Compare commits

...

2 Commits

Author SHA1 Message Date
Raphael Michel
2ac8f4bba7 Transactional safety for manual handling of sync jobs 2025-08-08 09:21:05 +02:00
Mira Weller
38b9457e28 Allow users to run sync jobs immediately 2025-08-07 14:32:41 +02:00
2 changed files with 55 additions and 22 deletions

View File

@@ -52,6 +52,22 @@ def periodic_reset_in_flight(sender, **kwargs):
sq.set_sync_error('timeout', [], 'Timeout')
def run_sync(queue):
grouped = groupby(sorted(queue, key=lambda q: (q.sync_provider, q.event.pk)), lambda q: (q.sync_provider, q.event))
for (target, event), queued_orders in grouped:
target_cls, meta = datasync_providers.get(identifier=target, active_in=event)
if not target_cls:
# sync plugin not found (plugin deactivated or uninstalled) -> drop outstanding jobs
num_deleted, _ = OrderSyncQueue.objects.filter(pk__in=[sq.pk for sq in queued_orders]).delete()
logger.info("Deleted %d queue entries from %r because plugin %s inactive", num_deleted, event, target)
continue
with scope(organizer=event.organizer):
with target_cls(event=event) as p:
p.sync_queued_orders(queued_orders)
@app.task()
def sync_all():
with scopes_disabled():
@@ -70,16 +86,20 @@ def sync_all():
.prefetch_related("event")
[:1000]
)
grouped = groupby(sorted(queue, key=lambda q: (q.sync_provider, q.event.pk)), lambda q: (q.sync_provider, q.event))
for (target, event), queued_orders in grouped:
target_cls, meta = datasync_providers.get(identifier=target, active_in=event)
run_sync(queue)
if not target_cls:
# sync plugin not found (plugin deactivated or uninstalled) -> drop outstanding jobs
num_deleted, _ = OrderSyncQueue.objects.filter(pk__in=[sq.pk for sq in queued_orders]).delete()
logger.info("Deleted %d queue entries from %r because plugin %s inactive", num_deleted, event, target)
continue
with scope(organizer=event.organizer):
with target_cls(event=event) as p:
p.sync_queued_orders(queued_orders)
@app.task()
def sync_single(queue_item_id: int):
with scopes_disabled():
queue = (
OrderSyncQueue.objects
.filter(
pk=queue_item_id,
in_flight=False,
not_before__lt=now(),
need_manual_retry__isnull=True,
)
.prefetch_related("event")
)
run_sync(queue)

View File

@@ -23,6 +23,7 @@
from itertools import groupby
from django.contrib import messages
from django.db import transaction
from django.db.models import Q
from django.dispatch import receiver
from django.http import HttpResponseNotAllowed
@@ -35,12 +36,14 @@ from django.views.generic import ListView
from pretix.base.datasync.datasync import datasync_providers
from pretix.base.models import Event, Order
from pretix.base.models.datasync import OrderSyncQueue
from pretix.base.services.datasync import sync_single
from pretix.control.permissions import (
AdministratorPermissionRequiredMixin, EventPermissionRequiredMixin,
OrganizerPermissionRequiredMixin,
)
from pretix.control.signals import order_info
from pretix.control.views.orders import OrderView
from pretix.helpers import OF_SELF
@receiver(order_info, dispatch_uid="datasync_control_order_info")
@@ -78,18 +81,28 @@ class ControlSyncJob(OrderView):
prov.enqueue_order(self.order, 'user')
messages.success(self.request, _('The sync job has been enqueued and will run in the next minutes.'))
elif self.request.POST.get("cancel_job"):
job = self.order.queued_sync_jobs.get(pk=self.request.POST.get("cancel_job"))
if job.in_flight:
messages.warning(self.request, _('The sync job is already in progress.'))
else:
job.delete()
messages.success(self.request, _('The sync job has been canceled.'))
with transaction.atomic():
job = self.order.queued_sync_jobs.select_for_update(of=OF_SELF).get(
pk=self.request.POST.get("cancel_job")
)
if job.in_flight:
messages.warning(self.request, _('The sync job is already in progress.'))
else:
job.delete()
messages.success(self.request, _('The sync job has been canceled.'))
elif self.request.POST.get("run_job_now"):
job = self.order.queued_sync_jobs.get(pk=self.request.POST.get("run_job_now"))
job.not_before = now()
job.need_manual_retry = None
job.save()
messages.success(self.request, _('The sync job has been set to run as soon as possible.'))
with transaction.atomic():
job = self.order.queued_sync_jobs.select_for_update(of=OF_SELF).get(
pk=self.request.POST.get("run_job_now")
)
if job.in_flight:
messages.success(self.request, _('The sync job is already in progress.'))
else:
job.not_before = now()
job.need_manual_retry = None
job.save()
sync_single.apply_async(args=(job.pk,))
messages.success(self.request, _('The sync job has been set to run as soon as possible.'))
return redirect(self.get_order_url())