policies/expression: migrate to raw python instead of jinja2 (#49)

* policies/expression: migrate to raw python instead of jinja2

* lib/expression: create base evaluator, custom subclass for policies

* core: rewrite propertymappings to use python

* providers/saml: update to new PropertyMappings

* sources/ldap: update to new PropertyMappings

* docs: update docs for new propertymappings

* root: remove jinja2

* root: re-add jinja to lock file as its implicitly required
This commit is contained in:
Jens L
2020-06-05 12:00:27 +02:00
committed by GitHub
parent 147212a5f9
commit 73116b9d1a
28 changed files with 322 additions and 190 deletions

View File

@ -1,81 +1,30 @@
"""passbook expression policy evaluator"""
import re
from typing import Any, Dict, List, Optional
from typing import List
from django.core.exceptions import ValidationError
from django.http import HttpRequest
from jinja2 import Undefined
from jinja2.exceptions import TemplateSyntaxError
from jinja2.nativetypes import NativeEnvironment
from requests import Session
from structlog import get_logger
from passbook.core.models import User
from passbook.flows.planner import PLAN_CONTEXT_SSO
from passbook.flows.views import SESSION_KEY_PLAN
from passbook.lib.expression.evaluator import BaseEvaluator
from passbook.lib.utils.http import get_client_ip
from passbook.policies.types import PolicyRequest, PolicyResult
LOGGER = get_logger()
class Evaluator:
"""Validate and evaluate jinja2-based expressions"""
class PolicyEvaluator(BaseEvaluator):
"""Validate and evaluate python-based expressions"""
_env: NativeEnvironment
_context: Dict[str, Any]
_messages: List[str]
def __init__(self):
self._env = NativeEnvironment(
extensions=["jinja2.ext.do"],
trim_blocks=True,
lstrip_blocks=True,
line_statement_prefix=">",
)
# update passbook/policies/expression/templates/policy/expression/form.html
# update docs/policies/expression/index.md
self._env.filters["regex_match"] = Evaluator.jinja2_filter_regex_match
self._env.filters["regex_replace"] = Evaluator.jinja2_filter_regex_replace
self._env.globals["pb_message"] = self.jinja2_func_message
self._context = {
"pb_is_group_member": Evaluator.jinja2_func_is_group_member,
"pb_user_by": Evaluator.jinja2_func_user_by,
"pb_logger": get_logger(),
"requests": Session(),
}
def __init__(self, policy_name: str):
super().__init__()
self._messages = []
self._context["pb_message"] = self.expr_func_message
self._filename = policy_name
@property
def env(self) -> NativeEnvironment:
"""Access to our custom NativeEnvironment"""
return self._env
@staticmethod
def jinja2_filter_regex_match(value: Any, regex: str) -> bool:
"""Jinja2 Filter to run re.search"""
return re.search(regex, value) is None
@staticmethod
def jinja2_filter_regex_replace(value: Any, regex: str, repl: str) -> str:
"""Jinja2 Filter to run re.sub"""
return re.sub(regex, repl, value)
@staticmethod
def jinja2_func_user_by(**filters) -> Optional[User]:
"""Get user by filters"""
users = User.objects.filter(**filters)
if users:
return users.first()
return None
@staticmethod
def jinja2_func_is_group_member(user: User, group_name: str) -> bool:
"""Check if `user` is member of group with name `group_name`"""
return user.groups.filter(name=group_name).exists()
def jinja2_func_message(self, message: str):
def expr_func_message(self, message: str):
"""Wrapper to append to messages list, which is returned with PolicyResult"""
self._messages.append(message)
@ -84,41 +33,35 @@ class Evaluator:
# update passbook/policies/expression/templates/policy/expression/form.html
# update docs/policies/expression/index.md
self._context["pb_is_sso_flow"] = request.context.get(PLAN_CONTEXT_SSO, False)
self._context["request"] = request
if request.http_request:
self.set_http_request(request.http_request)
self._context["request"] = request
def set_http_request(self, request: HttpRequest):
"""Update context based on http request"""
# update passbook/policies/expression/templates/policy/expression/form.html
# update docs/policies/expression/index.md
self._context["pb_client_ip"] = (
get_client_ip(request.http_request) or "255.255.255.255"
)
self._context["pb_client_ip"] = get_client_ip(request) or "255.255.255.255"
self._context["request"] = request
if SESSION_KEY_PLAN in request.http_request.session:
self._context["pb_flow_plan"] = request.http_request.session[
SESSION_KEY_PLAN
]
if SESSION_KEY_PLAN in request.session:
self._context["pb_flow_plan"] = request.session[SESSION_KEY_PLAN]
def evaluate(self, expression_source: str) -> PolicyResult:
"""Parse and evaluate expression. Policy is expected to return a truthy object.
Messages can be added using 'do pb_message()'."""
try:
expression = self._env.from_string(expression_source.lstrip().rstrip())
except TemplateSyntaxError as exc:
result = super().evaluate(expression_source)
except (ValueError, SyntaxError) as exc:
return PolicyResult(False, str(exc))
try:
result: Optional[Any] = expression.render(self._context)
except Exception as exc: # pylint: disable=broad-except
LOGGER.warning("Expression error", exc=exc)
return PolicyResult(False, str(exc))
else:
policy_result = PolicyResult(False)
policy_result.messages = tuple(self._messages)
if isinstance(result, Undefined):
if result is None:
LOGGER.warning(
"Expression policy returned undefined",
"Expression policy returned None",
src=expression_source,
req=self._context,
)
@ -126,11 +69,3 @@ class Evaluator:
if result:
policy_result.passing = bool(result)
return policy_result
def validate(self, expression: str):
"""Validate expression's syntax, raise ValidationError if Syntax is invalid"""
try:
self._env.from_string(expression)
return True
except TemplateSyntaxError as exc:
raise ValidationError(f"Expression Syntax Error: {str(exc)}") from exc