*(minor): small refactor

This commit is contained in:
Langhammer, Jens
2019-10-07 16:33:48 +02:00
parent d21ec6c9a5
commit f2acc154cd
300 changed files with 1420 additions and 1788 deletions

View File

View File

@ -0,0 +1,5 @@
"""Passbook reputation Admin"""
from passbook.lib.admin import admin_autoregister
admin_autoregister('passbook_policies_reputation')

View File

@ -0,0 +1,15 @@
"""Passbook reputation_policy app config"""
from importlib import import_module
from django.apps import AppConfig
class PassbookPolicyReputationConfig(AppConfig):
"""Passbook reputation app config"""
name = 'passbook.policies.reputation'
label = 'passbook_policies_reputation'
verbose_name = 'passbook Policies.Reputation'
def ready(self):
import_module('passbook.policies.reputation.signals')

View File

@ -0,0 +1,18 @@
"""passbook reputation request forms"""
from django import forms
from passbook.core.forms.policies import GENERAL_FIELDS
from passbook.policies.reputation.models import ReputationPolicy
class ReputationPolicyForm(forms.ModelForm):
"""Form to edit ReputationPolicy"""
class Meta:
model = ReputationPolicy
fields = GENERAL_FIELDS + ['check_ip', 'check_username', 'threshold']
widgets = {
'name': forms.TextInput(),
'value': forms.TextInput(),
}

View File

@ -0,0 +1,50 @@
# Generated by Django 2.2.6 on 2019-10-07 14:07
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
('passbook_core', '0001_initial'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name='IPReputation',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('ip', models.GenericIPAddressField(unique=True)),
('score', models.IntegerField(default=0)),
('updated', models.DateTimeField(auto_now=True)),
],
),
migrations.CreateModel(
name='ReputationPolicy',
fields=[
('policy_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='passbook_core.Policy')),
('check_ip', models.BooleanField(default=True)),
('check_username', models.BooleanField(default=True)),
('threshold', models.IntegerField(default=-5)),
],
options={
'verbose_name': 'Reputation Policy',
'verbose_name_plural': 'Reputation Policies',
},
bases=('passbook_core.policy',),
),
migrations.CreateModel(
name='UserReputation',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('score', models.IntegerField(default=0)),
('updated', models.DateTimeField(auto_now=True)),
('user', models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)),
],
),
]

View File

@ -0,0 +1,56 @@
"""passbook reputation request policy"""
from django.db import models
from django.utils.translation import gettext as _
from ipware import get_client_ip
from passbook.core.models import Policy, User
from passbook.policies.struct import PolicyRequest, PolicyResult
class ReputationPolicy(Policy):
"""Return true if request IP/target username's score is below a certain threshold"""
check_ip = models.BooleanField(default=True)
check_username = models.BooleanField(default=True)
threshold = models.IntegerField(default=-5)
form = 'passbook.policies.reputation.forms.ReputationPolicyForm'
def passes(self, request: PolicyRequest) -> PolicyResult:
remote_ip, _ = get_client_ip(request.http_request)
passing = True
if self.check_ip:
ip_scores = IPReputation.objects.filter(ip=remote_ip, score__lte=self.threshold)
passing = passing and ip_scores.exists()
if self.check_username:
user_scores = UserReputation.objects.filter(user=request.user,
score__lte=self.threshold)
passing = passing and user_scores.exists()
return PolicyResult(passing)
class Meta:
verbose_name = _('Reputation Policy')
verbose_name_plural = _('Reputation Policies')
class IPReputation(models.Model):
"""Store score coming from the same IP"""
ip = models.GenericIPAddressField(unique=True)
score = models.IntegerField(default=0)
updated = models.DateTimeField(auto_now=True)
def __str__(self):
return f"IPReputation for {self.ip} @ {self.score}"
class UserReputation(models.Model):
"""Store score attempting to log in as the same username"""
user = models.OneToOneField(User, on_delete=models.CASCADE)
score = models.IntegerField(default=0)
updated = models.DateTimeField(auto_now=True)
def __str__(self):
return f"UserReputation for {self.user} @ {self.score}"

View File

@ -0,0 +1,48 @@
"""passbook reputation request signals"""
from django.contrib.auth.signals import user_logged_in, user_login_failed
from django.dispatch import receiver
from ipware import get_client_ip
from structlog import get_logger
from passbook.core.models import User
from passbook.policies.reputation.models import IPReputation, UserReputation
LOGGER = get_logger()
def get_remote_ip(request):
"""Small wrapper of get_client_ip to catch errors"""
try:
remote_ip, _ = get_client_ip(request)
if remote_ip:
return remote_ip
if 'ip' in request:
return request['ip']
except (AttributeError, ValueError):
pass
return '255.255.255.255'
def update_score(request, username, amount):
"""Update score for IP and User"""
remote_ip = get_remote_ip(request)
ip_score, _ = IPReputation.objects.update_or_create(ip=remote_ip)
ip_score.score += amount
ip_score.save()
LOGGER.debug("Updated score", amount=amount, for_ip=remote_ip)
user = User.objects.filter(username=username)
if not user.exists():
return
user_score, _ = UserReputation.objects.update_or_create(user=user.first())
user_score.score += amount
user_score.save()
LOGGER.debug("Updated score", amount=amount, for_user=username)
@receiver(user_login_failed)
def handle_failed_login(sender, request, credentials, **kwargs):
"""Lower Score for failed loging attempts"""
update_score(request, credentials.get('username'), -1)
@receiver(user_logged_in)
def handle_successful_login(sender, request, user, **kwargs):
"""Raise score for successful attempts"""
update_score(request, user.username, 1)