diff --git a/doc/api/webhooks.rst b/doc/api/webhooks.rst index c4937985e..e76db1cc9 100644 --- a/doc/api/webhooks.rst +++ b/doc/api/webhooks.rst @@ -92,9 +92,10 @@ If any other status code is returned, we will assume you did not receive the cal or ``304 Not Modified`` response will be treated as a failure. pretix will not follow any ``301`` or ``302`` redirect headers and pretix will ignore all other information in your response headers or body. -If we do not receive a status code in the range of ``200`` and ``299``, pretix will retry to deliver for up to three -days with an exponential back off. Therefore, we recommend that you implement your endpoint in a way where calling it -multiple times for the same event due to a perceived error does not do any harm. +If we do not receive a status code in the range of ``200`` and ``299`` or do not receive any response within a 30 second +time frame, pretix will retry to deliver for up to three days with an exponential back off. Therefore, we recommend that +you implement your endpoint in a way where calling it multiple times for the same event due to a perceived error does +not do any harm. There is only one exception: If status code ``410 Gone`` is returned, we will assume the endpoint does not exist any more and automatically disable the webhook. diff --git a/src/pretix/api/migrations/0008_webhookcallretry.py b/src/pretix/api/migrations/0008_webhookcallretry.py new file mode 100644 index 000000000..1dcf40b1d --- /dev/null +++ b/src/pretix/api/migrations/0008_webhookcallretry.py @@ -0,0 +1,29 @@ +# Generated by Django 3.2.12 on 2022-09-13 14:48 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('pretixbase', '0218_checkinlist_addon_match'), + ('pretixapi', '0007_alter_webhookcall_target_url'), + ] + + operations = [ + migrations.CreateModel( + name='WebHookCallRetry', + fields=[ + ('id', models.BigAutoField(primary_key=True, serialize=False)), + ('retry_not_before', models.DateTimeField(auto_now_add=True)), + ('retry_count', models.PositiveIntegerField(default=0)), + ('action_type', models.CharField(max_length=255)), + ('logentry', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='webhook_retries', to='pretixbase.logentry')), + ('webhook', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='retries', to='pretixapi.webhook')), + ], + options={ + 'unique_together': {('webhook', 'logentry')}, + }, + ), + ] diff --git a/src/pretix/api/models.py b/src/pretix/api/models.py index ed6784428..e25a9b0b9 100644 --- a/src/pretix/api/models.py +++ b/src/pretix/api/models.py @@ -133,6 +133,18 @@ class WebHookCall(models.Model): ordering = ("-datetime",) +class WebHookCallRetry(models.Model): + id = models.BigAutoField(primary_key=True) + webhook = models.ForeignKey('WebHook', on_delete=models.CASCADE, related_name='retries') + logentry = models.ForeignKey('pretixbase.LogEntry', on_delete=models.CASCADE, related_name='webhook_retries') + retry_not_before = models.DateTimeField(auto_now_add=True) + retry_count = models.PositiveIntegerField(default=0) + action_type = models.CharField(max_length=255) + + class Meta: + unique_together = (('webhook', 'logentry'),) + + class ApiCall(models.Model): idempotency_key = models.CharField(max_length=190, db_index=True) auth_hash = models.CharField(max_length=190, db_index=True) diff --git a/src/pretix/api/webhooks.py b/src/pretix/api/webhooks.py index 0c68ac96f..cf88ec4c5 100644 --- a/src/pretix/api/webhooks.py +++ b/src/pretix/api/webhooks.py @@ -23,19 +23,24 @@ import json import logging import time from collections import OrderedDict +from datetime import timedelta import requests -from celery.exceptions import MaxRetriesExceededError +from django.db import DatabaseError, connection, transaction from django.db.models import Exists, OuterRef, Q from django.dispatch import receiver +from django.utils.timezone import now from django.utils.translation import gettext_lazy as _, pgettext_lazy from django_scopes import scope, scopes_disabled from requests import RequestException -from pretix.api.models import WebHook, WebHookCall, WebHookEventListener +from pretix.api.models import ( + WebHook, WebHookCall, WebHookCallRetry, WebHookEventListener, +) from pretix.api.signals import register_webhook_events from pretix.base.models import LogEntry from pretix.base.services.tasks import ProfiledTask, TransactionAwareTask +from pretix.base.signals import periodic_task from pretix.celery_app import app logger = logging.getLogger(__name__) @@ -356,59 +361,162 @@ def notify_webhooks(logentry_ids: list): send_webhook.apply_async(args=(logentry.id, notification_type.action_type, wh.pk)) -@app.task(base=ProfiledTask, bind=True, max_retries=9, acks_late=True) -def send_webhook(self, logentry_id: int, action_type: str, webhook_id: int): - # 9 retries with 2**(2*x) timing is roughly 72 hours +@app.task(base=ProfiledTask, bind=True, max_retries=5, default_retry_delay=60, acks_late=True, autoretry_for=(DatabaseError,),) +def send_webhook(self, logentry_id: int, action_type: str, webhook_id: int, retry_count: int = 0): + """ + Sends out a specific webhook using adequate retry and error handling logic. + + Our retry logic is a little complex since we have different constraints here: + + 1. We historically documented that we retry for up to three days, so we want to keep that + promise. We want to use (approximately) exponentially increasing times to keep load + manageable. + + 2. We want to use Celery's ``acks_late=True`` options which prevents lost tasks if a worker + crashes. + + 3. A limitation of Celery's redis broker implementation is that it can not properly handle + tasks that *run or wait* longer than `visibility_timeout`, which defaults to 1h, when + ``acks_late`` is enabled. So any task with a *retry interval* of >1h will be restarted + many times because celery believes the worker has crashed. + + 4. We do like that the first few retries happen within a few seconds to work around very + intermittent connectivity issues quickly. For the longer retries with multiple hours, + we don't care if they are emitted a few minutes too late. + + We therefore have a two-phase retry process: + + - For all retry intervals below 5 minutes, which is the first 3 retries currently, we + schedule a new celery task directly with an increased retry_count. We do *not* use + celery's retry() call currently to make the retry process in both phases more similar, + there should not be much of a difference though (except that the initial task will be in + SUCCESS state, but we don't check that status anywhere). + + - For all retry intervals of at least 5 minutes, we create a database entry. Then, the + periodic task ``schedule_webhook_retries_on_celery`` will schedule celery tasks for them + once their time has come. + """ + retry_intervals = ( + 5, # + 5 seconds + 30, # + 30 seconds + 60, # + 1 minute + 300, # + 5 minutes + 1200, # + 20 minutes + 3600, # + 60 minutes + 1440, # + 4 hours + 21600, # + 6 hours + 43200, # + 12 hours + 43200, # + 24 hours + 86400, # + 24 hours + ) # added up, these are approximately 3 days, as documented + retry_celery_cutoff = 300 + with scopes_disabled(): webhook = WebHook.objects.get(id=webhook_id) - with scope(organizer=webhook.organizer): + + with scope(organizer=webhook.organizer), transaction.atomic(): logentry = LogEntry.all.get(id=logentry_id) types = get_all_webhook_events() event_type = types.get(action_type) if not event_type or not webhook.enabled: - return # Ignore, e.g. plugin not installed + return 'obsolete-webhook' # Ignore, e.g. plugin not installed payload = event_type.build_payload(logentry) if payload is None: # Content object deleted? - return + return 'obsolete-payload' t = time.time() try: - try: - resp = requests.post( - webhook.target_url, - json=payload, - allow_redirects=False + resp = requests.post( + webhook.target_url, + json=payload, + allow_redirects=False, + timeout=30, + ) + WebHookCall.objects.create( + webhook=webhook, + action_type=logentry.action_type, + target_url=webhook.target_url, + is_retry=self.request.retries > 0, + execution_time=time.time() - t, + return_code=resp.status_code, + payload=json.dumps(payload), + response_body=resp.text[:1024 * 1024], + success=200 <= resp.status_code <= 299 + ) + if resp.status_code == 410: + webhook.enabled = False + webhook.save() + return 'gone' + elif resp.status_code > 299: + if retry_count >= len(retry_intervals): + return 'retry-given-up' + elif retry_intervals[retry_count] < retry_celery_cutoff: + send_webhook.apply_async(args=(logentry_id, action_type, webhook_id, retry_count + 1)) + return 'retry-via-celery' + else: + webhook.retries.update_or_create( + logentry=logentry, + defaults=dict( + retry_not_before=now() + timedelta(seconds=retry_intervals[retry_count]), + retry_count=retry_count + 1, + action_type=action_type, + ), + ) + return 'retry-via-db' + return 'ok' + except RequestException as e: + WebHookCall.objects.create( + webhook=webhook, + action_type=logentry.action_type, + target_url=webhook.target_url, + is_retry=self.request.retries > 0, + execution_time=time.time() - t, + return_code=0, + payload=json.dumps(payload), + response_body=str(e)[:1024 * 1024] + ) + if retry_count >= len(retry_intervals): + return 'retry-given-up' + elif retry_intervals[retry_count] < retry_celery_cutoff: + send_webhook.apply_async(args=(logentry_id, action_type, webhook_id, retry_count + 1)) + return 'retry-via-celery' + else: + webhook.retries.update_or_create( + logentry=logentry, + defaults=dict( + retry_not_before=now() + timedelta(seconds=retry_intervals[retry_count]), + retry_count=retry_count + 1, + action_type=action_type, + ), ) - WebHookCall.objects.create( - webhook=webhook, - action_type=logentry.action_type, - target_url=webhook.target_url, - is_retry=self.request.retries > 0, - execution_time=time.time() - t, - return_code=resp.status_code, - payload=json.dumps(payload), - response_body=resp.text[:1024 * 1024], - success=200 <= resp.status_code <= 299 - ) - if resp.status_code == 410: - webhook.enabled = False - webhook.save() - elif resp.status_code > 299: - raise self.retry(countdown=2 ** (self.request.retries * 2)) # max is 2 ** (8*2) = 65536 seconds = ~18 hours - except RequestException as e: - WebHookCall.objects.create( - webhook=webhook, - action_type=logentry.action_type, - target_url=webhook.target_url, - is_retry=self.request.retries > 0, - execution_time=time.time() - t, - return_code=0, - payload=json.dumps(payload), - response_body=str(e)[:1024 * 1024] - ) - raise self.retry(countdown=2 ** (self.request.retries * 2)) # max is 2 ** (8*2) = 65536 seconds = ~18 hours - except MaxRetriesExceededError: - pass + return 'retry-via-db' + + +@app.task(base=TransactionAwareTask) +def manually_retry_all_calls(webhook_id: int): + with scopes_disabled(): + webhook = WebHook.objects.get(id=webhook_id) + with scope(organizer=webhook.organizer), transaction.atomic(): + for whcr in webhook.retries.select_for_update( + skip_locked=connection.features.has_select_for_update_skip_locked + ): + send_webhook.apply_async( + args=(whcr.logentry_id, whcr.action_type, whcr.webhook_id, whcr.retry_count), + ) + whcr.delete() + + +@receiver(signal=periodic_task, dispatch_uid='pretixapi_schedule_webhook_retries_on_celery') +@scopes_disabled() +def schedule_webhook_retries_on_celery(sender, **kwargs): + with transaction.atomic(): + for whcr in WebHookCallRetry.objects.select_for_update( + skip_locked=connection.features.has_select_for_update_skip_locked + ).filter(retry_not_before__lt=now()): + send_webhook.apply_async( + args=(whcr.logentry_id, whcr.action_type, whcr.webhook_id, whcr.retry_count), + ) + whcr.delete() diff --git a/src/pretix/control/logdisplay.py b/src/pretix/control/logdisplay.py index e099a3ec8..137ceba19 100644 --- a/src/pretix/control/logdisplay.py +++ b/src/pretix/control/logdisplay.py @@ -319,6 +319,8 @@ def pretixcontrol_logentry_display(sender: Event, logentry: LogEntry, **kwargs): 'pretix.giftcards.acceptance.removed': _('Gift card acceptance for another organizer has been removed.'), 'pretix.webhook.created': _('The webhook has been created.'), 'pretix.webhook.changed': _('The webhook has been changed.'), + 'pretix.webhook.retries.expedited': _('The webhook call retry jobs have been manually expedited.'), + 'pretix.webhook.retries.dropped': _('The webhook call retry jobs have been dropped.'), 'pretix.membershiptype.created': _('The membership type has been created.'), 'pretix.membershiptype.changed': _('The membership type has been changed.'), 'pretix.membershiptype.deleted': _('The membership type has been deleted.'), diff --git a/src/pretix/control/templates/pretixcontrol/organizers/webhook_logs.html b/src/pretix/control/templates/pretixcontrol/organizers/webhook_logs.html index af7d7e2f2..05fdec5f9 100644 --- a/src/pretix/control/templates/pretixcontrol/organizers/webhook_logs.html +++ b/src/pretix/control/templates/pretixcontrol/organizers/webhook_logs.html @@ -6,13 +6,42 @@

{% trans "This page shows all calls to your webhook in the past 30 days." %}

+ {% if retry_count %} +
+ {% csrf_token %} +
+

+ {% blocktranslate trimmed count count=retry_count %} + One webhook is scheduled to be retried. + {% plural %} + {{ count }} webhooks are scheduled to be retried. + {% endblocktranslate %} +

+

+ + +

+

+ {% blocktranslate trimmed with minutes=5 %} + Webhooks scheduled to be retried in less than {{ minutes }} minutes may not be listed here and can + no longer be stopped or expedited. + {% endblocktranslate %} +

+
+
+ {% endif %} {% for c in calls %}
{% if c.is_retry %} - + {% else %} {% endif %} diff --git a/src/pretix/control/views/organizer.py b/src/pretix/control/views/organizer.py index fe98d0e95..abfe0e07a 100644 --- a/src/pretix/control/views/organizer.py +++ b/src/pretix/control/views/organizer.py @@ -62,6 +62,7 @@ from django.views.generic import ( ) from pretix.api.models import WebHook +from pretix.api.webhooks import manually_retry_all_calls from pretix.base.auth import get_auth_backends from pretix.base.channels import get_all_sales_channels from pretix.base.i18n import language @@ -1252,6 +1253,7 @@ class WebHookLogsView(OrganizerDetailViewMixin, OrganizerPermissionRequiredMixin def get_context_data(self, **kwargs): ctx = super().get_context_data(**kwargs) ctx['webhook'] = self.webhook + ctx['retry_count'] = self.webhook.retries.count() return ctx @cached_property @@ -1263,6 +1265,25 @@ class WebHookLogsView(OrganizerDetailViewMixin, OrganizerPermissionRequiredMixin def get_queryset(self): return self.webhook.calls.order_by('-datetime') + def post(self, request, *args, **kwargs): + if request.POST.get("action") == "expedite": + self.request.organizer.log_action('pretix.webhook.retries.expedited', user=self.request.user, data={ + 'webhook': self.webhook.pk, + }) + manually_retry_all_calls.apply_async(args=(self.webhook.pk,)) + messages.success(request, _('All requests will now be scheduled for an immediate attempt. Please ' + 'allow for a few minutes before they are processed.')) + elif request.POST.get("action") == "drop": + self.request.organizer.log_action('pretix.webhook.retries.dropped', user=self.request.user, data={ + 'webhook': self.webhook.pk, + }) + self.webhook.retries.all().delete() + messages.success(request, _('All unprocessed webhooks have been stopped from retrying.')) + return redirect(reverse('control:organizer.webhook.logs', kwargs={ + 'organizer': self.request.organizer.slug, + 'webhook': self.webhook.pk, + })) + class GiftCardListView(OrganizerDetailViewMixin, OrganizerPermissionRequiredMixin, ListView): model = GiftCard