stages/user_login: fix session binding logging (#15175)

* add tests

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix logging

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* update test db?

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* ah there we go; fix mmdb not being reloaded with test settings

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

---------

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
Jens L.
2025-06-21 00:21:49 +02:00
committed by GitHub
parent 080e2311fe
commit f76becfd86
7 changed files with 80 additions and 14 deletions

View File

@ -86,6 +86,10 @@ dev-create-db:
dev-reset: dev-drop-db dev-create-db migrate ## Drop and restore the Authentik PostgreSQL instance to a "fresh install" state.
update-test-mmdb: ## Update test GeoIP and ASN Databases
curl -L https://raw.githubusercontent.com/maxmind/MaxMind-DB/refs/heads/main/test-data/GeoLite2-ASN-Test.mmdb -o ${PWD}/tests/GeoLite2-ASN-Test.mmdb
curl -L https://raw.githubusercontent.com/maxmind/MaxMind-DB/refs/heads/main/test-data/GeoLite2-City-Test.mmdb -o ${PWD}/tests/GeoLite2-City-Test.mmdb
#########################
## API Schema
#########################

View File

@ -15,13 +15,13 @@ class MMDBContextProcessor(EventContextProcessor):
self.reader: Reader | None = None
self._last_mtime: float = 0.0
self.logger = get_logger()
self.open()
self.load()
def path(self) -> str | None:
"""Get the path to the MMDB file to load"""
raise NotImplementedError
def open(self):
def load(self):
"""Get GeoIP Reader, if configured, otherwise none"""
path = self.path()
if path == "" or not path:
@ -44,7 +44,7 @@ class MMDBContextProcessor(EventContextProcessor):
diff = self._last_mtime < mtime
if diff > 0:
self.logger.info("Found new MMDB Database, reopening", diff=diff, path=path)
self.open()
self.load()
except OSError as exc:
self.logger.warning("Failed to check MMDB age", exc=exc)

View File

@ -11,6 +11,8 @@ from django.contrib.contenttypes.models import ContentType
from django.test.runner import DiscoverRunner
from structlog.stdlib import get_logger
from authentik.events.context_processors.asn import ASN_CONTEXT_PROCESSOR
from authentik.events.context_processors.geoip import GEOIP_CONTEXT_PROCESSOR
from authentik.lib.config import CONFIG
from authentik.lib.sentry import sentry_init
from authentik.root.signals import post_startup, pre_startup, startup
@ -76,6 +78,9 @@ class PytestTestRunner(DiscoverRunner): # pragma: no cover
for key, value in test_config.items():
CONFIG.set(key, value)
ASN_CONTEXT_PROCESSOR.load()
GEOIP_CONTEXT_PROCESSOR.load()
sentry_init()
self.logger.debug("Test environment configured")

View File

@ -101,9 +101,9 @@ class BoundSessionMiddleware(SessionMiddleware):
SESSION_KEY_BINDING_GEO, GeoIPBinding.NO_BINDING
)
if configured_binding_net != NetworkBinding.NO_BINDING:
self.recheck_session_net(configured_binding_net, last_ip, new_ip)
BoundSessionMiddleware.recheck_session_net(configured_binding_net, last_ip, new_ip)
if configured_binding_geo != GeoIPBinding.NO_BINDING:
self.recheck_session_geo(configured_binding_geo, last_ip, new_ip)
BoundSessionMiddleware.recheck_session_geo(configured_binding_geo, last_ip, new_ip)
# If we got to this point without any error being raised, we need to
# update the last saved IP to the current one
if SESSION_KEY_BINDING_NET in request.session or SESSION_KEY_BINDING_GEO in request.session:
@ -111,7 +111,8 @@ class BoundSessionMiddleware(SessionMiddleware):
# (== basically requires the user to be logged in)
request.session[request.session.model.Keys.LAST_IP] = new_ip
def recheck_session_net(self, binding: NetworkBinding, last_ip: str, new_ip: str):
@staticmethod
def recheck_session_net(binding: NetworkBinding, last_ip: str, new_ip: str):
"""Check network/ASN binding"""
last_asn = ASN_CONTEXT_PROCESSOR.asn(last_ip)
new_asn = ASN_CONTEXT_PROCESSOR.asn(new_ip)
@ -158,7 +159,8 @@ class BoundSessionMiddleware(SessionMiddleware):
new_ip,
)
def recheck_session_geo(self, binding: GeoIPBinding, last_ip: str, new_ip: str):
@staticmethod
def recheck_session_geo(binding: GeoIPBinding, last_ip: str, new_ip: str):
"""Check GeoIP binding"""
last_geo = GEOIP_CONTEXT_PROCESSOR.city(last_ip)
new_geo = GEOIP_CONTEXT_PROCESSOR.city(new_ip)
@ -179,8 +181,8 @@ class BoundSessionMiddleware(SessionMiddleware):
if last_geo.continent != new_geo.continent:
raise SessionBindingBroken(
"geoip.continent",
last_geo.continent,
new_geo.continent,
last_geo.continent.to_dict(),
new_geo.continent.to_dict(),
last_ip,
new_ip,
)
@ -192,8 +194,8 @@ class BoundSessionMiddleware(SessionMiddleware):
if last_geo.country != new_geo.country:
raise SessionBindingBroken(
"geoip.country",
last_geo.country,
new_geo.country,
last_geo.country.to_dict(),
new_geo.country.to_dict(),
last_ip,
new_ip,
)
@ -202,8 +204,8 @@ class BoundSessionMiddleware(SessionMiddleware):
if last_geo.city != new_geo.city:
raise SessionBindingBroken(
"geoip.city",
last_geo.city,
new_geo.city,
last_geo.city.to_dict(),
new_geo.city.to_dict(),
last_ip,
new_ip,
)

View File

@ -3,6 +3,7 @@
from time import sleep
from unittest.mock import patch
from django.http import HttpRequest
from django.urls import reverse
from django.utils.timezone import now
@ -17,7 +18,12 @@ from authentik.flows.views.executor import SESSION_KEY_PLAN
from authentik.lib.generators import generate_id
from authentik.lib.utils.time import timedelta_from_string
from authentik.root.middleware import ClientIPMiddleware
from authentik.stages.user_login.models import UserLoginStage
from authentik.stages.user_login.middleware import (
BoundSessionMiddleware,
SessionBindingBroken,
logout_extra,
)
from authentik.stages.user_login.models import GeoIPBinding, NetworkBinding, UserLoginStage
class TestUserLoginStage(FlowTestCase):
@ -192,3 +198,52 @@ class TestUserLoginStage(FlowTestCase):
self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
response = self.client.get(reverse("authentik_api:application-list"))
self.assertEqual(response.status_code, 403)
def test_binding_net_break_log(self):
"""Test logout_extra with exception"""
# IPs from https://github.com/maxmind/MaxMind-DB/blob/main/source-data/GeoLite2-ASN-Test.json
for args, expect in [
[[NetworkBinding.BIND_ASN, "8.8.8.8", "8.8.8.8"], ["network.missing"]],
[[NetworkBinding.BIND_ASN, "1.0.0.1", "1.128.0.1"], ["network.asn"]],
[
[NetworkBinding.BIND_ASN_NETWORK, "12.81.96.1", "12.81.128.1"],
["network.asn_network"],
],
[[NetworkBinding.BIND_ASN_NETWORK_IP, "1.0.0.1", "1.0.0.2"], ["network.ip"]],
]:
with self.subTest(args[0]):
with self.assertRaises(SessionBindingBroken) as cm:
BoundSessionMiddleware.recheck_session_net(*args)
self.assertEqual(cm.exception.reason, expect[0])
# Ensure the request can be logged without throwing errors
self.client.force_login(self.user)
request = HttpRequest()
request.session = self.client.session
request.user = self.user
logout_extra(request, cm.exception)
def test_binding_geo_break_log(self):
"""Test logout_extra with exception"""
# IPs from https://github.com/maxmind/MaxMind-DB/blob/main/source-data/GeoLite2-City-Test.json
for args, expect in [
[[GeoIPBinding.BIND_CONTINENT, "8.8.8.8", "8.8.8.8"], ["geoip.missing"]],
[[GeoIPBinding.BIND_CONTINENT, "2.125.160.216", "67.43.156.1"], ["geoip.continent"]],
[
[GeoIPBinding.BIND_CONTINENT_COUNTRY, "81.2.69.142", "89.160.20.112"],
["geoip.country"],
],
[
[GeoIPBinding.BIND_CONTINENT_COUNTRY_CITY, "2.125.160.216", "81.2.69.142"],
["geoip.city"],
],
]:
with self.subTest(args[0]):
with self.assertRaises(SessionBindingBroken) as cm:
BoundSessionMiddleware.recheck_session_geo(*args)
self.assertEqual(cm.exception.reason, expect[0])
# Ensure the request can be logged without throwing errors
self.client.force_login(self.user)
request = HttpRequest()
request.session = self.client.session
request.user = self.user
logout_extra(request, cm.exception)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 12 KiB

After

Width:  |  Height:  |  Size: 12 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 20 KiB

After

Width:  |  Height:  |  Size: 21 KiB