60 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			60 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """passbook ldap source signals"""
 | |
| from typing import Any, Dict
 | |
| 
 | |
| from django.core.exceptions import ValidationError
 | |
| from django.db.models.signals import post_save
 | |
| from django.dispatch import receiver
 | |
| from django.utils.translation import gettext_lazy as _
 | |
| from ldap3.core.exceptions import LDAPException
 | |
| 
 | |
| from passbook.core.models import User
 | |
| from passbook.core.signals import password_changed
 | |
| from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER
 | |
| from passbook.sources.ldap.models import LDAPSource
 | |
| from passbook.sources.ldap.password import LDAPPasswordChanger
 | |
| from passbook.sources.ldap.tasks import ldap_sync
 | |
| from passbook.stages.prompt.signals import password_validate
 | |
| 
 | |
| 
 | |
| @receiver(post_save, sender=LDAPSource)
 | |
| # pylint: disable=unused-argument
 | |
| def sync_ldap_source_on_save(sender, instance: LDAPSource, **_):
 | |
|     """Ensure that source is synced on save (if enabled)"""
 | |
|     if instance.enabled:
 | |
|         ldap_sync.delay(instance.pk)
 | |
| 
 | |
| 
 | |
| @receiver(password_validate)
 | |
| # pylint: disable=unused-argument
 | |
| def ldap_password_validate(sender, password: str, plan_context: Dict[str, Any], **__):
 | |
|     """if there's an LDAP Source with enabled password sync, check the password"""
 | |
|     sources = LDAPSource.objects.filter(sync_users_password=True)
 | |
|     if not sources.exists():
 | |
|         return
 | |
|     source = sources.first()
 | |
|     changer = LDAPPasswordChanger(source)
 | |
|     if changer.check_ad_password_complexity_enabled():
 | |
|         passing = changer.ad_password_complexity(
 | |
|             password, plan_context.get(PLAN_CONTEXT_PENDING_USER, None)
 | |
|         )
 | |
|         if not passing:
 | |
|             raise ValidationError(
 | |
|                 _("Password does not match Active Direcory Complexity.")
 | |
|             )
 | |
| 
 | |
| 
 | |
| @receiver(password_changed)
 | |
| # pylint: disable=unused-argument
 | |
| def ldap_sync_password(sender, user: User, password: str, **_):
 | |
|     """Connect to ldap and update password. We do this in the background to get
 | |
|     automatic retries on error."""
 | |
|     sources = LDAPSource.objects.filter(sync_users_password=True)
 | |
|     if not sources.exists():
 | |
|         return
 | |
|     source = sources.first()
 | |
|     changer = LDAPPasswordChanger(source)
 | |
|     try:
 | |
|         changer.change_password(user, password)
 | |
|     except LDAPException as exc:
 | |
|         raise ValidationError("Failed to set password") from exc
 | 
