ci: upgrade pylint to latest version
core: also upgrade kombu as https://github.com/celery/kombu/issues/1101 is fixed now
This commit is contained in:
@ -9,7 +9,7 @@ class OpenIDUserInfoView(ScopedResourceMixin, View):
|
||||
|
||||
required_scopes = ['openid:userinfo']
|
||||
|
||||
def get(self, request, *args, **kwargs):
|
||||
def get(self, request, *_, **__):
|
||||
"""Passbook v1 OpenID API"""
|
||||
payload = {
|
||||
'sub': request.user.uuid.int,
|
||||
|
||||
@ -8,26 +8,35 @@ from passbook.core.signals import (invitation_created, invitation_used,
|
||||
|
||||
|
||||
@receiver(user_logged_in)
|
||||
def on_user_logged_in(sender, request, user, **kwargs):
|
||||
# pylint: disable=unused-argument
|
||||
def on_user_logged_in(sender, request, user, **_):
|
||||
"""Log successful login"""
|
||||
Event.new(EventAction.LOGIN).from_http(request)
|
||||
|
||||
|
||||
@receiver(user_logged_out)
|
||||
def on_user_logged_out(sender, request, user, **kwargs):
|
||||
# pylint: disable=unused-argument
|
||||
def on_user_logged_out(sender, request, user, **_):
|
||||
"""Log successfully logout"""
|
||||
Event.new(EventAction.LOGOUT).from_http(request)
|
||||
|
||||
|
||||
@receiver(user_signed_up)
|
||||
def on_user_signed_up(sender, request, user, **kwargs):
|
||||
# pylint: disable=unused-argument
|
||||
def on_user_signed_up(sender, request, user, **_):
|
||||
"""Log successfully signed up"""
|
||||
Event.new(EventAction.SIGN_UP).from_http(request)
|
||||
|
||||
|
||||
@receiver(invitation_created)
|
||||
def on_invitation_created(sender, request, invitation, **kwargs):
|
||||
# pylint: disable=unused-argument
|
||||
def on_invitation_created(sender, request, invitation, **_):
|
||||
"""Log Invitation creation"""
|
||||
Event.new(EventAction.INVITE_CREATED, invitation_uuid=invitation.uuid.hex).from_http(request)
|
||||
|
||||
|
||||
@receiver(invitation_used)
|
||||
def on_invitation_used(sender, request, invitation, **kwargs):
|
||||
# pylint: disable=unused-argument
|
||||
def on_invitation_used(sender, request, invitation, **_):
|
||||
"""Log Invitation usage"""
|
||||
Event.new(EventAction.INVITE_USED, invitation_uuid=invitation.uuid.hex).from_http(request)
|
||||
|
||||
@ -12,6 +12,7 @@ invitation_created = Signal(providing_args=['request', 'invitation'])
|
||||
invitation_used = Signal(providing_args=['request', 'invitation', 'user'])
|
||||
password_changed = Signal(providing_args=['user', 'password'])
|
||||
|
||||
|
||||
@receiver(post_save)
|
||||
# pylint: disable=unused-argument
|
||||
def invalidate_policy_cache(sender, instance, **_):
|
||||
|
||||
@ -78,6 +78,7 @@ class LoginView(UserPassesTestMixin, FormView):
|
||||
|
||||
def invalid_login(self, request: HttpRequest, disabled_user: User = None) -> HttpResponse:
|
||||
"""Handle login for disabled users/invalid login attempts"""
|
||||
LOGGER.debug("invalid_login", user=disabled_user)
|
||||
messages.error(request, _('Failed to authenticate.'))
|
||||
return self.render_to_response(self.get_context_data())
|
||||
|
||||
|
||||
@ -1,6 +1,5 @@
|
||||
"""passbook OTP Settings"""
|
||||
|
||||
OTP_TOTP_ISSUER = 'passbook'
|
||||
MIDDLEWARE = [
|
||||
'django_otp.middleware.OTPMiddleware',
|
||||
]
|
||||
|
||||
@ -1,22 +1,17 @@
|
||||
"""passbook OTP Utils"""
|
||||
|
||||
from django.conf import settings
|
||||
from django.utils.http import urlencode
|
||||
|
||||
|
||||
def otpauth_url(accountname, secret, issuer=None, digits=6):
|
||||
"""Create otpauth according to
|
||||
https://github.com/google/google-authenticator/wiki/Key-Uri-Format"""
|
||||
|
||||
accountname = accountname
|
||||
issuer = issuer if issuer else getattr(settings, 'OTP_TOTP_ISSUER')
|
||||
|
||||
# Ensure that the secret parameter is the FIRST parameter of the URI, this
|
||||
# allows Microsoft Authenticator to work.
|
||||
query = [
|
||||
('secret', secret),
|
||||
('digits', digits),
|
||||
('issuer', issuer),
|
||||
('issuer', 'passbook'),
|
||||
]
|
||||
|
||||
return 'otpauth://totp/%s:%s?%s' % (issuer, accountname, urlencode(query))
|
||||
|
||||
@ -26,6 +26,7 @@ OTP_SESSION_KEY = 'passbook_factors_otp_key'
|
||||
OTP_SETTING_UP_KEY = 'passbook_factors_otp_setup'
|
||||
LOGGER = get_logger()
|
||||
|
||||
|
||||
class UserSettingsView(LoginRequiredMixin, TemplateView):
|
||||
"""View for user settings to control OTP"""
|
||||
|
||||
@ -37,15 +38,16 @@ class UserSettingsView(LoginRequiredMixin, TemplateView):
|
||||
static = StaticDevice.objects.filter(user=self.request.user, confirmed=True)
|
||||
if static.exists():
|
||||
kwargs['static_tokens'] = StaticToken.objects.filter(device=static.first()) \
|
||||
.order_by('token')
|
||||
.order_by('token')
|
||||
totp_devices = TOTPDevice.objects.filter(user=self.request.user, confirmed=True)
|
||||
kwargs['state'] = totp_devices.exists() and static.exists()
|
||||
return kwargs
|
||||
|
||||
|
||||
class DisableView(LoginRequiredMixin, View):
|
||||
"""Disable TOTP for user"""
|
||||
|
||||
def get(self, request, *args, **kwargs):
|
||||
def get(self, request: HttpRequest) -> HttpResponse:
|
||||
"""Delete all the devices for user"""
|
||||
static = get_object_or_404(StaticDevice, user=request.user, confirmed=True)
|
||||
static_tokens = StaticToken.objects.filter(device=static).order_by('token')
|
||||
@ -59,6 +61,7 @@ class DisableView(LoginRequiredMixin, View):
|
||||
Event.new(EventAction.CUSTOM, message='User disabled OTP.').from_http(request)
|
||||
return redirect(reverse('passbook_factors_otp:otp-user-settings'))
|
||||
|
||||
|
||||
class EnableView(LoginRequiredMixin, FormView):
|
||||
"""View to set up OTP"""
|
||||
|
||||
@ -133,6 +136,7 @@ class EnableView(LoginRequiredMixin, FormView):
|
||||
Event.new(EventAction.CUSTOM, message='User enabled OTP.').from_http(self.request)
|
||||
return redirect('passbook_factors_otp:otp-user-settings')
|
||||
|
||||
|
||||
class QRView(NeverCacheMixin, View):
|
||||
"""View returns an SVG image with the OTP token information"""
|
||||
|
||||
|
||||
@ -15,18 +15,18 @@ from passbook.lib.utils.urls import is_url_absolute
|
||||
from passbook.policies.engine import PolicyEngine
|
||||
|
||||
LOGGER = get_logger()
|
||||
# Argument used to redirect user after login
|
||||
NEXT_ARG_NAME = 'next'
|
||||
|
||||
|
||||
def _redirect_with_qs(view, get_query_set=None):
|
||||
"""Wrapper to redirect whilst keeping GET Parameters"""
|
||||
target = reverse(view)
|
||||
if get_query_set:
|
||||
target += '?' + urlencode({key: value for key, value in get_query_set.items()})
|
||||
target += '?' + urlencode(get_query_set)
|
||||
return redirect(target)
|
||||
|
||||
|
||||
# Argument used to redirect user after login
|
||||
NEXT_ARG_NAME = 'next'
|
||||
|
||||
class AuthenticationView(UserPassesTestMixin, View):
|
||||
"""Wizard-like Multi-factor authenticator"""
|
||||
|
||||
@ -165,5 +165,6 @@ class AuthenticationView(UserPassesTestMixin, View):
|
||||
del self.request.session[key]
|
||||
LOGGER.debug("Cleaned up sessions")
|
||||
|
||||
|
||||
class FactorPermissionDeniedView(PermissionDeniedView):
|
||||
"""User could not be authenticated"""
|
||||
|
||||
@ -81,7 +81,7 @@ class ConfigLoader:
|
||||
except yaml.YAMLError as exc:
|
||||
raise ImproperlyConfigured from exc
|
||||
except PermissionError as exc:
|
||||
LOGGER.warning('Permission denied while reading %s', path)
|
||||
LOGGER.warning('Permission denied while reading file', path=path, error=exc)
|
||||
|
||||
def update_from_dict(self, update: dict):
|
||||
"""Update config from dict"""
|
||||
@ -143,6 +143,7 @@ class ConfigLoader:
|
||||
|
||||
CONFIG = ConfigLoader()
|
||||
|
||||
|
||||
def signal_handler(sender, **_):
|
||||
"""Add all loaded config files to autoreload watcher"""
|
||||
for path in CONFIG.loaded_file:
|
||||
|
||||
@ -27,12 +27,14 @@ def update_score(request, username, amount):
|
||||
|
||||
|
||||
@receiver(user_login_failed)
|
||||
def handle_failed_login(sender, request, credentials, **kwargs):
|
||||
# pylint: disable=unused-argument
|
||||
def handle_failed_login(sender, request, credentials, **_):
|
||||
"""Lower Score for failed loging attempts"""
|
||||
update_score(request, credentials.get('username'), -1)
|
||||
|
||||
|
||||
@receiver(user_logged_in)
|
||||
def handle_successful_login(sender, request, user, **kwargs):
|
||||
# pylint: disable=unused-argument
|
||||
def handle_successful_login(sender, request, user, **_):
|
||||
"""Raise score for successful attempts"""
|
||||
update_score(request, user.username, 1)
|
||||
|
||||
@ -116,9 +116,10 @@ class LoginProcessView(AccessRequiredView):
|
||||
"""Processor-based login continuation.
|
||||
Presents a SAML 2.0 Assertion for POSTing back to the Service Provider."""
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def get(self, request, application):
|
||||
"""Handle get request, i.e. render form"""
|
||||
LOGGER.debug("Request: %s", request)
|
||||
LOGGER.debug("SAMLLoginProcessView", request=request, method='get')
|
||||
# Check if user has access
|
||||
if self.provider.application.skip_authorization:
|
||||
ctx = self.provider.processor.generate_response()
|
||||
@ -137,9 +138,10 @@ class LoginProcessView(AccessRequiredView):
|
||||
except exceptions.CannotHandleAssertion as exc:
|
||||
LOGGER.debug(exc)
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def post(self, request, application):
|
||||
"""Handle post request, return back to ACS"""
|
||||
LOGGER.debug("Request: %s", request)
|
||||
LOGGER.debug("SAMLLoginProcessView", request=request, method='post')
|
||||
# Check if user has access
|
||||
if request.POST.get('ACSUrl', None):
|
||||
# User accepted request
|
||||
@ -163,6 +165,7 @@ class LogoutView(CSRFExemptMixin, AccessRequiredView):
|
||||
returns a standard logged-out page. (SalesForce and others use this method,
|
||||
though it's technically not SAML 2.0)."""
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def get(self, request, application):
|
||||
"""Perform logout"""
|
||||
logout(request)
|
||||
@ -183,6 +186,7 @@ class SLOLogout(CSRFExemptMixin, AccessRequiredView):
|
||||
"""Receives a SAML 2.0 LogoutRequest from a Service Provider,
|
||||
logs out the user and returns a standard logged-out page."""
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def post(self, request, application):
|
||||
"""Perform logout"""
|
||||
request.session['SAMLRequest'] = request.POST['SAMLRequest']
|
||||
@ -224,6 +228,7 @@ class DescriptorDownloadView(AccessRequiredView):
|
||||
class InitiateLoginView(AccessRequiredView):
|
||||
"""IdP-initiated Login"""
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def get(self, request, application):
|
||||
"""Initiates an IdP-initiated link to a simple SP resource/target URL."""
|
||||
self.provider.processor.init_deep_link(request, '')
|
||||
|
||||
@ -20,7 +20,7 @@ class Connector:
|
||||
|
||||
def __init__(self, source: LDAPSource):
|
||||
self._source = source
|
||||
self._server = ldap3.Server(source.server_uri) # Implement URI parsing
|
||||
self._server = ldap3.Server(source.server_uri) # Implement URI parsing
|
||||
|
||||
def bind(self):
|
||||
"""Bind using Source's Credentials"""
|
||||
@ -171,7 +171,7 @@ class Connector:
|
||||
temp_connection.bind()
|
||||
return user
|
||||
except ldap3.core.exceptions.LDAPInvalidCredentialsResult as exception:
|
||||
LOGGER.debug("LDAPInvalidCredentialsResult", user=user)
|
||||
LOGGER.debug("LDAPInvalidCredentialsResult", user=user, error=exception)
|
||||
except ldap3.core.exceptions.LDAPException as exception:
|
||||
LOGGER.warning(exception)
|
||||
return None
|
||||
|
||||
@ -20,7 +20,7 @@ class BaseOAuthClient:
|
||||
|
||||
_session = None
|
||||
|
||||
def __init__(self, source, token=''): # nosec
|
||||
def __init__(self, source, token=''): # nosec
|
||||
self.source = source
|
||||
self.token = token
|
||||
self._session = Session()
|
||||
@ -151,6 +151,7 @@ class OAuthClient(BaseOAuthClient):
|
||||
class OAuth2Client(BaseOAuthClient):
|
||||
"""OAuth2 Client"""
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def check_application_state(self, request, callback):
|
||||
"Check optional state parameter."
|
||||
stored = request.session.get(self.session_key, None)
|
||||
@ -192,6 +193,7 @@ class OAuth2Client(BaseOAuthClient):
|
||||
else:
|
||||
return response.text
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def get_application_state(self, request, callback):
|
||||
"Generate state optional parameter."
|
||||
return get_random_string(32)
|
||||
@ -238,7 +240,7 @@ class OAuth2Client(BaseOAuthClient):
|
||||
return 'oauth-client-{0}-request-state'.format(self.source.name)
|
||||
|
||||
|
||||
def get_client(source, token=''): # nosec
|
||||
def get_client(source, token=''): # nosec
|
||||
"Return the API client for the given source."
|
||||
cls = OAuth2Client
|
||||
if source.request_token_url:
|
||||
|
||||
@ -72,7 +72,7 @@ class OAuthCallback(OAuthClientMixin, View):
|
||||
source_id = None
|
||||
source = None
|
||||
|
||||
def get(self, request, *args, **kwargs):
|
||||
def get(self, request, *_, **kwargs):
|
||||
"""View Get handler"""
|
||||
slug = kwargs.get('source_slug', '')
|
||||
try:
|
||||
@ -221,7 +221,8 @@ class DisconnectView(LoginRequiredMixin, View):
|
||||
}))
|
||||
return self.get(request, source_slug)
|
||||
|
||||
def get(self, request, source):
|
||||
# pylint: disable=unused-argument
|
||||
def get(self, request, source_slug):
|
||||
"""Show delete form"""
|
||||
return render(request, 'generic/delete.html', {
|
||||
'object': self.source,
|
||||
|
||||
Reference in New Issue
Block a user