policies: add and/or mode (#463)

* policies: add mode to PolicyEngine for AND and OR modes

* events: use PolicyEngine in OR mode
This commit is contained in:
Jens L
2021-01-12 18:22:25 +01:00
committed by GitHub
parent b2b737e59e
commit c727c845df
4 changed files with 69 additions and 21 deletions

View File

@ -1,4 +1,5 @@
"""authentik policy engine"""
from enum import Enum
from multiprocessing import Pipe, set_start_method
from multiprocessing.connection import Connection
from typing import Iterator, List, Optional
@ -37,12 +38,23 @@ class PolicyProcessInfo:
self.result = None
class PolicyEngineMode(Enum):
"""Decide how results of multiple policies should be combined."""
MODE_AND = "and"
MODE_OR = "or"
class PolicyEngine:
"""Orchestrate policy checking, launch tasks and return result"""
use_cache: bool
request: PolicyRequest
mode: PolicyEngineMode
# Allow objects with no policies attached to pass
empty_result: bool
__pbm: PolicyBindingModel
__cached_policies: List[PolicyResult]
__processes: List[PolicyProcessInfo]
@ -52,6 +64,10 @@ class PolicyEngine:
def __init__(
self, pbm: PolicyBindingModel, user: User, request: HttpRequest = None
):
self.mode = PolicyEngineMode.MODE_AND
# For backwards compatibility, set empty_result to true
# objects with no policies attached will pass.
self.empty_result = True
if not isinstance(pbm, PolicyBindingModel): # pragma: no cover
raise ValueError(f"{pbm} is not instance of PolicyBindingModel")
self.__pbm = pbm
@ -119,24 +135,19 @@ class PolicyEngine:
x.result for x in self.__processes if x.result
]
all_results = list(process_results + self.__cached_policies)
final_result = PolicyResult(False)
final_result.messages = []
final_result.source_results = all_results
if len(all_results) < self.__expected_result_count: # pragma: no cover
raise AssertionError("Got less results than polices")
for result in all_results:
LOGGER.debug(
"P_ENG: result", passing=result.passing, messages=result.messages
)
if result.messages:
final_result.messages.extend(result.messages)
if not result.passing:
final_result.messages = tuple(final_result.messages)
final_result.passing = False
return final_result
final_result.messages = tuple(final_result.messages)
final_result.passing = True
return final_result
# No results, no policies attached -> passing
if len(all_results) == 0:
return PolicyResult(self.empty_result)
passing = False
if self.mode == PolicyEngineMode.MODE_AND:
passing = all([x.passing for x in all_results])
if self.mode == PolicyEngineMode.MODE_OR:
passing = any([x.passing for x in all_results])
result = PolicyResult(passing)
result.messages = tuple([y for x in all_results for y in x.messages])
return result
@property
def passing(self) -> bool: