core: add geo_ip to authenticated sessions if enabled

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
Jens Langhammer
2021-05-30 00:50:53 +02:00
parent 059da74d1c
commit 73338bdf32
5 changed files with 250 additions and 30 deletions

View File

@ -1,30 +1,103 @@
"""AuthenticatedSessions API Viewset"""
from typing import Optional, TypedDict
from geoip2.errors import GeoIP2Error
from guardian.utils import get_anonymous_user
from rest_framework import mixins
from rest_framework.fields import SerializerMethodField
from rest_framework.request import Request
from rest_framework.serializers import ModelSerializer
from rest_framework.viewsets import GenericViewSet
from ua_parser import user_agent_parser
from authentik.core.models import AuthenticatedSession
from authentik.events.geo import GEOIP_READER
class UserAgentDeviceDict(TypedDict):
"""User agent device"""
brand: str
family: str
model: str
class UserAgentOSDict(TypedDict):
"""User agent os"""
family: str
major: str
minor: str
patch: str
patch_minor: str
class UserAgentBrowserDict(TypedDict):
"""User agent browser"""
family: str
major: str
minor: str
patch: str
class UserAgentDict(TypedDict):
"""User agent details"""
device: UserAgentDeviceDict
os: UserAgentOSDict
user_agent: UserAgentBrowserDict
string: str
class GeoIPDict(TypedDict):
"""GeoIP Details"""
continent: str
country: str
lat: float
long: float
class AuthenticatedSessionSerializer(ModelSerializer):
"""AuthenticatedSession Serializer"""
current = SerializerMethodField()
user_agent = SerializerMethodField()
geo_ip = SerializerMethodField()
def get_current(self, instance: AuthenticatedSession) -> bool:
"""Check if session is currently active session"""
request: Request = self.context["request"]
return request._request.session.session_key == instance.session_key
def get_user_agent(self, instance: AuthenticatedSession) -> UserAgentDict:
"""Get parsed user agent"""
return user_agent_parser.Parse(instance.last_user_agent)
def get_geo_ip(self, instance: AuthenticatedSession) -> Optional[GeoIPDict]:
"""Get parsed user agent"""
if not GEOIP_READER:
return None
try:
city = GEOIP_READER.city(instance.last_ip)
return {
"continent": city.continent.code,
"country": city.country.iso_code,
"lat": city.location.latitude,
"long": city.location.longitude,
}
except GeoIP2Error:
return None
class Meta:
model = AuthenticatedSession
fields = [
"uuid",
"current",
"user_agent",
"geo_ip",
"user",
"last_ip",
"last_user_agent",