Replace Elastic APM with Sentry APM (#183)
This commit is contained in:
@ -1,13 +1,14 @@
|
||||
"""Flows Planner"""
|
||||
from dataclasses import dataclass, field
|
||||
from time import time
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from django.core.cache import cache
|
||||
from django.http import HttpRequest
|
||||
from elasticapm import capture_span
|
||||
from sentry_sdk import start_span
|
||||
from sentry_sdk.tracing import Span
|
||||
from structlog import get_logger
|
||||
|
||||
from passbook.audit.models import cleanse_dict
|
||||
from passbook.core.models import User
|
||||
from passbook.flows.exceptions import EmptyFlowException, FlowNonApplicableException
|
||||
from passbook.flows.markers import ReevaluateMarker, StageMarker
|
||||
@ -90,47 +91,53 @@ class FlowPlanner:
|
||||
self.allow_empty_flows = False
|
||||
self.flow = flow
|
||||
|
||||
@capture_span(name="FlowPlanner", span_type="flow.planner.plan")
|
||||
def plan(
|
||||
self, request: HttpRequest, default_context: Optional[Dict[str, Any]] = None
|
||||
) -> FlowPlan:
|
||||
"""Check each of the flows' policies, check policies for each stage with PolicyBinding
|
||||
and return ordered list"""
|
||||
LOGGER.debug("f(plan): Starting planning process", flow=self.flow)
|
||||
# Bit of a workaround here, if there is a pending user set in the default context
|
||||
# we use that user for our cache key
|
||||
# to make sure they don't get the generic response
|
||||
if default_context and PLAN_CONTEXT_PENDING_USER in default_context:
|
||||
user = default_context[PLAN_CONTEXT_PENDING_USER]
|
||||
else:
|
||||
user = request.user
|
||||
# First off, check the flow's direct policy bindings
|
||||
# to make sure the user even has access to the flow
|
||||
engine = PolicyEngine(self.flow, user, request)
|
||||
if default_context:
|
||||
engine.request.context = default_context
|
||||
engine.build()
|
||||
result = engine.result
|
||||
if not result.passing:
|
||||
raise FlowNonApplicableException(result.messages)
|
||||
# User is passing so far, check if we have a cached plan
|
||||
cached_plan_key = cache_key(self.flow, user)
|
||||
cached_plan = cache.get(cached_plan_key, None)
|
||||
if cached_plan and self.use_cache:
|
||||
LOGGER.debug(
|
||||
"f(plan): Taking plan from cache", flow=self.flow, key=cached_plan_key
|
||||
)
|
||||
# Reset the context as this isn't factored into caching
|
||||
cached_plan.context = default_context or {}
|
||||
return cached_plan
|
||||
LOGGER.debug("f(plan): building plan", flow=self.flow)
|
||||
plan = self._build_plan(user, request, default_context)
|
||||
cache.set(cache_key(self.flow, user), plan)
|
||||
if not plan.stages and not self.allow_empty_flows:
|
||||
raise EmptyFlowException()
|
||||
return plan
|
||||
with start_span(op="flow.planner.plan") as span:
|
||||
span: Span
|
||||
span.set_data("flow", self.flow)
|
||||
span.set_data("request", request)
|
||||
|
||||
LOGGER.debug("f(plan): Starting planning process", flow=self.flow)
|
||||
# Bit of a workaround here, if there is a pending user set in the default context
|
||||
# we use that user for our cache key
|
||||
# to make sure they don't get the generic response
|
||||
if default_context and PLAN_CONTEXT_PENDING_USER in default_context:
|
||||
user = default_context[PLAN_CONTEXT_PENDING_USER]
|
||||
else:
|
||||
user = request.user
|
||||
# First off, check the flow's direct policy bindings
|
||||
# to make sure the user even has access to the flow
|
||||
engine = PolicyEngine(self.flow, user, request)
|
||||
if default_context:
|
||||
span.set_data("default_context", cleanse_dict(default_context))
|
||||
engine.request.context = default_context
|
||||
engine.build()
|
||||
result = engine.result
|
||||
if not result.passing:
|
||||
raise FlowNonApplicableException(result.messages)
|
||||
# User is passing so far, check if we have a cached plan
|
||||
cached_plan_key = cache_key(self.flow, user)
|
||||
cached_plan = cache.get(cached_plan_key, None)
|
||||
if cached_plan and self.use_cache:
|
||||
LOGGER.debug(
|
||||
"f(plan): Taking plan from cache",
|
||||
flow=self.flow,
|
||||
key=cached_plan_key,
|
||||
)
|
||||
# Reset the context as this isn't factored into caching
|
||||
cached_plan.context = default_context or {}
|
||||
return cached_plan
|
||||
LOGGER.debug("f(plan): building plan", flow=self.flow)
|
||||
plan = self._build_plan(user, request, default_context)
|
||||
cache.set(cache_key(self.flow, user), plan)
|
||||
if not plan.stages and not self.allow_empty_flows:
|
||||
raise EmptyFlowException()
|
||||
return plan
|
||||
|
||||
@capture_span(name="FlowPlanner", span_type="flow.planner.build_plan")
|
||||
def _build_plan(
|
||||
self,
|
||||
user: User,
|
||||
@ -139,38 +146,40 @@ class FlowPlanner:
|
||||
) -> FlowPlan:
|
||||
"""Build flow plan by checking each stage in their respective
|
||||
order and checking the applied policies"""
|
||||
start_time = time()
|
||||
plan = FlowPlan(flow_pk=self.flow.pk.hex)
|
||||
if default_context:
|
||||
plan.context = default_context
|
||||
# Check Flow policies
|
||||
for stage in (
|
||||
self.flow.stages.order_by("flowstagebinding__order")
|
||||
.select_subclasses()
|
||||
.select_related()
|
||||
):
|
||||
binding: FlowStageBinding = stage.flowstagebinding_set.get(
|
||||
target__pk=self.flow.pk
|
||||
)
|
||||
engine = PolicyEngine(binding, user, request)
|
||||
engine.request.context = plan.context
|
||||
engine.build()
|
||||
if engine.passing:
|
||||
LOGGER.debug("f(plan): Stage passing", stage=stage, flow=self.flow)
|
||||
plan.stages.append(stage)
|
||||
marker = StageMarker()
|
||||
if binding.re_evaluate_policies:
|
||||
LOGGER.debug(
|
||||
"f(plan): Stage has re-evaluate marker",
|
||||
stage=stage,
|
||||
flow=self.flow,
|
||||
)
|
||||
marker = ReevaluateMarker(binding=binding, user=user)
|
||||
plan.markers.append(marker)
|
||||
end_time = time()
|
||||
with start_span(op="flow.planner.build_plan") as span:
|
||||
span: Span
|
||||
span.set_data("flow", self.flow)
|
||||
span.set_data("user", user)
|
||||
span.set_data("request", request)
|
||||
|
||||
plan = FlowPlan(flow_pk=self.flow.pk.hex)
|
||||
if default_context:
|
||||
plan.context = default_context
|
||||
# Check Flow policies
|
||||
for stage in (
|
||||
self.flow.stages.order_by("flowstagebinding__order")
|
||||
.select_subclasses()
|
||||
.select_related()
|
||||
):
|
||||
binding: FlowStageBinding = stage.flowstagebinding_set.get(
|
||||
target__pk=self.flow.pk
|
||||
)
|
||||
engine = PolicyEngine(binding, user, request)
|
||||
engine.request.context = plan.context
|
||||
engine.build()
|
||||
if engine.passing:
|
||||
LOGGER.debug("f(plan): Stage passing", stage=stage, flow=self.flow)
|
||||
plan.stages.append(stage)
|
||||
marker = StageMarker()
|
||||
if binding.re_evaluate_policies:
|
||||
LOGGER.debug(
|
||||
"f(plan): Stage has re-evaluate marker",
|
||||
stage=stage,
|
||||
flow=self.flow,
|
||||
)
|
||||
marker = ReevaluateMarker(binding=binding, user=user)
|
||||
plan.markers.append(marker)
|
||||
LOGGER.debug(
|
||||
"f(plan): Finished building",
|
||||
flow=self.flow,
|
||||
duration_s=end_time - start_time,
|
||||
"f(plan): Finished building", flow=self.flow, duration_s=span.timestamp,
|
||||
)
|
||||
return plan
|
||||
|
||||
Reference in New Issue
Block a user