core: fix applications API not being same format as other APIs
This commit is contained in:
		| @ -20,46 +20,45 @@ from rest_framework.viewsets import ViewSet | ||||
| from passbook.audit.models import Event, EventAction | ||||
|  | ||||
|  | ||||
| def get_events_per_1h(**filter_kwargs) -> List[Dict[str, int]]: | ||||
|     """Get event count by hour in the last day, fill with zeros""" | ||||
|     date_from = now() - timedelta(days=1) | ||||
|     result = ( | ||||
|         Event.objects.filter(created__gte=date_from, **filter_kwargs) | ||||
|         .annotate( | ||||
|             age=ExpressionWrapper(now() - F("created"), output_field=DurationField()) | ||||
|         ) | ||||
|         .annotate(age_hours=ExtractHour("age")) | ||||
|         .values("age_hours") | ||||
|         .annotate(count=Count("pk")) | ||||
|         .order_by("age_hours") | ||||
|     ) | ||||
|     data = Counter({d["age_hours"]: d["count"] for d in result}) | ||||
|     results = [] | ||||
|     _now = now() | ||||
|     for hour in range(0, -24, -1): | ||||
|         results.append( | ||||
|             { | ||||
|                 "x": time.mktime((_now + timedelta(hours=hour)).timetuple()) * 1000, | ||||
|                 "y": data[hour * -1], | ||||
|             } | ||||
|         ) | ||||
|     return results | ||||
|  | ||||
|  | ||||
| class AdministrationMetricsSerializer(Serializer): | ||||
|     """Overview View""" | ||||
|  | ||||
|     logins_per_1h = SerializerMethodField() | ||||
|     logins_failed_per_1h = SerializerMethodField() | ||||
|  | ||||
|     def get_events_per_1h(self, action: str) -> List[Dict[str, int]]: | ||||
|         """Get event count by hour in the last day, fill with zeros""" | ||||
|         date_from = now() - timedelta(days=1) | ||||
|         result = ( | ||||
|             Event.objects.filter(action=action, created__gte=date_from) | ||||
|             .annotate( | ||||
|                 age=ExpressionWrapper( | ||||
|                     now() - F("created"), output_field=DurationField() | ||||
|                 ) | ||||
|             ) | ||||
|             .annotate(age_hours=ExtractHour("age")) | ||||
|             .values("age_hours") | ||||
|             .annotate(count=Count("pk")) | ||||
|             .order_by("age_hours") | ||||
|         ) | ||||
|         data = Counter({d["age_hours"]: d["count"] for d in result}) | ||||
|         results = [] | ||||
|         _now = now() | ||||
|         for hour in range(0, -24, -1): | ||||
|             results.append( | ||||
|                 { | ||||
|                     "x": time.mktime((_now + timedelta(hours=hour)).timetuple()) * 1000, | ||||
|                     "y": data[hour * -1], | ||||
|                 } | ||||
|             ) | ||||
|         return results | ||||
|  | ||||
|     def get_logins_per_1h(self, _): | ||||
|         """Get successful logins per hour for the last 24 hours""" | ||||
|         return self.get_events_per_1h(EventAction.LOGIN) | ||||
|         return get_events_per_1h(action=EventAction.LOGIN) | ||||
|  | ||||
|     def get_logins_failed_per_1h(self, _): | ||||
|         """Get failed logins per hour for the last 24 hours""" | ||||
|         return self.get_events_per_1h(EventAction.LOGIN_FAILED) | ||||
|         return get_events_per_1h(action=EventAction.LOGIN_FAILED) | ||||
|  | ||||
|     def create(self, request: Request) -> response: | ||||
|         raise NotImplementedError | ||||
|  | ||||
| @ -44,9 +44,13 @@ class ApplicationViewSet(ModelViewSet): | ||||
|             queryset = backend().filter_queryset(self.request, queryset, self) | ||||
|         return queryset | ||||
|  | ||||
|     def list(self, request: Request, *_, **__) -> Response: | ||||
|     def list(self, request: Request, *args, **kwargs) -> Response: | ||||
|         """Custom list method that checks Policy based access instead of guardian""" | ||||
|         if request.user.is_superuser: | ||||
|             # pylint: disable=no-member | ||||
|             return super().list(request, *args, **kwargs) | ||||
|         queryset = self._filter_queryset_for_list(self.get_queryset()) | ||||
|         self.paginate_queryset(queryset) | ||||
|         allowed_applications = [] | ||||
|         for application in queryset.order_by("name"): | ||||
|             engine = PolicyEngine(application, self.request.user, self.request) | ||||
| @ -54,4 +58,4 @@ class ApplicationViewSet(ModelViewSet): | ||||
|             if engine.passing: | ||||
|                 allowed_applications.append(application) | ||||
|         serializer = self.get_serializer(allowed_applications, many=True) | ||||
|         return Response(serializer.data) | ||||
|         return self.get_paginated_response(serializer.data) | ||||
|  | ||||
		Reference in New Issue
	
	Block a user
	 Jens Langhammer
					Jens Langhammer