policy(minor): add data class for policy request
This commit is contained in:
		| @ -3,7 +3,6 @@ import re | ||||
| from datetime import timedelta | ||||
| from random import SystemRandom | ||||
| from time import sleep | ||||
| from typing import List | ||||
| from uuid import uuid4 | ||||
|  | ||||
| from django.contrib.auth.models import AbstractUser | ||||
| @ -18,6 +17,7 @@ from structlog import get_logger | ||||
| from passbook.core.signals import password_changed | ||||
| from passbook.lib.models import CreatedUpdatedModel, UUIDModel | ||||
| from passbook.policy.exceptions import PolicyException | ||||
| from passbook.policy.struct import PolicyRequest, PolicyResult | ||||
|  | ||||
| LOGGER = get_logger(__name__) | ||||
|  | ||||
| @ -26,20 +26,6 @@ def default_nonce_duration(): | ||||
|     """Default duration a Nonce is valid""" | ||||
|     return now() + timedelta(hours=4) | ||||
|  | ||||
|  | ||||
| class PolicyResult: | ||||
|     """Small data-class to hold policy results""" | ||||
|  | ||||
|     passing: bool = False | ||||
|     messages: List[str] = [] | ||||
|  | ||||
|     def __init__(self, passing: bool, *messages: str): | ||||
|         self.passing = passing | ||||
|         self.messages = messages | ||||
|  | ||||
|     def __str__(self): | ||||
|         return f"<PolicyResult passing={self.passing}>" | ||||
|  | ||||
| class Group(UUIDModel): | ||||
|     """Custom Group model which supports a basic hierarchy""" | ||||
|  | ||||
| @ -244,7 +230,7 @@ class Policy(UUIDModel, CreatedUpdatedModel): | ||||
|             return self.name | ||||
|         return "%s action %s" % (self.name, self.action) | ||||
|  | ||||
|     def passes(self, user: User) -> PolicyResult: | ||||
|     def passes(self, request: PolicyRequest) -> PolicyResult: | ||||
|         """Check if user instance passes this policy""" | ||||
|         raise PolicyException() | ||||
|  | ||||
| @ -288,11 +274,11 @@ class FieldMatcherPolicy(Policy): | ||||
|             description = "%s: %s" % (self.name, description) | ||||
|         return description | ||||
|  | ||||
|     def passes(self, user: User) -> PolicyResult: | ||||
|     def passes(self, request: PolicyRequest) -> PolicyResult: | ||||
|         """Check if user instance passes this role""" | ||||
|         if not hasattr(user, self.user_field): | ||||
|         if not hasattr(request.user, self.user_field): | ||||
|             raise ValueError("Field does not exist") | ||||
|         user_field_value = getattr(user, self.user_field, None) | ||||
|         user_field_value = getattr(request.user, self.user_field, None) | ||||
|         LOGGER.debug("Checked '%s' %s with '%s'...", | ||||
|                      user_field_value, self.match_action, self.value) | ||||
|         passes = False | ||||
| @ -328,11 +314,11 @@ class PasswordPolicy(Policy): | ||||
|  | ||||
|     form = 'passbook.core.forms.policies.PasswordPolicyForm' | ||||
|  | ||||
|     def passes(self, user: User) -> PolicyResult: | ||||
|     def passes(self, request: PolicyRequest) -> PolicyResult: | ||||
|         # Only check if password is being set | ||||
|         if not hasattr(user, '__password__'): | ||||
|         if not hasattr(request.user, '__password__'): | ||||
|             return PolicyResult(True) | ||||
|         password = getattr(user, '__password__') | ||||
|         password = getattr(request.user, '__password__') | ||||
|  | ||||
|         filter_regex = r'' | ||||
|         if self.amount_lowercase > 0: | ||||
| @ -379,7 +365,7 @@ class WebhookPolicy(Policy): | ||||
|  | ||||
|     form = 'passbook.core.forms.policies.WebhookPolicyForm' | ||||
|  | ||||
|     def passes(self, user: User) -> PolicyResult: | ||||
|     def passes(self, request: PolicyRequest) -> PolicyResult: | ||||
|         """Call webhook asynchronously and report back""" | ||||
|         raise NotImplementedError() | ||||
|  | ||||
| @ -398,7 +384,7 @@ class DebugPolicy(Policy): | ||||
|  | ||||
|     form = 'passbook.core.forms.policies.DebugPolicyForm' | ||||
|  | ||||
|     def passes(self, user: User) -> PolicyResult: | ||||
|     def passes(self, request: PolicyRequest) -> PolicyResult: | ||||
|         """Wait random time then return result""" | ||||
|         wait = SystemRandom().randrange(self.wait_min, self.wait_max) | ||||
|         LOGGER.debug("Policy '%s' waiting for %ds", self.name, wait) | ||||
| @ -417,8 +403,8 @@ class GroupMembershipPolicy(Policy): | ||||
|  | ||||
|     form = 'passbook.core.forms.policies.GroupMembershipPolicyForm' | ||||
|  | ||||
|     def passes(self, user: User) -> PolicyResult: | ||||
|         return PolicyResult(self.group.user_set.filter(pk=user.pk).exists()) | ||||
|     def passes(self, request: PolicyRequest) -> PolicyResult: | ||||
|         return PolicyResult(self.group.user_set.filter(pk=request.user.pk).exists()) | ||||
|  | ||||
|     class Meta: | ||||
|  | ||||
| @ -430,10 +416,11 @@ class SSOLoginPolicy(Policy): | ||||
|  | ||||
|     form = 'passbook.core.forms.policies.SSOLoginPolicyForm' | ||||
|  | ||||
|     def passes(self, user) -> PolicyResult: | ||||
|     def passes(self, request: PolicyRequest) -> PolicyResult: | ||||
|         """Check if user instance passes this policy""" | ||||
|         from passbook.core.auth.view import AuthenticationView | ||||
|         return PolicyResult(user.session.get(AuthenticationView.SESSION_IS_SSO_LOGIN, False)) | ||||
|         is_sso_login = request.user.session.get(AuthenticationView.SESSION_IS_SSO_LOGIN, False) | ||||
|         return PolicyResult(is_sso_login) | ||||
|  | ||||
|     class Meta: | ||||
|  | ||||
|  | ||||
		Reference in New Issue
	
	Block a user
	 Jens Langhammer
					Jens Langhammer