all: implement black as code formatter
This commit is contained in:
@ -28,6 +28,7 @@ class PolicyProcessInfo:
|
||||
self.policy = policy
|
||||
self.result = None
|
||||
|
||||
|
||||
class PolicyEngine:
|
||||
"""Orchestrate policy checking, launch tasks and return result"""
|
||||
|
||||
@ -46,12 +47,13 @@ class PolicyEngine:
|
||||
|
||||
def _select_subclasses(self) -> List[Policy]:
|
||||
"""Make sure all Policies are their respective classes"""
|
||||
return Policy.objects \
|
||||
.filter(pk__in=[x.pk for x in self.policies]) \
|
||||
.select_subclasses() \
|
||||
.order_by('order')
|
||||
return (
|
||||
Policy.objects.filter(pk__in=[x.pk for x in self.policies])
|
||||
.select_subclasses()
|
||||
.order_by("order")
|
||||
)
|
||||
|
||||
def build(self) -> 'PolicyEngine':
|
||||
def build(self) -> "PolicyEngine":
|
||||
"""Build task group"""
|
||||
cached_policies = []
|
||||
for policy in self._select_subclasses():
|
||||
@ -65,8 +67,9 @@ class PolicyEngine:
|
||||
task = PolicyProcess(policy, self.request, task_end)
|
||||
LOGGER.debug("Starting Process", policy=policy)
|
||||
task.start()
|
||||
self.__processes.append(PolicyProcessInfo(process=task,
|
||||
connection=our_end, policy=policy))
|
||||
self.__processes.append(
|
||||
PolicyProcessInfo(process=task, connection=our_end, policy=policy)
|
||||
)
|
||||
# If all policies are cached, we have an empty list here.
|
||||
for proc_info in self.__processes:
|
||||
proc_info.process.join(proc_info.policy.timeout)
|
||||
@ -80,7 +83,9 @@ class PolicyEngine:
|
||||
"""Get policy-checking result"""
|
||||
messages: List[str] = []
|
||||
for proc_info in self.__processes:
|
||||
LOGGER.debug("Result", policy=proc_info.policy, passing=proc_info.result.passing)
|
||||
LOGGER.debug(
|
||||
"Result", policy=proc_info.policy, passing=proc_info.result.passing
|
||||
)
|
||||
if proc_info.result.messages:
|
||||
messages += proc_info.result.messages
|
||||
if not proc_info.result.passing:
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
"""policy exceptions"""
|
||||
|
||||
|
||||
class PolicyException(Exception):
|
||||
"""Exception that should be raised during Policy Evaluation, and can be recovered from."""
|
||||
|
||||
@ -2,4 +2,4 @@
|
||||
|
||||
from passbook.lib.admin import admin_autoregister
|
||||
|
||||
admin_autoregister('passbook_policies_expiry')
|
||||
admin_autoregister("passbook_policies_expiry")
|
||||
|
||||
@ -11,7 +11,7 @@ class PasswordExpiryPolicySerializer(ModelSerializer):
|
||||
|
||||
class Meta:
|
||||
model = PasswordExpiryPolicy
|
||||
fields = GENERAL_SERIALIZER_FIELDS + ['days', 'deny_only']
|
||||
fields = GENERAL_SERIALIZER_FIELDS + ["days", "deny_only"]
|
||||
|
||||
|
||||
class PasswordExpiryPolicyViewSet(ModelViewSet):
|
||||
|
||||
@ -6,6 +6,6 @@ from django.apps import AppConfig
|
||||
class PassbookPolicyExpiryConfig(AppConfig):
|
||||
"""Passbook policy_expiry app config"""
|
||||
|
||||
name = 'passbook.policies.expiry'
|
||||
label = 'passbook_policies_expiry'
|
||||
verbose_name = 'passbook Policies.Expiry'
|
||||
name = "passbook.policies.expiry"
|
||||
label = "passbook_policies_expiry"
|
||||
verbose_name = "passbook Policies.Expiry"
|
||||
|
||||
@ -14,13 +14,11 @@ class PasswordExpiryPolicyForm(forms.ModelForm):
|
||||
class Meta:
|
||||
|
||||
model = PasswordExpiryPolicy
|
||||
fields = GENERAL_FIELDS + ['days', 'deny_only']
|
||||
fields = GENERAL_FIELDS + ["days", "deny_only"]
|
||||
widgets = {
|
||||
'name': forms.TextInput(),
|
||||
'order': forms.NumberInput(),
|
||||
'days': forms.NumberInput(),
|
||||
'policies': FilteredSelectMultiple(_('policies'), False)
|
||||
}
|
||||
labels = {
|
||||
'deny_only': _("Only fail the policy, don't set user's password.")
|
||||
"name": forms.TextInput(),
|
||||
"order": forms.NumberInput(),
|
||||
"days": forms.NumberInput(),
|
||||
"policies": FilteredSelectMultiple(_("policies"), False),
|
||||
}
|
||||
labels = {"deny_only": _("Only fail the policy, don't set user's password.")}
|
||||
|
||||
@ -9,21 +9,31 @@ class Migration(migrations.Migration):
|
||||
initial = True
|
||||
|
||||
dependencies = [
|
||||
('passbook_core', '0001_initial'),
|
||||
("passbook_core", "0001_initial"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='PasswordExpiryPolicy',
|
||||
name="PasswordExpiryPolicy",
|
||||
fields=[
|
||||
('policy_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='passbook_core.Policy')),
|
||||
('deny_only', models.BooleanField(default=False)),
|
||||
('days', models.IntegerField()),
|
||||
(
|
||||
"policy_ptr",
|
||||
models.OneToOneField(
|
||||
auto_created=True,
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
parent_link=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
to="passbook_core.Policy",
|
||||
),
|
||||
),
|
||||
("deny_only", models.BooleanField(default=False)),
|
||||
("days", models.IntegerField()),
|
||||
],
|
||||
options={
|
||||
'verbose_name': 'Password Expiry Policy',
|
||||
'verbose_name_plural': 'Password Expiry Policies',
|
||||
"verbose_name": "Password Expiry Policy",
|
||||
"verbose_name_plural": "Password Expiry Policies",
|
||||
},
|
||||
bases=('passbook_core.policy',),
|
||||
bases=("passbook_core.policy",),
|
||||
),
|
||||
]
|
||||
|
||||
@ -19,27 +19,31 @@ class PasswordExpiryPolicy(Policy):
|
||||
deny_only = models.BooleanField(default=False)
|
||||
days = models.IntegerField()
|
||||
|
||||
form = 'passbook.policies.expiry.forms.PasswordExpiryPolicyForm'
|
||||
form = "passbook.policies.expiry.forms.PasswordExpiryPolicyForm"
|
||||
|
||||
def passes(self, request: PolicyRequest) -> PolicyResult:
|
||||
"""If password change date is more than x days in the past, call set_unusable_password
|
||||
and show a notice"""
|
||||
actual_days = (now() - request.user.password_change_date).days
|
||||
days_since_expiry = (now() - (request.user.password_change_date + timedelta(days=self.days)
|
||||
)).days
|
||||
days_since_expiry = (
|
||||
now() - (request.user.password_change_date + timedelta(days=self.days))
|
||||
).days
|
||||
if actual_days >= self.days:
|
||||
if not self.deny_only:
|
||||
request.user.set_unusable_password()
|
||||
request.user.save()
|
||||
message = _(('Password expired %(days)d days ago. '
|
||||
'Please update your password.') % {
|
||||
'days': days_since_expiry
|
||||
})
|
||||
message = _(
|
||||
(
|
||||
"Password expired %(days)d days ago. "
|
||||
"Please update your password."
|
||||
)
|
||||
% {"days": days_since_expiry}
|
||||
)
|
||||
return PolicyResult(False, message)
|
||||
return PolicyResult(False, _('Password has expired.'))
|
||||
return PolicyResult(False, _("Password has expired."))
|
||||
return PolicyResult(True)
|
||||
|
||||
class Meta:
|
||||
|
||||
verbose_name = _('Password Expiry Policy')
|
||||
verbose_name_plural = _('Password Expiry Policies')
|
||||
verbose_name = _("Password Expiry Policy")
|
||||
verbose_name_plural = _("Password Expiry Policies")
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
"""General fields"""
|
||||
|
||||
GENERAL_FIELDS = ['name', 'negate', 'order', 'timeout']
|
||||
GENERAL_SERIALIZER_FIELDS = ['pk', 'name', 'negate', 'order', 'timeout']
|
||||
GENERAL_FIELDS = ["name", "negate", "order", "timeout"]
|
||||
GENERAL_SERIALIZER_FIELDS = ["pk", "name", "negate", "order", "timeout"]
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
"""autodiscover admin"""
|
||||
from passbook.lib.admin import admin_autoregister
|
||||
|
||||
admin_autoregister('passbook_policies_group')
|
||||
admin_autoregister("passbook_policies_group")
|
||||
|
||||
@ -11,7 +11,7 @@ class GroupMembershipPolicySerializer(ModelSerializer):
|
||||
|
||||
class Meta:
|
||||
model = GroupMembershipPolicy
|
||||
fields = GENERAL_SERIALIZER_FIELDS + ['group']
|
||||
fields = GENERAL_SERIALIZER_FIELDS + ["group"]
|
||||
|
||||
|
||||
class GroupMembershipPolicyViewSet(ModelViewSet):
|
||||
|
||||
@ -6,6 +6,6 @@ from django.apps import AppConfig
|
||||
class PassbookPoliciesGroupConfig(AppConfig):
|
||||
"""passbook Group policy app config"""
|
||||
|
||||
name = 'passbook.policies.group'
|
||||
label = 'passbook_policies_group'
|
||||
verbose_name = 'passbook Policies.Group'
|
||||
name = "passbook.policies.group"
|
||||
label = "passbook_policies_group"
|
||||
verbose_name = "passbook Policies.Group"
|
||||
|
||||
@ -12,8 +12,10 @@ class GroupMembershipPolicyForm(forms.ModelForm):
|
||||
class Meta:
|
||||
|
||||
model = GroupMembershipPolicy
|
||||
fields = GENERAL_FIELDS + ['group', ]
|
||||
fields = GENERAL_FIELDS + [
|
||||
"group",
|
||||
]
|
||||
widgets = {
|
||||
'name': forms.TextInput(),
|
||||
'order': forms.NumberInput(),
|
||||
"name": forms.TextInput(),
|
||||
"order": forms.NumberInput(),
|
||||
}
|
||||
|
||||
@ -9,20 +9,36 @@ class Migration(migrations.Migration):
|
||||
initial = True
|
||||
|
||||
dependencies = [
|
||||
('passbook_core', '0001_initial'),
|
||||
("passbook_core", "0001_initial"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='GroupMembershipPolicy',
|
||||
name="GroupMembershipPolicy",
|
||||
fields=[
|
||||
('policy_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='passbook_core.Policy')),
|
||||
('group', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='passbook_core.Group')),
|
||||
(
|
||||
"policy_ptr",
|
||||
models.OneToOneField(
|
||||
auto_created=True,
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
parent_link=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
to="passbook_core.Policy",
|
||||
),
|
||||
),
|
||||
(
|
||||
"group",
|
||||
models.ForeignKey(
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
to="passbook_core.Group",
|
||||
),
|
||||
),
|
||||
],
|
||||
options={
|
||||
'verbose_name': 'Group Membership Policy',
|
||||
'verbose_name_plural': 'Group Membership Policies',
|
||||
"verbose_name": "Group Membership Policy",
|
||||
"verbose_name_plural": "Group Membership Policies",
|
||||
},
|
||||
bases=('passbook_core.policy',),
|
||||
bases=("passbook_core.policy",),
|
||||
),
|
||||
]
|
||||
|
||||
@ -11,12 +11,12 @@ class GroupMembershipPolicy(Policy):
|
||||
|
||||
group = models.ForeignKey(Group, on_delete=models.CASCADE)
|
||||
|
||||
form = 'passbook.policies.group.forms.GroupMembershipPolicyForm'
|
||||
form = "passbook.policies.group.forms.GroupMembershipPolicyForm"
|
||||
|
||||
def passes(self, request: PolicyRequest) -> PolicyResult:
|
||||
return PolicyResult(self.group.user_set.filter(pk=request.user.pk).exists())
|
||||
|
||||
class Meta:
|
||||
|
||||
verbose_name = _('Group Membership Policy')
|
||||
verbose_name_plural = _('Group Membership Policies')
|
||||
verbose_name = _("Group Membership Policy")
|
||||
verbose_name_plural = _("Group Membership Policies")
|
||||
|
||||
@ -2,4 +2,4 @@
|
||||
|
||||
from passbook.lib.admin import admin_autoregister
|
||||
|
||||
admin_autoregister('passbook_policies_hibp')
|
||||
admin_autoregister("passbook_policies_hibp")
|
||||
|
||||
@ -11,7 +11,7 @@ class HaveIBeenPwendPolicySerializer(ModelSerializer):
|
||||
|
||||
class Meta:
|
||||
model = HaveIBeenPwendPolicy
|
||||
fields = GENERAL_SERIALIZER_FIELDS + ['allowed_count']
|
||||
fields = GENERAL_SERIALIZER_FIELDS + ["allowed_count"]
|
||||
|
||||
|
||||
class HaveIBeenPwendPolicyViewSet(ModelViewSet):
|
||||
|
||||
@ -6,6 +6,6 @@ from django.apps import AppConfig
|
||||
class PassbookPolicyHIBPConfig(AppConfig):
|
||||
"""Passbook hibp app config"""
|
||||
|
||||
name = 'passbook.policies.hibp'
|
||||
label = 'passbook_policies_hibp'
|
||||
verbose_name = 'passbook Policies.HaveIBeenPwned'
|
||||
name = "passbook.policies.hibp"
|
||||
label = "passbook_policies_hibp"
|
||||
verbose_name = "passbook Policies.HaveIBeenPwned"
|
||||
|
||||
@ -14,9 +14,9 @@ class HaveIBeenPwnedPolicyForm(forms.ModelForm):
|
||||
class Meta:
|
||||
|
||||
model = HaveIBeenPwendPolicy
|
||||
fields = GENERAL_FIELDS + ['allowed_count']
|
||||
fields = GENERAL_FIELDS + ["allowed_count"]
|
||||
widgets = {
|
||||
'name': forms.TextInput(),
|
||||
'order': forms.NumberInput(),
|
||||
'policies': FilteredSelectMultiple(_('policies'), False)
|
||||
"name": forms.TextInput(),
|
||||
"order": forms.NumberInput(),
|
||||
"policies": FilteredSelectMultiple(_("policies"), False),
|
||||
}
|
||||
|
||||
@ -9,20 +9,30 @@ class Migration(migrations.Migration):
|
||||
initial = True
|
||||
|
||||
dependencies = [
|
||||
('passbook_core', '0001_initial'),
|
||||
("passbook_core", "0001_initial"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='HaveIBeenPwendPolicy',
|
||||
name="HaveIBeenPwendPolicy",
|
||||
fields=[
|
||||
('policy_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='passbook_core.Policy')),
|
||||
('allowed_count', models.IntegerField(default=0)),
|
||||
(
|
||||
"policy_ptr",
|
||||
models.OneToOneField(
|
||||
auto_created=True,
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
parent_link=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
to="passbook_core.Policy",
|
||||
),
|
||||
),
|
||||
("allowed_count", models.IntegerField(default=0)),
|
||||
],
|
||||
options={
|
||||
'verbose_name': 'Have I Been Pwned Policy',
|
||||
'verbose_name_plural': 'Have I Been Pwned Policies',
|
||||
"verbose_name": "Have I Been Pwned Policy",
|
||||
"verbose_name_plural": "Have I Been Pwned Policies",
|
||||
},
|
||||
bases=('passbook_core.policy',),
|
||||
bases=("passbook_core.policy",),
|
||||
),
|
||||
]
|
||||
|
||||
@ -10,37 +10,40 @@ from passbook.core.models import Policy, PolicyResult, User
|
||||
|
||||
LOGGER = get_logger()
|
||||
|
||||
|
||||
class HaveIBeenPwendPolicy(Policy):
|
||||
"""Check if password is on HaveIBeenPwned's list by upload the first
|
||||
5 characters of the SHA1 Hash."""
|
||||
|
||||
allowed_count = models.IntegerField(default=0)
|
||||
|
||||
form = 'passbook.policies.hibp.forms.HaveIBeenPwnedPolicyForm'
|
||||
form = "passbook.policies.hibp.forms.HaveIBeenPwnedPolicyForm"
|
||||
|
||||
def passes(self, user: User) -> PolicyResult:
|
||||
"""Check if password is in HIBP DB. Hashes given Password with SHA1, uses the first 5
|
||||
characters of Password in request and checks if full hash is in response. Returns 0
|
||||
if Password is not in result otherwise the count of how many times it was used."""
|
||||
# Only check if password is being set
|
||||
if not hasattr(user, '__password__'):
|
||||
if not hasattr(user, "__password__"):
|
||||
return PolicyResult(True)
|
||||
password = getattr(user, '__password__')
|
||||
pw_hash = sha1(password.encode('utf-8')).hexdigest() # nosec
|
||||
url = 'https://api.pwnedpasswords.com/range/%s' % pw_hash[:5]
|
||||
password = getattr(user, "__password__")
|
||||
pw_hash = sha1(password.encode("utf-8")).hexdigest() # nosec
|
||||
url = "https://api.pwnedpasswords.com/range/%s" % pw_hash[:5]
|
||||
result = get(url).text
|
||||
final_count = 0
|
||||
for line in result.split('\r\n'):
|
||||
full_hash, count = line.split(':')
|
||||
for line in result.split("\r\n"):
|
||||
full_hash, count = line.split(":")
|
||||
if pw_hash[5:] == full_hash.lower():
|
||||
final_count = int(count)
|
||||
LOGGER.debug("Got count %d for hash %s", final_count, pw_hash[:5])
|
||||
if final_count > self.allowed_count:
|
||||
message = _("Password exists on %(count)d online lists." % {'count': final_count})
|
||||
message = _(
|
||||
"Password exists on %(count)d online lists." % {"count": final_count}
|
||||
)
|
||||
return PolicyResult(False, message)
|
||||
return PolicyResult(True)
|
||||
|
||||
class Meta:
|
||||
|
||||
verbose_name = _('Have I Been Pwned Policy')
|
||||
verbose_name_plural = _('Have I Been Pwned Policies')
|
||||
verbose_name = _("Have I Been Pwned Policy")
|
||||
verbose_name_plural = _("Have I Been Pwned Policies")
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
"""autodiscover admin"""
|
||||
from passbook.lib.admin import admin_autoregister
|
||||
|
||||
admin_autoregister('passbook_policies_matcher')
|
||||
admin_autoregister("passbook_policies_matcher")
|
||||
|
||||
@ -11,7 +11,11 @@ class FieldMatcherPolicySerializer(ModelSerializer):
|
||||
|
||||
class Meta:
|
||||
model = FieldMatcherPolicy
|
||||
fields = GENERAL_SERIALIZER_FIELDS + ['user_field', 'match_action', 'value', ]
|
||||
fields = GENERAL_SERIALIZER_FIELDS + [
|
||||
"user_field",
|
||||
"match_action",
|
||||
"value",
|
||||
]
|
||||
|
||||
|
||||
class FieldMatcherPolicyViewSet(ModelViewSet):
|
||||
|
||||
@ -6,6 +6,6 @@ from django.apps import AppConfig
|
||||
class PassbookPoliciesMatcherConfig(AppConfig):
|
||||
"""passbook Matcher policy app config"""
|
||||
|
||||
name = 'passbook.policies.matcher'
|
||||
label = 'passbook_policies_matcher'
|
||||
verbose_name = 'passbook Policies.Matcher'
|
||||
name = "passbook.policies.matcher"
|
||||
label = "passbook_policies_matcher"
|
||||
verbose_name = "passbook Policies.Matcher"
|
||||
|
||||
@ -12,8 +12,12 @@ class FieldMatcherPolicyForm(forms.ModelForm):
|
||||
class Meta:
|
||||
|
||||
model = FieldMatcherPolicy
|
||||
fields = GENERAL_FIELDS + ['user_field', 'match_action', 'value', ]
|
||||
fields = GENERAL_FIELDS + [
|
||||
"user_field",
|
||||
"match_action",
|
||||
"value",
|
||||
]
|
||||
widgets = {
|
||||
'name': forms.TextInput(),
|
||||
'value': forms.TextInput(),
|
||||
"name": forms.TextInput(),
|
||||
"value": forms.TextInput(),
|
||||
}
|
||||
|
||||
@ -9,22 +9,56 @@ class Migration(migrations.Migration):
|
||||
initial = True
|
||||
|
||||
dependencies = [
|
||||
('passbook_core', '0001_initial'),
|
||||
("passbook_core", "0001_initial"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='FieldMatcherPolicy',
|
||||
name="FieldMatcherPolicy",
|
||||
fields=[
|
||||
('policy_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='passbook_core.Policy')),
|
||||
('user_field', models.TextField(choices=[('username', 'Username'), ('name', 'Name'), ('email', 'E-Mail'), ('is_staff', 'Is staff'), ('is_active', 'Is active'), ('data_joined', 'Date joined')])),
|
||||
('match_action', models.CharField(choices=[('startswith', 'Starts with'), ('endswith', 'Ends with'), ('contains', 'Contains'), ('regexp', 'Regexp'), ('exact', 'Exact')], max_length=50)),
|
||||
('value', models.TextField()),
|
||||
(
|
||||
"policy_ptr",
|
||||
models.OneToOneField(
|
||||
auto_created=True,
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
parent_link=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
to="passbook_core.Policy",
|
||||
),
|
||||
),
|
||||
(
|
||||
"user_field",
|
||||
models.TextField(
|
||||
choices=[
|
||||
("username", "Username"),
|
||||
("name", "Name"),
|
||||
("email", "E-Mail"),
|
||||
("is_staff", "Is staff"),
|
||||
("is_active", "Is active"),
|
||||
("data_joined", "Date joined"),
|
||||
]
|
||||
),
|
||||
),
|
||||
(
|
||||
"match_action",
|
||||
models.CharField(
|
||||
choices=[
|
||||
("startswith", "Starts with"),
|
||||
("endswith", "Ends with"),
|
||||
("contains", "Contains"),
|
||||
("regexp", "Regexp"),
|
||||
("exact", "Exact"),
|
||||
],
|
||||
max_length=50,
|
||||
),
|
||||
),
|
||||
("value", models.TextField()),
|
||||
],
|
||||
options={
|
||||
'verbose_name': 'Field matcher Policy',
|
||||
'verbose_name_plural': 'Field matcher Policies',
|
||||
"verbose_name": "Field matcher Policy",
|
||||
"verbose_name_plural": "Field matcher Policies",
|
||||
},
|
||||
bases=('passbook_core.policy',),
|
||||
bases=("passbook_core.policy",),
|
||||
),
|
||||
]
|
||||
|
||||
@ -10,41 +10,44 @@ from passbook.policies.struct import PolicyRequest, PolicyResult
|
||||
|
||||
LOGGER = get_logger()
|
||||
|
||||
|
||||
class FieldMatcherPolicy(Policy):
|
||||
"""Policy which checks if a field of the User model matches/doesn't match a
|
||||
certain pattern"""
|
||||
|
||||
MATCH_STARTSWITH = 'startswith'
|
||||
MATCH_ENDSWITH = 'endswith'
|
||||
MATCH_CONTAINS = 'contains'
|
||||
MATCH_REGEXP = 'regexp'
|
||||
MATCH_EXACT = 'exact'
|
||||
MATCH_STARTSWITH = "startswith"
|
||||
MATCH_ENDSWITH = "endswith"
|
||||
MATCH_CONTAINS = "contains"
|
||||
MATCH_REGEXP = "regexp"
|
||||
MATCH_EXACT = "exact"
|
||||
|
||||
MATCHES = (
|
||||
(MATCH_STARTSWITH, _('Starts with')),
|
||||
(MATCH_ENDSWITH, _('Ends with')),
|
||||
(MATCH_CONTAINS, _('Contains')),
|
||||
(MATCH_REGEXP, _('Regexp')),
|
||||
(MATCH_EXACT, _('Exact')),
|
||||
(MATCH_STARTSWITH, _("Starts with")),
|
||||
(MATCH_ENDSWITH, _("Ends with")),
|
||||
(MATCH_CONTAINS, _("Contains")),
|
||||
(MATCH_REGEXP, _("Regexp")),
|
||||
(MATCH_EXACT, _("Exact")),
|
||||
)
|
||||
|
||||
USER_FIELDS = (
|
||||
('username', _('Username'),),
|
||||
('name', _('Name'),),
|
||||
('email', _('E-Mail'),),
|
||||
('is_staff', _('Is staff'),),
|
||||
('is_active', _('Is active'),),
|
||||
('data_joined', _('Date joined'),),
|
||||
("username", _("Username"),),
|
||||
("name", _("Name"),),
|
||||
("email", _("E-Mail"),),
|
||||
("is_staff", _("Is staff"),),
|
||||
("is_active", _("Is active"),),
|
||||
("data_joined", _("Date joined"),),
|
||||
)
|
||||
|
||||
user_field = models.TextField(choices=USER_FIELDS)
|
||||
match_action = models.CharField(max_length=50, choices=MATCHES)
|
||||
value = models.TextField()
|
||||
|
||||
form = 'passbook.policies.matcher.forms.FieldMatcherPolicyForm'
|
||||
form = "passbook.policies.matcher.forms.FieldMatcherPolicyForm"
|
||||
|
||||
def __str__(self):
|
||||
description = f"{self.name}, user.{self.user_field} {self.match_action} '{self.value}'"
|
||||
description = (
|
||||
f"{self.name}, user.{self.user_field} {self.match_action} '{self.value}'"
|
||||
)
|
||||
if self.name:
|
||||
description = f"{self.name}: {description}"
|
||||
return description
|
||||
@ -54,8 +57,12 @@ class FieldMatcherPolicy(Policy):
|
||||
if not hasattr(request.user, self.user_field):
|
||||
raise ValueError("Field does not exist")
|
||||
user_field_value = getattr(request.user, self.user_field, None)
|
||||
LOGGER.debug("Checking field", value=user_field_value,
|
||||
action=self.match_action, should_be=self.value)
|
||||
LOGGER.debug(
|
||||
"Checking field",
|
||||
value=user_field_value,
|
||||
action=self.match_action,
|
||||
should_be=self.value,
|
||||
)
|
||||
passes = False
|
||||
if self.match_action == FieldMatcherPolicy.MATCH_STARTSWITH:
|
||||
passes = user_field_value.startswith(self.value)
|
||||
@ -72,5 +79,5 @@ class FieldMatcherPolicy(Policy):
|
||||
|
||||
class Meta:
|
||||
|
||||
verbose_name = _('Field matcher Policy')
|
||||
verbose_name_plural = _('Field matcher Policies')
|
||||
verbose_name = _("Field matcher Policy")
|
||||
verbose_name_plural = _("Field matcher Policies")
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
"""autodiscover admin"""
|
||||
from passbook.lib.admin import admin_autoregister
|
||||
|
||||
admin_autoregister('passbook_policies_password')
|
||||
admin_autoregister("passbook_policies_password")
|
||||
|
||||
@ -11,9 +11,14 @@ class PasswordPolicySerializer(ModelSerializer):
|
||||
|
||||
class Meta:
|
||||
model = PasswordPolicy
|
||||
fields = GENERAL_SERIALIZER_FIELDS + ['amount_uppercase', 'amount_lowercase',
|
||||
'amount_symbols', 'length_min', 'symbol_charset',
|
||||
'error_message']
|
||||
fields = GENERAL_SERIALIZER_FIELDS + [
|
||||
"amount_uppercase",
|
||||
"amount_lowercase",
|
||||
"amount_symbols",
|
||||
"length_min",
|
||||
"symbol_charset",
|
||||
"error_message",
|
||||
]
|
||||
|
||||
|
||||
class PasswordPolicyViewSet(ModelViewSet):
|
||||
|
||||
@ -6,6 +6,6 @@ from django.apps import AppConfig
|
||||
class PassbookPoliciesPasswordConfig(AppConfig):
|
||||
"""passbook Password policy app config"""
|
||||
|
||||
name = 'passbook.policies.password'
|
||||
label = 'passbook_policies_password'
|
||||
verbose_name = 'passbook Policies.Password'
|
||||
name = "passbook.policies.password"
|
||||
label = "passbook_policies_password"
|
||||
verbose_name = "passbook Policies.Password"
|
||||
|
||||
@ -13,17 +13,22 @@ class PasswordPolicyForm(forms.ModelForm):
|
||||
class Meta:
|
||||
|
||||
model = PasswordPolicy
|
||||
fields = GENERAL_FIELDS + ['amount_uppercase', 'amount_lowercase',
|
||||
'amount_symbols', 'length_min', 'symbol_charset',
|
||||
'error_message']
|
||||
fields = GENERAL_FIELDS + [
|
||||
"amount_uppercase",
|
||||
"amount_lowercase",
|
||||
"amount_symbols",
|
||||
"length_min",
|
||||
"symbol_charset",
|
||||
"error_message",
|
||||
]
|
||||
widgets = {
|
||||
'name': forms.TextInput(),
|
||||
'symbol_charset': forms.TextInput(),
|
||||
'error_message': forms.TextInput(),
|
||||
"name": forms.TextInput(),
|
||||
"symbol_charset": forms.TextInput(),
|
||||
"error_message": forms.TextInput(),
|
||||
}
|
||||
labels = {
|
||||
'amount_uppercase': _('Minimum amount of Uppercase Characters'),
|
||||
'amount_lowercase': _('Minimum amount of Lowercase Characters'),
|
||||
'amount_symbols': _('Minimum amount of Symbols Characters'),
|
||||
'length_min': _('Minimum Length'),
|
||||
"amount_uppercase": _("Minimum amount of Uppercase Characters"),
|
||||
"amount_lowercase": _("Minimum amount of Lowercase Characters"),
|
||||
"amount_symbols": _("Minimum amount of Symbols Characters"),
|
||||
"length_min": _("Minimum Length"),
|
||||
}
|
||||
|
||||
@ -9,25 +9,38 @@ class Migration(migrations.Migration):
|
||||
initial = True
|
||||
|
||||
dependencies = [
|
||||
('passbook_core', '0001_initial'),
|
||||
("passbook_core", "0001_initial"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='PasswordPolicy',
|
||||
name="PasswordPolicy",
|
||||
fields=[
|
||||
('policy_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='passbook_core.Policy')),
|
||||
('amount_uppercase', models.IntegerField(default=0)),
|
||||
('amount_lowercase', models.IntegerField(default=0)),
|
||||
('amount_symbols', models.IntegerField(default=0)),
|
||||
('length_min', models.IntegerField(default=0)),
|
||||
('symbol_charset', models.TextField(default='!\\"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ ')),
|
||||
('error_message', models.TextField()),
|
||||
(
|
||||
"policy_ptr",
|
||||
models.OneToOneField(
|
||||
auto_created=True,
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
parent_link=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
to="passbook_core.Policy",
|
||||
),
|
||||
),
|
||||
("amount_uppercase", models.IntegerField(default=0)),
|
||||
("amount_lowercase", models.IntegerField(default=0)),
|
||||
("amount_symbols", models.IntegerField(default=0)),
|
||||
("length_min", models.IntegerField(default=0)),
|
||||
(
|
||||
"symbol_charset",
|
||||
models.TextField(default="!\\\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~ "),
|
||||
),
|
||||
("error_message", models.TextField()),
|
||||
],
|
||||
options={
|
||||
'verbose_name': 'Password Policy',
|
||||
'verbose_name_plural': 'Password Policies',
|
||||
"verbose_name": "Password Policy",
|
||||
"verbose_name_plural": "Password Policies",
|
||||
},
|
||||
bases=('passbook_core.policy',),
|
||||
bases=("passbook_core.policy",),
|
||||
),
|
||||
]
|
||||
|
||||
@ -21,21 +21,21 @@ class PasswordPolicy(Policy):
|
||||
symbol_charset = models.TextField(default=r"!\"#$%&'()*+,-./:;<=>?@[\]^_`{|}~ ")
|
||||
error_message = models.TextField()
|
||||
|
||||
form = 'passbook.policies.password.forms.PasswordPolicyForm'
|
||||
form = "passbook.policies.password.forms.PasswordPolicyForm"
|
||||
|
||||
def passes(self, request: PolicyRequest) -> PolicyResult:
|
||||
# Only check if password is being set
|
||||
if not hasattr(request.user, '__password__'):
|
||||
if not hasattr(request.user, "__password__"):
|
||||
return PolicyResult(True)
|
||||
password = getattr(request.user, '__password__')
|
||||
password = getattr(request.user, "__password__")
|
||||
|
||||
filter_regex = r''
|
||||
filter_regex = r""
|
||||
if self.amount_lowercase > 0:
|
||||
filter_regex += r'[a-z]{%d,}' % self.amount_lowercase
|
||||
filter_regex += r"[a-z]{%d,}" % self.amount_lowercase
|
||||
if self.amount_uppercase > 0:
|
||||
filter_regex += r'[A-Z]{%d,}' % self.amount_uppercase
|
||||
filter_regex += r"[A-Z]{%d,}" % self.amount_uppercase
|
||||
if self.amount_symbols > 0:
|
||||
filter_regex += r'[%s]{%d,}' % (self.symbol_charset, self.amount_symbols)
|
||||
filter_regex += r"[%s]{%d,}" % (self.symbol_charset, self.amount_symbols)
|
||||
result = bool(re.compile(filter_regex).match(password))
|
||||
if not result:
|
||||
return PolicyResult(result, self.error_message)
|
||||
@ -43,5 +43,5 @@ class PasswordPolicy(Policy):
|
||||
|
||||
class Meta:
|
||||
|
||||
verbose_name = _('Password Policy')
|
||||
verbose_name_plural = _('Password Policies')
|
||||
verbose_name = _("Password Policy")
|
||||
verbose_name_plural = _("Password Policies")
|
||||
|
||||
@ -16,6 +16,7 @@ def cache_key(policy, user):
|
||||
"""Generate Cache key for policy"""
|
||||
return f"policy_{policy.pk}#{user.pk}"
|
||||
|
||||
|
||||
class PolicyProcess(Process):
|
||||
"""Evaluate a single policy within a seprate process"""
|
||||
|
||||
@ -31,8 +32,12 @@ class PolicyProcess(Process):
|
||||
|
||||
def run(self):
|
||||
"""Task wrapper to run policy checking"""
|
||||
LOGGER.debug("Running policy", policy=self.policy,
|
||||
user=self.request.user, process="PolicyProcess")
|
||||
LOGGER.debug(
|
||||
"Running policy",
|
||||
policy=self.policy,
|
||||
user=self.request.user,
|
||||
process="PolicyProcess",
|
||||
)
|
||||
try:
|
||||
policy_result = self.policy.passes(self.request)
|
||||
except PolicyException as exc:
|
||||
@ -41,8 +46,14 @@ class PolicyProcess(Process):
|
||||
# Invert result if policy.negate is set
|
||||
if self.policy.negate:
|
||||
policy_result.passing = not policy_result.passing
|
||||
LOGGER.debug("Got result", policy=self.policy, result=policy_result,
|
||||
process="PolicyProcess", passing=policy_result.passing, user=self.request.user)
|
||||
LOGGER.debug(
|
||||
"Got result",
|
||||
policy=self.policy,
|
||||
result=policy_result,
|
||||
process="PolicyProcess",
|
||||
passing=policy_result.passing,
|
||||
user=self.request.user,
|
||||
)
|
||||
key = cache_key(self.policy, self.request.user)
|
||||
cache.set(key, policy_result)
|
||||
LOGGER.debug("Cached policy evaluation", key=key)
|
||||
|
||||
@ -2,4 +2,4 @@
|
||||
|
||||
from passbook.lib.admin import admin_autoregister
|
||||
|
||||
admin_autoregister('passbook_policies_reputation')
|
||||
admin_autoregister("passbook_policies_reputation")
|
||||
|
||||
@ -11,7 +11,7 @@ class ReputationPolicySerializer(ModelSerializer):
|
||||
|
||||
class Meta:
|
||||
model = ReputationPolicy
|
||||
fields = GENERAL_SERIALIZER_FIELDS + ['check_ip', 'check_username', 'threshold']
|
||||
fields = GENERAL_SERIALIZER_FIELDS + ["check_ip", "check_username", "threshold"]
|
||||
|
||||
|
||||
class ReputationPolicyViewSet(ModelViewSet):
|
||||
|
||||
@ -7,9 +7,9 @@ from django.apps import AppConfig
|
||||
class PassbookPolicyReputationConfig(AppConfig):
|
||||
"""Passbook reputation app config"""
|
||||
|
||||
name = 'passbook.policies.reputation'
|
||||
label = 'passbook_policies_reputation'
|
||||
verbose_name = 'passbook Policies.Reputation'
|
||||
name = "passbook.policies.reputation"
|
||||
label = "passbook_policies_reputation"
|
||||
verbose_name = "passbook Policies.Reputation"
|
||||
|
||||
def ready(self):
|
||||
import_module('passbook.policies.reputation.signals')
|
||||
import_module("passbook.policies.reputation.signals")
|
||||
|
||||
@ -12,11 +12,11 @@ class ReputationPolicyForm(forms.ModelForm):
|
||||
class Meta:
|
||||
|
||||
model = ReputationPolicy
|
||||
fields = GENERAL_FIELDS + ['check_ip', 'check_username', 'threshold']
|
||||
fields = GENERAL_FIELDS + ["check_ip", "check_username", "threshold"]
|
||||
widgets = {
|
||||
'name': forms.TextInput(),
|
||||
'value': forms.TextInput(),
|
||||
"name": forms.TextInput(),
|
||||
"value": forms.TextInput(),
|
||||
}
|
||||
labels = {
|
||||
'check_ip': _('Check IP'),
|
||||
"check_ip": _("Check IP"),
|
||||
}
|
||||
|
||||
@ -10,41 +10,73 @@ class Migration(migrations.Migration):
|
||||
initial = True
|
||||
|
||||
dependencies = [
|
||||
('passbook_core', '0001_initial'),
|
||||
("passbook_core", "0001_initial"),
|
||||
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='IPReputation',
|
||||
name="IPReputation",
|
||||
fields=[
|
||||
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('ip', models.GenericIPAddressField(unique=True)),
|
||||
('score', models.IntegerField(default=0)),
|
||||
('updated', models.DateTimeField(auto_now=True)),
|
||||
(
|
||||
"id",
|
||||
models.AutoField(
|
||||
auto_created=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
verbose_name="ID",
|
||||
),
|
||||
),
|
||||
("ip", models.GenericIPAddressField(unique=True)),
|
||||
("score", models.IntegerField(default=0)),
|
||||
("updated", models.DateTimeField(auto_now=True)),
|
||||
],
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='ReputationPolicy',
|
||||
name="ReputationPolicy",
|
||||
fields=[
|
||||
('policy_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='passbook_core.Policy')),
|
||||
('check_ip', models.BooleanField(default=True)),
|
||||
('check_username', models.BooleanField(default=True)),
|
||||
('threshold', models.IntegerField(default=-5)),
|
||||
(
|
||||
"policy_ptr",
|
||||
models.OneToOneField(
|
||||
auto_created=True,
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
parent_link=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
to="passbook_core.Policy",
|
||||
),
|
||||
),
|
||||
("check_ip", models.BooleanField(default=True)),
|
||||
("check_username", models.BooleanField(default=True)),
|
||||
("threshold", models.IntegerField(default=-5)),
|
||||
],
|
||||
options={
|
||||
'verbose_name': 'Reputation Policy',
|
||||
'verbose_name_plural': 'Reputation Policies',
|
||||
"verbose_name": "Reputation Policy",
|
||||
"verbose_name_plural": "Reputation Policies",
|
||||
},
|
||||
bases=('passbook_core.policy',),
|
||||
bases=("passbook_core.policy",),
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='UserReputation',
|
||||
name="UserReputation",
|
||||
fields=[
|
||||
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('score', models.IntegerField(default=0)),
|
||||
('updated', models.DateTimeField(auto_now=True)),
|
||||
('user', models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)),
|
||||
(
|
||||
"id",
|
||||
models.AutoField(
|
||||
auto_created=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
verbose_name="ID",
|
||||
),
|
||||
),
|
||||
("score", models.IntegerField(default=0)),
|
||||
("updated", models.DateTimeField(auto_now=True)),
|
||||
(
|
||||
"user",
|
||||
models.OneToOneField(
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
to=settings.AUTH_USER_MODEL,
|
||||
),
|
||||
),
|
||||
],
|
||||
),
|
||||
]
|
||||
|
||||
@ -14,24 +14,27 @@ class ReputationPolicy(Policy):
|
||||
check_username = models.BooleanField(default=True)
|
||||
threshold = models.IntegerField(default=-5)
|
||||
|
||||
form = 'passbook.policies.reputation.forms.ReputationPolicyForm'
|
||||
form = "passbook.policies.reputation.forms.ReputationPolicyForm"
|
||||
|
||||
def passes(self, request: PolicyRequest) -> PolicyResult:
|
||||
remote_ip = get_client_ip(request.http_request)
|
||||
passing = True
|
||||
if self.check_ip:
|
||||
ip_scores = IPReputation.objects.filter(ip=remote_ip, score__lte=self.threshold)
|
||||
ip_scores = IPReputation.objects.filter(
|
||||
ip=remote_ip, score__lte=self.threshold
|
||||
)
|
||||
passing = passing and ip_scores.exists()
|
||||
if self.check_username:
|
||||
user_scores = UserReputation.objects.filter(user=request.user,
|
||||
score__lte=self.threshold)
|
||||
user_scores = UserReputation.objects.filter(
|
||||
user=request.user, score__lte=self.threshold
|
||||
)
|
||||
passing = passing and user_scores.exists()
|
||||
return PolicyResult(passing)
|
||||
|
||||
class Meta:
|
||||
|
||||
verbose_name = _('Reputation Policy')
|
||||
verbose_name_plural = _('Reputation Policies')
|
||||
verbose_name = _("Reputation Policy")
|
||||
verbose_name_plural = _("Reputation Policies")
|
||||
|
||||
|
||||
class IPReputation(models.Model):
|
||||
|
||||
@ -12,7 +12,7 @@ LOGGER = get_logger()
|
||||
|
||||
def update_score(request, username, amount):
|
||||
"""Update score for IP and User"""
|
||||
remote_ip = get_client_ip(request) or '255.255.255.255.'
|
||||
remote_ip = get_client_ip(request) or "255.255.255.255."
|
||||
ip_score, _ = IPReputation.objects.update_or_create(ip=remote_ip)
|
||||
ip_score.score += amount
|
||||
ip_score.save()
|
||||
@ -30,7 +30,7 @@ def update_score(request, username, amount):
|
||||
# pylint: disable=unused-argument
|
||||
def handle_failed_login(sender, request, credentials, **_):
|
||||
"""Lower Score for failed loging attempts"""
|
||||
update_score(request, credentials.get('username'), -1)
|
||||
update_score(request, credentials.get("username"), -1)
|
||||
|
||||
|
||||
@receiver(user_logged_in)
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
"""autodiscover admin"""
|
||||
from passbook.lib.admin import admin_autoregister
|
||||
|
||||
admin_autoregister('passbook_policies_sso')
|
||||
admin_autoregister("passbook_policies_sso")
|
||||
|
||||
@ -6,6 +6,6 @@ from django.apps import AppConfig
|
||||
class PassbookPoliciesSSOConfig(AppConfig):
|
||||
"""passbook sso policy app config"""
|
||||
|
||||
name = 'passbook.policies.sso'
|
||||
label = 'passbook_policies_sso'
|
||||
verbose_name = 'passbook Policies.SSO'
|
||||
name = "passbook.policies.sso"
|
||||
label = "passbook_policies_sso"
|
||||
verbose_name = "passbook Policies.SSO"
|
||||
|
||||
@ -14,6 +14,6 @@ class SSOLoginPolicyForm(forms.ModelForm):
|
||||
model = SSOLoginPolicy
|
||||
fields = GENERAL_FIELDS
|
||||
widgets = {
|
||||
'name': forms.TextInput(),
|
||||
'order': forms.NumberInput(),
|
||||
"name": forms.TextInput(),
|
||||
"order": forms.NumberInput(),
|
||||
}
|
||||
|
||||
@ -9,19 +9,29 @@ class Migration(migrations.Migration):
|
||||
initial = True
|
||||
|
||||
dependencies = [
|
||||
('passbook_core', '0001_initial'),
|
||||
("passbook_core", "0001_initial"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='SSOLoginPolicy',
|
||||
name="SSOLoginPolicy",
|
||||
fields=[
|
||||
('policy_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='passbook_core.Policy')),
|
||||
(
|
||||
"policy_ptr",
|
||||
models.OneToOneField(
|
||||
auto_created=True,
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
parent_link=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
to="passbook_core.Policy",
|
||||
),
|
||||
),
|
||||
],
|
||||
options={
|
||||
'verbose_name': 'SSO Login Policy',
|
||||
'verbose_name_plural': 'SSO Login Policies',
|
||||
"verbose_name": "SSO Login Policy",
|
||||
"verbose_name_plural": "SSO Login Policies",
|
||||
},
|
||||
bases=('passbook_core.policy',),
|
||||
bases=("passbook_core.policy",),
|
||||
),
|
||||
]
|
||||
|
||||
@ -8,15 +8,18 @@ from passbook.policies.struct import PolicyRequest, PolicyResult
|
||||
class SSOLoginPolicy(Policy):
|
||||
"""Policy that applies to users that have authenticated themselves through SSO"""
|
||||
|
||||
form = 'passbook.policies.sso.forms.SSOLoginPolicyForm'
|
||||
form = "passbook.policies.sso.forms.SSOLoginPolicyForm"
|
||||
|
||||
def passes(self, request: PolicyRequest) -> PolicyResult:
|
||||
"""Check if user instance passes this policy"""
|
||||
from passbook.factors.view import AuthenticationView
|
||||
is_sso_login = request.user.session.get(AuthenticationView.SESSION_IS_SSO_LOGIN, False)
|
||||
|
||||
is_sso_login = request.user.session.get(
|
||||
AuthenticationView.SESSION_IS_SSO_LOGIN, False
|
||||
)
|
||||
return PolicyResult(is_sso_login)
|
||||
|
||||
class Meta:
|
||||
|
||||
verbose_name = _('SSO Login Policy')
|
||||
verbose_name_plural = _('SSO Login Policies')
|
||||
verbose_name = _("SSO Login Policy")
|
||||
verbose_name_plural = _("SSO Login Policies")
|
||||
|
||||
@ -9,6 +9,7 @@ from django.http import HttpRequest
|
||||
if TYPE_CHECKING:
|
||||
from passbook.core.models import User
|
||||
|
||||
|
||||
class PolicyRequest:
|
||||
"""Data-class to hold policy request data"""
|
||||
|
||||
|
||||
@ -11,23 +11,17 @@ class PolicyTestEngine(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
cache.clear()
|
||||
self.user = User.objects.create_user(
|
||||
username="policyuser")
|
||||
self.user = User.objects.create_user(username="policyuser")
|
||||
self.policy_false = DebugPolicy.objects.create(
|
||||
result=False,
|
||||
wait_min=0,
|
||||
wait_max=1)
|
||||
result=False, wait_min=0, wait_max=1
|
||||
)
|
||||
self.policy_true = DebugPolicy.objects.create(
|
||||
result=True,
|
||||
wait_min=0,
|
||||
wait_max=1)
|
||||
result=True, wait_min=0, wait_max=1
|
||||
)
|
||||
self.policy_negate = DebugPolicy.objects.create(
|
||||
negate=True,
|
||||
result=True,
|
||||
wait_min=0,
|
||||
wait_max=1)
|
||||
self.policy_raises = Policy.objects.create(
|
||||
name='raises')
|
||||
negate=True, result=True, wait_min=0, wait_max=1
|
||||
)
|
||||
self.policy_raises = Policy.objects.create(name="raises")
|
||||
|
||||
def test_engine_empty(self):
|
||||
"""Ensure empty policy list passes"""
|
||||
@ -36,7 +30,9 @@ class PolicyTestEngine(TestCase):
|
||||
|
||||
def test_engine(self):
|
||||
"""Ensure all policies passes (Mix of false and true -> false)"""
|
||||
engine = PolicyEngine(DebugPolicy.objects.filter(negate__exact=False), self.user)
|
||||
engine = PolicyEngine(
|
||||
DebugPolicy.objects.filter(negate__exact=False), self.user
|
||||
)
|
||||
self.assertEqual(engine.build().passing, False)
|
||||
|
||||
def test_engine_negate(self):
|
||||
@ -46,14 +42,16 @@ class PolicyTestEngine(TestCase):
|
||||
|
||||
def test_engine_policy_error(self):
|
||||
"""Test negate flag"""
|
||||
engine = PolicyEngine(Policy.objects.filter(name='raises'), self.user)
|
||||
engine = PolicyEngine(Policy.objects.filter(name="raises"), self.user)
|
||||
self.assertEqual(engine.build().passing, False)
|
||||
|
||||
def test_engine_cache(self):
|
||||
"""Ensure empty policy list passes"""
|
||||
engine = PolicyEngine(DebugPolicy.objects.filter(negate__exact=False), self.user)
|
||||
self.assertEqual(len(cache.keys('policy_*')), 0)
|
||||
engine = PolicyEngine(
|
||||
DebugPolicy.objects.filter(negate__exact=False), self.user
|
||||
)
|
||||
self.assertEqual(len(cache.keys("policy_*")), 0)
|
||||
self.assertEqual(engine.build().passing, False)
|
||||
self.assertEqual(len(cache.keys('policy_*')), 2)
|
||||
self.assertEqual(len(cache.keys("policy_*")), 2)
|
||||
self.assertEqual(engine.build().passing, False)
|
||||
self.assertEqual(len(cache.keys('policy_*')), 2)
|
||||
self.assertEqual(len(cache.keys("policy_*")), 2)
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
"""autodiscover admin"""
|
||||
from passbook.lib.admin import admin_autoregister
|
||||
|
||||
admin_autoregister('passbook_policies_webhook')
|
||||
admin_autoregister("passbook_policies_webhook")
|
||||
|
||||
@ -11,8 +11,14 @@ class WebhookPolicySerializer(ModelSerializer):
|
||||
|
||||
class Meta:
|
||||
model = WebhookPolicy
|
||||
fields = GENERAL_SERIALIZER_FIELDS + ['url', 'method', 'json_body', 'json_headers',
|
||||
'result_jsonpath', 'result_json_value', ]
|
||||
fields = GENERAL_SERIALIZER_FIELDS + [
|
||||
"url",
|
||||
"method",
|
||||
"json_body",
|
||||
"json_headers",
|
||||
"result_jsonpath",
|
||||
"result_json_value",
|
||||
]
|
||||
|
||||
|
||||
class WebhookPolicyViewSet(ModelViewSet):
|
||||
|
||||
@ -6,6 +6,6 @@ from django.apps import AppConfig
|
||||
class PassbookPoliciesWebhookConfig(AppConfig):
|
||||
"""passbook Webhook policy app config"""
|
||||
|
||||
name = 'passbook.policies.webhook'
|
||||
label = 'passbook_policies_webhook'
|
||||
verbose_name = 'passbook Policies.Webhook'
|
||||
name = "passbook.policies.webhook"
|
||||
label = "passbook_policies_webhook"
|
||||
verbose_name = "passbook Policies.Webhook"
|
||||
|
||||
@ -12,12 +12,18 @@ class WebhookPolicyForm(forms.ModelForm):
|
||||
class Meta:
|
||||
|
||||
model = WebhookPolicy
|
||||
fields = GENERAL_FIELDS + ['url', 'method', 'json_body', 'json_headers',
|
||||
'result_jsonpath', 'result_json_value', ]
|
||||
fields = GENERAL_FIELDS + [
|
||||
"url",
|
||||
"method",
|
||||
"json_body",
|
||||
"json_headers",
|
||||
"result_jsonpath",
|
||||
"result_json_value",
|
||||
]
|
||||
widgets = {
|
||||
'name': forms.TextInput(),
|
||||
'json_body': forms.TextInput(),
|
||||
'json_headers': forms.TextInput(),
|
||||
'result_jsonpath': forms.TextInput(),
|
||||
'result_json_value': forms.TextInput(),
|
||||
"name": forms.TextInput(),
|
||||
"json_body": forms.TextInput(),
|
||||
"json_headers": forms.TextInput(),
|
||||
"result_jsonpath": forms.TextInput(),
|
||||
"result_json_value": forms.TextInput(),
|
||||
}
|
||||
|
||||
@ -9,25 +9,47 @@ class Migration(migrations.Migration):
|
||||
initial = True
|
||||
|
||||
dependencies = [
|
||||
('passbook_core', '0001_initial'),
|
||||
("passbook_core", "0001_initial"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='WebhookPolicy',
|
||||
name="WebhookPolicy",
|
||||
fields=[
|
||||
('policy_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='passbook_core.Policy')),
|
||||
('url', models.URLField()),
|
||||
('method', models.CharField(choices=[('GET', 'GET'), ('POST', 'POST'), ('PATCH', 'PATCH'), ('DELETE', 'DELETE'), ('PUT', 'PUT')], max_length=10)),
|
||||
('json_body', models.TextField()),
|
||||
('json_headers', models.TextField()),
|
||||
('result_jsonpath', models.TextField()),
|
||||
('result_json_value', models.TextField()),
|
||||
(
|
||||
"policy_ptr",
|
||||
models.OneToOneField(
|
||||
auto_created=True,
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
parent_link=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
to="passbook_core.Policy",
|
||||
),
|
||||
),
|
||||
("url", models.URLField()),
|
||||
(
|
||||
"method",
|
||||
models.CharField(
|
||||
choices=[
|
||||
("GET", "GET"),
|
||||
("POST", "POST"),
|
||||
("PATCH", "PATCH"),
|
||||
("DELETE", "DELETE"),
|
||||
("PUT", "PUT"),
|
||||
],
|
||||
max_length=10,
|
||||
),
|
||||
),
|
||||
("json_body", models.TextField()),
|
||||
("json_headers", models.TextField()),
|
||||
("result_jsonpath", models.TextField()),
|
||||
("result_json_value", models.TextField()),
|
||||
],
|
||||
options={
|
||||
'verbose_name': 'Webhook Policy',
|
||||
'verbose_name_plural': 'Webhook Policies',
|
||||
"verbose_name": "Webhook Policy",
|
||||
"verbose_name_plural": "Webhook Policies",
|
||||
},
|
||||
bases=('passbook_core.policy',),
|
||||
bases=("passbook_core.policy",),
|
||||
),
|
||||
]
|
||||
|
||||
@ -9,11 +9,11 @@ from passbook.policies.struct import PolicyRequest, PolicyResult
|
||||
class WebhookPolicy(Policy):
|
||||
"""Policy that asks webhook"""
|
||||
|
||||
METHOD_GET = 'GET'
|
||||
METHOD_POST = 'POST'
|
||||
METHOD_PATCH = 'PATCH'
|
||||
METHOD_DELETE = 'DELETE'
|
||||
METHOD_PUT = 'PUT'
|
||||
METHOD_GET = "GET"
|
||||
METHOD_POST = "POST"
|
||||
METHOD_PATCH = "PATCH"
|
||||
METHOD_DELETE = "DELETE"
|
||||
METHOD_PUT = "PUT"
|
||||
|
||||
METHODS = (
|
||||
(METHOD_GET, METHOD_GET),
|
||||
@ -30,7 +30,7 @@ class WebhookPolicy(Policy):
|
||||
result_jsonpath = models.TextField()
|
||||
result_json_value = models.TextField()
|
||||
|
||||
form = 'passbook.policies.webhook.forms.WebhookPolicyForm'
|
||||
form = "passbook.policies.webhook.forms.WebhookPolicyForm"
|
||||
|
||||
def passes(self, request: PolicyRequest) -> PolicyResult:
|
||||
"""Call webhook asynchronously and report back"""
|
||||
@ -38,5 +38,5 @@ class WebhookPolicy(Policy):
|
||||
|
||||
class Meta:
|
||||
|
||||
verbose_name = _('Webhook Policy')
|
||||
verbose_name_plural = _('Webhook Policies')
|
||||
verbose_name = _("Webhook Policy")
|
||||
verbose_name_plural = _("Webhook Policies")
|
||||
|
||||
Reference in New Issue
Block a user