Files
pretix_cgo/src/pretix/base/models/base.py
2019-07-09 16:13:37 +02:00

209 lines
7.3 KiB
Python

import json
import uuid
from django.contrib.contenttypes.models import ContentType
from django.db import models
from django.db.models.constants import LOOKUP_SEP
from django.db.models.signals import post_delete
from django.dispatch import receiver
from django.urls import reverse
from django.utils.crypto import get_random_string
from django.utils.functional import cached_property
from pretix.helpers.json import CustomJSONEncoder
def cachedfile_name(instance, filename: str) -> str:
secret = get_random_string(length=12)
return 'cachedfiles/%s.%s.%s' % (instance.id, secret, filename.split('.')[-1])
class CachedFile(models.Model):
"""
An uploaded file, with an optional expiry date.
"""
id = models.UUIDField(primary_key=True, default=uuid.uuid4)
expires = models.DateTimeField(null=True, blank=True)
date = models.DateTimeField(null=True, blank=True)
filename = models.CharField(max_length=255)
type = models.CharField(max_length=255)
file = models.FileField(null=True, blank=True, upload_to=cachedfile_name)
@receiver(post_delete, sender=CachedFile)
def cached_file_delete(sender, instance, **kwargs):
if instance.file:
# Pass false so FileField doesn't save the model.
instance.file.delete(False)
class LoggingMixin:
def log_action(self, action, data=None, user=None, api_token=None, auth=None, save=True):
"""
Create a LogEntry object that is related to this object.
See the LogEntry documentation for details.
:param action: The namespaced action code
:param data: Any JSON-serializable object
:param user: The user performing the action (optional)
"""
from .log import LogEntry
from .event import Event
from .devices import Device
from pretix.api.models import OAuthAccessToken, OAuthApplication
from .organizer import TeamAPIToken
from ..notifications import get_all_notification_types
from ..services.notifications import notify
from pretix.api.webhooks import get_all_webhook_events, notify_webhooks
event = None
if isinstance(self, Event):
event = self
elif hasattr(self, 'event'):
event = self.event
if user and not user.is_authenticated:
user = None
kwargs = {}
if isinstance(auth, OAuthAccessToken):
kwargs['oauth_application'] = auth.application
elif isinstance(auth, OAuthApplication):
kwargs['oauth_application'] = auth
elif isinstance(auth, TeamAPIToken):
kwargs['api_token'] = auth
elif isinstance(auth, Device):
kwargs['device'] = auth
elif isinstance(api_token, TeamAPIToken):
kwargs['api_token'] = api_token
logentry = LogEntry(content_object=self, user=user, action_type=action, event=event, **kwargs)
if isinstance(data, dict):
sensitivekeys = ['password', 'secret', 'api_key']
for sensitivekey in sensitivekeys:
for k, v in data.items():
if (sensitivekey in k) and v:
data[k] = "********"
logentry.data = json.dumps(data, cls=CustomJSONEncoder, sort_keys=True)
elif data:
raise TypeError("You should only supply dictionaries as log data.")
if save:
logentry.save()
no_types = get_all_notification_types()
wh_types = get_all_webhook_events()
no_type = None
wh_type = None
typepath = logentry.action_type
while (not no_type or not wh_types) and '.' in typepath:
wh_type = wh_type or wh_types.get(typepath + ('.*' if typepath != logentry.action_type else ''))
no_type = no_type or no_types.get(typepath + ('.*' if typepath != logentry.action_type else ''))
typepath = typepath.rsplit('.', 1)[0]
if no_type:
notify.apply_async(args=(logentry.pk,))
if wh_type:
notify_webhooks.apply_async(args=(logentry.pk,))
return logentry
class LoggedModel(models.Model, LoggingMixin):
class Meta:
abstract = True
@cached_property
def logs_content_type(self):
return ContentType.objects.get_for_model(type(self))
@cached_property
def all_logentries_link(self):
from pretix.base.models import Event
if isinstance(self, Event):
event = self
elif hasattr(self, 'event'):
event = self.event
else:
return None
return reverse(
'control:event.log',
kwargs={
'event': event.slug,
'organizer': event.organizer.slug,
}
) + '?content_type={}&object={}'.format(
self.logs_content_type.pk,
self.pk
)
def top_logentries(self):
qs = self.all_logentries()
if self.all_logentries_link:
qs = qs[:25]
return qs
def top_logentries_has_more(self):
return self.all_logentries().count() > 25
def all_logentries(self):
"""
Returns all log entries that are attached to this object.
:return: A QuerySet of LogEntry objects
"""
from .log import LogEntry
return LogEntry.objects.filter(
content_type=self.logs_content_type, object_id=self.pk
).select_related('user', 'event', 'oauth_application', 'api_token', 'device')
class LockModel:
def refresh_for_update(self, fields=None, using=None, **kwargs):
"""
Like refresh_from_db(), but with select_for_update().
See also https://code.djangoproject.com/ticket/28344
"""
if fields is not None:
if not fields:
return
if any(LOOKUP_SEP in f for f in fields):
raise ValueError(
'Found "%s" in fields argument. Relations and transforms '
'are not allowed in fields.' % LOOKUP_SEP)
hints = {'instance': self}
db_instance_qs = self.__class__._base_manager.db_manager(using, hints=hints).filter(pk=self.pk).select_for_update(**kwargs)
# Use provided fields, if not set then reload all non-deferred fields.
deferred_fields = self.get_deferred_fields()
if fields is not None:
fields = list(fields)
db_instance_qs = db_instance_qs.only(*fields)
elif deferred_fields:
fields = [f.attname for f in self._meta.concrete_fields
if f.attname not in deferred_fields]
db_instance_qs = db_instance_qs.only(*fields)
db_instance = db_instance_qs.get()
non_loaded_fields = db_instance.get_deferred_fields()
for field in self._meta.concrete_fields:
if field.attname in non_loaded_fields:
# This field wasn't refreshed - skip ahead.
continue
setattr(self, field.attname, getattr(db_instance, field.attname))
# Clear cached foreign keys.
if field.is_relation and field.is_cached(self):
field.delete_cached_value(self)
# Clear cached relations.
for field in self._meta.related_objects:
if field.is_cached(self):
field.delete_cached_value(self)
self._state.db = db_instance._state.db