policies/reputation: fix existing reputation update (cherry-pick #10124) (#10125)

policies/reputation: fix existing reputation update (#10124)

* add failing test case



* fix reputation update



* lint



---------

Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
Co-authored-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
This commit is contained in:
gcp-cherry-pick-bot[bot]
2024-06-16 19:52:04 +02:00
committed by GitHub
parent 35cd126406
commit b6157ecaf1
2 changed files with 26 additions and 10 deletions

View File

@ -1,6 +1,8 @@
"""authentik reputation request signals"""
from django.contrib.auth.signals import user_logged_in
from django.db import transaction
from django.db.models import F
from django.dispatch import receiver
from django.http import HttpRequest
from structlog.stdlib import get_logger
@ -19,16 +21,21 @@ def update_score(request: HttpRequest, identifier: str, amount: int):
"""Update score for IP and User"""
remote_ip = ClientIPMiddleware.get_client_ip(request)
Reputation.objects.update_or_create(
ip=remote_ip,
identifier=identifier,
defaults={
"score": amount,
"ip_geo_data": GEOIP_CONTEXT_PROCESSOR.city_dict(remote_ip) or {},
"ip_asn_data": ASN_CONTEXT_PROCESSOR.asn_dict(remote_ip) or {},
"expires": reputation_expiry(),
},
)
with transaction.atomic():
reputation, created = Reputation.objects.select_for_update().get_or_create(
ip=remote_ip,
identifier=identifier,
defaults={
"score": amount,
"ip_geo_data": GEOIP_CONTEXT_PROCESSOR.city_dict(remote_ip) or {},
"ip_asn_data": ASN_CONTEXT_PROCESSOR.asn_dict(remote_ip) or {},
"expires": reputation_expiry(),
},
)
if not created:
reputation.score = F("score") + amount
reputation.save()
LOGGER.debug("Updated score", amount=amount, for_user=identifier, for_ip=remote_ip)

View File

@ -39,6 +39,15 @@ class TestReputationPolicy(TestCase):
)
self.assertEqual(Reputation.objects.get(identifier=self.test_username).score, -1)
def test_update_reputation(self):
"""test reputation update"""
Reputation.objects.create(identifier=self.test_username, ip=self.test_ip, score=43)
# Trigger negative reputation
authenticate(
self.request, self.backends, username=self.test_username, password=self.test_username
)
self.assertEqual(Reputation.objects.get(identifier=self.test_username).score, 42)
def test_policy(self):
"""Test Policy"""
request = PolicyRequest(user=self.user)