sources/ldap: implement MonitoredTask
This commit is contained in:
@ -7,7 +7,7 @@ AUTHENTICATION_BACKENDS = [
|
||||
|
||||
CELERY_BEAT_SCHEDULE = {
|
||||
"sources_ldap_sync": {
|
||||
"task": "passbook.sources.ldap.tasks.sync",
|
||||
"task": "passbook.sources.ldap.tasks.ldap_sync_all",
|
||||
"schedule": crontab(minute=0), # Run every hour
|
||||
"options": {"queue": "passbook_scheduled"},
|
||||
}
|
||||
|
@ -12,7 +12,7 @@ from passbook.core.signals import password_changed
|
||||
from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER
|
||||
from passbook.sources.ldap.models import LDAPSource
|
||||
from passbook.sources.ldap.password import LDAPPasswordChanger
|
||||
from passbook.sources.ldap.tasks import sync_single
|
||||
from passbook.sources.ldap.tasks import ldap_sync
|
||||
from passbook.stages.prompt.signals import password_validate
|
||||
|
||||
|
||||
@ -21,7 +21,7 @@ from passbook.stages.prompt.signals import password_validate
|
||||
def sync_ldap_source_on_save(sender, instance: LDAPSource, **_):
|
||||
"""Ensure that source is synced on save (if enabled)"""
|
||||
if instance.enabled:
|
||||
sync_single.delay(instance.pk)
|
||||
ldap_sync.delay(instance.pk)
|
||||
|
||||
|
||||
@receiver(password_validate)
|
||||
|
@ -35,17 +35,18 @@ class LDAPSynchronizer:
|
||||
return f"{self._source.additional_group_dn},{self._source.base_dn}"
|
||||
return self._source.base_dn
|
||||
|
||||
def sync_groups(self):
|
||||
def sync_groups(self) -> int:
|
||||
"""Iterate over all LDAP Groups and create passbook_core.Group instances"""
|
||||
if not self._source.sync_groups:
|
||||
LOGGER.warning("Group syncing is disabled for this Source")
|
||||
return
|
||||
return -1
|
||||
groups = self._source.connection.extend.standard.paged_search(
|
||||
search_base=self.base_dn_groups,
|
||||
search_filter=self._source.group_object_filter,
|
||||
search_scope=ldap3.SUBTREE,
|
||||
attributes=ldap3.ALL_ATTRIBUTES,
|
||||
)
|
||||
group_count = 0
|
||||
for group in groups:
|
||||
attributes = group.get("attributes", {})
|
||||
if self._source.object_uniqueness_field not in attributes:
|
||||
@ -68,18 +69,21 @@ class LDAPSynchronizer:
|
||||
LOGGER.debug(
|
||||
"Synced group", group=attributes.get("name", ""), created=created
|
||||
)
|
||||
group_count += 1
|
||||
return group_count
|
||||
|
||||
def sync_users(self):
|
||||
def sync_users(self) -> int:
|
||||
"""Iterate over all LDAP Users and create passbook_core.User instances"""
|
||||
if not self._source.sync_users:
|
||||
LOGGER.warning("User syncing is disabled for this Source")
|
||||
return
|
||||
return -1
|
||||
users = self._source.connection.extend.standard.paged_search(
|
||||
search_base=self.base_dn_users,
|
||||
search_filter=self._source.user_object_filter,
|
||||
search_scope=ldap3.SUBTREE,
|
||||
attributes=ldap3.ALL_ATTRIBUTES,
|
||||
)
|
||||
user_count = 0
|
||||
for user in users:
|
||||
attributes = user.get("attributes", {})
|
||||
if self._source.object_uniqueness_field not in attributes:
|
||||
@ -109,6 +113,8 @@ class LDAPSynchronizer:
|
||||
LOGGER.debug(
|
||||
"Synced User", user=attributes.get("name", ""), created=created
|
||||
)
|
||||
user_count += 1
|
||||
return user_count
|
||||
|
||||
def sync_membership(self):
|
||||
"""Iterate over all Users and assign Groups using memberOf Field"""
|
||||
|
@ -2,26 +2,37 @@
|
||||
from time import time
|
||||
|
||||
from django.core.cache import cache
|
||||
from ldap3.core.exceptions import LDAPException
|
||||
|
||||
from passbook.lib.tasks import MonitoredTask, TaskResult, TaskResultStatus
|
||||
from passbook.root.celery import CELERY_APP
|
||||
from passbook.sources.ldap.models import LDAPSource
|
||||
from passbook.sources.ldap.sync import LDAPSynchronizer
|
||||
|
||||
|
||||
@CELERY_APP.task()
|
||||
def sync():
|
||||
def ldap_sync_all():
|
||||
"""Sync all sources"""
|
||||
for source in LDAPSource.objects.filter(enabled=True):
|
||||
sync_single.delay(source.pk)
|
||||
ldap_sync.delay(source.pk)
|
||||
|
||||
|
||||
@CELERY_APP.task()
|
||||
def sync_single(source_pk):
|
||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
||||
def ldap_sync(self: MonitoredTask, source_pk: int):
|
||||
"""Sync a single source"""
|
||||
source: LDAPSource = LDAPSource.objects.get(pk=source_pk)
|
||||
syncer = LDAPSynchronizer(source)
|
||||
syncer.sync_users()
|
||||
syncer.sync_groups()
|
||||
syncer.sync_membership()
|
||||
cache_key = source.state_cache_prefix("last_sync")
|
||||
cache.set(cache_key, time(), timeout=60 * 60)
|
||||
try:
|
||||
syncer = LDAPSynchronizer(source)
|
||||
user_count = syncer.sync_users()
|
||||
group_count = syncer.sync_groups()
|
||||
syncer.sync_membership()
|
||||
cache_key = source.state_cache_prefix("last_sync")
|
||||
cache.set(cache_key, time(), timeout=60 * 60)
|
||||
self.set_status(
|
||||
TaskResult(
|
||||
TaskResultStatus.SUCCESSFUL,
|
||||
[f"Synced {user_count} users", f"Synced {group_count} groups"],
|
||||
)
|
||||
)
|
||||
except LDAPException as exc:
|
||||
self.set_status(TaskResult(TaskResultStatus.ERROR, [str(exc)], exc))
|
||||
|
@ -7,7 +7,7 @@ from passbook.core.models import Group, User
|
||||
from passbook.providers.oauth2.generators import generate_client_secret
|
||||
from passbook.sources.ldap.models import LDAPPropertyMapping, LDAPSource
|
||||
from passbook.sources.ldap.sync import LDAPSynchronizer
|
||||
from passbook.sources.ldap.tasks import sync
|
||||
from passbook.sources.ldap.tasks import ldap_sync_all
|
||||
from passbook.sources.ldap.tests.utils import _build_mock_connection
|
||||
|
||||
LDAP_PASSWORD = generate_client_secret()
|
||||
@ -48,4 +48,4 @@ class LDAPSyncTests(TestCase):
|
||||
@patch("passbook.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH)
|
||||
def test_tasks(self):
|
||||
"""Test Scheduled tasks"""
|
||||
sync()
|
||||
ldap_sync_all.delay()
|
||||
|
Reference in New Issue
Block a user