mirror of
https://github.com/pretix/pretix.git
synced 2025-12-19 16:22:26 +00:00
Compare commits
2 Commits
fix-datasy
...
sync-singl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2ac8f4bba7 | ||
|
|
38b9457e28 |
@@ -52,6 +52,22 @@ def periodic_reset_in_flight(sender, **kwargs):
|
|||||||
sq.set_sync_error('timeout', [], 'Timeout')
|
sq.set_sync_error('timeout', [], 'Timeout')
|
||||||
|
|
||||||
|
|
||||||
|
def run_sync(queue):
|
||||||
|
grouped = groupby(sorted(queue, key=lambda q: (q.sync_provider, q.event.pk)), lambda q: (q.sync_provider, q.event))
|
||||||
|
for (target, event), queued_orders in grouped:
|
||||||
|
target_cls, meta = datasync_providers.get(identifier=target, active_in=event)
|
||||||
|
|
||||||
|
if not target_cls:
|
||||||
|
# sync plugin not found (plugin deactivated or uninstalled) -> drop outstanding jobs
|
||||||
|
num_deleted, _ = OrderSyncQueue.objects.filter(pk__in=[sq.pk for sq in queued_orders]).delete()
|
||||||
|
logger.info("Deleted %d queue entries from %r because plugin %s inactive", num_deleted, event, target)
|
||||||
|
continue
|
||||||
|
|
||||||
|
with scope(organizer=event.organizer):
|
||||||
|
with target_cls(event=event) as p:
|
||||||
|
p.sync_queued_orders(queued_orders)
|
||||||
|
|
||||||
|
|
||||||
@app.task()
|
@app.task()
|
||||||
def sync_all():
|
def sync_all():
|
||||||
with scopes_disabled():
|
with scopes_disabled():
|
||||||
@@ -70,16 +86,20 @@ def sync_all():
|
|||||||
.prefetch_related("event")
|
.prefetch_related("event")
|
||||||
[:1000]
|
[:1000]
|
||||||
)
|
)
|
||||||
grouped = groupby(sorted(queue, key=lambda q: (q.sync_provider, q.event.pk)), lambda q: (q.sync_provider, q.event))
|
run_sync(queue)
|
||||||
for (target, event), queued_orders in grouped:
|
|
||||||
target_cls, meta = datasync_providers.get(identifier=target, active_in=event)
|
|
||||||
|
|
||||||
if not target_cls:
|
|
||||||
# sync plugin not found (plugin deactivated or uninstalled) -> drop outstanding jobs
|
|
||||||
num_deleted, _ = OrderSyncQueue.objects.filter(pk__in=[sq.pk for sq in queued_orders]).delete()
|
|
||||||
logger.info("Deleted %d queue entries from %r because plugin %s inactive", num_deleted, event, target)
|
|
||||||
continue
|
|
||||||
|
|
||||||
with scope(organizer=event.organizer):
|
@app.task()
|
||||||
with target_cls(event=event) as p:
|
def sync_single(queue_item_id: int):
|
||||||
p.sync_queued_orders(queued_orders)
|
with scopes_disabled():
|
||||||
|
queue = (
|
||||||
|
OrderSyncQueue.objects
|
||||||
|
.filter(
|
||||||
|
pk=queue_item_id,
|
||||||
|
in_flight=False,
|
||||||
|
not_before__lt=now(),
|
||||||
|
need_manual_retry__isnull=True,
|
||||||
|
)
|
||||||
|
.prefetch_related("event")
|
||||||
|
)
|
||||||
|
run_sync(queue)
|
||||||
|
|||||||
@@ -23,6 +23,7 @@
|
|||||||
from itertools import groupby
|
from itertools import groupby
|
||||||
|
|
||||||
from django.contrib import messages
|
from django.contrib import messages
|
||||||
|
from django.db import transaction
|
||||||
from django.db.models import Q
|
from django.db.models import Q
|
||||||
from django.dispatch import receiver
|
from django.dispatch import receiver
|
||||||
from django.http import HttpResponseNotAllowed
|
from django.http import HttpResponseNotAllowed
|
||||||
@@ -35,12 +36,14 @@ from django.views.generic import ListView
|
|||||||
from pretix.base.datasync.datasync import datasync_providers
|
from pretix.base.datasync.datasync import datasync_providers
|
||||||
from pretix.base.models import Event, Order
|
from pretix.base.models import Event, Order
|
||||||
from pretix.base.models.datasync import OrderSyncQueue
|
from pretix.base.models.datasync import OrderSyncQueue
|
||||||
|
from pretix.base.services.datasync import sync_single
|
||||||
from pretix.control.permissions import (
|
from pretix.control.permissions import (
|
||||||
AdministratorPermissionRequiredMixin, EventPermissionRequiredMixin,
|
AdministratorPermissionRequiredMixin, EventPermissionRequiredMixin,
|
||||||
OrganizerPermissionRequiredMixin,
|
OrganizerPermissionRequiredMixin,
|
||||||
)
|
)
|
||||||
from pretix.control.signals import order_info
|
from pretix.control.signals import order_info
|
||||||
from pretix.control.views.orders import OrderView
|
from pretix.control.views.orders import OrderView
|
||||||
|
from pretix.helpers import OF_SELF
|
||||||
|
|
||||||
|
|
||||||
@receiver(order_info, dispatch_uid="datasync_control_order_info")
|
@receiver(order_info, dispatch_uid="datasync_control_order_info")
|
||||||
@@ -78,17 +81,27 @@ class ControlSyncJob(OrderView):
|
|||||||
prov.enqueue_order(self.order, 'user')
|
prov.enqueue_order(self.order, 'user')
|
||||||
messages.success(self.request, _('The sync job has been enqueued and will run in the next minutes.'))
|
messages.success(self.request, _('The sync job has been enqueued and will run in the next minutes.'))
|
||||||
elif self.request.POST.get("cancel_job"):
|
elif self.request.POST.get("cancel_job"):
|
||||||
job = self.order.queued_sync_jobs.get(pk=self.request.POST.get("cancel_job"))
|
with transaction.atomic():
|
||||||
|
job = self.order.queued_sync_jobs.select_for_update(of=OF_SELF).get(
|
||||||
|
pk=self.request.POST.get("cancel_job")
|
||||||
|
)
|
||||||
if job.in_flight:
|
if job.in_flight:
|
||||||
messages.warning(self.request, _('The sync job is already in progress.'))
|
messages.warning(self.request, _('The sync job is already in progress.'))
|
||||||
else:
|
else:
|
||||||
job.delete()
|
job.delete()
|
||||||
messages.success(self.request, _('The sync job has been canceled.'))
|
messages.success(self.request, _('The sync job has been canceled.'))
|
||||||
elif self.request.POST.get("run_job_now"):
|
elif self.request.POST.get("run_job_now"):
|
||||||
job = self.order.queued_sync_jobs.get(pk=self.request.POST.get("run_job_now"))
|
with transaction.atomic():
|
||||||
|
job = self.order.queued_sync_jobs.select_for_update(of=OF_SELF).get(
|
||||||
|
pk=self.request.POST.get("run_job_now")
|
||||||
|
)
|
||||||
|
if job.in_flight:
|
||||||
|
messages.success(self.request, _('The sync job is already in progress.'))
|
||||||
|
else:
|
||||||
job.not_before = now()
|
job.not_before = now()
|
||||||
job.need_manual_retry = None
|
job.need_manual_retry = None
|
||||||
job.save()
|
job.save()
|
||||||
|
sync_single.apply_async(args=(job.pk,))
|
||||||
messages.success(self.request, _('The sync job has been set to run as soon as possible.'))
|
messages.success(self.request, _('The sync job has been set to run as soon as possible.'))
|
||||||
|
|
||||||
return redirect(self.get_order_url())
|
return redirect(self.get_order_url())
|
||||||
|
|||||||
Reference in New Issue
Block a user