diff --git a/authentik/sources/scim/tests/test_users.py b/authentik/sources/scim/tests/test_users.py index 45730ce5c5..fe025a97f0 100644 --- a/authentik/sources/scim/tests/test_users.py +++ b/authentik/sources/scim/tests/test_users.py @@ -88,6 +88,55 @@ class TestSCIMUsers(APITestCase): ).exists() ) + def test_user_create_duplicate_by_username(self): + """Test user create""" + user = create_test_user() + username = generate_id() + obj1 = { + "userName": username, + "externalId": generate_id(), + "emails": [ + { + "primary": True, + "value": user.email, + } + ], + } + obj2 = obj1.copy() + obj2.update({"externalId": generate_id()}) + response = self.client.post( + reverse( + "authentik_sources_scim:v2-users", + kwargs={ + "source_slug": self.source.slug, + }, + ), + data=dumps(obj1), + content_type=SCIM_CONTENT_TYPE, + HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", + ) + self.assertEqual(response.status_code, 201) + self.assertTrue( + SCIMSourceUser.objects.filter(source=self.source, user__username=username).exists() + ) + self.assertTrue( + Event.objects.filter( + action=EventAction.MODEL_CREATED, user__username=self.source.token.user.username + ).exists() + ) + response = self.client.post( + reverse( + "authentik_sources_scim:v2-users", + kwargs={ + "source_slug": self.source.slug, + }, + ), + data=dumps(obj2), + content_type=SCIM_CONTENT_TYPE, + HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", + ) + self.assertEqual(response.status_code, 409) + def test_user_property_mappings(self): """Test user property_mappings""" self.source.user_property_mappings.set( diff --git a/authentik/sources/scim/views/v2/users.py b/authentik/sources/scim/views/v2/users.py index d2c6cf73b8..93761a3ac5 100644 --- a/authentik/sources/scim/views/v2/users.py +++ b/authentik/sources/scim/views/v2/users.py @@ -2,6 +2,7 @@ from uuid import uuid4 +from django.db.models import Q from django.db.transaction import atomic from django.http import Http404, QueryDict from django.urls import reverse @@ -113,8 +114,11 @@ class UsersView(SCIMObjectView): def post(self, request: Request, **kwargs) -> Response: """Create user handler""" connection = SCIMSourceUser.objects.filter( + Q( + Q(user__uuid=request.data.get("id")) + | Q(user__username=request.data.get("userName")) + ), source=self.source, - user__uuid=request.data.get("id"), ).first() if connection: self.logger.debug("Found existing user")