diff --git a/src/pretix/base/services/datasync.py b/src/pretix/base/services/datasync.py index 6e327e207c..c4d48da7ea 100644 --- a/src/pretix/base/services/datasync.py +++ b/src/pretix/base/services/datasync.py @@ -52,6 +52,22 @@ def periodic_reset_in_flight(sender, **kwargs): sq.set_sync_error('timeout', [], 'Timeout') +def run_sync(queue): + grouped = groupby(sorted(queue, key=lambda q: (q.sync_provider, q.event.pk)), lambda q: (q.sync_provider, q.event)) + for (target, event), queued_orders in grouped: + target_cls, meta = datasync_providers.get(identifier=target, active_in=event) + + if not target_cls: + # sync plugin not found (plugin deactivated or uninstalled) -> drop outstanding jobs + num_deleted, _ = OrderSyncQueue.objects.filter(pk__in=[sq.pk for sq in queued_orders]).delete() + logger.info("Deleted %d queue entries from %r because plugin %s inactive", num_deleted, event, target) + continue + + with scope(organizer=event.organizer): + with target_cls(event=event) as p: + p.sync_queued_orders(queued_orders) + + @app.task() def sync_all(): with scopes_disabled(): @@ -70,16 +86,20 @@ def sync_all(): .prefetch_related("event") [:1000] ) - grouped = groupby(sorted(queue, key=lambda q: (q.sync_provider, q.event.pk)), lambda q: (q.sync_provider, q.event)) - for (target, event), queued_orders in grouped: - target_cls, meta = datasync_providers.get(identifier=target, active_in=event) + run_sync(queue) - if not target_cls: - # sync plugin not found (plugin deactivated or uninstalled) -> drop outstanding jobs - num_deleted, _ = OrderSyncQueue.objects.filter(pk__in=[sq.pk for sq in queued_orders]).delete() - logger.info("Deleted %d queue entries from %r because plugin %s inactive", num_deleted, event, target) - continue - with scope(organizer=event.organizer): - with target_cls(event=event) as p: - p.sync_queued_orders(queued_orders) +@app.task() +def sync_single(queue_item_id: int): + with scopes_disabled(): + queue = ( + OrderSyncQueue.objects + .filter( + pk=queue_item_id, + in_flight=False, + not_before__lt=now(), + need_manual_retry__isnull=True, + ) + .prefetch_related("event") + ) + run_sync(queue) diff --git a/src/pretix/control/views/datasync.py b/src/pretix/control/views/datasync.py index 02a1098429..6a8e3fcb26 100644 --- a/src/pretix/control/views/datasync.py +++ b/src/pretix/control/views/datasync.py @@ -23,6 +23,7 @@ from itertools import groupby from django.contrib import messages +from django.db import transaction from django.db.models import Q from django.dispatch import receiver from django.http import HttpResponseNotAllowed @@ -35,12 +36,14 @@ from django.views.generic import ListView from pretix.base.datasync.datasync import datasync_providers from pretix.base.models import Event, Order from pretix.base.models.datasync import OrderSyncQueue +from pretix.base.services.datasync import sync_single from pretix.control.permissions import ( AdministratorPermissionRequiredMixin, EventPermissionRequiredMixin, OrganizerPermissionRequiredMixin, ) from pretix.control.signals import order_info from pretix.control.views.orders import OrderView +from pretix.helpers import OF_SELF @receiver(order_info, dispatch_uid="datasync_control_order_info") @@ -78,18 +81,28 @@ class ControlSyncJob(OrderView): prov.enqueue_order(self.order, 'user') messages.success(self.request, _('The sync job has been enqueued and will run in the next minutes.')) elif self.request.POST.get("cancel_job"): - job = self.order.queued_sync_jobs.get(pk=self.request.POST.get("cancel_job")) - if job.in_flight: - messages.warning(self.request, _('The sync job is already in progress.')) - else: - job.delete() - messages.success(self.request, _('The sync job has been canceled.')) + with transaction.atomic(): + job = self.order.queued_sync_jobs.select_for_update(of=OF_SELF).get( + pk=self.request.POST.get("cancel_job") + ) + if job.in_flight: + messages.warning(self.request, _('The sync job is already in progress.')) + else: + job.delete() + messages.success(self.request, _('The sync job has been canceled.')) elif self.request.POST.get("run_job_now"): - job = self.order.queued_sync_jobs.get(pk=self.request.POST.get("run_job_now")) - job.not_before = now() - job.need_manual_retry = None - job.save() - messages.success(self.request, _('The sync job has been set to run as soon as possible.')) + with transaction.atomic(): + job = self.order.queued_sync_jobs.select_for_update(of=OF_SELF).get( + pk=self.request.POST.get("run_job_now") + ) + if job.in_flight: + messages.success(self.request, _('The sync job is already in progress.')) + else: + job.not_before = now() + job.need_manual_retry = None + job.save() + sync_single.apply_async(args=(job.pk,)) + messages.success(self.request, _('The sync job has been set to run as soon as possible.')) return redirect(self.get_order_url())