diff --git a/src/pretix/api/webhooks.py b/src/pretix/api/webhooks.py index b14bd7139e..ac3889157c 100644 --- a/src/pretix/api/webhooks.py +++ b/src/pretix/api/webhooks.py @@ -43,6 +43,7 @@ from pretix.base.services.tasks import ProfiledTask, TransactionAwareTask from pretix.base.signals import periodic_task from pretix.celery_app import app from pretix.helpers import OF_SELF +from pretix.helpers.celery import get_task_priority logger = logging.getLogger(__name__) _ALL_EVENTS = None @@ -474,7 +475,10 @@ def notify_webhooks(logentry_ids: list): ) for wh in webhooks: - send_webhook.apply_async(args=(logentry.id, notification_type.action_type, wh.pk)) + send_webhook.apply_async( + args=(logentry.id, notification_type.action_type, wh.pk), + priority=get_task_priority("notifications", logentry.organizer_id), + ) @app.task(base=ProfiledTask, bind=True, max_retries=5, default_retry_delay=60, acks_late=True, autoretry_for=(DatabaseError,),) diff --git a/src/pretix/base/models/base.py b/src/pretix/base/models/base.py index d8154aa8f9..397168c017 100644 --- a/src/pretix/base/models/base.py +++ b/src/pretix/base/models/base.py @@ -31,6 +31,7 @@ from django.urls import reverse from django.utils.crypto import get_random_string from django.utils.functional import cached_property +from pretix.helpers.celery import get_task_priority from pretix.helpers.json import CustomJSONEncoder @@ -131,9 +132,15 @@ class LoggingMixin: logentry.save() if logentry.notification_type: - notify.apply_async(args=(logentry.pk,)) + notify.apply_async( + args=(logentry.pk,), + priority=get_task_priority("notifications", logentry.organizer_id), + ) if logentry.webhook_type: - notify_webhooks.apply_async(args=(logentry.pk,)) + notify_webhooks.apply_async( + args=(logentry.pk,), + priority=get_task_priority("notifications", logentry.organizer_id), + ) return logentry diff --git a/src/pretix/base/models/log.py b/src/pretix/base/models/log.py index 2ccf5c20e1..52ce878169 100644 --- a/src/pretix/base/models/log.py +++ b/src/pretix/base/models/log.py @@ -35,11 +35,14 @@ import json import logging +from django.conf import settings from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.contenttypes.models import ContentType from django.db import connections, models from django.utils.functional import cached_property +from pretix.helpers.celery import get_task_priority + class VisibleOnlyManager(models.Manager): def get_queryset(self): @@ -186,7 +189,19 @@ class LogEntry(models.Model): to_notify = [o.id for o in objects if o.notification_type] if to_notify: - notify.apply_async(args=(to_notify,)) + organizer_ids = set(o.organizer_id for o in objects if o.notification_type) + notify.apply_async( + args=(to_notify,), + priority=settings.PRIORITY_CELERY_HIGHEST_FUNC( + get_task_priority("notifications", oid) for oid in organizer_ids + ), + ) to_wh = [o.id for o in objects if o.webhook_type] if to_wh: - notify_webhooks.apply_async(args=(to_wh,)) + organizer_ids = set(o.organizer_id for o in objects if o.webhook_type) + notify_webhooks.apply_async( + args=(to_wh,), + priority=settings.PRIORITY_CELERY_HIGHEST_FUNC( + get_task_priority("notifications", oid) for oid in organizer_ids + ), + ) diff --git a/src/pretix/base/services/notifications.py b/src/pretix/base/services/notifications.py index 357ad8fc64..5a75dfafbe 100644 --- a/src/pretix/base/services/notifications.py +++ b/src/pretix/base/services/notifications.py @@ -32,6 +32,7 @@ from pretix.base.services.mail import mail_send_task from pretix.base.services.tasks import ProfiledTask, TransactionAwareTask from pretix.base.signals import notification from pretix.celery_app import app +from pretix.helpers.celery import get_task_priority from pretix.helpers.urls import build_absolute_uri @@ -88,12 +89,18 @@ def notify(logentry_ids: list): for um, enabled in notify_specific.items(): user, method = um if enabled: - send_notification.apply_async(args=(logentry.id, notification_type.action_type, user.pk, method)) + send_notification.apply_async( + args=(logentry.id, notification_type.action_type, user.pk, method), + priority=get_task_priority("notifications", logentry.organizer_id), + ) for um, enabled in notify_global.items(): user, method = um if enabled and um not in notify_specific: - send_notification.apply_async(args=(logentry.id, notification_type.action_type, user.pk, method)) + send_notification.apply_async( + args=(logentry.id, notification_type.action_type, user.pk, method), + priority=get_task_priority("notifications", logentry.organizer_id), + ) notification.send(logentry.event, logentry_id=logentry.id, notification_type=notification_type.action_type) diff --git a/src/pretix/helpers/celery.py b/src/pretix/helpers/celery.py new file mode 100644 index 0000000000..de2b5d9a3d --- /dev/null +++ b/src/pretix/helpers/celery.py @@ -0,0 +1,62 @@ +# +# This file is part of pretix (Community Edition). +# +# Copyright (C) 2014-2020 Raphael Michel and contributors +# Copyright (C) 2020-today pretix GmbH and contributors +# +# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General +# Public License as published by the Free Software Foundation in version 3 of the License. +# +# ADDITIONAL TERMS APPLY: Pursuant to Section 7 of the GNU Affero General Public License, additional terms are +# applicable granting you additional permissions and placing additional restrictions on your usage of this software. +# Please refer to the pretix LICENSE file to obtain the full terms applicable to this work. If you did not receive +# this file, see . +# +# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied +# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You should have received a copy of the GNU Affero General Public License along with this program. If not, see +# . +# +from django.conf import settings + +THRESHOLD_DOWNGRADE_TO_MID = 50 +THRESHOLD_DOWNGRADE_TO_LOW = 250 + + +def get_task_priority(shard, organizer_id): + """ + This is an attempt to build a simple "fair-use" policy for webhooks and notifications. The problem is that when + one organizer creates e.g. 20,000 orders through the API, that might schedule 20,000 webhooks and every other + organizer will need to wait for these webhooks to go through. + + We try to fix that by building three queues: high-prio, mid-prio, and low-prio. Every organizer starts in the + high-prio queue, and all their tasks are routed immediately. Once an organizer submits more than X jobs of a + certain type per minute, they get downgraded to the mid-prio queue, and then – if they submit even more – to the + low-prio queue. That way, if another organizer has "regular usage", they are prioritized over the organizer with + high load. + """ + from django_redis import get_redis_connection + + if not settings.HAS_REDIS: + return settings.PRIORITY_CELERY_HIGH + + # We use redis directly instead of the Django cache API since the Django cache API does not support INCR for + # nonexistant keys + rc = get_redis_connection("redis") + + cache_key = f"pretix:task_priority:{shard}:{organizer_id}" + + # Make sure counters expire after a while when not used + p = rc.pipeline() + p.incr(cache_key) + p.expire(cache_key, 60) + new_counter = p.execute()[0] + + if new_counter >= THRESHOLD_DOWNGRADE_TO_LOW: + return settings.PRIORITY_CELERY_LOW + elif new_counter >= THRESHOLD_DOWNGRADE_TO_MID: + return settings.PRIORITY_CELERY_MID + else: + return settings.PRIORITY_CELERY_HIGH diff --git a/src/pretix/settings.py b/src/pretix/settings.py index 7f78458581..a269f5b194 100644 --- a/src/pretix/settings.py +++ b/src/pretix/settings.py @@ -347,11 +347,53 @@ if HAS_CELERY: CELERY_RESULT_BACKEND = config.get('celery', 'backend') if HAS_CELERY_BROKER_TRANSPORT_OPTS: CELERY_BROKER_TRANSPORT_OPTIONS = loads(config.get('celery', 'broker_transport_options')) + else: + CELERY_BROKER_TRANSPORT_OPTIONS = {} if HAS_CELERY_BACKEND_TRANSPORT_OPTS: CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS = loads(config.get('celery', 'backend_transport_options')) CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True + + if CELERY_BROKER_URL.startswith("amqp://"): + # https://docs.celeryq.dev/en/latest/userguide/routing.html#routing-options-rabbitmq-priorities + # Enable priorities for all queues + CELERY_TASK_QUEUE_MAX_PRIORITY = 3 + # On RabbitMQ, higher number is higher priority, and having less levels makes rabbitmq use less CPU and RAM + PRIORITY_CELERY_LOW = 1 + PRIORITY_CELERY_MID = 2 + PRIORITY_CELERY_HIGH = 3 + PRIORITY_CELERY_LOWEST_FUNC = min + PRIORITY_CELERY_HIGHEST_FUNC = max + # Set default + CELERY_TASK_DEFAULT_PRIORITY = PRIORITY_CELERY_MID + elif CELERY_BROKER_URL.startswith("redis://"): + # https://docs.celeryq.dev/en/latest/userguide/routing.html#redis-message-priorities + CELERY_BROKER_TRANSPORT_OPTIONS.update({ + "queue_order_strategy": "priority", + "sep": ":", + "priority_steps": [0, 4, 8] + }) + # On redis, lower number is higher priority, and it appears that there are always levels 0-9 even though it + # is only really executed based on the 3 steps listed above. + PRIORITY_CELERY_LOW = 9 + PRIORITY_CELERY_MID = 5 + PRIORITY_CELERY_HIGH = 0 + PRIORITY_CELERY_LOWEST_FUNC = max + PRIORITY_CELERY_HIGHEST_FUNC = min + CELERY_TASK_DEFAULT_PRIORITY = PRIORITY_CELERY_MID + else: + # No priority support assumed + PRIORITY_CELERY_LOW = 0 + PRIORITY_CELERY_MID = 0 + PRIORITY_CELERY_HIGH = 0 + PRIORITY_CELERY_LOWEST_FUNC = min + PRIORITY_CELERY_HIGHEST_FUNC = max else: CELERY_TASK_ALWAYS_EAGER = True + PRIORITY_CELERY_LOW = 0 + PRIORITY_CELERY_MID = 0 + PRIORITY_CELERY_HIGH = 0 + PRIORITY_CELERY_LOWEST_FUNC = min + PRIORITY_CELERY_HIGHEST_FUNC = max CACHE_TICKETS_HOURS = config.getint('cache', 'tickets', fallback=24 * 3)