diff --git a/src/pretix/base/apps.py b/src/pretix/base/apps.py index 6b88ff62d2..c32df61139 100644 --- a/src/pretix/base/apps.py +++ b/src/pretix/base/apps.py @@ -46,8 +46,7 @@ class PretixBaseConfig(AppConfig): from . import invoice # NOQA from . import notifications # NOQA from . import email # NOQA - from .services import auth, checkin, currencies, export, mail, tickets, cart, modelimport, orders, invoices, cleanup, update_check, quotas, notifications, vouchers # NOQA - from .datasync import datasync # NOQA + from .services import auth, checkin, currencies, datasync, export, mail, tickets, cart, modelimport, orders, invoices, cleanup, update_check, quotas, notifications, vouchers # NOQA from .models import _transactions # NOQA from django.conf import settings diff --git a/src/pretix/base/datasync/datasync.py b/src/pretix/base/datasync/datasync.py index 06404c24db..6a6c78e403 100644 --- a/src/pretix/base/datasync/datasync.py +++ b/src/pretix/base/datasync/datasync.py @@ -25,77 +25,26 @@ import logging from collections import namedtuple from datetime import timedelta from functools import cached_property -from itertools import groupby from typing import Protocol import sentry_sdk from django.db import DatabaseError, transaction -from django.db.models import F, Window -from django.db.models.functions import RowNumber -from django.dispatch import receiver from django.utils.timezone import now -from django_scopes import scope, scopes_disabled from pretix.base.datasync.sourcefields import ( EVENT, EVENT_OR_SUBEVENT, ORDER, ORDER_POSITION, get_data_fields, ) from pretix.base.logentrytype_registry import make_link from pretix.base.models.datasync import OrderSyncQueue, OrderSyncResult -from pretix.base.signals import EventPluginRegistry, periodic_task -from pretix.celery_app import app +from pretix.base.signals import EventPluginRegistry from pretix.helpers import OF_SELF logger = logging.getLogger(__name__) -@receiver(periodic_task, dispatch_uid="data_sync_periodic_sync_all") -def periodic_sync_all(sender, **kwargs): - sync_all.apply_async() - - -@receiver(periodic_task, dispatch_uid="data_sync_periodic_reset_in_flight") -def periodic_reset_in_flight(sender, **kwargs): - for sq in OrderSyncQueue.objects.filter( - in_flight=True, - in_flight_since__lt=now() - timedelta(minutes=20), - ): - sq.set_sync_error('timeout', [], 'Timeout') - - sync_targets = EventPluginRegistry({"identifier": lambda o: o.identifier}) -@app.task() -def sync_all(): - with scopes_disabled(): - queue = ( - OrderSyncQueue.objects - .filter( - in_flight=False, - not_before__lt=now(), - need_manual_retry__isnull=True, - ) - .order_by(Window( - expression=RowNumber(), - partition_by=[F("event_id")], - order_by="not_before", - )) - .prefetch_related("event") - [:1000] - ) - grouped = groupby(sorted(queue, key=lambda q: (q.sync_provider, q.event.pk)), lambda q: (q.sync_provider, q.event)) - for (target, event), queued_orders in grouped: - target_cls, meta = sync_targets.get(identifier=target, active_in=event) - - if not target_cls: - # sync plugin not found (plugin deactivated or uninstalled) -> drop outstanding jobs - OrderSyncQueue.objects.filter(pk__in=[sq.pk for sq in queued_orders]).delete() - - with scope(organizer=event.organizer): - with target_cls(event=event) as p: - p.sync_queued_orders(queued_orders) - - class BaseSyncError(Exception): def __init__(self, messages, full_message=None): self.messages = messages diff --git a/src/pretix/base/services/datasync.py b/src/pretix/base/services/datasync.py new file mode 100644 index 0000000000..eda9199c55 --- /dev/null +++ b/src/pretix/base/services/datasync.py @@ -0,0 +1,83 @@ +# +# This file is part of pretix (Community Edition). +# +# Copyright (C) 2014-2020 Raphael Michel and contributors +# Copyright (C) 2020-2021 rami.io GmbH and contributors +# +# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General +# Public License as published by the Free Software Foundation in version 3 of the License. +# +# ADDITIONAL TERMS APPLY: Pursuant to Section 7 of the GNU Affero General Public License, additional terms are +# applicable granting you additional permissions and placing additional restrictions on your usage of this software. +# Please refer to the pretix LICENSE file to obtain the full terms applicable to this work. If you did not receive +# this file, see . +# +# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied +# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You should have received a copy of the GNU Affero General Public License along with this program. If not, see +# . +# + +import logging +from datetime import timedelta +from itertools import groupby + +from django.db.models import F, Window +from django.db.models.functions import RowNumber +from django.dispatch import receiver +from django.utils.timezone import now +from django_scopes import scope, scopes_disabled + +from pretix.base.datasync.datasync import sync_targets +from pretix.base.models.datasync import OrderSyncQueue +from pretix.base.signals import periodic_task +from pretix.celery_app import app + +logger = logging.getLogger(__name__) + + +@receiver(periodic_task, dispatch_uid="data_sync_periodic_sync_all") +def periodic_sync_all(sender, **kwargs): + sync_all.apply_async() + + +@receiver(periodic_task, dispatch_uid="data_sync_periodic_reset_in_flight") +def periodic_reset_in_flight(sender, **kwargs): + for sq in OrderSyncQueue.objects.filter( + in_flight=True, + in_flight_since__lt=now() - timedelta(minutes=20), + ): + sq.set_sync_error('timeout', [], 'Timeout') + + +@app.task() +def sync_all(): + with scopes_disabled(): + queue = ( + OrderSyncQueue.objects + .filter( + in_flight=False, + not_before__lt=now(), + need_manual_retry__isnull=True, + ) + .order_by(Window( + expression=RowNumber(), + partition_by=[F("event_id")], + order_by="not_before", + )) + .prefetch_related("event") + [:1000] + ) + grouped = groupby(sorted(queue, key=lambda q: (q.sync_provider, q.event.pk)), lambda q: (q.sync_provider, q.event)) + for (target, event), queued_orders in grouped: + target_cls, meta = sync_targets.get(identifier=target, active_in=event) + + if not target_cls: + # sync plugin not found (plugin deactivated or uninstalled) -> drop outstanding jobs + OrderSyncQueue.objects.filter(pk__in=[sq.pk for sq in queued_orders]).delete() + + with scope(organizer=event.organizer): + with target_cls(event=event) as p: + p.sync_queued_orders(queued_orders)