New retry logic for webhooks (#2790)

Co-authored-by: Richard Schreiber <wiffbi@gmail.com>
This commit is contained in:
Raphael Michel
2022-09-15 09:41:39 +02:00
committed by GitHub
parent 1a401ec1e9
commit c1233ed692
7 changed files with 249 additions and 47 deletions

View File

@@ -23,19 +23,24 @@ import json
import logging
import time
from collections import OrderedDict
from datetime import timedelta
import requests
from celery.exceptions import MaxRetriesExceededError
from django.db import DatabaseError, connection, transaction
from django.db.models import Exists, OuterRef, Q
from django.dispatch import receiver
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _, pgettext_lazy
from django_scopes import scope, scopes_disabled
from requests import RequestException
from pretix.api.models import WebHook, WebHookCall, WebHookEventListener
from pretix.api.models import (
WebHook, WebHookCall, WebHookCallRetry, WebHookEventListener,
)
from pretix.api.signals import register_webhook_events
from pretix.base.models import LogEntry
from pretix.base.services.tasks import ProfiledTask, TransactionAwareTask
from pretix.base.signals import periodic_task
from pretix.celery_app import app
logger = logging.getLogger(__name__)
@@ -356,59 +361,162 @@ def notify_webhooks(logentry_ids: list):
send_webhook.apply_async(args=(logentry.id, notification_type.action_type, wh.pk))
@app.task(base=ProfiledTask, bind=True, max_retries=9, acks_late=True)
def send_webhook(self, logentry_id: int, action_type: str, webhook_id: int):
# 9 retries with 2**(2*x) timing is roughly 72 hours
@app.task(base=ProfiledTask, bind=True, max_retries=5, default_retry_delay=60, acks_late=True, autoretry_for=(DatabaseError,),)
def send_webhook(self, logentry_id: int, action_type: str, webhook_id: int, retry_count: int = 0):
"""
Sends out a specific webhook using adequate retry and error handling logic.
Our retry logic is a little complex since we have different constraints here:
1. We historically documented that we retry for up to three days, so we want to keep that
promise. We want to use (approximately) exponentially increasing times to keep load
manageable.
2. We want to use Celery's ``acks_late=True`` options which prevents lost tasks if a worker
crashes.
3. A limitation of Celery's redis broker implementation is that it can not properly handle
tasks that *run or wait* longer than `visibility_timeout`, which defaults to 1h, when
``acks_late`` is enabled. So any task with a *retry interval* of >1h will be restarted
many times because celery believes the worker has crashed.
4. We do like that the first few retries happen within a few seconds to work around very
intermittent connectivity issues quickly. For the longer retries with multiple hours,
we don't care if they are emitted a few minutes too late.
We therefore have a two-phase retry process:
- For all retry intervals below 5 minutes, which is the first 3 retries currently, we
schedule a new celery task directly with an increased retry_count. We do *not* use
celery's retry() call currently to make the retry process in both phases more similar,
there should not be much of a difference though (except that the initial task will be in
SUCCESS state, but we don't check that status anywhere).
- For all retry intervals of at least 5 minutes, we create a database entry. Then, the
periodic task ``schedule_webhook_retries_on_celery`` will schedule celery tasks for them
once their time has come.
"""
retry_intervals = (
5, # + 5 seconds
30, # + 30 seconds
60, # + 1 minute
300, # + 5 minutes
1200, # + 20 minutes
3600, # + 60 minutes
1440, # + 4 hours
21600, # + 6 hours
43200, # + 12 hours
43200, # + 24 hours
86400, # + 24 hours
) # added up, these are approximately 3 days, as documented
retry_celery_cutoff = 300
with scopes_disabled():
webhook = WebHook.objects.get(id=webhook_id)
with scope(organizer=webhook.organizer):
with scope(organizer=webhook.organizer), transaction.atomic():
logentry = LogEntry.all.get(id=logentry_id)
types = get_all_webhook_events()
event_type = types.get(action_type)
if not event_type or not webhook.enabled:
return # Ignore, e.g. plugin not installed
return 'obsolete-webhook' # Ignore, e.g. plugin not installed
payload = event_type.build_payload(logentry)
if payload is None:
# Content object deleted?
return
return 'obsolete-payload'
t = time.time()
try:
try:
resp = requests.post(
webhook.target_url,
json=payload,
allow_redirects=False
resp = requests.post(
webhook.target_url,
json=payload,
allow_redirects=False,
timeout=30,
)
WebHookCall.objects.create(
webhook=webhook,
action_type=logentry.action_type,
target_url=webhook.target_url,
is_retry=self.request.retries > 0,
execution_time=time.time() - t,
return_code=resp.status_code,
payload=json.dumps(payload),
response_body=resp.text[:1024 * 1024],
success=200 <= resp.status_code <= 299
)
if resp.status_code == 410:
webhook.enabled = False
webhook.save()
return 'gone'
elif resp.status_code > 299:
if retry_count >= len(retry_intervals):
return 'retry-given-up'
elif retry_intervals[retry_count] < retry_celery_cutoff:
send_webhook.apply_async(args=(logentry_id, action_type, webhook_id, retry_count + 1))
return 'retry-via-celery'
else:
webhook.retries.update_or_create(
logentry=logentry,
defaults=dict(
retry_not_before=now() + timedelta(seconds=retry_intervals[retry_count]),
retry_count=retry_count + 1,
action_type=action_type,
),
)
return 'retry-via-db'
return 'ok'
except RequestException as e:
WebHookCall.objects.create(
webhook=webhook,
action_type=logentry.action_type,
target_url=webhook.target_url,
is_retry=self.request.retries > 0,
execution_time=time.time() - t,
return_code=0,
payload=json.dumps(payload),
response_body=str(e)[:1024 * 1024]
)
if retry_count >= len(retry_intervals):
return 'retry-given-up'
elif retry_intervals[retry_count] < retry_celery_cutoff:
send_webhook.apply_async(args=(logentry_id, action_type, webhook_id, retry_count + 1))
return 'retry-via-celery'
else:
webhook.retries.update_or_create(
logentry=logentry,
defaults=dict(
retry_not_before=now() + timedelta(seconds=retry_intervals[retry_count]),
retry_count=retry_count + 1,
action_type=action_type,
),
)
WebHookCall.objects.create(
webhook=webhook,
action_type=logentry.action_type,
target_url=webhook.target_url,
is_retry=self.request.retries > 0,
execution_time=time.time() - t,
return_code=resp.status_code,
payload=json.dumps(payload),
response_body=resp.text[:1024 * 1024],
success=200 <= resp.status_code <= 299
)
if resp.status_code == 410:
webhook.enabled = False
webhook.save()
elif resp.status_code > 299:
raise self.retry(countdown=2 ** (self.request.retries * 2)) # max is 2 ** (8*2) = 65536 seconds = ~18 hours
except RequestException as e:
WebHookCall.objects.create(
webhook=webhook,
action_type=logentry.action_type,
target_url=webhook.target_url,
is_retry=self.request.retries > 0,
execution_time=time.time() - t,
return_code=0,
payload=json.dumps(payload),
response_body=str(e)[:1024 * 1024]
)
raise self.retry(countdown=2 ** (self.request.retries * 2)) # max is 2 ** (8*2) = 65536 seconds = ~18 hours
except MaxRetriesExceededError:
pass
return 'retry-via-db'
@app.task(base=TransactionAwareTask)
def manually_retry_all_calls(webhook_id: int):
with scopes_disabled():
webhook = WebHook.objects.get(id=webhook_id)
with scope(organizer=webhook.organizer), transaction.atomic():
for whcr in webhook.retries.select_for_update(
skip_locked=connection.features.has_select_for_update_skip_locked
):
send_webhook.apply_async(
args=(whcr.logentry_id, whcr.action_type, whcr.webhook_id, whcr.retry_count),
)
whcr.delete()
@receiver(signal=periodic_task, dispatch_uid='pretixapi_schedule_webhook_retries_on_celery')
@scopes_disabled()
def schedule_webhook_retries_on_celery(sender, **kwargs):
with transaction.atomic():
for whcr in WebHookCallRetry.objects.select_for_update(
skip_locked=connection.features.has_select_for_update_skip_locked
).filter(retry_not_before__lt=now()):
send_webhook.apply_async(
args=(whcr.logentry_id, whcr.action_type, whcr.webhook_id, whcr.retry_count),
)
whcr.delete()