sources/ldap: implement LDAP password validation and syncing
This commit is contained in:
@ -1,4 +1,6 @@
|
||||
"""Wrapper for ldap3 to easily manage user"""
|
||||
from enum import IntFlag
|
||||
from re import split
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
import ldap3
|
||||
@ -12,6 +14,20 @@ from passbook.sources.ldap.models import LDAPPropertyMapping, LDAPSource
|
||||
|
||||
LOGGER = get_logger()
|
||||
|
||||
NON_ALPHA = r"~!@#$%^&*_-+=`|\(){}[]:;\"'<>,.?/"
|
||||
RE_DISPLAYNAME_SEPARATORS = r",\.–—_\s#\t"
|
||||
|
||||
|
||||
class PwdProperties(IntFlag):
|
||||
"""Possible values for the pwdProperties attribute"""
|
||||
|
||||
DOMAIN_PASSWORD_COMPLEX = 1
|
||||
DOMAIN_PASSWORD_NO_ANON_CHANGE = 2
|
||||
DOMAIN_PASSWORD_NO_CLEAR_CHANGE = 4
|
||||
DOMAIN_LOCKOUT_ADMINS = 8
|
||||
DOMAIN_PASSWORD_STORE_CLEARTEXT = 16
|
||||
DOMAIN_REFUSE_PASSWORD_CHANGE = 32
|
||||
|
||||
|
||||
class Connector:
|
||||
"""Wrapper for ldap3 to easily manage user authentication and creation"""
|
||||
@ -21,11 +37,6 @@ class Connector:
|
||||
def __init__(self, source: LDAPSource):
|
||||
self._source = source
|
||||
|
||||
@staticmethod
|
||||
def encode_pass(password: str) -> bytes:
|
||||
"""Encodes a plain-text password so it can be used by AD"""
|
||||
return '"{}"'.format(password).encode("utf-16-le")
|
||||
|
||||
@property
|
||||
def base_dn_users(self) -> str:
|
||||
"""Shortcut to get full base_dn for user lookups"""
|
||||
@ -206,7 +217,7 @@ class Connector:
|
||||
if self.auth_user_by_bind(user, password):
|
||||
# Password given successfully binds to LDAP, so we save it in our Database
|
||||
LOGGER.debug("Updating user's password in DB", user=user)
|
||||
user.set_password(password)
|
||||
user.set_password(password, signal=False)
|
||||
user.save()
|
||||
return user
|
||||
# Password doesn't match
|
||||
@ -232,3 +243,106 @@ class Connector:
|
||||
except ldap3.core.exceptions.LDAPException as exception:
|
||||
LOGGER.warning(exception)
|
||||
return None
|
||||
|
||||
def get_domain_root_dn(self) -> str:
|
||||
"""Attempt to get root DN via MS specific fields or generic LDAP fields"""
|
||||
info = self._source.connection.server.info
|
||||
if "rootDomainNamingContext" in info.other:
|
||||
return info.other["rootDomainNamingContext"][0]
|
||||
naming_contexts = info.naming_contexts
|
||||
naming_contexts.sort(key=len)
|
||||
return naming_contexts[0]
|
||||
|
||||
def check_ad_password_complexity_enabled(self) -> bool:
|
||||
"""Check if DOMAIN_PASSWORD_COMPLEX is enabled"""
|
||||
root_dn = self.get_domain_root_dn()
|
||||
root_attrs = self._source.connection.extend.standard.paged_search(
|
||||
search_base=root_dn,
|
||||
search_filter="(objectClass=*)",
|
||||
search_scope=ldap3.BASE,
|
||||
attributes=["pwdProperties"],
|
||||
)
|
||||
root_attrs = list(root_attrs)[0]
|
||||
pwd_properties = PwdProperties(root_attrs["attributes"]["pwdProperties"])
|
||||
if PwdProperties.DOMAIN_PASSWORD_COMPLEX in pwd_properties:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def change_password(self, user: User, password: str):
|
||||
"""Change user's password"""
|
||||
user_dn = user.attributes.get("distinguishedName", None)
|
||||
if not user_dn:
|
||||
raise AttributeError("User has no distinguishedName set.")
|
||||
self._source.connection.extend.microsoft.modify_password(user_dn, password)
|
||||
|
||||
def _ad_check_password_existing(self, password: str, user_dn: str) -> bool:
|
||||
"""Check if a password contains sAMAccount or displayName"""
|
||||
users = self._source.connection.extend.standard.paged_search(
|
||||
search_base=user_dn,
|
||||
search_filter="(objectClass=*)",
|
||||
search_scope=ldap3.BASE,
|
||||
attributes=["displayName", "sAMAccountName"],
|
||||
)
|
||||
if len(users) != 1:
|
||||
raise AssertionError()
|
||||
user = users[0]
|
||||
# If sAMAccountName is longer than 3 chars, check if its contained in password
|
||||
if len(user.sAMAccountName.value) >= 3:
|
||||
if password.lower() in user.sAMAccountName.value.lower():
|
||||
return False
|
||||
display_name_tokens = split(RE_DISPLAYNAME_SEPARATORS, user.displayName.value)
|
||||
for token in display_name_tokens:
|
||||
# Ignore tokens under 3 chars
|
||||
if len(token) < 3:
|
||||
continue
|
||||
if token.lower() in password.lower():
|
||||
return False
|
||||
return True
|
||||
|
||||
def ad_password_complexity(
|
||||
self, password: str, user: Optional[User] = None
|
||||
) -> bool:
|
||||
"""Check if password matches Active direcotry password policies
|
||||
|
||||
https://docs.microsoft.com/en-us/windows/security/threat-protection/
|
||||
security-policy-settings/password-must-meet-complexity-requirements
|
||||
"""
|
||||
if user:
|
||||
# Check if password contains sAMAccountName or displayNames
|
||||
if "distinguishedName" in user.attributes:
|
||||
existing_user_check = self._ad_check_password_existing(
|
||||
password, user.attributes.get("distinguishedName")
|
||||
)
|
||||
if not existing_user_check:
|
||||
LOGGER.debug("Password failed name check", user=user)
|
||||
return existing_user_check
|
||||
|
||||
# Step 2, match at least 3 of 5 categories
|
||||
matched_categories = 0
|
||||
required = 3
|
||||
for letter in password:
|
||||
# Only match one category per letter,
|
||||
if letter.islower():
|
||||
matched_categories += 1
|
||||
elif letter.isupper():
|
||||
matched_categories += 1
|
||||
elif not letter.isascii() and letter.isalpha():
|
||||
# Not exactly matching microsoft's policy, but count it as "Other unicode" char
|
||||
# when its alpha and not ascii
|
||||
matched_categories += 1
|
||||
elif letter.isnumeric():
|
||||
matched_categories += 1
|
||||
elif letter in NON_ALPHA:
|
||||
matched_categories += 1
|
||||
if matched_categories < required:
|
||||
LOGGER.debug(
|
||||
"Password didn't match enough categories",
|
||||
has=matched_categories,
|
||||
must=required,
|
||||
)
|
||||
return False
|
||||
LOGGER.debug(
|
||||
"Password matched categories", has=matched_categories, must=required
|
||||
)
|
||||
return True
|
||||
|
||||
Reference in New Issue
Block a user