Add auditable superuser mode (#824)

* Remove is_superuser everywhere

* Session handling

* List of sessions, relative timeout

* Absolute timeout

* Optionally pseudo-force audit comments

* Fix failing tests

* Add tests

* Add docs

* Rebsae migration

* Typos

* Fix tests
This commit is contained in:
Raphael Michel
2018-03-28 14:16:58 +02:00
committed by GitHub
parent 558c920181
commit a284e0c2f7
56 changed files with 965 additions and 130 deletions

View File

@@ -1,4 +1,4 @@
from typing import Union
from datetime import timedelta
from django.conf import settings
from django.contrib.auth.models import (
@@ -9,6 +9,7 @@ from django.contrib.contenttypes.models import ContentType
from django.db import models
from django.db.models import Q
from django.utils.crypto import get_random_string
from django.utils.timezone import now
from django.utils.translation import ugettext_lazy as _
from django_otp.models import Device
@@ -36,7 +37,6 @@ class UserManager(BaseUserManager):
raise Exception("You must provide a password")
user = self.model(email=email)
user.is_staff = True
user.is_superuser = True
user.set_password(password)
user.save()
return user
@@ -46,6 +46,11 @@ def generate_notifications_token():
return get_random_string(length=32)
class SuperuserPermissionSet:
def __contains__(self, item):
return True
class User(AbstractBaseUser, PermissionsMixin, LoggingMixin):
"""
This is the user model used by pretix for authentication.
@@ -114,6 +119,10 @@ class User(AbstractBaseUser, PermissionsMixin, LoggingMixin):
def __str__(self):
return self.email
@property
def is_superuser(self):
return False
def get_short_name(self) -> str:
"""
Returns the first of the following user properties that is found to exist:
@@ -194,40 +203,36 @@ class User(AbstractBaseUser, PermissionsMixin, LoggingMixin):
))
return self._teamcache['e{}'.format(event.pk)]
class SuperuserPermissionSet:
def __contains__(self, item):
return True
def get_event_permission_set(self, organizer, event) -> Union[set, SuperuserPermissionSet]:
def get_event_permission_set(self, organizer, event) -> set:
"""
Gets a set of permissions (as strings) that a user holds for a particular event
:param organizer: The organizer of the event
:param event: The event to check
:return: set in case of a normal user and a SuperuserPermissionSet in case of a superuser (fake object where
a in b always returns true).
:return: set
"""
if self.is_superuser:
return self.SuperuserPermissionSet()
teams = self._get_teams_for_event(organizer, event)
return set.union(*[t.permission_set() for t in teams])
sets = [t.permission_set() for t in teams]
if sets:
return set.union(*sets)
else:
return set()
def get_organizer_permission_set(self, organizer) -> Union[set, SuperuserPermissionSet]:
def get_organizer_permission_set(self, organizer) -> set:
"""
Gets a set of permissions (as strings) that a user holds for a particular organizer
:param organizer: The organizer of the event
:return: set in case of a normal user and a SuperuserPermissionSet in case of a superuser (fake object where
a in b always returns true).
:return: set
"""
if self.is_superuser:
return self.SuperuserPermissionSet()
teams = self._get_teams_for_organizer(organizer)
return set.union(*[t.permission_set() for t in teams])
sets = [t.permission_set() for t in teams]
if sets:
return set.union(*sets)
else:
return set()
def has_event_permission(self, organizer, event, perm_name=None) -> bool:
def has_event_permission(self, organizer, event, perm_name=None, request=None) -> bool:
"""
Checks if this user is part of any team that grants access of type ``perm_name``
to the event ``event``.
@@ -235,9 +240,10 @@ class User(AbstractBaseUser, PermissionsMixin, LoggingMixin):
:param organizer: The organizer of the event
:param event: The event to check
:param perm_name: The permission, e.g. ``can_change_teams``
:param request: The current request (optional). Required to detect staff sessions properly.
:return: bool
"""
if self.is_superuser:
if request and self.has_active_staff_session(request.session.session_key):
return True
teams = self._get_teams_for_event(organizer, event)
if teams:
@@ -246,16 +252,17 @@ class User(AbstractBaseUser, PermissionsMixin, LoggingMixin):
return True
return False
def has_organizer_permission(self, organizer, perm_name=None):
def has_organizer_permission(self, organizer, perm_name=None, request=None):
"""
Checks if this user is part of any team that grants access of type ``perm_name``
to the organizer ``organizer``.
:param organizer: The organizer to check
:param perm_name: The permission, e.g. ``can_change_teams``
:param request: The current request (optional). Required to detect staff sessions properly.
:return: bool
"""
if self.is_superuser:
if request and self.has_active_staff_session(request.session.session_key):
return True
teams = self._get_teams_for_organizer(organizer)
if teams:
@@ -263,15 +270,16 @@ class User(AbstractBaseUser, PermissionsMixin, LoggingMixin):
return True
return False
def get_events_with_any_permission(self):
def get_events_with_any_permission(self, request=None):
"""
Returns a queryset of events the user has any permissions to.
:param request: The current request (optional). Required to detect staff sessions properly.
:return: Iterable of Events
"""
from .event import Event
if self.is_superuser:
if request and self.has_active_staff_session(request.session.session_key):
return Event.objects.all()
return Event.objects.filter(
@@ -279,15 +287,16 @@ class User(AbstractBaseUser, PermissionsMixin, LoggingMixin):
| Q(id__in=self.teams.values_list('limit_events__id', flat=True))
)
def get_events_with_permission(self, permission):
def get_events_with_permission(self, permission, request=None):
"""
Returns a queryset of events the user has a specific permissions to.
:param request: The current request (optional). Required to detect staff sessions properly.
:return: Iterable of Events
"""
from .event import Event
if self.is_superuser:
if request and self.has_active_staff_session(request.session.session_key):
return Event.objects.all()
kwargs = {permission: True}
@@ -297,6 +306,56 @@ class User(AbstractBaseUser, PermissionsMixin, LoggingMixin):
| Q(id__in=self.teams.filter(**kwargs).values_list('limit_events__id', flat=True))
)
def has_active_staff_session(self, session_key=None):
"""
Returns whether or not a user has an active staff session (formerly known as superuser session)
with the given session key.
"""
return self.get_active_staff_session(session_key) is not None
def get_active_staff_session(self, session_key=None):
if not self.is_staff:
return None
if not hasattr(self, '_staff_session_cache'):
self._staff_session_cache = {}
if session_key not in self._staff_session_cache:
qs = StaffSession.objects.filter(
user=self, date_end__isnull=True
)
if session_key:
qs = qs.filter(session_key=session_key)
sess = qs.first()
if sess:
if sess.date_start < now() - timedelta(seconds=settings.PRETIX_SESSION_TIMEOUT_ABSOLUTE):
sess.date_end = now()
sess.save()
sess = None
self._staff_session_cache[session_key] = sess
return self._staff_session_cache[session_key]
class StaffSession(models.Model):
user = models.ForeignKey('User')
date_start = models.DateTimeField(auto_now_add=True)
date_end = models.DateTimeField(null=True, blank=True)
session_key = models.CharField(max_length=255)
comment = models.TextField()
class Meta:
ordering = ('date_start',)
class StaffSessionAuditLog(models.Model):
session = models.ForeignKey('StaffSession', related_name='logs')
datetime = models.DateTimeField(auto_now_add=True)
url = models.CharField(max_length=255)
method = models.CharField(max_length=255)
impersonating = models.ForeignKey('User', null=True, blank=True)
class Meta:
ordering = ('datetime',)
class U2FDevice(Device):
json_data = models.TextField()