core: don't delete expired tokens, rotate their key

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
Jens Langhammer
2021-07-14 21:47:32 +02:00
parent 6f98833150
commit aa701c5725
8 changed files with 134 additions and 22 deletions

View File

@ -381,6 +381,13 @@ class ExpiringModel(models.Model):
expires = models.DateTimeField(default=default_token_duration)
expiring = models.BooleanField(default=True)
def expire_action(self, *args, **kwargs):
"""Handler which is called when this object is expired. By
default the object is deleted. This is less efficient compared
to bulk deleting objects, but classes like Token() need to change
values instead of being deleted."""
return self.delete(*args, **kwargs)
@classmethod
def filter_not_expired(cls, **kwargs) -> QuerySet:
"""Filer for tokens which are not expired yet or are not expiring,
@ -425,6 +432,18 @@ class Token(ManagedModel, ExpiringModel):
user = models.ForeignKey("User", on_delete=models.CASCADE, related_name="+")
description = models.TextField(default="", blank=True)
def expire_action(self, *args, **kwargs):
"""Handler which is called when this object is expired."""
from authentik.events.models import Event, EventAction
self.key = default_token_key()
self.save(*args, **kwargs)
Event.new(
action=EventAction.SECRET_ROTATE,
token=self,
message=f"Token {self.identifier}'s secret was rotated.",
).save()
def __str__(self):
description = f"{self.identifier}"
if self.expiring:

View File

@ -26,14 +26,16 @@ def clean_expired_models(self: MonitoredTask):
messages = []
for cls in ExpiringModel.__subclasses__():
cls: ExpiringModel
amount, _ = (
objects = (
cls.objects.all()
.exclude(expiring=False)
.exclude(expiring=True, expires__gt=now())
.delete()
)
LOGGER.debug("Deleted expired models", model=cls, amount=amount)
messages.append(f"Deleted {amount} expired {cls._meta.verbose_name_plural}")
for obj in objects:
obj.expire_action()
amount = objects.count()
LOGGER.debug("Expired models", model=cls, amount=amount)
messages.append(f"Expired {amount} {cls._meta.verbose_name_plural}")
self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, messages))

View File

@ -1,18 +0,0 @@
"""authentik core task tests"""
from django.test import TestCase
from django.utils.timezone import now
from guardian.shortcuts import get_anonymous_user
from authentik.core.models import Token
from authentik.core.tasks import clean_expired_models
class TestTasks(TestCase):
"""Test Tasks"""
def test_token_cleanup(self):
"""Test Token cleanup task"""
Token.objects.create(expires=now(), user=get_anonymous_user())
self.assertEqual(Token.objects.all().count(), 1)
clean_expired_models.delay().get()
self.assertEqual(Token.objects.all().count(), 0)

View File

@ -1,5 +1,7 @@
"""Test token API"""
from django.urls.base import reverse
from django.utils.timezone import now
from guardian.shortcuts import get_anonymous_user
from rest_framework.test import APITestCase
from authentik.core.models import (
@ -8,6 +10,7 @@ from authentik.core.models import (
TokenIntents,
User,
)
from authentik.core.tasks import clean_expired_models
class TestTokenAPI(APITestCase):
@ -41,3 +44,11 @@ class TestTokenAPI(APITestCase):
self.assertEqual(token.user, self.user)
self.assertEqual(token.intent, TokenIntents.INTENT_API)
self.assertEqual(token.expiring, False)
def test_token_expire(self):
"""Test Token expire task"""
token: Token = Token.objects.create(expires=now(), user=get_anonymous_user())
key = token.key
clean_expired_models.delay().get()
token.refresh_from_db()
self.assertNotEqual(key, token.key)