policies/hibp: update for flows, add unittests
This commit is contained in:
		| @ -6,8 +6,8 @@ from django.utils.translation import gettext as _ | ||||
| from requests import get | ||||
| from structlog import get_logger | ||||
|  | ||||
| from passbook.core.models import User | ||||
| from passbook.policies.models import Policy, PolicyResult | ||||
| from passbook.policies.types import PolicyRequest | ||||
|  | ||||
| LOGGER = get_logger() | ||||
|  | ||||
| @ -16,20 +16,31 @@ class HaveIBeenPwendPolicy(Policy): | ||||
|     """Check if password is on HaveIBeenPwned's list by uploading the first | ||||
|     5 characters of the SHA1 Hash.""" | ||||
|  | ||||
|     password_field = models.TextField( | ||||
|         default="password", | ||||
|         help_text=_( | ||||
|             "Field key to check, field keys defined in Prompt stages are available." | ||||
|         ), | ||||
|     ) | ||||
|  | ||||
|     allowed_count = models.IntegerField(default=0) | ||||
|  | ||||
|     form = "passbook.policies.hibp.forms.HaveIBeenPwnedPolicyForm" | ||||
|  | ||||
|     def passes(self, user: User) -> PolicyResult: | ||||
|     def passes(self, request: PolicyRequest) -> PolicyResult: | ||||
|         """Check if password is in HIBP DB. Hashes given Password with SHA1, uses the first 5 | ||||
|         characters of Password in request and checks if full hash is in response. Returns 0 | ||||
|         if Password is not in result otherwise the count of how many times it was used.""" | ||||
|         # Only check if password is being set | ||||
|         if not hasattr(user, "__password__"): | ||||
|             return PolicyResult(True) | ||||
|         password = getattr(user, "__password__") | ||||
|         if self.password_field not in request.context: | ||||
|             LOGGER.warning( | ||||
|                 "Password field not set in Policy Request", | ||||
|                 field=self.password_field, | ||||
|                 fields=request.context.keys(), | ||||
|             ) | ||||
|         password = request.context[self.password_field] | ||||
|  | ||||
|         pw_hash = sha1(password.encode("utf-8")).hexdigest()  # nosec | ||||
|         url = "https://api.pwnedpasswords.com/range/%s" % pw_hash[:5] | ||||
|         url = f"https://api.pwnedpasswords.com/range/{pw_hash[:5]}" | ||||
|         result = get(url).text | ||||
|         final_count = 0 | ||||
|         for line in result.split("\r\n"): | ||||
|  | ||||
		Reference in New Issue
	
	Block a user
	 Jens Langhammer
					Jens Langhammer