""" This code has been taken from https://blog.hypertrack.io/2016/10/08/dealing-with-database-transactions-in-django-celery/ Usage: from pretix.base.services.async import TransactionAwareTask @task(base=TransactionAwareTask) def task_…(): """ import cProfile import os import random import time from django.conf import settings from django.db import transaction from django_scopes import scope, scopes_disabled from pretix.base.metrics import ( pretix_task_duration_seconds, pretix_task_runs_total, ) from pretix.base.models import Event, Organizer, User from pretix.celery_app import app class ProfiledTask(app.Task): def __call__(self, *args, **kwargs): if settings.PROFILING_RATE > 0 and random.random() < settings.PROFILING_RATE / 100: profiler = cProfile.Profile() profiler.enable() t0 = time.perf_counter() ret = super().__call__(*args, **kwargs) tottime = time.perf_counter() - t0 profiler.disable() profiler.dump_stats(os.path.join(settings.PROFILE_DIR, '{time:.0f}_{tottime:.3f}_celery_{t}.pstat'.format( t=self.name, tottime=tottime, time=time.time() ))) else: t0 = time.perf_counter() ret = super().__call__(*args, **kwargs) tottime = time.perf_counter() - t0 if settings.METRICS_ENABLED: pretix_task_duration_seconds.observe(tottime, task_name=self.name) return ret def on_failure(self, exc, task_id, args, kwargs, einfo): if settings.METRICS_ENABLED: expected = False for t in self.throws: if isinstance(exc, t): expected = True break pretix_task_runs_total.inc(1, task_name=self.name, status="expected-error" if expected else "error") return super().on_failure(exc, task_id, args, kwargs, einfo) def on_success(self, retval, task_id, args, kwargs): if settings.METRICS_ENABLED: pretix_task_runs_total.inc(1, task_name=self.name, status="success") return super().on_success(retval, task_id, args, kwargs) class EventTask(app.Task): def __call__(self, *args, **kwargs): if 'event_id' in kwargs: event_id = kwargs.get('event_id') with scopes_disabled(): event = Event.objects.select_related('organizer').get(pk=event_id) del kwargs['event_id'] kwargs['event'] = event elif 'event' in kwargs: event_id = kwargs.get('event') with scopes_disabled(): event = Event.objects.select_related('organizer').get(pk=event_id) kwargs['event'] = event else: args = list(args) event_id = args[0] with scopes_disabled(): event = Event.objects.select_related('organizer').get(pk=event_id) args[0] = event with scope(organizer=event.organizer): ret = super().__call__(*args, **kwargs) return ret class OrganizerUserTask(app.Task): def __call__(self, *args, **kwargs): organizer_id = kwargs['organizer'] with scopes_disabled(): organizer = Organizer.objects.get(pk=organizer_id) kwargs['organizer'] = organizer user_id = kwargs['user'] user = User.objects.get(pk=user_id) kwargs['user'] = user with scope(organizer=organizer): ret = super().__call__(*args, **kwargs) return ret class ProfiledEventTask(ProfiledTask, EventTask): pass class ProfiledOrganizerUserTask(ProfiledTask, OrganizerUserTask): pass class TransactionAwareTask(ProfiledTask): """ Task class which is aware of django db transactions and only executes tasks after transaction has been committed """ def apply_async(self, *args, **kwargs): """ Unlike the default task in celery, this task does not return an async result """ transaction.on_commit( lambda: super(TransactionAwareTask, self).apply_async(*args, **kwargs) ) class TransactionAwareProfiledEventTask(ProfiledEventTask): def apply_async(self, *args, **kwargs): """ Unlike the default task in celery, this task does not return an async result """ transaction.on_commit( lambda: super(TransactionAwareProfiledEventTask, self).apply_async(*args, **kwargs) )