sources/scim: cleanup service account when source is deleted (#9319)

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
Jens L
2024-04-17 22:57:05 +02:00
committed by GitHub
parent 8935ca65a7
commit 49ac0eb662
11 changed files with 109 additions and 104 deletions

View File

@ -7,7 +7,6 @@ from rest_framework.viewsets import ModelViewSet
from authentik.core.api.sources import SourceSerializer
from authentik.core.api.tokens import TokenSerializer
from authentik.core.api.used_by import UsedByMixin
from authentik.core.models import Token, TokenIntents, User, UserTypes
from authentik.sources.scim.models import SCIMSource
@ -27,25 +26,6 @@ class SCIMSourceSerializer(SourceSerializer):
return relative_url
return self.context["request"].build_absolute_uri(relative_url)
def create(self, validated_data):
instance: SCIMSource = super().create(validated_data)
identifier = f"ak-source-scim-{instance.pk}"
user = User.objects.create(
username=identifier,
name=f"SCIM Source {instance.name} Service-Account",
type=UserTypes.SERVICE_ACCOUNT,
)
token = Token.objects.create(
user=user,
identifier=identifier,
intent=TokenIntents.INTENT_API,
expiring=False,
managed=f"goauthentik.io/sources/scim/{instance.pk}",
)
instance.token = token
instance.save()
return instance
class Meta:
model = SCIMSource

View File

@ -1,12 +1,13 @@
"""Authentik SCIM app config"""
from django.apps import AppConfig
from authentik.blueprints.apps import ManagedAppConfig
class AuthentikSourceSCIMConfig(AppConfig):
class AuthentikSourceSCIMConfig(ManagedAppConfig):
"""authentik SCIM Source app config"""
name = "authentik.sources.scim"
label = "authentik_sources_scim"
verbose_name = "authentik Sources.SCIM"
mountpoint = "source/scim/"
default = True

View File

@ -1,5 +1,7 @@
"""SCIM Source"""
from uuid import uuid4
from django.db import models
from django.utils.translation import gettext_lazy as _
from rest_framework.serializers import BaseSerializer
@ -14,6 +16,12 @@ class SCIMSource(Source):
token = models.ForeignKey(Token, on_delete=models.CASCADE, null=True, default=None)
@property
def service_account_identifier(self) -> str:
if not self.pk:
self.pk = uuid4()
return f"ak-source-scim-{self.pk}"
@property
def component(self) -> str:
"""Return component used to edit this object"""

View File

@ -0,0 +1,38 @@
from django.db.models import Model
from django.db.models.signals import pre_delete, pre_save
from django.dispatch import receiver
from authentik.core.models import Token, TokenIntents, User, UserTypes
from authentik.sources.scim.models import SCIMSource
@receiver(pre_save, sender=SCIMSource)
def scim_source_pre_save(sender: type[Model], instance: SCIMSource, **_):
"""Create service account before source is saved"""
# .service_account_identifier will auto-assign a primary key uuid to the source
# if none is set yet, just so we can get the identifier before we save
identifier = instance.service_account_identifier
user = User.objects.create(
username=identifier,
name=f"SCIM Source {instance.name} Service-Account",
type=UserTypes.SERVICE_ACCOUNT,
)
token = Token.objects.create(
user=user,
identifier=identifier,
intent=TokenIntents.INTENT_API,
expiring=False,
managed=f"goauthentik.io/sources/scim/{instance.pk}",
)
instance.token = token
@receiver(pre_delete, sender=SCIMSource)
def scim_source_pre_delete(sender: type[Model], instance: SCIMSource, **_):
"""Delete SCIM Source service account before deleting source"""
Token.objects.filter(
identifier=instance.service_account_identifier, intent=TokenIntents.INTENT_API
).delete()
User.objects.filter(
username=instance.service_account_identifier, type=UserTypes.SERVICE_ACCOUNT
).delete()

View File

@ -14,27 +14,13 @@ class TestSCIMAuth(APITestCase):
def setUp(self) -> None:
self.user = create_test_admin_user()
self.token = Token.objects.create(
user=self.user,
identifier=generate_id(),
intent=TokenIntents.INTENT_API,
)
self.token2 = Token.objects.create(
user=self.user,
identifier=generate_id(),
intent=TokenIntents.INTENT_API,
)
self.token3 = Token.objects.create(
user=self.user,
identifier=generate_id(),
intent=TokenIntents.INTENT_API,
)
self.source = SCIMSource.objects.create(
name=generate_id(), slug=generate_id(), token=self.token
)
self.source2 = SCIMSource.objects.create(
name=generate_id(), slug=generate_id(), token=self.token2
)
self.source = SCIMSource.objects.create(name=generate_id(), slug=generate_id())
self.source2 = SCIMSource.objects.create(name=generate_id(), slug=generate_id())
def test_auth_ok(self):
"""Test successful auth"""
@ -45,7 +31,7 @@ class TestSCIMAuth(APITestCase):
"source_slug": self.source.slug,
},
),
HTTP_AUTHORIZATION=f"Bearer {self.token.key}",
HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}",
)
self.assertEqual(response.status_code, 200)
@ -71,7 +57,7 @@ class TestSCIMAuth(APITestCase):
"source_slug": self.source.slug,
},
),
HTTP_AUTHORIZATION=f"Bearer {self.token2.key}",
HTTP_AUTHORIZATION=f"Bearer {self.source2.token.key}",
)
self.assertEqual(response.status_code, 403)
# Token for no source

View File

@ -3,8 +3,6 @@
from django.urls import reverse
from rest_framework.test import APITestCase
from authentik.core.models import Token, TokenIntents
from authentik.core.tests.utils import create_test_admin_user
from authentik.lib.generators import generate_id
from authentik.sources.scim.models import SCIMSource
@ -13,14 +11,9 @@ class TestSCIMResourceTypes(APITestCase):
"""Test SCIM ResourceTypes view"""
def setUp(self) -> None:
self.user = create_test_admin_user()
self.token = Token.objects.create(
user=self.user,
identifier=generate_id(),
intent=TokenIntents.INTENT_API,
)
self.source = SCIMSource.objects.create(
name=generate_id(), slug=generate_id(), token=self.token
name=generate_id(),
slug=generate_id(),
)
def test_resource_type(self):
@ -32,7 +25,7 @@ class TestSCIMResourceTypes(APITestCase):
"source_slug": self.source.slug,
},
),
HTTP_AUTHORIZATION=f"Bearer {self.token.key}",
HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}",
)
self.assertEqual(response.status_code, 200)
@ -46,7 +39,7 @@ class TestSCIMResourceTypes(APITestCase):
"resource_type": "ServiceProviderConfig",
},
),
HTTP_AUTHORIZATION=f"Bearer {self.token.key}",
HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}",
)
self.assertEqual(response.status_code, 200)
@ -60,6 +53,6 @@ class TestSCIMResourceTypes(APITestCase):
"resource_type": "foo",
},
),
HTTP_AUTHORIZATION=f"Bearer {self.token.key}",
HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}",
)
self.assertEqual(response.status_code, 404)

View File

@ -3,8 +3,6 @@
from django.urls import reverse
from rest_framework.test import APITestCase
from authentik.core.models import Token, TokenIntents
from authentik.core.tests.utils import create_test_admin_user
from authentik.lib.generators import generate_id
from authentik.sources.scim.models import SCIMSource
@ -13,15 +11,7 @@ class TestSCIMSchemas(APITestCase):
"""Test SCIM Schema view"""
def setUp(self) -> None:
self.user = create_test_admin_user()
self.token = Token.objects.create(
user=self.user,
identifier=generate_id(),
intent=TokenIntents.INTENT_API,
)
self.source = SCIMSource.objects.create(
name=generate_id(), slug=generate_id(), token=self.token
)
self.source = SCIMSource.objects.create(name=generate_id(), slug=generate_id())
def test_schema(self):
"""Test full schema view"""
@ -32,7 +22,7 @@ class TestSCIMSchemas(APITestCase):
"source_slug": self.source.slug,
},
),
HTTP_AUTHORIZATION=f"Bearer {self.token.key}",
HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}",
)
self.assertEqual(response.status_code, 200)
@ -46,7 +36,7 @@ class TestSCIMSchemas(APITestCase):
"schema_uri": "urn:ietf:params:scim:schemas:core:2.0:Meta",
},
),
HTTP_AUTHORIZATION=f"Bearer {self.token.key}",
HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}",
)
self.assertEqual(response.status_code, 200)
@ -60,6 +50,6 @@ class TestSCIMSchemas(APITestCase):
"schema_uri": "foo",
},
),
HTTP_AUTHORIZATION=f"Bearer {self.token.key}",
HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}",
)
self.assertEqual(response.status_code, 404)

View File

@ -3,8 +3,6 @@
from django.urls import reverse
from rest_framework.test import APITestCase
from authentik.core.models import Token, TokenIntents
from authentik.core.tests.utils import create_test_admin_user
from authentik.lib.generators import generate_id
from authentik.sources.scim.models import SCIMSource
@ -13,14 +11,9 @@ class TestSCIMServiceProviderConfig(APITestCase):
"""Test SCIM ServiceProviderConfig view"""
def setUp(self) -> None:
self.user = create_test_admin_user()
self.token = Token.objects.create(
user=self.user,
identifier=generate_id(),
intent=TokenIntents.INTENT_API,
)
self.source = SCIMSource.objects.create(
name=generate_id(), slug=generate_id(), token=self.token
name=generate_id(),
slug=generate_id(),
)
def test_config(self):
@ -32,6 +25,6 @@ class TestSCIMServiceProviderConfig(APITestCase):
"source_slug": self.source.slug,
},
),
HTTP_AUTHORIZATION=f"Bearer {self.token.key}",
HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}",
)
self.assertEqual(response.status_code, 200)

View File

@ -0,0 +1,27 @@
"""Test SCIM Source creation"""
from rest_framework.test import APITestCase
from authentik.core.models import Token, User
from authentik.lib.generators import generate_id
from authentik.sources.scim.models import SCIMSource
class TestSCIMSignals(APITestCase):
"""Test SCIM Signals view"""
def setUp(self) -> None:
self.uid = generate_id()
def test_create(self) -> None:
source = SCIMSource.objects.create(name=self.uid, slug=self.uid)
self.assertIsNotNone(source.token)
self.assertIsNotNone(source.token.user)
def test_delete(self):
self.test_create()
source = SCIMSource.objects.filter(slug=self.uid).first()
identifier = source.service_account_identifier
source.delete()
self.assertFalse(User.objects.filter(username=identifier).exists())
self.assertFalse(Token.objects.filter(identifier=identifier).exists())

View File

@ -6,8 +6,8 @@ from uuid import uuid4
from django.urls import reverse
from rest_framework.test import APITestCase
from authentik.core.models import Token, TokenIntents
from authentik.core.tests.utils import create_test_admin_user
from authentik.core.tests.utils import create_test_user
from authentik.events.models import Event, EventAction
from authentik.lib.generators import generate_id
from authentik.providers.scim.clients.schema import User as SCIMUserSchema
from authentik.sources.scim.models import SCIMSource, SCIMSourceUser
@ -18,15 +18,7 @@ class TestSCIMUsers(APITestCase):
"""Test SCIM User view"""
def setUp(self) -> None:
self.user = create_test_admin_user()
self.token = Token.objects.create(
user=self.user,
identifier=generate_id(),
intent=TokenIntents.INTENT_API,
)
self.source = SCIMSource.objects.create(
name=generate_id(), slug=generate_id(), token=self.token
)
self.source = SCIMSource.objects.create(name=generate_id(), slug=generate_id())
def test_user_list(self):
"""Test full user list"""
@ -37,15 +29,16 @@ class TestSCIMUsers(APITestCase):
"source_slug": self.source.slug,
},
),
HTTP_AUTHORIZATION=f"Bearer {self.token.key}",
HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}",
)
self.assertEqual(response.status_code, 200)
def test_user_list_single(self):
"""Test full user list (single user)"""
user = create_test_user()
SCIMSourceUser.objects.create(
source=self.source,
user=self.user,
user=user,
id=str(uuid4()),
)
response = self.client.get(
@ -53,16 +46,17 @@ class TestSCIMUsers(APITestCase):
"authentik_sources_scim:v2-users",
kwargs={
"source_slug": self.source.slug,
"user_id": str(self.user.uuid),
"user_id": str(user.uuid),
},
),
HTTP_AUTHORIZATION=f"Bearer {self.token.key}",
HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}",
)
self.assertEqual(response.status_code, 200)
SCIMUserSchema.model_validate_json(response.content, strict=True)
def test_user_create(self):
"""Test user create"""
user = create_test_user()
ext_id = generate_id()
response = self.client.post(
reverse(
@ -78,13 +72,18 @@ class TestSCIMUsers(APITestCase):
"emails": [
{
"primary": True,
"value": self.user.email,
"value": user.email,
}
],
}
),
content_type=SCIM_CONTENT_TYPE,
HTTP_AUTHORIZATION=f"Bearer {self.token.key}",
HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}",
)
self.assertEqual(response.status_code, 201)
self.assertTrue(SCIMSourceUser.objects.filter(source=self.source, id=ext_id).exists())
self.assertTrue(
Event.objects.filter(
action=EventAction.MODEL_CREATED, user__username=self.source.token.user.username
).exists()
)

View File

@ -6,7 +6,6 @@ from typing import Any
from docker.types import Healthcheck
from authentik.core.models import Token, TokenIntents, User
from authentik.lib.generators import generate_id
from authentik.lib.utils.http import get_http_session
from authentik.sources.scim.models import SCIMSource
@ -40,18 +39,9 @@ class TestSourceSCIM(SeleniumTestCase):
@retry()
def test_scim_conformance(self):
user = User.objects.create(
username=generate_id(),
)
token = Token.objects.create(
user=user,
intent=TokenIntents.INTENT_API,
expiring=False,
)
source = SCIMSource.objects.create(
name=generate_id(),
slug=generate_id(),
token=token,
)
session = get_http_session()
test_launch = session.post(
@ -59,7 +49,7 @@ class TestSourceSCIM(SeleniumTestCase):
data={
"endPoint": self.live_server_url + f"/source/scim/{source.slug}/v2",
"username": "foo",
"password": token.key,
"password": source.token.key,
"jwtToken": None,
"usersCheck": 1,
"groupsCheck": 1,