Files
authentik/authentik/policies/reputation/tests.py
Jens L 55aa1897af root: use single redis db (#4009)
* use single redis db

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

* cleanup prefixes

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

* ensure __str__ always returns string

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

* fix remaining old prefixes

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

* add release notes

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
2022-11-15 14:31:29 +01:00

64 lines
2.5 KiB
Python

"""test reputation signals and policy"""
from django.core.cache import cache
from django.test import RequestFactory, TestCase
from authentik.core.models import User
from authentik.policies.reputation.models import CACHE_KEY_PREFIX, Reputation, ReputationPolicy
from authentik.policies.reputation.tasks import save_reputation
from authentik.policies.types import PolicyRequest
from authentik.stages.password import BACKEND_INBUILT
from authentik.stages.password.stage import authenticate
class TestReputationPolicy(TestCase):
"""test reputation signals and policy"""
def setUp(self):
self.request_factory = RequestFactory()
self.request = self.request_factory.get("/")
self.test_ip = "127.0.0.1"
self.test_username = "test"
keys = cache.keys(CACHE_KEY_PREFIX + "*")
cache.delete_many(keys)
# We need a user for the one-to-one in userreputation
self.user = User.objects.create(username=self.test_username)
self.backends = [BACKEND_INBUILT]
def test_ip_reputation(self):
"""test IP reputation"""
# Trigger negative reputation
authenticate(
self.request, self.backends, username=self.test_username, password=self.test_username
)
# Test value in cache
self.assertEqual(
cache.get(CACHE_KEY_PREFIX + self.test_ip + "/" + self.test_username),
{"ip": "127.0.0.1", "identifier": "test", "score": -1},
)
# Save cache and check db values
save_reputation.delay().get()
self.assertEqual(Reputation.objects.get(ip=self.test_ip).score, -1)
def test_user_reputation(self):
"""test User reputation"""
# Trigger negative reputation
authenticate(
self.request, self.backends, username=self.test_username, password=self.test_username
)
# Test value in cache
self.assertEqual(
cache.get(CACHE_KEY_PREFIX + self.test_ip + "/" + self.test_username),
{"ip": "127.0.0.1", "identifier": "test", "score": -1},
)
# Save cache and check db values
save_reputation.delay().get()
self.assertEqual(Reputation.objects.get(identifier=self.test_username).score, -1)
def test_policy(self):
"""Test Policy"""
request = PolicyRequest(user=self.user)
policy: ReputationPolicy = ReputationPolicy.objects.create(
name="reputation-test", threshold=0
)
self.assertTrue(policy.passes(request).passing)