From 7889f7636fd29a402f2d6197c5708cb3cae7d704 Mon Sep 17 00:00:00 2001 From: Raphael Michel Date: Fri, 23 Jan 2026 12:41:44 +0100 Subject: [PATCH] Periodic tasks --- src/pretix/base/models/mail.py | 4 +- src/pretix/base/services/cleanup.py | 16 +++++- src/pretix/base/services/mail.py | 86 ++++++++++++++++++++++++++++- src/pretix/base/shredder.py | 6 +- src/pretix/settings.py | 2 + src/tests/base/test_shredders.py | 12 +++- 6 files changed, 118 insertions(+), 8 deletions(-) diff --git a/src/pretix/base/models/mail.py b/src/pretix/base/models/mail.py index 4a75f9fbcd..50cd15d2cf 100644 --- a/src/pretix/base/models/mail.py +++ b/src/pretix/base/models/mail.py @@ -128,11 +128,11 @@ class OutgoingMail(models.Model): should_attach_tickets = models.BooleanField(default=False) should_attach_ical = models.BooleanField(default=False) - # We need to make sure cached files are kept as l + # clean_cached_files makes sure not to delete these as long as the email is in a retryable state should_attach_cached_files = models.ManyToManyField( 'pretixbase.CachedFile', related_name='outgoing_mails', - ) # todo: prevent deletion? + ) # This is used to send files stored in settings. In most cases, these aren't short-lived and should still be there # if the email is sent. Otherwise, they will be skipped. We accept that risk. diff --git a/src/pretix/base/services/cleanup.py b/src/pretix/base/services/cleanup.py index 6667b2c3f5..0142f95752 100644 --- a/src/pretix/base/services/cleanup.py +++ b/src/pretix/base/services/cleanup.py @@ -23,11 +23,12 @@ from datetime import timedelta from django.conf import settings from django.core.management import call_command +from django.db.models import Exists, OuterRef from django.dispatch import receiver from django.utils.timezone import now from django_scopes import scopes_disabled -from pretix.base.models import CachedCombinedTicket, CachedTicket +from pretix.base.models import CachedCombinedTicket, CachedTicket, OutgoingMail from pretix.base.models.customers import CustomerSSOGrant from ..models import CachedFile, CartPosition, InvoiceAddress @@ -49,7 +50,18 @@ def clean_cart_positions(sender, **kwargs): @receiver(signal=periodic_task) @scopes_disabled() def clean_cached_files(sender, **kwargs): - for cf in CachedFile.objects.filter(expires__isnull=False, expires__lt=now()): + has_queued_email = Exists( + OutgoingMail.objects.filter( + should_attach_cached_files__pk=OuterRef("pk"), + status__in=( + OutgoingMail.STATUS_QUEUED, + OutgoingMail.STATUS_INFLIGHT, + OutgoingMail.STATUS_AWAWITING_RETRY, + OutgoingMail.STATUS_FAILED, + ), + ) + ) + for cf in CachedFile.objects.filter(expires__isnull=False, expires__lt=now()).exclude(has_queued_email): cf.delete() diff --git a/src/pretix/base/services/mail.py b/src/pretix/base/services/mail.py index df51bfbb9a..d0b9352b2f 100644 --- a/src/pretix/base/services/mail.py +++ b/src/pretix/base/services/mail.py @@ -54,11 +54,14 @@ from django.conf import settings from django.core.files.storage import default_storage from django.core.mail import EmailMultiAlternatives, SafeMIMEMultipart from django.core.mail.message import SafeMIMEText -from django.db import transaction +from django.db import connection, transaction +from django.db.models import Q +from django.dispatch import receiver from django.template.loader import get_template from django.utils.html import escape from django.utils.timezone import now, override from django.utils.translation import gettext as _, pgettext +from django_scopes import scopes_disabled from i18nfield.strings import LazyI18nString from text_unidecode import unidecode @@ -72,7 +75,9 @@ from pretix.base.models.mail import OutgoingMail from pretix.base.services.invoices import invoice_pdf_task from pretix.base.services.tasks import TransactionAwareTask from pretix.base.services.tickets import get_tickets_for_order -from pretix.base.signals import email_filter, global_email_filter +from pretix.base.signals import ( + email_filter, global_email_filter, periodic_task, +) from pretix.celery_app import app from pretix.helpers import OF_SELF from pretix.helpers.format import SafeFormatter, format_map @@ -910,3 +915,80 @@ def _format_error(e: Exception): return 'SMTP recipients refudes', '\n'.join(message) else: return 'Internal error', str(e) + + +def _is_queue_long(queue_name="mail"): + """ + Checks an estimate if there is currently a long celery queue for emails. If so, + there's no reason to retry stuck emails, because they are stuck because of the + queue and we don't need to add more oil to the fire. + + This does not need to be perfect, as it is safe to run the same task twice, it just + wastes ressources. + """ + if not settings.HAS_CELERY: + return False + if not settings.CELERY_BROKER_URL.startswith("redis://"): + return False # check not supported + priority_steps = settings.CELERY_BROKER_TRANSPORT_OPTIONS.get("priority_steps", [0]) + sep = settings.CELERY_BROKER_TRANSPORT_OPTIONS.get("sep", ":") + client = app.broker_connection().channel().client + queue_length = 0 + for prio in priority_steps: + if prio: + qname = f"{queue_name}{sep}{prio}" + else: + qname = queue_name + queue_length += client.llen(qname) + + return queue_length > 100 + + +@receiver(signal=periodic_task) +@scopes_disabled() +def retry_stuck_inflight_mails(sender, **kwargs): + """ + Retry emails that are stuck in "inflight" state, e.g. their celery task just died. + """ + with transaction.atomic(): + for m in OutgoingMail.objects.filter( + status=OutgoingMail.STATUS_INFLIGHT, + inflight_since__lt=now() - timedelta(hours=1), + ).select_for_update(of=OF_SELF, skip_locked=connection.features.has_select_for_update_skip_locked): + m.status = OutgoingMail.STATUS_QUEUED + m.save() + mail_send_task.apply_async(kwargs={"outgoing_mail": m.pk}) + + +@receiver(signal=periodic_task) +@scopes_disabled() +def retry_stuck_queued_mails(sender, **kwargs): + """ + Retry emails that are stuck in "queued" state, e.g. their celery task never started. We do this only + when there is currently almost no queue, to avoid many tasks being scheduled for the same mail if that + mail is still waiting in the queue (even if that would be safe, all tasks except the first one would be a no-op, + but it would create many more useless tasks in a high-load situation). + """ + if _is_queue_long(): + logger.info("Do not retry stuck mails as the queue is long.") + return + + for m in OutgoingMail.objects.filter( + status=OutgoingMail.STATUS_QUEUED, + created__lt=now() - timedelta(hours=1), + ): + mail_send_task.apply_async(kwargs={"outgoing_mail": m.pk}) + + +@receiver(signal=periodic_task) +@scopes_disabled() +def delete_old_emails(sender, **kwargs): + """ + OutgoingMail is currently not intended to be an archive, because it would be hard to do in a + privacy-first design, so we delete after some time. + """ + cutoff = now() - timedelta(seconds=settings.OUTGOING_MAIL_RETENTION) + OutgoingMail.objects.filter( + Q(sent__lt=cutoff) | + Q(sent__isnull=True, created__lt=cutoff) + ).delete() diff --git a/src/pretix/base/shredder.py b/src/pretix/base/shredder.py index a8110e9874..21b2803053 100644 --- a/src/pretix/base/shredder.py +++ b/src/pretix/base/shredder.py @@ -51,7 +51,7 @@ from pretix.api.serializers.waitinglist import WaitingListSerializer from pretix.base.i18n import LazyLocaleException from pretix.base.models import ( CachedCombinedTicket, CachedTicket, Event, InvoiceAddress, OrderPayment, - OrderPosition, OrderRefund, QuestionAnswer, + OrderPosition, OrderRefund, OutgoingMail, QuestionAnswer, ) from pretix.base.services.invoices import invoice_pdf_task from pretix.base.signals import register_data_shredders @@ -329,6 +329,10 @@ class EmailAddressShredder(BaseDataShredder): sleep_time=2, ) + slow_delete( + OutgoingMail.objects.filter(event=self.event) + ) + for o in _progress_helper(qs_orders, progress_callback, qs_op_cnt, total): changed = bool(o.email) or bool(o.customer) o.email = None diff --git a/src/pretix/settings.py b/src/pretix/settings.py index 627275449a..f02c840e68 100644 --- a/src/pretix/settings.py +++ b/src/pretix/settings.py @@ -855,6 +855,8 @@ COUNTRIES_OVERRIDE = { DATA_UPLOAD_MAX_NUMBER_FIELDS = 25000 DATA_UPLOAD_MAX_MEMORY_SIZE = 10 * 1024 * 1024 # 10 MB +OUTGOING_MAIL_RETENTION = 14 * 24 * 3600 # 14 days in seonds + # File sizes are in MiB FILE_UPLOAD_MAX_SIZE_IMAGE = 1024 * 1024 * config.getint("pretix_file_upload", "max_size_image", fallback=10) FILE_UPLOAD_MAX_SIZE_FAVICON = 1024 * 1024 * config.getint("pretix_file_upload", "max_size_favicon", fallback=1) diff --git a/src/tests/base/test_shredders.py b/src/tests/base/test_shredders.py index b8ebd7828c..d2f0c59c8e 100644 --- a/src/tests/base/test_shredders.py +++ b/src/tests/base/test_shredders.py @@ -31,7 +31,7 @@ from django_scopes import scope from pretix.base.models import ( CachedCombinedTicket, CachedTicket, Event, InvoiceAddress, Order, - OrderPayment, OrderPosition, Organizer, QuestionAnswer, + OrderPayment, OrderPosition, Organizer, OutgoingMail, QuestionAnswer, ) from pretix.base.services.invoices import generate_invoice, invoice_pdf_task from pretix.base.services.tickets import generate @@ -111,6 +111,15 @@ def test_email_shredder(event, order): 'new_email': 'foo@bar.com', } ) + m = OutgoingMail.objects.create( + event=event, + order=order, + to=['recipient@example.com'], + subject='Test', + body_plain='Test', + sender='sender@example.com', + headers={}, + ) s = EmailAddressShredder(event) f = list(s.generate_files()) @@ -129,6 +138,7 @@ def test_email_shredder(event, order): assert 'Foo' not in l1.data l2.refresh_from_db() assert '@' not in l2.data + assert not OutgoingMail.objects.filter(pk=m.pk).exists() @pytest.mark.django_db