Add simple locking mechanism

This commit is contained in:
Mira Weller
2025-05-07 16:52:54 +02:00
parent 244cdb22b0
commit 8028879159

View File

@@ -28,6 +28,7 @@ from functools import cached_property
from itertools import groupby from itertools import groupby
import sentry_sdk import sentry_sdk
from django.db import transaction, DatabaseError
from django.db.models import F, Window from django.db.models import F, Window
from django.db.models.functions import RowNumber from django.db.models.functions import RowNumber
from django.dispatch import receiver from django.dispatch import receiver
@@ -41,6 +42,7 @@ from pretix.base.logentrytype_registry import make_link
from pretix.base.models.datasync import OrderSyncQueue, OrderSyncResult from pretix.base.models.datasync import OrderSyncQueue, OrderSyncResult
from pretix.base.signals import EventPluginRegistry, periodic_task from pretix.base.signals import EventPluginRegistry, periodic_task
from pretix.celery_app import app from pretix.celery_app import app
from pretix.helpers import OF_SELF
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -56,7 +58,6 @@ sync_targets = EventPluginRegistry({"identifier": lambda o: o.identifier})
def sync_event_to_target(event, target_cls, queued_orders): def sync_event_to_target(event, target_cls, queued_orders):
with scope(organizer=event.organizer): with scope(organizer=event.organizer):
with target_cls(event=event) as p: with target_cls(event=event) as p:
# TODO: should I somehow lock the queued orders or events, to avoid syncing them twice at the same time?
p.sync_queued_orders(queued_orders) p.sync_queued_orders(queued_orders)
@@ -71,11 +72,10 @@ def sync_all():
partition_by=[F("event_id")], partition_by=[F("event_id")],
order_by="not_before", order_by="not_before",
)) ))
.select_related("order")
.prefetch_related("event") .prefetch_related("event")
[:1000] [:1000]
) )
grouped = groupby(sorted(queue, key=lambda q: (q.sync_provider, q.order.event.pk)), lambda q: (q.sync_provider, q.order.event)) grouped = groupby(sorted(queue, key=lambda q: (q.sync_provider, q.event.pk)), lambda q: (q.sync_provider, q.event))
for (target, event), queued_orders in grouped: for (target, event), queued_orders in grouped:
target_cls, meta = sync_targets.get(identifier=target, active_in=event) target_cls, meta = sync_targets.get(identifier=target, active_in=event)
@@ -207,54 +207,64 @@ class OutboundSyncProvider:
raise NotImplementedError raise NotImplementedError
def sync_queued_orders(self, queued_orders): def sync_queued_orders(self, queued_orders):
for sq in queued_orders: for queue_item in queued_orders:
try: with transaction.atomic():
mapped_objects = self.sync_order(sq.order) try:
except UnrecoverableSyncError as e: sq = (
logger.warning( OrderSyncQueue.objects
f"Could not sync order {sq.order.code} to {type(self).__name__}", .select_for_update(of=OF_SELF, nowait=True)
exc_info=True, .select_related("order")
) .get(pk=queue_item.pk)
sq.order.log_action(e.log_action_type, { )
"provider": self.identifier, except DatabaseError:
"error": e.messages, continue
"full_message": e.full_message, try:
}) mapped_objects = self.sync_order(sq.order)
sq.delete() except UnrecoverableSyncError as e:
except RecoverableSyncError as e: logger.warning(
sq.failed_attempts += 1 f"Could not sync order {sq.order.code} to {type(self).__name__}",
sq.not_before = self.next_retry_date(sq) exc_info=True,
logger.info( )
f"Could not sync order {sq.order.code} to {type(self).__name__} (transient error, attempt #{sq.failed_attempts})", sq.order.log_action(e.log_action_type, {
exc_info=True,
)
if sq.failed_attempts >= self.max_attempts:
sentry_sdk.capture_exception(e)
sq.order.log_action("pretix.event.order.data_sync.failed.exceeded", {
"provider": self.identifier, "provider": self.identifier,
"error": e.messages, "error": e.messages,
"full_message": e.full_message, "full_message": e.full_message,
}) })
sq.delete() sq.delete()
except RecoverableSyncError as e:
sq.failed_attempts += 1
sq.not_before = self.next_retry_date(sq)
logger.info(
f"Could not sync order {sq.order.code} to {type(self).__name__} (transient error, attempt #{sq.failed_attempts})",
exc_info=True,
)
if sq.failed_attempts >= self.max_attempts:
sentry_sdk.capture_exception(e)
sq.order.log_action("pretix.event.order.data_sync.failed.exceeded", {
"provider": self.identifier,
"error": e.messages,
"full_message": e.full_message,
})
sq.delete()
else:
sq.save()
except Exception as e:
logger.exception(
f"Could not sync order {sq.order.code} to {type(self).__name__} (unhandled exception)"
)
sentry_sdk.capture_exception(e)
sq.order.log_action("pretix.event.order.data_sync.failed.internal", {
"provider": self.identifier,
"error": [],
"full_message": str(e),
})
sq.delete()
else: else:
sq.save() sq.order.log_action("pretix.event.order.data_sync.success", {
except Exception as e: "provider": self.identifier,
logger.exception( "objects": mapped_objects
f"Could not sync order {sq.order.code} to {type(self).__name__} (unhandled exception)" })
) sq.delete()
sentry_sdk.capture_exception(e)
sq.order.log_action("pretix.event.order.data_sync.failed.internal", {
"provider": self.identifier,
"error": [],
"full_message": str(e),
})
sq.delete()
else:
sq.order.log_action("pretix.event.order.data_sync.success", {
"provider": self.identifier,
"objects": mapped_objects
})
sq.delete()
@cached_property @cached_property
def data_fields(self): def data_fields(self):