Files
pretix_cgo/src/pretix/base/models/auth.py
2020-03-22 11:04:51 +01:00

475 lines
17 KiB
Python

import binascii
import json
from datetime import timedelta
from urllib.parse import urlparse
import webauthn
from django.conf import settings
from django.contrib.auth.models import (
AbstractBaseUser, BaseUserManager, PermissionsMixin,
)
from django.contrib.auth.tokens import default_token_generator
from django.contrib.contenttypes.models import ContentType
from django.db import models
from django.db.models import Q
from django.utils.crypto import get_random_string, salted_hmac
from django.utils.timezone import now
from django.utils.translation import ugettext_lazy as _
from django_otp.models import Device
from django_scopes import scopes_disabled
from u2flib_server.utils import (
pub_key_from_der, websafe_decode, websafe_encode,
)
from pretix.base.i18n import language
from pretix.helpers.urls import build_absolute_uri
from .base import LoggingMixin
class UserManager(BaseUserManager):
"""
This is the user manager for our custom user model. See the User
model documentation to see what's so special about our user model.
"""
def create_user(self, email: str, password: str = None, **kwargs):
user = self.model(email=email, **kwargs)
user.set_password(password)
user.save()
return user
def create_superuser(self, email: str, password: str = None): # NOQA
# Not used in the software but required by Django
if password is None:
raise Exception("You must provide a password")
user = self.model(email=email)
user.is_staff = True
user.set_password(password)
user.save()
return user
def generate_notifications_token():
return get_random_string(length=32)
def generate_session_token():
return get_random_string(length=32)
class SuperuserPermissionSet:
def __contains__(self, item):
return True
class User(AbstractBaseUser, PermissionsMixin, LoggingMixin):
"""
This is the user model used by pretix for authentication.
:param email: The user's email address, used for identification.
:type email: str
:param fullname: The user's full name. May be empty or null.
:type fullname: str
:param is_active: Whether this user account is activated.
:type is_active: bool
:param is_staff: ``True`` for system operators.
:type is_staff: bool
:param date_joined: The datetime of the user's registration.
:type date_joined: datetime
:param locale: The user's preferred locale code.
:type locale: str
:param timezone: The user's preferred timezone.
:type timezone: str
"""
USERNAME_FIELD = 'email'
REQUIRED_FIELDS = []
email = models.EmailField(unique=True, db_index=True, null=True, blank=True,
verbose_name=_('E-mail'), max_length=190)
fullname = models.CharField(max_length=255, blank=True, null=True,
verbose_name=_('Full name'))
is_active = models.BooleanField(default=True,
verbose_name=_('Is active'))
is_staff = models.BooleanField(default=False,
verbose_name=_('Is site admin'))
date_joined = models.DateTimeField(auto_now_add=True,
verbose_name=_('Date joined'))
locale = models.CharField(max_length=50,
choices=settings.LANGUAGES,
default=settings.LANGUAGE_CODE,
verbose_name=_('Language'))
timezone = models.CharField(max_length=100,
default=settings.TIME_ZONE,
verbose_name=_('Timezone'))
require_2fa = models.BooleanField(
default=False,
verbose_name=_('Two-factor authentication is required to log in')
)
notifications_send = models.BooleanField(
default=True,
verbose_name=_('Receive notifications according to my settings below'),
help_text=_('If turned off, you will not get any notifications.')
)
notifications_token = models.CharField(max_length=255, default=generate_notifications_token)
auth_backend = models.CharField(max_length=255, default='native')
session_token = models.CharField(max_length=32, default=generate_session_token)
objects = UserManager()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._teamcache = {}
class Meta:
verbose_name = _("User")
verbose_name_plural = _("Users")
ordering = ('email',)
def save(self, *args, **kwargs):
self.email = self.email.lower()
is_new = not self.pk
super().save(*args, **kwargs)
if is_new:
self.notification_settings.create(
action_type='pretix.event.order.refund.requested',
event=None,
method='mail',
enabled=True
)
def __str__(self):
return self.email
@property
def is_superuser(self):
return False
def get_short_name(self) -> str:
"""
Returns the first of the following user properties that is found to exist:
* Full name
* Email address
Only present for backwards compatibility
"""
if self.fullname:
return self.fullname
else:
return self.email
def get_full_name(self) -> str:
"""
Returns the first of the following user properties that is found to exist:
* Full name
* Email address
"""
if self.fullname:
return self.fullname
else:
return self.email
def send_security_notice(self, messages, email=None):
from pretix.base.services.mail import mail, SendMailException
try:
with language(self.locale):
msg = '- ' + '\n- '.join(str(m) for m in messages)
mail(
email or self.email,
_('Account information changed'),
'pretixcontrol/email/security_notice.txt',
{
'user': self,
'messages': msg,
'url': build_absolute_uri('control:user.settings')
},
event=None,
user=self,
locale=self.locale
)
except SendMailException:
pass # Already logged
def send_password_reset(self):
from pretix.base.services.mail import mail
mail(
self.email, _('Password recovery'), 'pretixcontrol/email/forgot.txt',
{
'user': self,
'url': (build_absolute_uri('control:auth.forgot.recover')
+ '?id=%d&token=%s' % (self.id, default_token_generator.make_token(self)))
},
None, locale=self.locale, user=self
)
@property
def top_logentries(self):
return self.all_logentries
@property
def all_logentries(self):
from pretix.base.models import LogEntry
return LogEntry.objects.filter(content_type=ContentType.objects.get_for_model(User),
object_id=self.pk)
def _get_teams_for_organizer(self, organizer):
if 'o{}'.format(organizer.pk) not in self._teamcache:
self._teamcache['o{}'.format(organizer.pk)] = list(self.teams.filter(organizer=organizer))
return self._teamcache['o{}'.format(organizer.pk)]
def _get_teams_for_event(self, organizer, event):
if 'e{}'.format(event.pk) not in self._teamcache:
self._teamcache['e{}'.format(event.pk)] = list(self.teams.filter(organizer=organizer).filter(
Q(all_events=True) | Q(limit_events=event)
))
return self._teamcache['e{}'.format(event.pk)]
def get_event_permission_set(self, organizer, event) -> set:
"""
Gets a set of permissions (as strings) that a user holds for a particular event
:param organizer: The organizer of the event
:param event: The event to check
:return: set
"""
teams = self._get_teams_for_event(organizer, event)
sets = [t.permission_set() for t in teams]
if sets:
return set.union(*sets)
else:
return set()
def get_organizer_permission_set(self, organizer) -> set:
"""
Gets a set of permissions (as strings) that a user holds for a particular organizer
:param organizer: The organizer of the event
:return: set
"""
teams = self._get_teams_for_organizer(organizer)
sets = [t.permission_set() for t in teams]
if sets:
return set.union(*sets)
else:
return set()
def has_event_permission(self, organizer, event, perm_name=None, request=None) -> bool:
"""
Checks if this user is part of any team that grants access of type ``perm_name``
to the event ``event``.
:param organizer: The organizer of the event
:param event: The event to check
:param perm_name: The permission, e.g. ``can_change_teams``
:param request: The current request (optional). Required to detect staff sessions properly.
:return: bool
"""
if request and self.has_active_staff_session(request.session.session_key):
return True
teams = self._get_teams_for_event(organizer, event)
if teams:
self._teamcache['e{}'.format(event.pk)] = teams
if isinstance(perm_name, (tuple, list)):
return any([any(team.has_permission(p) for team in teams) for p in perm_name])
if not perm_name or any([team.has_permission(perm_name) for team in teams]):
return True
return False
def has_organizer_permission(self, organizer, perm_name=None, request=None):
"""
Checks if this user is part of any team that grants access of type ``perm_name``
to the organizer ``organizer``.
:param organizer: The organizer to check
:param perm_name: The permission, e.g. ``can_change_teams``
:param request: The current request (optional). Required to detect staff sessions properly.
:return: bool
"""
if request and self.has_active_staff_session(request.session.session_key):
return True
teams = self._get_teams_for_organizer(organizer)
if teams:
if isinstance(perm_name, (tuple, list)):
return any([any(team.has_permission(p) for team in teams) for p in perm_name])
if not perm_name or any([team.has_permission(perm_name) for team in teams]):
return True
return False
@scopes_disabled()
def get_events_with_any_permission(self, request=None):
"""
Returns a queryset of events the user has any permissions to.
:param request: The current request (optional). Required to detect staff sessions properly.
:return: Iterable of Events
"""
from .event import Event
if request and self.has_active_staff_session(request.session.session_key):
return Event.objects.all()
return Event.objects.filter(
Q(organizer_id__in=self.teams.filter(all_events=True).values_list('organizer', flat=True))
| Q(id__in=self.teams.values_list('limit_events__id', flat=True))
)
@scopes_disabled()
def get_events_with_permission(self, permission, request=None):
"""
Returns a queryset of events the user has a specific permissions to.
:param request: The current request (optional). Required to detect staff sessions properly.
:return: Iterable of Events
"""
from .event import Event
if request and self.has_active_staff_session(request.session.session_key):
return Event.objects.all()
kwargs = {permission: True}
return Event.objects.filter(
Q(organizer_id__in=self.teams.filter(all_events=True, **kwargs).values_list('organizer', flat=True))
| Q(id__in=self.teams.filter(**kwargs).values_list('limit_events__id', flat=True))
)
@scopes_disabled()
def get_organizers_with_permission(self, permission, request=None):
"""
Returns a queryset of organizers the user has a specific permissions to.
:param request: The current request (optional). Required to detect staff sessions properly.
:return: Iterable of Organizers
"""
from .event import Organizer
if request and self.has_active_staff_session(request.session.session_key):
return Organizer.objects.all()
kwargs = {permission: True}
return Organizer.objects.filter(
id__in=self.teams.filter(**kwargs).values_list('organizer', flat=True)
)
def has_active_staff_session(self, session_key=None):
"""
Returns whether or not a user has an active staff session (formerly known as superuser session)
with the given session key.
"""
return self.get_active_staff_session(session_key) is not None
def get_active_staff_session(self, session_key=None):
if not self.is_staff:
return None
if not hasattr(self, '_staff_session_cache'):
self._staff_session_cache = {}
if session_key not in self._staff_session_cache:
qs = StaffSession.objects.filter(
user=self, date_end__isnull=True
)
if session_key:
qs = qs.filter(session_key=session_key)
sess = qs.first()
if sess:
if sess.date_start < now() - timedelta(seconds=settings.PRETIX_SESSION_TIMEOUT_ABSOLUTE):
sess.date_end = now()
sess.save()
sess = None
self._staff_session_cache[session_key] = sess
return self._staff_session_cache[session_key]
def get_session_auth_hash(self):
"""
Return an HMAC that needs to
"""
key_salt = "pretix.base.models.User.get_session_auth_hash"
payload = self.password
payload += self.email
payload += self.session_token
return salted_hmac(key_salt, payload).hexdigest()
def update_session_token(self):
self.session_token = generate_session_token()
self.save(update_fields=['session_token'])
class StaffSession(models.Model):
user = models.ForeignKey('User', on_delete=models.PROTECT)
date_start = models.DateTimeField(auto_now_add=True)
date_end = models.DateTimeField(null=True, blank=True)
session_key = models.CharField(max_length=255)
comment = models.TextField()
class Meta:
ordering = ('date_start',)
class StaffSessionAuditLog(models.Model):
session = models.ForeignKey('StaffSession', related_name='logs', on_delete=models.PROTECT)
datetime = models.DateTimeField(auto_now_add=True)
url = models.CharField(max_length=255)
method = models.CharField(max_length=255)
impersonating = models.ForeignKey('User', null=True, blank=True, on_delete=models.PROTECT)
class Meta:
ordering = ('datetime',)
class U2FDevice(Device):
json_data = models.TextField()
@property
def webauthnuser(self):
d = json.loads(self.json_data)
# We manually need to convert the pubkey from DER format (used in our
# former U2F implementation) to the format required by webauthn. This
# is based on the following example:
# https://www.w3.org/TR/webauthn/#sctn-encoded-credPubKey-examples
pub_key = pub_key_from_der(websafe_decode(d['publicKey'].replace('+', '-').replace('/', '_')))
pub_key = binascii.unhexlify(
'A5010203262001215820{:064x}225820{:064x}'.format(
pub_key.public_numbers().x, pub_key.public_numbers().y
)
)
return webauthn.WebAuthnUser(
d['keyHandle'],
self.user.email,
str(self.user),
settings.SITE_URL,
d['keyHandle'],
websafe_encode(pub_key),
1,
urlparse(settings.SITE_URL).netloc
)
class WebAuthnDevice(Device):
credential_id = models.CharField(max_length=255, null=True, blank=True)
rp_id = models.CharField(max_length=255, null=True, blank=True)
icon_url = models.CharField(max_length=255, null=True, blank=True)
ukey = models.TextField(null=True)
pub_key = models.TextField(null=True)
sign_count = models.IntegerField(default=0)
@property
def webauthnuser(self):
return webauthn.WebAuthnUser(
self.ukey,
self.user.email,
str(self.user),
settings.SITE_URL,
self.credential_id,
self.pub_key,
self.sign_count,
urlparse(settings.SITE_URL).netloc
)