 4915205678
			
		
	
	4915205678
	
	
	
		
			
			* core: start migrating to flows for authorisation * sources/oauth: start type-hinting * core: create default user * core: only show user delete button if an unenrollment flow exists * flows: Correctly check initial policies on flow with context * policies: add more verbosity to engine * sources/oauth: migrate to flows * sources/oauth: fix typing errors * flows: add more tests * sources/oauth: start implementing unittests * sources/ldap: add option to disable user sync, move connection init to model * sources/ldap: re-add default PropertyMappings * providers/saml: re-add default PropertyMappings * admin: fix missing stage count * stages/identification: fix sources not being shown * crypto: fix being unable to save with private key * crypto: re-add default self-signed keypair * policies: rewrite cache_key to prevent wrong cache * sources/saml: migrate to flows for auth and enrollment * stages/consent: add new stage * admin: fix PropertyMapping widget not rendering properly * core: provider.authorization_flow is mandatory * flows: add support for "autosubmit" attribute on form * flows: add InMemoryStage for dynamic stages * flows: optionally allow empty flows from FlowPlanner * providers/saml: update to authorization_flow * sources/*: fix flow executor URL * flows: fix pylint error * flows: wrap responses in JSON object to easily handle redirects * flow: dont cache plan's context * providers/oauth: rewrite OAuth2 Provider to use flows * providers/*: update docstrings of models * core: fix forms not passing help_text through safe * flows: fix HttpResponses not being converted to JSON * providers/oidc: rewrite to use flows * flows: fix linting
		
			
				
	
	
		
			143 lines
		
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			143 lines
		
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Flow models"""
 | |
| from typing import Callable, Optional
 | |
| from uuid import uuid4
 | |
| 
 | |
| from django.db import models
 | |
| from django.http import HttpRequest
 | |
| from django.utils.translation import gettext_lazy as _
 | |
| from model_utils.managers import InheritanceManager
 | |
| from structlog import get_logger
 | |
| 
 | |
| from passbook.core.types import UIUserSettings
 | |
| from passbook.lib.utils.reflection import class_to_path
 | |
| from passbook.policies.models import PolicyBindingModel
 | |
| 
 | |
| LOGGER = get_logger()
 | |
| 
 | |
| 
 | |
| class FlowDesignation(models.TextChoices):
 | |
|     """Designation of what a Flow should be used for. At a later point, this
 | |
|     should be replaced by a database entry."""
 | |
| 
 | |
|     AUTHENTICATION = "authentication"
 | |
|     AUTHORIZATION = "authorization"
 | |
|     INVALIDATION = "invalidation"
 | |
|     ENROLLMENT = "enrollment"
 | |
|     UNRENOLLMENT = "unenrollment"
 | |
|     RECOVERY = "recovery"
 | |
|     PASSWORD_CHANGE = "password_change"  # nosec # noqa
 | |
| 
 | |
| 
 | |
| class Stage(models.Model):
 | |
|     """Stage is an instance of a component used in a flow. This can verify the user,
 | |
|     enroll the user or offer a way of recovery"""
 | |
| 
 | |
|     stage_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
 | |
| 
 | |
|     name = models.TextField()
 | |
| 
 | |
|     objects = InheritanceManager()
 | |
|     type = ""
 | |
|     form = ""
 | |
| 
 | |
|     @property
 | |
|     def ui_user_settings(self) -> Optional[UIUserSettings]:
 | |
|         """Entrypoint to integrate with User settings. Can either return None if no
 | |
|         user settings are available, or an instanace of UIUserSettings."""
 | |
|         return None
 | |
| 
 | |
|     def __str__(self):
 | |
|         return f"Stage {self.name}"
 | |
| 
 | |
| 
 | |
| def in_memory_stage(_type: Callable) -> Stage:
 | |
|     """Creates an in-memory stage instance, based on a `_type` as view."""
 | |
|     class_path = class_to_path(_type)
 | |
|     stage = Stage()
 | |
|     stage.type = class_path
 | |
|     return stage
 | |
| 
 | |
| 
 | |
| class Flow(PolicyBindingModel):
 | |
|     """Flow describes how a series of Stages should be executed to authenticate/enroll/recover
 | |
|     a user. Additionally, policies can be applied, to specify which users
 | |
|     have access to this flow."""
 | |
| 
 | |
|     flow_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
 | |
| 
 | |
|     name = models.TextField()
 | |
|     slug = models.SlugField(unique=True)
 | |
| 
 | |
|     designation = models.CharField(max_length=100, choices=FlowDesignation.choices)
 | |
| 
 | |
|     stages = models.ManyToManyField(Stage, through="FlowStageBinding", blank=True)
 | |
| 
 | |
|     pbm = models.OneToOneField(
 | |
|         PolicyBindingModel, parent_link=True, on_delete=models.CASCADE, related_name="+"
 | |
|     )
 | |
| 
 | |
|     @staticmethod
 | |
|     def with_policy(request: HttpRequest, **flow_filter) -> Optional["Flow"]:
 | |
|         """Get a Flow by `**flow_filter` and check if the request from `request` can access it."""
 | |
|         from passbook.policies.engine import PolicyEngine
 | |
| 
 | |
|         flows = Flow.objects.filter(**flow_filter)
 | |
|         for flow in flows:
 | |
|             engine = PolicyEngine(flow, request.user, request)
 | |
|             engine.build()
 | |
|             result = engine.result
 | |
|             if result.passing:
 | |
|                 LOGGER.debug("with_policy: flow passing", flow=flow)
 | |
|                 return flow
 | |
|             LOGGER.warning(
 | |
|                 "with_policy: flow not passing", flow=flow, messages=result.messages
 | |
|             )
 | |
|         LOGGER.debug("with_policy: no flow found", filters=flow_filter)
 | |
|         return None
 | |
| 
 | |
|     def related_flow(self, designation: str, request: HttpRequest) -> Optional["Flow"]:
 | |
|         """Get a related flow with `designation`. Currently this only queries
 | |
|         Flows by `designation`, but will eventually use `self` for related lookups."""
 | |
|         return Flow.with_policy(request, designation=designation)
 | |
| 
 | |
|     def __str__(self) -> str:
 | |
|         return f"Flow {self.name} ({self.slug})"
 | |
| 
 | |
|     class Meta:
 | |
| 
 | |
|         verbose_name = _("Flow")
 | |
|         verbose_name_plural = _("Flows")
 | |
| 
 | |
| 
 | |
| class FlowStageBinding(PolicyBindingModel):
 | |
|     """Relationship between Flow and Stage. Order is required and unique for
 | |
|     each flow-stage Binding. Additionally, policies can be specified, which determine if
 | |
|     this Binding applies to the current user"""
 | |
| 
 | |
|     fsb_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
 | |
| 
 | |
|     flow = models.ForeignKey("Flow", on_delete=models.CASCADE)
 | |
|     stage = models.ForeignKey(Stage, on_delete=models.CASCADE)
 | |
| 
 | |
|     re_evaluate_policies = models.BooleanField(
 | |
|         default=False,
 | |
|         help_text=_(
 | |
|             "When this option is enabled, the planner will re-evaluate policies bound to this."
 | |
|         ),
 | |
|     )
 | |
| 
 | |
|     order = models.IntegerField()
 | |
| 
 | |
|     objects = InheritanceManager()
 | |
| 
 | |
|     def __str__(self) -> str:
 | |
|         return f"Flow Stage Binding #{self.order} {self.flow} -> {self.stage}"
 | |
| 
 | |
|     class Meta:
 | |
| 
 | |
|         ordering = ["order", "flow"]
 | |
| 
 | |
|         verbose_name = _("Flow Stage Binding")
 | |
|         verbose_name_plural = _("Flow Stage Bindings")
 | |
|         unique_together = (("flow", "stage", "order"),)
 |