Files
authentik/authentik/policies/reputation/signals.py
dependabot[bot] bc9e7e8b93 build(deps): bump structlog from 20.1.0 to 20.2.0 (#445)
* build(deps): bump structlog from 20.1.0 to 20.2.0

Bumps [structlog](https://github.com/hynek/structlog) from 20.1.0 to 20.2.0.
- [Release notes](https://github.com/hynek/structlog/releases)
- [Changelog](https://github.com/hynek/structlog/blob/master/CHANGELOG.rst)
- [Commits](https://github.com/hynek/structlog/compare/20.1.0...20.2.0)

Signed-off-by: dependabot[bot] <support@github.com>

* *: use structlog.stdlib instead of structlog for type-hints

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jens Langhammer <jens.langhammer@beryju.org>
2021-01-01 15:39:43 +01:00

44 lines
1.5 KiB
Python

"""authentik reputation request signals"""
from django.contrib.auth.signals import user_logged_in, user_login_failed
from django.core.cache import cache
from django.dispatch import receiver
from django.http import HttpRequest
from structlog.stdlib import get_logger
from authentik.lib.utils.http import get_client_ip
from authentik.policies.reputation.models import (
CACHE_KEY_IP_PREFIX,
CACHE_KEY_USER_PREFIX,
)
LOGGER = get_logger()
def update_score(request: HttpRequest, username: str, amount: int):
"""Update score for IP and User"""
remote_ip = get_client_ip(request) or "255.255.255.255"
# We only update the cache here, as its faster than writing to the DB
cache.get_or_set(CACHE_KEY_IP_PREFIX + remote_ip, 0)
cache.incr(CACHE_KEY_IP_PREFIX + remote_ip, amount)
cache.get_or_set(CACHE_KEY_USER_PREFIX + username, 0)
cache.incr(CACHE_KEY_USER_PREFIX + username, amount)
LOGGER.debug("Updated score", amount=amount, for_user=username, for_ip=remote_ip)
@receiver(user_login_failed)
# pylint: disable=unused-argument
def handle_failed_login(sender, request, credentials, **_):
"""Lower Score for failed loging attempts"""
if "username" in credentials:
update_score(request, credentials.get("username"), -1)
@receiver(user_logged_in)
# pylint: disable=unused-argument
def handle_successful_login(sender, request, user, **_):
"""Raise score for successful attempts"""
update_score(request, user.username, 1)