Files
authentik/authentik/core/middleware.py
dependabot[bot] bd8794f646 core: bump structlog from 21.5.0 to 22.1.0 (#3294)
* core: bump structlog from 21.5.0 to 22.1.0

Bumps [structlog](https://github.com/hynek/structlog) from 21.5.0 to 22.1.0.
- [Release notes](https://github.com/hynek/structlog/releases)
- [Changelog](https://github.com/hynek/structlog/blob/main/CHANGELOG.md)
- [Commits](https://github.com/hynek/structlog/compare/21.5.0...22.1.0)

---
updated-dependencies:
- dependency-name: structlog
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>

* migrate threaedlocal to contextvars

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jens Langhammer <jens.langhammer@beryju.org>
2022-07-23 22:40:56 +02:00

70 lines
2.6 KiB
Python

"""authentik admin Middleware to impersonate users"""
from contextvars import ContextVar
from typing import Callable
from uuid import uuid4
from django.http import HttpRequest, HttpResponse
from sentry_sdk.api import set_tag
from structlog.contextvars import STRUCTLOG_KEY_PREFIX
SESSION_KEY_IMPERSONATE_USER = "authentik/impersonate/user"
SESSION_KEY_IMPERSONATE_ORIGINAL_USER = "authentik/impersonate/original_user"
RESPONSE_HEADER_ID = "X-authentik-id"
KEY_AUTH_VIA = "auth_via"
KEY_USER = "user"
CTX_REQUEST_ID = ContextVar(STRUCTLOG_KEY_PREFIX + "request_id", default=None)
CTX_HOST = ContextVar(STRUCTLOG_KEY_PREFIX + "host", default=None)
CTX_AUTH_VIA = ContextVar(STRUCTLOG_KEY_PREFIX + KEY_AUTH_VIA, default=None)
class ImpersonateMiddleware:
"""Middleware to impersonate users"""
get_response: Callable[[HttpRequest], HttpResponse]
def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]):
self.get_response = get_response
def __call__(self, request: HttpRequest) -> HttpResponse:
# No permission checks are done here, they need to be checked before
# SESSION_KEY_IMPERSONATE_USER is set.
if SESSION_KEY_IMPERSONATE_USER in request.session:
request.user = request.session[SESSION_KEY_IMPERSONATE_USER]
# Ensure that the user is active, otherwise nothing will work
request.user.is_active = True
return self.get_response(request)
class RequestIDMiddleware:
"""Add a unique ID to every request"""
get_response: Callable[[HttpRequest], HttpResponse]
def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]):
self.get_response = get_response
def __call__(self, request: HttpRequest) -> HttpResponse:
if not hasattr(request, "request_id"):
request_id = uuid4().hex
setattr(request, "request_id", request_id)
CTX_REQUEST_ID.set(request_id)
CTX_HOST.set(request.get_host())
set_tag("authentik.request_id", request_id)
if hasattr(request, "user") and getattr(request.user, "is_authenticated", False):
CTX_AUTH_VIA.set("session")
else:
CTX_AUTH_VIA.set("unauthenticated")
response = self.get_response(request)
response[RESPONSE_HEADER_ID] = request.request_id
setattr(response, "ak_context", {})
response.ak_context["request_id"] = CTX_REQUEST_ID.get()
response.ak_context["host"] = CTX_HOST.get()
response.ak_context[KEY_AUTH_VIA] = CTX_AUTH_VIA.get()
response.ak_context[KEY_USER] = request.user.username
return response