334 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			334 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python3
 | |
| import random
 | |
| import sys
 | |
| from collections.abc import Iterable
 | |
| from multiprocessing import Process
 | |
| from os import environ
 | |
| from uuid import uuid4
 | |
| 
 | |
| import django
 | |
| 
 | |
| environ.setdefault("DJANGO_SETTINGS_MODULE", "authentik.root.settings")
 | |
| environ.setdefault("AUTHENTIK_BOOTSTRAP_PASSWORD", "akadmin")
 | |
| environ.setdefault("AUTHENTIK_BOOTSTRAP_TOKEN", "akadmin")
 | |
| environ.setdefault("AUTHENTIK_BOOTSTRAP_EMAIL", "akadmin@authentik.test")
 | |
| django.setup()
 | |
| 
 | |
| from django.conf import settings
 | |
| 
 | |
| from authentik.core.models import Application, Group, User
 | |
| from authentik.crypto.models import CertificateKeyPair
 | |
| from authentik.events.models import Event, EventAction
 | |
| from authentik.flows.models import Flow
 | |
| from authentik.policies.expression.models import ExpressionPolicy
 | |
| from authentik.policies.models import PolicyBinding
 | |
| from authentik.providers.oauth2.models import OAuth2Provider
 | |
| from authentik.stages.authenticator_static.models import StaticToken
 | |
| from authentik.tenants.models import Domain, Tenant
 | |
| 
 | |
| settings.CELERY["task_always_eager"] = True
 | |
| 
 | |
| host = environ.get("BENCH_HOST", "localhost")
 | |
| 
 | |
| 
 | |
| class TestSuite:
 | |
|     TEST_NAME: str
 | |
|     TEST_CASES: Iterable[Iterable[int | str | bool]]
 | |
| 
 | |
|     @classmethod
 | |
|     def get_testcases(cls):
 | |
|         return [cls(params) for params in cls.TEST_CASES]
 | |
| 
 | |
|     def __init__(self, params: Iterable[int | str | bool]):
 | |
|         self.params = params
 | |
| 
 | |
|     def __str__(self):
 | |
|         return (
 | |
|             "-".join([self.TEST_NAME] + [str(param) for param in self.params])
 | |
|             .replace("_", "-")
 | |
|             .lower()
 | |
|         )
 | |
| 
 | |
|     @property
 | |
|     def schema_name(self):
 | |
|         return f"t_{str(self).replace('-', '_')}"
 | |
| 
 | |
|     @property
 | |
|     def domain_name(self):
 | |
|         return f"{str(self)}.{host}"
 | |
| 
 | |
|     def create(self):
 | |
|         created = False
 | |
|         t = Tenant.objects.filter(schema_name=self.schema_name).first()
 | |
|         if not t:
 | |
|             created = True
 | |
|             t = Tenant.objects.create(schema_name=self.schema_name, name=uuid4())
 | |
|         Domain.objects.get_or_create(tenant=t, domain=self.domain_name)
 | |
|         if created:
 | |
|             with t:
 | |
|                 self.create_data(*self.params)
 | |
| 
 | |
|     def create_data(self):
 | |
|         raise NotImplementedError
 | |
| 
 | |
|     def delete(self):
 | |
|         Tenant.objects.filter(schema_name=self.schema_name).delete()
 | |
| 
 | |
| 
 | |
| class UserList(TestSuite):
 | |
|     TEST_NAME = "user-list"
 | |
|     TEST_CASES = [
 | |
|         (1000, 0, 0),
 | |
|         (10000, 0, 0),
 | |
|         (1000, 3, 0),
 | |
|         (10000, 3, 0),
 | |
|         (1000, 20, 0),
 | |
|         (10000, 20, 0),
 | |
|         (1000, 20, 3),
 | |
|         (10000, 20, 3),
 | |
|     ]
 | |
| 
 | |
|     def create_data(self, user_count: int, groups_per_user: int, parents_per_group: int):
 | |
|         Group.objects.bulk_create([Group(name=uuid4()) for _ in range(groups_per_user * 5)])
 | |
|         for group in Group.objects.exclude(name="authentik Admins"):
 | |
|             for _ in range(parents_per_group):
 | |
|                 new_group = Group.objects.create(name=uuid4())
 | |
|                 group.parent = new_group
 | |
|                 group.save()
 | |
|                 group = new_group
 | |
|         User.objects.bulk_create(
 | |
|             [
 | |
|                 User(
 | |
|                     username=uuid4(),
 | |
|                     name=uuid4(),
 | |
|                 )
 | |
|                 for _ in range(user_count)
 | |
|             ]
 | |
|         )
 | |
|         if groups_per_user:
 | |
|             for user in User.objects.exclude_anonymous().exclude(username="akadmin"):
 | |
|                 user.ak_groups.set(
 | |
|                     Group.objects.exclude(name="authentik Admins").order_by("?")[:groups_per_user]
 | |
|                 )
 | |
| 
 | |
| 
 | |
| class GroupList(TestSuite):
 | |
|     TEST_NAME = "group-list"
 | |
|     TEST_CASES = [
 | |
|         (1000, 0, False),
 | |
|         (10000, 0, False),
 | |
|         (1000, 1000, False),
 | |
|         (1000, 10000, False),
 | |
|         (1000, 0, True),
 | |
|         (10000, 0, True),
 | |
|     ]
 | |
| 
 | |
|     def create_data(self, group_count, users_per_group, with_parent):
 | |
|         User.objects.bulk_create(
 | |
|             [
 | |
|                 User(
 | |
|                     username=uuid4(),
 | |
|                     name=uuid4(),
 | |
|                 )
 | |
|                 for _ in range(users_per_group * 5)
 | |
|             ]
 | |
|         )
 | |
|         if with_parent:
 | |
|             parents = Group.objects.bulk_create([Group(name=uuid4()) for _ in range(group_count)])
 | |
|         groups = Group.objects.bulk_create(
 | |
|             [
 | |
|                 Group(name=uuid4(), parent=(parents[i] if with_parent else None))
 | |
|                 for i in range(group_count)
 | |
|             ]
 | |
|         )
 | |
|         if users_per_group:
 | |
|             for group in groups:
 | |
|                 group.users.set(
 | |
|                     User.objects.exclude_anonymous()
 | |
|                     .exclude(username="akadmin")
 | |
|                     .order_by("?")[:users_per_group]
 | |
|                 )
 | |
| 
 | |
| 
 | |
| class Login(TestSuite):
 | |
|     TEST_NAME = "login"
 | |
|     TEST_CASES = [
 | |
|         ("no-mfa",),
 | |
|         ("with-mfa",),
 | |
|     ]
 | |
| 
 | |
|     def create_data(self, mfa: str):
 | |
|         user = User(username="test", name=uuid4())
 | |
|         user.set_password("verySecurePassword")
 | |
|         user.save()
 | |
| 
 | |
|         if mfa == "with-mfa":
 | |
|             device = user.staticdevice_set.create()
 | |
|             # Multiple token with same token for all the iterations in the test
 | |
|             device.token_set.bulk_create(
 | |
|                 [StaticToken(device=device, token=f"staticToken") for _ in range(1_000_000)]
 | |
|             )
 | |
| 
 | |
| 
 | |
| class ProviderOauth2(TestSuite):
 | |
|     TEST_NAME = "provider-oauth2"
 | |
|     TEST_CASES = [
 | |
|         (2, 50, 2),
 | |
|         (0, 0, 0),
 | |
|         (10, 0, 0),
 | |
|         (100, 0, 0),
 | |
|         (0, 10, 0),
 | |
|         (0, 100, 0),
 | |
|         (0, 0, 10),
 | |
|         (0, 0, 100),
 | |
|         (10, 10, 10),
 | |
|         (100, 100, 100),
 | |
|     ]
 | |
| 
 | |
|     def create_data(
 | |
|         self, user_policies_count: int, group_policies_count: int, expression_policies_count: int
 | |
|     ):
 | |
|         user = User(username="test", name=uuid4())
 | |
|         user.set_password("verySecurePassword")
 | |
|         user.save()
 | |
| 
 | |
|         provider = OAuth2Provider.objects.create(
 | |
|             name="test",
 | |
|             authorization_flow=Flow.objects.get(
 | |
|                 slug="default-provider-authorization-implicit-consent"
 | |
|             ),
 | |
|             signing_key=CertificateKeyPair.objects.get(name="authentik Self-signed Certificate"),
 | |
|             redirect_uris="http://test.localhost",
 | |
|             client_id="123456",
 | |
|             client_secret="123456",
 | |
|         )
 | |
|         application = Application.objects.create(slug="test", name="test", provider=provider)
 | |
| 
 | |
|         User.objects.bulk_create(
 | |
|             [
 | |
|                 User(
 | |
|                     username=uuid4(),
 | |
|                     name=uuid4(),
 | |
|                 )
 | |
|                 for _ in range(user_policies_count)
 | |
|             ]
 | |
|         )
 | |
|         PolicyBinding.objects.bulk_create(
 | |
|             [
 | |
|                 PolicyBinding(
 | |
|                     user=user,
 | |
|                     target=application,
 | |
|                     order=random.randint(1, 1_000_000),
 | |
|                 )
 | |
|                 for user in User.objects.exclude(username="akadmin").exclude_anonymous()
 | |
|             ]
 | |
|         )
 | |
| 
 | |
|         Group.objects.bulk_create([Group(name=uuid4()) for _ in range(group_policies_count)])
 | |
|         PolicyBinding.objects.bulk_create(
 | |
|             [
 | |
|                 PolicyBinding(
 | |
|                     group=group,
 | |
|                     target=application,
 | |
|                     order=random.randint(1, 1_000_000),
 | |
|                 )
 | |
|                 for group in Group.objects.exclude(name="authentik Admins")
 | |
|             ]
 | |
|         )
 | |
|         user.ak_groups.set(Group.objects.exclude(name="authentik Admins").order_by("?")[:1])
 | |
| 
 | |
|         [
 | |
|             ExpressionPolicy(
 | |
|                 name=f"test-{uuid4()}",
 | |
|                 expression="return True",
 | |
|             ).save()
 | |
|             for _ in range(expression_policies_count)
 | |
|         ]
 | |
|         PolicyBinding.objects.bulk_create(
 | |
|             [
 | |
|                 PolicyBinding(
 | |
|                     policy=policy,
 | |
|                     target=application,
 | |
|                     order=random.randint(1, 1_000_000),
 | |
|                 )
 | |
|                 for policy in ExpressionPolicy.objects.filter(name__startswith="test-")
 | |
|             ]
 | |
|         )
 | |
| 
 | |
| 
 | |
| class EventList(TestSuite):
 | |
|     TEST_NAME = "event-list"
 | |
|     TEST_CASES = [
 | |
|         (1_000,),
 | |
|         (10_000,),
 | |
|         (100_000,),
 | |
|         (1_000_000,),
 | |
|     ]
 | |
| 
 | |
|     def create_data(self, event_count: int):
 | |
|         Event.objects.bulk_create(
 | |
|             [
 | |
|                 Event(
 | |
|                     user={
 | |
|                         "pk": str(uuid4()),
 | |
|                         "name": str(uuid4()),
 | |
|                         "username": str(uuid4()),
 | |
|                         "email": f"{uuid4()}@example.org",
 | |
|                     },
 | |
|                     action="custom_benchmark",
 | |
|                     app="tests_benchmarks",
 | |
|                     context={
 | |
|                         str(uuid4()): str(uuid4()),
 | |
|                         str(uuid4()): str(uuid4()),
 | |
|                         str(uuid4()): str(uuid4()),
 | |
|                         str(uuid4()): str(uuid4()),
 | |
|                         str(uuid4()): str(uuid4()),
 | |
|                     },
 | |
|                     client_ip="192.0.2.42",
 | |
|                 )
 | |
|                 for _ in range(event_count)
 | |
|             ]
 | |
|         )
 | |
| 
 | |
| 
 | |
| def main(action: str, selected_suite: str | None = None):
 | |
|     testsuites = TestSuite.__subclasses__()
 | |
|     testcases = []
 | |
|     for testsuite in testsuites:
 | |
|         testcases += testsuite.get_testcases()
 | |
| 
 | |
|     match action:
 | |
|         case "create":
 | |
|             to_create = []
 | |
|             for testcase in testcases:
 | |
|                 if selected_suite and testcase.TEST_NAME != selected_suite:
 | |
|                     continue
 | |
|                 to_create.append(testcase)
 | |
|             processes = [Process(target=testcase.create) for testcase in to_create]
 | |
|             for p in processes:
 | |
|                 p.start()
 | |
|             for p in processes:
 | |
|                 p.join()
 | |
|         case "list":
 | |
|             print(*[testsuite.TEST_NAME for testsuite in testsuites], sep="\n")
 | |
|         case "delete":
 | |
|             for testcase in testcases:
 | |
|                 if selected_suite and testcase.TEST_NAME != selected_suite:
 | |
|                     continue
 | |
|                 testcase.delete()
 | |
|         case _:
 | |
|             print("Unknown action. Should be create, list or delete")
 | |
|             exit(1)
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     if len(sys.argv) < 2:
 | |
|         action = "create"
 | |
|     else:
 | |
|         action = sys.argv[1]
 | |
|     if len(sys.argv) < 3:
 | |
|         testsuite = None
 | |
|     else:
 | |
|         testsuite = sys.argv[2]
 | |
|     main(action, testsuite)
 | 
