Always process tasks through celery (#245)

This commit is contained in:
Raphael Michel
2016-09-21 10:38:31 +02:00
committed by GitHub
parent d014a92cef
commit 1faacef9d4
22 changed files with 145 additions and 213 deletions

View File

@@ -273,9 +273,3 @@ class EventLock(models.Model):
event = models.CharField(max_length=36, primary_key=True)
date = models.DateTimeField(auto_now=True)
token = models.UUIDField(default=uuid.uuid4)
class LockTimeoutException(Exception):
pass
class LockReleaseException(Exception):
pass

View File

@@ -2,7 +2,7 @@ from datetime import datetime, timedelta
from decimal import Decimal
from typing import List, Optional
from django.conf import settings
from celery.exceptions import MaxRetriesExceededError
from django.db.models import Q
from django.utils.translation import ugettext as _
@@ -10,6 +10,8 @@ from pretix.base.i18n import LazyLocaleException
from pretix.base.models import (
CartPosition, Event, EventLock, Item, ItemVariation, Quota, Voucher,
)
from pretix.base.services.locking import LockTimeoutException
from pretix.celery import app
class CartError(LazyLocaleException):
@@ -205,7 +207,8 @@ def _add_items_to_cart(event: Event, items: List[dict], cart_id: str=None) -> No
raise CartError(err)
def add_items_to_cart(event: int, items: List[dict], cart_id: str=None) -> None:
@app.task(bind=True, max_retries=5, default_retry_delay=1)
def add_items_to_cart(self, event: int, items: List[dict], cart_id: str=None) -> None:
"""
Adds a list of items to a user's cart.
:param event: The event ID in question
@@ -216,8 +219,11 @@ def add_items_to_cart(event: int, items: List[dict], cart_id: str=None) -> None:
"""
event = Event.objects.get(id=event)
try:
_add_items_to_cart(event, items, cart_id)
except EventLock.LockTimeoutException:
try:
_add_items_to_cart(event, items, cart_id)
except LockTimeoutException:
self.retry()
except (MaxRetriesExceededError, LockTimeoutException):
raise CartError(error_messages['busy'])
@@ -242,7 +248,8 @@ def _remove_items_from_cart(event: Event, items: List[dict], cart_id: str) -> No
cp.delete()
def remove_items_from_cart(event: int, items: List[dict], cart_id: str=None) -> None:
@app.task(bind=True, max_retries=5, default_retry_delay=1)
def remove_items_from_cart(self, event: int, items: List[dict], cart_id: str=None) -> None:
"""
Removes a list of items from a user's cart.
:param event: The event ID in question
@@ -251,35 +258,9 @@ def remove_items_from_cart(event: int, items: List[dict], cart_id: str=None) ->
"""
event = Event.objects.get(id=event)
try:
_remove_items_from_cart(event, items, cart_id)
except EventLock.LockTimeoutException:
try:
_remove_items_from_cart(event, items, cart_id)
except LockTimeoutException:
self.retry()
except (MaxRetriesExceededError, LockTimeoutException):
raise CartError(error_messages['busy'])
if settings.HAS_CELERY:
from pretix.celery import app
@app.task(bind=True, max_retries=5, default_retry_delay=1)
def add_items_to_cart_task(self, event: int, items: List[dict], cart_id: str):
event = Event.objects.get(id=event)
try:
try:
_add_items_to_cart(event, items, cart_id)
except EventLock.LockTimeoutException:
self.retry(exc=CartError(error_messages['busy']))
except CartError as e:
return e
@app.task(bind=True, max_retries=5, default_retry_delay=1)
def remove_items_from_cart_task(self, event: int, items: List[dict], cart_id: str):
event = Event.objects.get(id=event)
try:
try:
_remove_items_from_cart(event, items, cart_id)
except EventLock.LockTimeoutException:
self.retry(exc=CartError(error_messages['busy']))
except CartError as e:
return e
add_items_to_cart.task = add_items_to_cart_task
remove_items_from_cart.task = remove_items_from_cart_task

View File

@@ -5,8 +5,10 @@ from django.core.files.base import ContentFile
from pretix.base.models import CachedFile, Event, cachedfile_name
from pretix.base.signals import register_data_exporters
from pretix.celery import app
@app.task()
def export(event: str, fileid: str, provider: str, form_data: Dict[str, Any]) -> None:
event = Event.objects.get(id=event)
file = CachedFile.objects.get(id=fileid)
@@ -17,12 +19,3 @@ def export(event: str, fileid: str, provider: str, form_data: Dict[str, Any]) ->
file.filename, file.type, data = ex.render(form_data)
file.file.save(cachedfile_name(file, file.filename), ContentFile(data))
file.save()
if settings.HAS_CELERY:
from pretix.celery import app
export_task = app.task(export)
def export(*args, **kwargs):
export_task.apply_async(args=args, kwargs=kwargs)

View File

@@ -24,6 +24,7 @@ from reportlab.platypus import (
from pretix.base.i18n import LazyI18nString, language
from pretix.base.models import Invoice, InvoiceAddress, InvoiceLine, Order
from pretix.base.signals import register_payment_providers
from pretix.celery import app
@transaction.atomic
@@ -374,7 +375,8 @@ def _invoice_generate_german(invoice, f):
return doc
def invoice_pdf(invoice: int):
@app.task
def invoice_pdf_task(invoice: int):
i = Invoice.objects.get(pk=invoice)
with language(i.locale):
with tempfile.NamedTemporaryFile(suffix=".pdf") as f:
@@ -391,13 +393,8 @@ def invoice_qualified(order: Order):
return True
if settings.HAS_CELERY:
from pretix.celery import app
invoice_pdf_task = app.task(invoice_pdf)
def invoice_pdf(*args, **kwargs):
# We introduce a 2 second delay, because otherwise we run into conditions where
# the task worker tries to generate the PDF even before our database transaction
# was committed and therefore fails to find the invoice object.
invoice_pdf_task.apply_async(args=args, kwargs=kwargs, countdown=2)
def invoice_pdf(*args, **kwargs):
# We introduce a 2 second delay, because otherwise we run into conditions where
# the task worker tries to generate the PDF even before our database transaction
# was committed and therefore fails to find the invoice object.
invoice_pdf_task.apply_async(args=args, kwargs=kwargs, countdown=2)

View File

@@ -27,13 +27,21 @@ class LockManager:
return False
class LockTimeoutException(Exception):
pass
class LockReleaseException(Exception):
pass
def lock_event(event):
"""
Issue a lock on this event so nobody can book tickets for this event until
you release the lock. Will retry 5 times on failure.
:raises EventLock.LockTimeoutException: if the event is locked every time we try
to obtain the lock
:raises LockTimeoutException: if the event is locked every time we try
to obtain the lock
"""
if hasattr(event, '_lock') and event._lock:
return True
@@ -50,10 +58,10 @@ def release_event(event):
the lock will only be released if it was issued in _this_ python
representation of the database object.
:raises EventLock.LockReleaseException: if we do not own the lock
:raises LockReleaseException: if we do not own the lock
"""
if not hasattr(event, '_lock') or not event._lock:
raise EventLock.LockReleaseException('Lock is not owned by this thread')
raise LockReleaseException('Lock is not owned by this thread')
if settings.HAS_REDIS:
return release_event_redis(event)
else:
@@ -70,26 +78,26 @@ def lock_event_db(event):
event._lock = l
return True
elif l.date < now() - timedelta(seconds=LOCK_TIMEOUT):
newtoken = uuid.uuid4()
newtoken = str(uuid.uuid4())
updated = EventLock.objects.filter(event=event.id, token=l.token).update(date=dt, token=newtoken)
if updated:
l.token = newtoken
event._lock = l
return True
time.sleep(2 ** i / 100)
raise EventLock.LockTimeoutException()
raise LockTimeoutException()
@transaction.atomic
def release_event_db(event):
if not hasattr(event, '_lock') or not event._lock:
raise EventLock.LockReleaseException('Lock is not owned by this thread')
raise LockReleaseException('Lock is not owned by this thread')
try:
lock = EventLock.objects.get(event=event.id, token=event._lock.token)
lock.delete()
event._lock = None
except EventLock.DoesNotExist:
raise EventLock.LockReleaseException('Lock is no longer owned by this thread')
raise LockReleaseException('Lock is no longer owned by this thread')
def redis_lock_from_event(event):
@@ -113,9 +121,9 @@ def lock_event_redis(event):
return True
except RedisError:
logger.exception('Error locking an event')
raise EventLock.LockTimeoutException()
raise LockTimeoutException()
time.sleep(2 ** i / 100)
raise EventLock.LockTimeoutException()
raise LockTimeoutException()
def release_event_redis(event):
@@ -126,5 +134,5 @@ def release_event_redis(event):
lock.release()
except RedisError:
logger.exception('Error releasing an event lock')
raise EventLock.LockTimeoutException()
raise LockTimeoutException()
event._lock = None

View File

@@ -8,6 +8,7 @@ from django.utils.translation import ugettext as _
from pretix.base.i18n import LazyI18nString, language
from pretix.base.models import Event, Order
from pretix.celery import app
from pretix.multidomain.urlreverse import build_absolute_uri
logger = logging.getLogger('pretix.base.mail')
@@ -90,7 +91,8 @@ def mail(email: str, subject: str, template: str,
return mail_send([email], subject, body, sender, event.id if event else None, headers)
def mail_send(to: str, subject: str, body: str, sender: str, event: int=None, headers: dict=None) -> bool:
@app.task
def mail_send_task(to: str, subject: str, body: str, sender: str, event: int=None, headers: dict=None) -> bool:
email = EmailMessage(subject, body, sender, to=to, headers=headers)
if event:
event = Event.objects.get(id=event)
@@ -105,10 +107,5 @@ def mail_send(to: str, subject: str, body: str, sender: str, event: int=None, he
raise SendMailException('Failed to send an email to {}.'.format(to))
if settings.HAS_CELERY and settings.EMAIL_BACKEND != 'django.core.mail.outbox':
from pretix.celery import app
mail_send_task = app.task(mail_send)
def mail_send(*args, **kwargs):
mail_send_task.apply_async(args=args, kwargs=kwargs)
def mail_send(*args, **kwargs):
mail_send_task.apply_async(args=args, kwargs=kwargs)

View File

@@ -4,7 +4,7 @@ from datetime import datetime, timedelta
from decimal import Decimal
from typing import List, Optional
from django.conf import settings
from celery.exceptions import MaxRetriesExceededError
from django.db import transaction
from django.dispatch import receiver
from django.utils.formats import date_format
@@ -23,10 +23,12 @@ from pretix.base.payment import BasePaymentProvider
from pretix.base.services.invoices import (
generate_cancellation, generate_invoice, invoice_qualified,
)
from pretix.base.services.locking import LockTimeoutException
from pretix.base.services.mail import SendMailException, mail
from pretix.base.signals import (
order_paid, order_placed, periodic_task, register_payment_providers,
)
from pretix.celery import app
from pretix.multidomain.urlreverse import build_absolute_uri
error_messages = {
@@ -137,7 +139,7 @@ def mark_order_refunded(order, user=None):
@transaction.atomic
def cancel_order(order, user=None):
def _cancel_order(order, user=None):
"""
Mark this order as canceled
:param order: The order to change
@@ -348,16 +350,6 @@ def _perform_order(event: str, payment_provider: str, position_ids: List[str],
return order.id
def perform_order(event: str, payment_provider: str, positions: List[str],
email: str=None, locale: str=None, address: int=None):
try:
return _perform_order(event, payment_provider, positions, email, locale, address)
except EventLock.LockTimeoutException:
# Is raised when there are too many threads asking for event locks and we were
# unable to get one
raise OrderError(error_messages['busy'])
@receiver(signal=periodic_task)
def expire_orders(sender, **kwargs):
eventcache = {}
@@ -571,29 +563,24 @@ class OrderChangeManager:
raise OrderError(error_messages['internal'])
if settings.HAS_CELERY:
from pretix.celery import app
@app.task(bind=True, max_retries=5, default_retry_delay=1)
def perform_order_task(self, event: str, payment_provider: str, positions: List[str],
email: str=None, locale: str=None, address: int=None):
@app.task(bind=True, max_retries=5, default_retry_delay=1)
def perform_order(self, event: str, payment_provider: str, positions: List[str],
email: str=None, locale: str=None, address: int=None):
try:
try:
try:
return _perform_order(event, payment_provider, positions, email, locale, address)
except EventLock.LockTimeoutException:
self.retry(exc=OrderError(error_messages['busy']))
except OrderError as e:
return e
return _perform_order(event, payment_provider, positions, email, locale, address)
except LockTimeoutException:
self.retry()
except (MaxRetriesExceededError, LockTimeoutException):
return OrderError(error_messages['busy'])
@app.task(bind=True, max_retries=5, default_retry_delay=1)
def cancel_order_task(self, order: int, user: int=None):
@app.task(bind=True, max_retries=5, default_retry_delay=1)
def cancel_order(self, order: int, user: int=None):
try:
try:
try:
return cancel_order(order, user)
except EventLock.LockTimeoutException:
self.retry(exc=OrderError(error_messages['busy']))
except OrderError as e:
return e
perform_order.task = perform_order_task
cancel_order.task = cancel_order_task
return _cancel_order(order, user)
except LockTimeoutException:
self.retry(exc=OrderError(error_messages['busy']))
except (MaxRetriesExceededError, LockTimeoutException):
return OrderError(error_messages['busy'])

View File

@@ -1,13 +1,14 @@
from datetime import timedelta
from django.conf import settings
from django.core.files.base import ContentFile
from django.utils.timezone import now
from pretix.base.models import CachedFile, CachedTicket, Order, cachedfile_name
from pretix.base.signals import register_ticket_outputs
from pretix.celery import app
@app.task
def generate(order: str, provider: str):
order = Order.objects.select_related('event').get(id=order)
ct = CachedTicket.objects.get_or_create(order=order, provider=provider)[0]
@@ -26,12 +27,3 @@ def generate(order: str, provider: str):
ct.cachedfile.filename, ct.cachedfile.type, data = prov.generate(order)
ct.cachedfile.file.save(cachedfile_name(ct.cachedfile, ct.cachedfile.filename), ContentFile(data))
ct.cachedfile.save()
if settings.HAS_CELERY:
from pretix.celery import app
generate_task = app.task(generate)
def generate(*args, **kwargs):
generate_task.apply_async(args=args, kwargs=kwargs)

View File

@@ -1,11 +1,15 @@
import logging
import celery.exceptions
from celery.result import AsyncResult
from django.conf import settings
from django.contrib import messages
from django.http import JsonResponse
from django.shortcuts import redirect, render
from django.utils.translation import ugettext as _
from pretix.celery import app
logger = logging.getLogger('pretix.base.async')
@@ -15,15 +19,22 @@ class AsyncAction:
error_url = None
def do(self, *args):
if settings.HAS_CELERY:
from pretix.celery import app
if not isinstance(self.task, app.Task):
raise TypeError('Method has no task attached')
if hasattr(self.task, 'task') and isinstance(self.task.task, app.Task):
return self._do_celery(args)
else:
raise TypeError('Method has no task attached')
res = self.task.apply_async(args=args)
if 'ajax' in self.request.GET or 'ajax' in self.request.POST:
data = self._return_ajax_result(res)
data['check_url'] = self.get_check_url(res.id, True)
return JsonResponse(data)
else:
return self._do_sync(args)
if res.ready():
if res.successful():
return self.success(res.info)
else:
return self.error(res.info)
return redirect(self.get_check_url(res.id, False))
def get_success_url(self, value):
return self.success_url
@@ -39,14 +50,13 @@ class AsyncAction:
return self.get_result(request)
return self.http_method_not_allowed(request)
def _return_celery_result(self, res, timeout=.5):
import celery.exceptions
def _return_ajax_result(self, res, timeout=.5):
if not res.ready():
try:
res.get(timeout=timeout)
except celery.exceptions.TimeoutError:
pass
ready = res.ready()
data = {
'async_id': res.id,
@@ -57,7 +67,7 @@ class AsyncAction:
smes = self.get_success_message(res.info)
if smes:
messages.success(self.request, smes)
# TODO: Do not store message if the ajax client stats that it will not redirect
# TODO: Do not store message if the ajax client states that it will not redirect
# but handle the mssage itself
data.update({
'redirect': self.get_success_url(res.info),
@@ -65,7 +75,7 @@ class AsyncAction:
})
else:
messages.error(self.request, self.get_error_message(res.info))
# TODO: Do not store message if the ajax client stats that it will not redirect
# TODO: Do not store message if the ajax client states that it will not redirect
# but handle the mssage itself
data.update({
'redirect': self.get_error_url(),
@@ -74,11 +84,9 @@ class AsyncAction:
return data
def get_result(self, request):
from celery.result import AsyncResult
res = AsyncResult(request.GET.get('async_id'))
if 'ajax' in self.request.GET:
return JsonResponse(self._return_celery_result(res, timeout=0.25))
return JsonResponse(self._return_ajax_result(res, timeout=0.25))
else:
if res.ready():
if res.successful():
@@ -87,23 +95,6 @@ class AsyncAction:
return self.error(res.info)
return render(request, 'pretixpresale/waiting.html')
def _do_celery(self, args):
res = self.task.task.apply_async(args=args)
if 'ajax' in self.request.GET or 'ajax' in self.request.POST:
data = self._return_celery_result(res)
data['check_url'] = self.get_check_url(res.id, True)
return JsonResponse(data)
else:
return redirect(self.get_check_url(res.id, False))
def _do_sync(self, args):
try:
rs = getattr(self.__class__, 'task')(*args)
return self.success(rs)
except Exception as e:
logger.exception('Error while executing task synchronously')
return self.error(e)
def success(self, value):
smes = self.get_success_message(value)
if smes:

View File

@@ -1,12 +1,12 @@
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "pretix.settings")
from django.conf import settings
if settings.HAS_CELERY:
from celery import Celery
app = Celery('pretix')
app = Celery('pretix')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

View File

@@ -22,6 +22,7 @@ from pretix.base.services.invoices import (
generate_cancellation, generate_invoice, invoice_pdf, invoice_qualified,
regenerate_invoice,
)
from pretix.base.services.locking import LockTimeoutException
from pretix.base.services.mail import SendMailException, mail
from pretix.base.services.orders import (
OrderChangeManager, OrderError, cancel_order, mark_order_paid,
@@ -433,7 +434,7 @@ class OrderDownload(OrderView):
ct.cachedfile = cf
ct.save()
if not ct.cachedfile.file.name:
tickets.generate(self.order.id, self.output.identifier)
tickets.generate.apply_async(args=(self.order.id, self.output.identifier))
return redirect(reverse('cachedfile.download', kwargs={'id': ct.cachedfile.id}))
@@ -465,7 +466,7 @@ class OrderExtend(OrderView):
messages.success(self.request, _('The payment term has been changed.'))
else:
messages.error(self.request, is_available)
except EventLock.LockTimeoutException:
except LockTimeoutException:
messages.error(self.request, _('We were not able to process the request completely as the '
'server was too busy.'))
return self._redirect_back()
@@ -660,5 +661,6 @@ class ExportView(EventPermissionRequiredMixin, TemplateView):
cf.date = now()
cf.expires = now() + timedelta(days=3)
cf.save()
export(self.request.event.id, str(cf.id), self.exporter.identifier, self.exporter.form.cleaned_data)
export.apply_async(args=(self.request.event.id, str(cf.id), self.exporter.identifier,
self.exporter.form.cleaned_data))
return redirect(reverse('cachedfile.download', kwargs={'id': str(cf.id)}))

View File

@@ -11,6 +11,7 @@ from pretix.base.i18n import language
from pretix.base.models import Event, Order, Quota
from pretix.base.services.mail import SendMailException
from pretix.base.services.orders import mark_order_paid
from pretix.celery import app
from .models import BankImportJob, BankTransaction
@@ -94,6 +95,7 @@ def _get_unknown_transactions(event: Event, job: BankImportJob, data: list):
return transactions
@app.task
def process_banktransfers(event: int, job: int, data: list) -> None:
with language("en"): # We'll translate error messages at display time
event = Event.objects.get(pk=event)
@@ -127,12 +129,3 @@ def process_banktransfers(event: int, job: int, data: list) -> None:
else:
job.state = BankImportJob.STATE_COMPLETED
job.save()
if settings.HAS_CELERY:
from pretix.celery import app
process_task = app.task(process_banktransfers)
def process_banktransfers(*args, **kwargs):
process_task.apply_async(args=args, kwargs=kwargs)

View File

@@ -311,7 +311,11 @@ class ImportView(EventPermissionRequiredMixin, ListView):
def start_processing(self, parsed):
job = BankImportJob.objects.create(event=self.request.event)
process_banktransfers(event=self.request.event.pk, job=job.pk, data=parsed)
process_banktransfers.apply_async(kwargs={
'event': self.request.event.pk,
'job': job.pk,
'data': parsed
})
return redirect(reverse('plugins:banktransfer:import.job', kwargs={
'event': self.request.event.slug,
'organizer': self.request.event.organizer.slug,

View File

@@ -9,10 +9,12 @@ from django.core.files.base import ContentFile
from django.core.files.storage import default_storage
from pretix.base.models import Event
from pretix.celery import app
logger = logging.getLogger('pretix.presale.style')
@app.task
def regenerate_css(event_id: int):
event = Event.objects.select_related('organizer').get(pk=event_id)
sassdir = os.path.join(settings.STATIC_ROOT, 'pretixpresale/scss')
@@ -36,12 +38,3 @@ def regenerate_css(event_id: int):
newname = default_storage.save(fname, ContentFile(css))
event.settings.set('presale_css_file', newname)
event.settings.set('presale_css_checksum', checksum)
if settings.HAS_CELERY:
from pretix.celery import app
regenerate_css_task = app.task(regenerate_css)
def regenerate_css(*args, **kwargs):
regenerate_css_task.apply_async(args=args, kwargs=kwargs)

View File

@@ -500,7 +500,7 @@ class OrderDownload(EventViewMixin, OrderDetailMixin, View):
cf.save()
ct.cachedfile = cf
ct.save()
generate(self.order.id, self.output.identifier)
generate.apply_async(args=(self.order.id, self.output.identifier))
return redirect(reverse('cachedfile.download', kwargs={'id': ct.cachedfile.id}))

View File

@@ -132,6 +132,8 @@ if HAS_CELERY:
BROKER_URL = config.get('celery', 'broker')
CELERY_RESULT_BACKEND = config.get('celery', 'backend')
CELERY_SEND_TASK_ERROR_EMAILS = bool(ADMINS)
else:
CELERY_ALWAYS_EAGER = True
SESSION_COOKIE_DOMAIN = config.get('pretix', 'cookie_domain', fallback=None)