Add prioritization to webhook/notifications queue (#5513)

* Add prioritization to webhook/notifications queue

* Add missing code

* Missing license header

* Fix argument

* Use redis pipeline

* Update license header
This commit is contained in:
Raphael Michel
2025-12-02 09:13:01 +01:00
committed by GitHub
parent 1c907f6a6f
commit bdd94b1f8a
6 changed files with 144 additions and 7 deletions

View File

@@ -43,6 +43,7 @@ from pretix.base.services.tasks import ProfiledTask, TransactionAwareTask
from pretix.base.signals import periodic_task
from pretix.celery_app import app
from pretix.helpers import OF_SELF
from pretix.helpers.celery import get_task_priority
logger = logging.getLogger(__name__)
_ALL_EVENTS = None
@@ -474,7 +475,10 @@ def notify_webhooks(logentry_ids: list):
)
for wh in webhooks:
send_webhook.apply_async(args=(logentry.id, notification_type.action_type, wh.pk))
send_webhook.apply_async(
args=(logentry.id, notification_type.action_type, wh.pk),
priority=get_task_priority("notifications", logentry.organizer_id),
)
@app.task(base=ProfiledTask, bind=True, max_retries=5, default_retry_delay=60, acks_late=True, autoretry_for=(DatabaseError,),)

View File

@@ -31,6 +31,7 @@ from django.urls import reverse
from django.utils.crypto import get_random_string
from django.utils.functional import cached_property
from pretix.helpers.celery import get_task_priority
from pretix.helpers.json import CustomJSONEncoder
@@ -131,9 +132,15 @@ class LoggingMixin:
logentry.save()
if logentry.notification_type:
notify.apply_async(args=(logentry.pk,))
notify.apply_async(
args=(logentry.pk,),
priority=get_task_priority("notifications", logentry.organizer_id),
)
if logentry.webhook_type:
notify_webhooks.apply_async(args=(logentry.pk,))
notify_webhooks.apply_async(
args=(logentry.pk,),
priority=get_task_priority("notifications", logentry.organizer_id),
)
return logentry

View File

@@ -35,11 +35,14 @@
import json
import logging
from django.conf import settings
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.db import connections, models
from django.utils.functional import cached_property
from pretix.helpers.celery import get_task_priority
class VisibleOnlyManager(models.Manager):
def get_queryset(self):
@@ -186,7 +189,19 @@ class LogEntry(models.Model):
to_notify = [o.id for o in objects if o.notification_type]
if to_notify:
notify.apply_async(args=(to_notify,))
organizer_ids = set(o.organizer_id for o in objects if o.notification_type)
notify.apply_async(
args=(to_notify,),
priority=settings.PRIORITY_CELERY_HIGHEST_FUNC(
get_task_priority("notifications", oid) for oid in organizer_ids
),
)
to_wh = [o.id for o in objects if o.webhook_type]
if to_wh:
notify_webhooks.apply_async(args=(to_wh,))
organizer_ids = set(o.organizer_id for o in objects if o.webhook_type)
notify_webhooks.apply_async(
args=(to_wh,),
priority=settings.PRIORITY_CELERY_HIGHEST_FUNC(
get_task_priority("notifications", oid) for oid in organizer_ids
),
)

View File

@@ -32,6 +32,7 @@ from pretix.base.services.mail import mail_send_task
from pretix.base.services.tasks import ProfiledTask, TransactionAwareTask
from pretix.base.signals import notification
from pretix.celery_app import app
from pretix.helpers.celery import get_task_priority
from pretix.helpers.urls import build_absolute_uri
@@ -88,12 +89,18 @@ def notify(logentry_ids: list):
for um, enabled in notify_specific.items():
user, method = um
if enabled:
send_notification.apply_async(args=(logentry.id, notification_type.action_type, user.pk, method))
send_notification.apply_async(
args=(logentry.id, notification_type.action_type, user.pk, method),
priority=get_task_priority("notifications", logentry.organizer_id),
)
for um, enabled in notify_global.items():
user, method = um
if enabled and um not in notify_specific:
send_notification.apply_async(args=(logentry.id, notification_type.action_type, user.pk, method))
send_notification.apply_async(
args=(logentry.id, notification_type.action_type, user.pk, method),
priority=get_task_priority("notifications", logentry.organizer_id),
)
notification.send(logentry.event, logentry_id=logentry.id, notification_type=notification_type.action_type)

View File

@@ -0,0 +1,62 @@
#
# This file is part of pretix (Community Edition).
#
# Copyright (C) 2014-2020 Raphael Michel and contributors
# Copyright (C) 2020-today pretix GmbH and contributors
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation in version 3 of the License.
#
# ADDITIONAL TERMS APPLY: Pursuant to Section 7 of the GNU Affero General Public License, additional terms are
# applicable granting you additional permissions and placing additional restrictions on your usage of this software.
# Please refer to the pretix LICENSE file to obtain the full terms applicable to this work. If you did not receive
# this file, see <https://pretix.eu/about/en/license>.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
#
from django.conf import settings
THRESHOLD_DOWNGRADE_TO_MID = 50
THRESHOLD_DOWNGRADE_TO_LOW = 250
def get_task_priority(shard, organizer_id):
"""
This is an attempt to build a simple "fair-use" policy for webhooks and notifications. The problem is that when
one organizer creates e.g. 20,000 orders through the API, that might schedule 20,000 webhooks and every other
organizer will need to wait for these webhooks to go through.
We try to fix that by building three queues: high-prio, mid-prio, and low-prio. Every organizer starts in the
high-prio queue, and all their tasks are routed immediately. Once an organizer submits more than X jobs of a
certain type per minute, they get downgraded to the mid-prio queue, and then if they submit even more to the
low-prio queue. That way, if another organizer has "regular usage", they are prioritized over the organizer with
high load.
"""
from django_redis import get_redis_connection
if not settings.HAS_REDIS:
return settings.PRIORITY_CELERY_HIGH
# We use redis directly instead of the Django cache API since the Django cache API does not support INCR for
# nonexistant keys
rc = get_redis_connection("redis")
cache_key = f"pretix:task_priority:{shard}:{organizer_id}"
# Make sure counters expire after a while when not used
p = rc.pipeline()
p.incr(cache_key)
p.expire(cache_key, 60)
new_counter = p.execute()[0]
if new_counter >= THRESHOLD_DOWNGRADE_TO_LOW:
return settings.PRIORITY_CELERY_LOW
elif new_counter >= THRESHOLD_DOWNGRADE_TO_MID:
return settings.PRIORITY_CELERY_MID
else:
return settings.PRIORITY_CELERY_HIGH

View File

@@ -347,11 +347,53 @@ if HAS_CELERY:
CELERY_RESULT_BACKEND = config.get('celery', 'backend')
if HAS_CELERY_BROKER_TRANSPORT_OPTS:
CELERY_BROKER_TRANSPORT_OPTIONS = loads(config.get('celery', 'broker_transport_options'))
else:
CELERY_BROKER_TRANSPORT_OPTIONS = {}
if HAS_CELERY_BACKEND_TRANSPORT_OPTS:
CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS = loads(config.get('celery', 'backend_transport_options'))
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
if CELERY_BROKER_URL.startswith("amqp://"):
# https://docs.celeryq.dev/en/latest/userguide/routing.html#routing-options-rabbitmq-priorities
# Enable priorities for all queues
CELERY_TASK_QUEUE_MAX_PRIORITY = 3
# On RabbitMQ, higher number is higher priority, and having less levels makes rabbitmq use less CPU and RAM
PRIORITY_CELERY_LOW = 1
PRIORITY_CELERY_MID = 2
PRIORITY_CELERY_HIGH = 3
PRIORITY_CELERY_LOWEST_FUNC = min
PRIORITY_CELERY_HIGHEST_FUNC = max
# Set default
CELERY_TASK_DEFAULT_PRIORITY = PRIORITY_CELERY_MID
elif CELERY_BROKER_URL.startswith("redis://"):
# https://docs.celeryq.dev/en/latest/userguide/routing.html#redis-message-priorities
CELERY_BROKER_TRANSPORT_OPTIONS.update({
"queue_order_strategy": "priority",
"sep": ":",
"priority_steps": [0, 4, 8]
})
# On redis, lower number is higher priority, and it appears that there are always levels 0-9 even though it
# is only really executed based on the 3 steps listed above.
PRIORITY_CELERY_LOW = 9
PRIORITY_CELERY_MID = 5
PRIORITY_CELERY_HIGH = 0
PRIORITY_CELERY_LOWEST_FUNC = max
PRIORITY_CELERY_HIGHEST_FUNC = min
CELERY_TASK_DEFAULT_PRIORITY = PRIORITY_CELERY_MID
else:
# No priority support assumed
PRIORITY_CELERY_LOW = 0
PRIORITY_CELERY_MID = 0
PRIORITY_CELERY_HIGH = 0
PRIORITY_CELERY_LOWEST_FUNC = min
PRIORITY_CELERY_HIGHEST_FUNC = max
else:
CELERY_TASK_ALWAYS_EAGER = True
PRIORITY_CELERY_LOW = 0
PRIORITY_CELERY_MID = 0
PRIORITY_CELERY_HIGH = 0
PRIORITY_CELERY_LOWEST_FUNC = min
PRIORITY_CELERY_HIGHEST_FUNC = max
CACHE_TICKETS_HOURS = config.getint('cache', 'tickets', fallback=24 * 3)