mirror of
https://github.com/pretix/pretix.git
synced 2026-05-21 17:54:08 +00:00
Periodic tasks
This commit is contained in:
@@ -128,11 +128,11 @@ class OutgoingMail(models.Model):
|
||||
should_attach_tickets = models.BooleanField(default=False)
|
||||
should_attach_ical = models.BooleanField(default=False)
|
||||
|
||||
# We need to make sure cached files are kept as l
|
||||
# clean_cached_files makes sure not to delete these as long as the email is in a retryable state
|
||||
should_attach_cached_files = models.ManyToManyField(
|
||||
'pretixbase.CachedFile',
|
||||
related_name='outgoing_mails',
|
||||
) # todo: prevent deletion?
|
||||
)
|
||||
|
||||
# This is used to send files stored in settings. In most cases, these aren't short-lived and should still be there
|
||||
# if the email is sent. Otherwise, they will be skipped. We accept that risk.
|
||||
|
||||
@@ -23,11 +23,12 @@ from datetime import timedelta
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.management import call_command
|
||||
from django.db.models import Exists, OuterRef
|
||||
from django.dispatch import receiver
|
||||
from django.utils.timezone import now
|
||||
from django_scopes import scopes_disabled
|
||||
|
||||
from pretix.base.models import CachedCombinedTicket, CachedTicket
|
||||
from pretix.base.models import CachedCombinedTicket, CachedTicket, OutgoingMail
|
||||
from pretix.base.models.customers import CustomerSSOGrant
|
||||
|
||||
from ..models import CachedFile, CartPosition, InvoiceAddress
|
||||
@@ -49,7 +50,18 @@ def clean_cart_positions(sender, **kwargs):
|
||||
@receiver(signal=periodic_task)
|
||||
@scopes_disabled()
|
||||
def clean_cached_files(sender, **kwargs):
|
||||
for cf in CachedFile.objects.filter(expires__isnull=False, expires__lt=now()):
|
||||
has_queued_email = Exists(
|
||||
OutgoingMail.objects.filter(
|
||||
should_attach_cached_files__pk=OuterRef("pk"),
|
||||
status__in=(
|
||||
OutgoingMail.STATUS_QUEUED,
|
||||
OutgoingMail.STATUS_INFLIGHT,
|
||||
OutgoingMail.STATUS_AWAWITING_RETRY,
|
||||
OutgoingMail.STATUS_FAILED,
|
||||
),
|
||||
)
|
||||
)
|
||||
for cf in CachedFile.objects.filter(expires__isnull=False, expires__lt=now()).exclude(has_queued_email):
|
||||
cf.delete()
|
||||
|
||||
|
||||
|
||||
@@ -54,11 +54,14 @@ from django.conf import settings
|
||||
from django.core.files.storage import default_storage
|
||||
from django.core.mail import EmailMultiAlternatives, SafeMIMEMultipart
|
||||
from django.core.mail.message import SafeMIMEText
|
||||
from django.db import transaction
|
||||
from django.db import connection, transaction
|
||||
from django.db.models import Q
|
||||
from django.dispatch import receiver
|
||||
from django.template.loader import get_template
|
||||
from django.utils.html import escape
|
||||
from django.utils.timezone import now, override
|
||||
from django.utils.translation import gettext as _, pgettext
|
||||
from django_scopes import scopes_disabled
|
||||
from i18nfield.strings import LazyI18nString
|
||||
from text_unidecode import unidecode
|
||||
|
||||
@@ -72,7 +75,9 @@ from pretix.base.models.mail import OutgoingMail
|
||||
from pretix.base.services.invoices import invoice_pdf_task
|
||||
from pretix.base.services.tasks import TransactionAwareTask
|
||||
from pretix.base.services.tickets import get_tickets_for_order
|
||||
from pretix.base.signals import email_filter, global_email_filter
|
||||
from pretix.base.signals import (
|
||||
email_filter, global_email_filter, periodic_task,
|
||||
)
|
||||
from pretix.celery_app import app
|
||||
from pretix.helpers import OF_SELF
|
||||
from pretix.helpers.format import SafeFormatter, format_map
|
||||
@@ -910,3 +915,80 @@ def _format_error(e: Exception):
|
||||
return 'SMTP recipients refudes', '\n'.join(message)
|
||||
else:
|
||||
return 'Internal error', str(e)
|
||||
|
||||
|
||||
def _is_queue_long(queue_name="mail"):
|
||||
"""
|
||||
Checks an estimate if there is currently a long celery queue for emails. If so,
|
||||
there's no reason to retry stuck emails, because they are stuck because of the
|
||||
queue and we don't need to add more oil to the fire.
|
||||
|
||||
This does not need to be perfect, as it is safe to run the same task twice, it just
|
||||
wastes ressources.
|
||||
"""
|
||||
if not settings.HAS_CELERY:
|
||||
return False
|
||||
if not settings.CELERY_BROKER_URL.startswith("redis://"):
|
||||
return False # check not supported
|
||||
priority_steps = settings.CELERY_BROKER_TRANSPORT_OPTIONS.get("priority_steps", [0])
|
||||
sep = settings.CELERY_BROKER_TRANSPORT_OPTIONS.get("sep", ":")
|
||||
client = app.broker_connection().channel().client
|
||||
queue_length = 0
|
||||
for prio in priority_steps:
|
||||
if prio:
|
||||
qname = f"{queue_name}{sep}{prio}"
|
||||
else:
|
||||
qname = queue_name
|
||||
queue_length += client.llen(qname)
|
||||
|
||||
return queue_length > 100
|
||||
|
||||
|
||||
@receiver(signal=periodic_task)
|
||||
@scopes_disabled()
|
||||
def retry_stuck_inflight_mails(sender, **kwargs):
|
||||
"""
|
||||
Retry emails that are stuck in "inflight" state, e.g. their celery task just died.
|
||||
"""
|
||||
with transaction.atomic():
|
||||
for m in OutgoingMail.objects.filter(
|
||||
status=OutgoingMail.STATUS_INFLIGHT,
|
||||
inflight_since__lt=now() - timedelta(hours=1),
|
||||
).select_for_update(of=OF_SELF, skip_locked=connection.features.has_select_for_update_skip_locked):
|
||||
m.status = OutgoingMail.STATUS_QUEUED
|
||||
m.save()
|
||||
mail_send_task.apply_async(kwargs={"outgoing_mail": m.pk})
|
||||
|
||||
|
||||
@receiver(signal=periodic_task)
|
||||
@scopes_disabled()
|
||||
def retry_stuck_queued_mails(sender, **kwargs):
|
||||
"""
|
||||
Retry emails that are stuck in "queued" state, e.g. their celery task never started. We do this only
|
||||
when there is currently almost no queue, to avoid many tasks being scheduled for the same mail if that
|
||||
mail is still waiting in the queue (even if that would be safe, all tasks except the first one would be a no-op,
|
||||
but it would create many more useless tasks in a high-load situation).
|
||||
"""
|
||||
if _is_queue_long():
|
||||
logger.info("Do not retry stuck mails as the queue is long.")
|
||||
return
|
||||
|
||||
for m in OutgoingMail.objects.filter(
|
||||
status=OutgoingMail.STATUS_QUEUED,
|
||||
created__lt=now() - timedelta(hours=1),
|
||||
):
|
||||
mail_send_task.apply_async(kwargs={"outgoing_mail": m.pk})
|
||||
|
||||
|
||||
@receiver(signal=periodic_task)
|
||||
@scopes_disabled()
|
||||
def delete_old_emails(sender, **kwargs):
|
||||
"""
|
||||
OutgoingMail is currently not intended to be an archive, because it would be hard to do in a
|
||||
privacy-first design, so we delete after some time.
|
||||
"""
|
||||
cutoff = now() - timedelta(seconds=settings.OUTGOING_MAIL_RETENTION)
|
||||
OutgoingMail.objects.filter(
|
||||
Q(sent__lt=cutoff) |
|
||||
Q(sent__isnull=True, created__lt=cutoff)
|
||||
).delete()
|
||||
|
||||
@@ -51,7 +51,7 @@ from pretix.api.serializers.waitinglist import WaitingListSerializer
|
||||
from pretix.base.i18n import LazyLocaleException
|
||||
from pretix.base.models import (
|
||||
CachedCombinedTicket, CachedTicket, Event, InvoiceAddress, OrderPayment,
|
||||
OrderPosition, OrderRefund, QuestionAnswer,
|
||||
OrderPosition, OrderRefund, OutgoingMail, QuestionAnswer,
|
||||
)
|
||||
from pretix.base.services.invoices import invoice_pdf_task
|
||||
from pretix.base.signals import register_data_shredders
|
||||
@@ -329,6 +329,10 @@ class EmailAddressShredder(BaseDataShredder):
|
||||
sleep_time=2,
|
||||
)
|
||||
|
||||
slow_delete(
|
||||
OutgoingMail.objects.filter(event=self.event)
|
||||
)
|
||||
|
||||
for o in _progress_helper(qs_orders, progress_callback, qs_op_cnt, total):
|
||||
changed = bool(o.email) or bool(o.customer)
|
||||
o.email = None
|
||||
|
||||
@@ -855,6 +855,8 @@ COUNTRIES_OVERRIDE = {
|
||||
DATA_UPLOAD_MAX_NUMBER_FIELDS = 25000
|
||||
DATA_UPLOAD_MAX_MEMORY_SIZE = 10 * 1024 * 1024 # 10 MB
|
||||
|
||||
OUTGOING_MAIL_RETENTION = 14 * 24 * 3600 # 14 days in seonds
|
||||
|
||||
# File sizes are in MiB
|
||||
FILE_UPLOAD_MAX_SIZE_IMAGE = 1024 * 1024 * config.getint("pretix_file_upload", "max_size_image", fallback=10)
|
||||
FILE_UPLOAD_MAX_SIZE_FAVICON = 1024 * 1024 * config.getint("pretix_file_upload", "max_size_favicon", fallback=1)
|
||||
|
||||
@@ -31,7 +31,7 @@ from django_scopes import scope
|
||||
|
||||
from pretix.base.models import (
|
||||
CachedCombinedTicket, CachedTicket, Event, InvoiceAddress, Order,
|
||||
OrderPayment, OrderPosition, Organizer, QuestionAnswer,
|
||||
OrderPayment, OrderPosition, Organizer, OutgoingMail, QuestionAnswer,
|
||||
)
|
||||
from pretix.base.services.invoices import generate_invoice, invoice_pdf_task
|
||||
from pretix.base.services.tickets import generate
|
||||
@@ -111,6 +111,15 @@ def test_email_shredder(event, order):
|
||||
'new_email': 'foo@bar.com',
|
||||
}
|
||||
)
|
||||
m = OutgoingMail.objects.create(
|
||||
event=event,
|
||||
order=order,
|
||||
to=['recipient@example.com'],
|
||||
subject='Test',
|
||||
body_plain='Test',
|
||||
sender='sender@example.com',
|
||||
headers={},
|
||||
)
|
||||
|
||||
s = EmailAddressShredder(event)
|
||||
f = list(s.generate_files())
|
||||
@@ -129,6 +138,7 @@ def test_email_shredder(event, order):
|
||||
assert 'Foo' not in l1.data
|
||||
l2.refresh_from_db()
|
||||
assert '@' not in l2.data
|
||||
assert not OutgoingMail.objects.filter(pk=m.pk).exists()
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
|
||||
Reference in New Issue
Block a user