diff --git a/src/pretix/base/datasync/datasync.py b/src/pretix/base/datasync/datasync.py index a6ec47fc29..fcf522febb 100644 --- a/src/pretix/base/datasync/datasync.py +++ b/src/pretix/base/datasync/datasync.py @@ -28,6 +28,7 @@ from functools import cached_property from itertools import groupby import sentry_sdk +from django.db import transaction, DatabaseError from django.db.models import F, Window from django.db.models.functions import RowNumber from django.dispatch import receiver @@ -41,6 +42,7 @@ from pretix.base.logentrytype_registry import make_link from pretix.base.models.datasync import OrderSyncQueue, OrderSyncResult from pretix.base.signals import EventPluginRegistry, periodic_task from pretix.celery_app import app +from pretix.helpers import OF_SELF logger = logging.getLogger(__name__) @@ -56,7 +58,6 @@ sync_targets = EventPluginRegistry({"identifier": lambda o: o.identifier}) def sync_event_to_target(event, target_cls, queued_orders): with scope(organizer=event.organizer): with target_cls(event=event) as p: - # TODO: should I somehow lock the queued orders or events, to avoid syncing them twice at the same time? p.sync_queued_orders(queued_orders) @@ -71,11 +72,10 @@ def sync_all(): partition_by=[F("event_id")], order_by="not_before", )) - .select_related("order") .prefetch_related("event") [:1000] ) - grouped = groupby(sorted(queue, key=lambda q: (q.sync_provider, q.order.event.pk)), lambda q: (q.sync_provider, q.order.event)) + grouped = groupby(sorted(queue, key=lambda q: (q.sync_provider, q.event.pk)), lambda q: (q.sync_provider, q.event)) for (target, event), queued_orders in grouped: target_cls, meta = sync_targets.get(identifier=target, active_in=event) @@ -207,54 +207,64 @@ class OutboundSyncProvider: raise NotImplementedError def sync_queued_orders(self, queued_orders): - for sq in queued_orders: - try: - mapped_objects = self.sync_order(sq.order) - except UnrecoverableSyncError as e: - logger.warning( - f"Could not sync order {sq.order.code} to {type(self).__name__}", - exc_info=True, - ) - sq.order.log_action(e.log_action_type, { - "provider": self.identifier, - "error": e.messages, - "full_message": e.full_message, - }) - sq.delete() - except RecoverableSyncError as e: - sq.failed_attempts += 1 - sq.not_before = self.next_retry_date(sq) - logger.info( - f"Could not sync order {sq.order.code} to {type(self).__name__} (transient error, attempt #{sq.failed_attempts})", - exc_info=True, - ) - if sq.failed_attempts >= self.max_attempts: - sentry_sdk.capture_exception(e) - sq.order.log_action("pretix.event.order.data_sync.failed.exceeded", { + for queue_item in queued_orders: + with transaction.atomic(): + try: + sq = ( + OrderSyncQueue.objects + .select_for_update(of=OF_SELF, nowait=True) + .select_related("order") + .get(pk=queue_item.pk) + ) + except DatabaseError: + continue + try: + mapped_objects = self.sync_order(sq.order) + except UnrecoverableSyncError as e: + logger.warning( + f"Could not sync order {sq.order.code} to {type(self).__name__}", + exc_info=True, + ) + sq.order.log_action(e.log_action_type, { "provider": self.identifier, "error": e.messages, "full_message": e.full_message, }) sq.delete() + except RecoverableSyncError as e: + sq.failed_attempts += 1 + sq.not_before = self.next_retry_date(sq) + logger.info( + f"Could not sync order {sq.order.code} to {type(self).__name__} (transient error, attempt #{sq.failed_attempts})", + exc_info=True, + ) + if sq.failed_attempts >= self.max_attempts: + sentry_sdk.capture_exception(e) + sq.order.log_action("pretix.event.order.data_sync.failed.exceeded", { + "provider": self.identifier, + "error": e.messages, + "full_message": e.full_message, + }) + sq.delete() + else: + sq.save() + except Exception as e: + logger.exception( + f"Could not sync order {sq.order.code} to {type(self).__name__} (unhandled exception)" + ) + sentry_sdk.capture_exception(e) + sq.order.log_action("pretix.event.order.data_sync.failed.internal", { + "provider": self.identifier, + "error": [], + "full_message": str(e), + }) + sq.delete() else: - sq.save() - except Exception as e: - logger.exception( - f"Could not sync order {sq.order.code} to {type(self).__name__} (unhandled exception)" - ) - sentry_sdk.capture_exception(e) - sq.order.log_action("pretix.event.order.data_sync.failed.internal", { - "provider": self.identifier, - "error": [], - "full_message": str(e), - }) - sq.delete() - else: - sq.order.log_action("pretix.event.order.data_sync.success", { - "provider": self.identifier, - "objects": mapped_objects - }) - sq.delete() + sq.order.log_action("pretix.event.order.data_sync.success", { + "provider": self.identifier, + "objects": mapped_objects + }) + sq.delete() @cached_property def data_fields(self):