move datasync tasks into services/datasync.py

This commit is contained in:
Mira Weller
2025-07-01 16:11:43 +02:00
parent 2cfe2a753d
commit a56c83d78a
3 changed files with 85 additions and 54 deletions

View File

@@ -46,8 +46,7 @@ class PretixBaseConfig(AppConfig):
from . import invoice # NOQA from . import invoice # NOQA
from . import notifications # NOQA from . import notifications # NOQA
from . import email # NOQA from . import email # NOQA
from .services import auth, checkin, currencies, export, mail, tickets, cart, modelimport, orders, invoices, cleanup, update_check, quotas, notifications, vouchers # NOQA from .services import auth, checkin, currencies, datasync, export, mail, tickets, cart, modelimport, orders, invoices, cleanup, update_check, quotas, notifications, vouchers # NOQA
from .datasync import datasync # NOQA
from .models import _transactions # NOQA from .models import _transactions # NOQA
from django.conf import settings from django.conf import settings

View File

@@ -25,77 +25,26 @@ import logging
from collections import namedtuple from collections import namedtuple
from datetime import timedelta from datetime import timedelta
from functools import cached_property from functools import cached_property
from itertools import groupby
from typing import Protocol from typing import Protocol
import sentry_sdk import sentry_sdk
from django.db import DatabaseError, transaction from django.db import DatabaseError, transaction
from django.db.models import F, Window
from django.db.models.functions import RowNumber
from django.dispatch import receiver
from django.utils.timezone import now from django.utils.timezone import now
from django_scopes import scope, scopes_disabled
from pretix.base.datasync.sourcefields import ( from pretix.base.datasync.sourcefields import (
EVENT, EVENT_OR_SUBEVENT, ORDER, ORDER_POSITION, get_data_fields, EVENT, EVENT_OR_SUBEVENT, ORDER, ORDER_POSITION, get_data_fields,
) )
from pretix.base.logentrytype_registry import make_link from pretix.base.logentrytype_registry import make_link
from pretix.base.models.datasync import OrderSyncQueue, OrderSyncResult from pretix.base.models.datasync import OrderSyncQueue, OrderSyncResult
from pretix.base.signals import EventPluginRegistry, periodic_task from pretix.base.signals import EventPluginRegistry
from pretix.celery_app import app
from pretix.helpers import OF_SELF from pretix.helpers import OF_SELF
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@receiver(periodic_task, dispatch_uid="data_sync_periodic_sync_all")
def periodic_sync_all(sender, **kwargs):
sync_all.apply_async()
@receiver(periodic_task, dispatch_uid="data_sync_periodic_reset_in_flight")
def periodic_reset_in_flight(sender, **kwargs):
for sq in OrderSyncQueue.objects.filter(
in_flight=True,
in_flight_since__lt=now() - timedelta(minutes=20),
):
sq.set_sync_error('timeout', [], 'Timeout')
sync_targets = EventPluginRegistry({"identifier": lambda o: o.identifier}) sync_targets = EventPluginRegistry({"identifier": lambda o: o.identifier})
@app.task()
def sync_all():
with scopes_disabled():
queue = (
OrderSyncQueue.objects
.filter(
in_flight=False,
not_before__lt=now(),
need_manual_retry__isnull=True,
)
.order_by(Window(
expression=RowNumber(),
partition_by=[F("event_id")],
order_by="not_before",
))
.prefetch_related("event")
[:1000]
)
grouped = groupby(sorted(queue, key=lambda q: (q.sync_provider, q.event.pk)), lambda q: (q.sync_provider, q.event))
for (target, event), queued_orders in grouped:
target_cls, meta = sync_targets.get(identifier=target, active_in=event)
if not target_cls:
# sync plugin not found (plugin deactivated or uninstalled) -> drop outstanding jobs
OrderSyncQueue.objects.filter(pk__in=[sq.pk for sq in queued_orders]).delete()
with scope(organizer=event.organizer):
with target_cls(event=event) as p:
p.sync_queued_orders(queued_orders)
class BaseSyncError(Exception): class BaseSyncError(Exception):
def __init__(self, messages, full_message=None): def __init__(self, messages, full_message=None):
self.messages = messages self.messages = messages

View File

@@ -0,0 +1,83 @@
#
# This file is part of pretix (Community Edition).
#
# Copyright (C) 2014-2020 Raphael Michel and contributors
# Copyright (C) 2020-2021 rami.io GmbH and contributors
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation in version 3 of the License.
#
# ADDITIONAL TERMS APPLY: Pursuant to Section 7 of the GNU Affero General Public License, additional terms are
# applicable granting you additional permissions and placing additional restrictions on your usage of this software.
# Please refer to the pretix LICENSE file to obtain the full terms applicable to this work. If you did not receive
# this file, see <https://pretix.eu/about/en/license>.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
#
import logging
from datetime import timedelta
from itertools import groupby
from django.db.models import F, Window
from django.db.models.functions import RowNumber
from django.dispatch import receiver
from django.utils.timezone import now
from django_scopes import scope, scopes_disabled
from pretix.base.datasync.datasync import sync_targets
from pretix.base.models.datasync import OrderSyncQueue
from pretix.base.signals import periodic_task
from pretix.celery_app import app
logger = logging.getLogger(__name__)
@receiver(periodic_task, dispatch_uid="data_sync_periodic_sync_all")
def periodic_sync_all(sender, **kwargs):
sync_all.apply_async()
@receiver(periodic_task, dispatch_uid="data_sync_periodic_reset_in_flight")
def periodic_reset_in_flight(sender, **kwargs):
for sq in OrderSyncQueue.objects.filter(
in_flight=True,
in_flight_since__lt=now() - timedelta(minutes=20),
):
sq.set_sync_error('timeout', [], 'Timeout')
@app.task()
def sync_all():
with scopes_disabled():
queue = (
OrderSyncQueue.objects
.filter(
in_flight=False,
not_before__lt=now(),
need_manual_retry__isnull=True,
)
.order_by(Window(
expression=RowNumber(),
partition_by=[F("event_id")],
order_by="not_before",
))
.prefetch_related("event")
[:1000]
)
grouped = groupby(sorted(queue, key=lambda q: (q.sync_provider, q.event.pk)), lambda q: (q.sync_provider, q.event))
for (target, event), queued_orders in grouped:
target_cls, meta = sync_targets.get(identifier=target, active_in=event)
if not target_cls:
# sync plugin not found (plugin deactivated or uninstalled) -> drop outstanding jobs
OrderSyncQueue.objects.filter(pk__in=[sq.pk for sq in queued_orders]).delete()
with scope(organizer=event.organizer):
with target_cls(event=event) as p:
p.sync_queued_orders(queued_orders)