Always make explicit which tables to lock (#3058)

Co-authored-by: Richard Schreiber <schreiber@rami.io>
This commit is contained in:
Raphael Michel
2023-01-25 11:44:11 +01:00
committed by GitHub
parent 8ca128912e
commit 3c1f3a26cf
16 changed files with 59 additions and 34 deletions

View File

@@ -32,6 +32,7 @@ from rest_framework import status
from pretix.api.models import ApiCall
from pretix.base.models import Organizer
from pretix.helpers import OF_SELF
class IdempotencyMiddleware:
@@ -56,7 +57,7 @@ class IdempotencyMiddleware:
idempotency_key = request.headers.get('X-Idempotency-Key', '')
with transaction.atomic():
call, created = ApiCall.objects.select_for_update().get_or_create(
call, created = ApiCall.objects.select_for_update(of=OF_SELF).get_or_create(
auth_hash=auth_hash,
idempotency_key=idempotency_key,
defaults={

View File

@@ -51,6 +51,7 @@ from pretix.base.models import (
User,
)
from pretix.base.settings import SETTINGS_AFFECTING_CSS
from pretix.helpers import OF_SELF
from pretix.helpers.dicts import merge_dicts
from pretix.presale.style import regenerate_organizer_css
@@ -178,7 +179,7 @@ class GiftCardViewSet(viewsets.ModelViewSet):
def perform_update(self, serializer):
if 'include_accepted' in self.request.GET:
raise PermissionDenied("Accepted gift cards cannot be updated, use transact instead.")
GiftCard.objects.select_for_update().get(pk=self.get_object().pk)
GiftCard.objects.select_for_update(of=OF_SELF).get(pk=self.get_object().pk)
old_value = serializer.instance.value
value = serializer.validated_data.pop('value')
inst = serializer.save(secret=serializer.instance.secret, currency=serializer.instance.currency,
@@ -196,7 +197,7 @@ class GiftCardViewSet(viewsets.ModelViewSet):
@action(detail=True, methods=["POST"])
@transaction.atomic()
def transact(self, request, **kwargs):
gc = GiftCard.objects.select_for_update().get(pk=self.get_object().pk)
gc = GiftCard.objects.select_for_update(of=OF_SELF).get(pk=self.get_object().pk)
value = serializers.DecimalField(max_digits=10, decimal_places=2).to_internal_value(
request.data.get('value')
)

View File

@@ -42,6 +42,7 @@ from pretix.base.models import LogEntry
from pretix.base.services.tasks import ProfiledTask, TransactionAwareTask
from pretix.base.signals import periodic_task
from pretix.celery_app import app
from pretix.helpers import OF_SELF
logger = logging.getLogger(__name__)
_ALL_EVENTS = None
@@ -502,7 +503,8 @@ def manually_retry_all_calls(webhook_id: int):
webhook = WebHook.objects.get(id=webhook_id)
with scope(organizer=webhook.organizer), transaction.atomic():
for whcr in webhook.retries.select_for_update(
skip_locked=connection.features.has_select_for_update_skip_locked
skip_locked=connection.features.has_select_for_update_skip_locked,
of=OF_SELF
):
send_webhook.apply_async(
args=(whcr.logentry_id, whcr.action_type, whcr.webhook_id, whcr.retry_count),
@@ -515,7 +517,8 @@ def manually_retry_all_calls(webhook_id: int):
def schedule_webhook_retries_on_celery(sender, **kwargs):
with transaction.atomic():
for whcr in WebHookCallRetry.objects.select_for_update(
skip_locked=connection.features.has_select_for_update_skip_locked
skip_locked=connection.features.has_select_for_update_skip_locked,
of=OF_SELF
).filter(retry_not_before__lt=now()):
send_webhook.apply_async(
args=(whcr.logentry_id, whcr.action_type, whcr.webhook_id, whcr.retry_count),

View File

@@ -79,6 +79,7 @@ from pretix.base.services.locking import LOCK_TIMEOUT, NoLockManager
from pretix.base.settings import PERSON_NAME_SCHEMES
from pretix.base.signals import order_gracefully_delete
from ...helpers import OF_SELF
from ...helpers.countries import CachedCountries, FastCountryField
from ...helpers.format import format_map
from ._transactions import (
@@ -1628,7 +1629,7 @@ class OrderPayment(models.Model):
been marked as paid.
"""
with transaction.atomic():
locked_instance = OrderPayment.objects.select_for_update().get(pk=self.pk)
locked_instance = OrderPayment.objects.select_for_update(of=OF_SELF).get(pk=self.pk)
if locked_instance.state not in (OrderPayment.PAYMENT_STATE_CREATED, OrderPayment.PAYMENT_STATE_PENDING):
# Race condition detected, this payment is already confirmed
logger.info('Failed payment {} but ignored due to likely race condition.'.format(
@@ -1673,7 +1674,7 @@ class OrderPayment(models.Model):
:raises Quota.QuotaExceededException: if the quota is exceeded and ``force`` is ``False``
"""
with transaction.atomic():
locked_instance = OrderPayment.objects.select_for_update().get(pk=self.pk)
locked_instance = OrderPayment.objects.select_for_update(of=OF_SELF).get(pk=self.pk)
if locked_instance.state == self.PAYMENT_STATE_CONFIRMED:
# Race condition detected, this payment is already confirmed
logger.info('Confirmed payment {} but ignored due to likely race condition.'.format(

View File

@@ -67,6 +67,7 @@ from pretix.base.settings import SettingsSandbox
from pretix.base.signals import register_payment_providers
from pretix.base.templatetags.money import money_filter
from pretix.base.templatetags.rich_text import rich_text
from pretix.helpers import OF_SELF
from pretix.helpers.countries import CachedCountries
from pretix.helpers.format import format_map
from pretix.helpers.money import DecimalTextInput
@@ -1399,7 +1400,7 @@ class GiftCardPayment(BasePaymentProvider):
try:
with transaction.atomic():
try:
gc = GiftCard.objects.select_for_update().get(pk=gcpk)
gc = GiftCard.objects.select_for_update(of=OF_SELF).get(pk=gcpk)
except GiftCard.DoesNotExist:
raise PaymentException(_("This gift card does not support this currency."))
if gc.currency != self.event.currency: # noqa - just a safeguard

View File

@@ -41,6 +41,7 @@ from pretix.base.services.orders import (
)
from pretix.base.services.tasks import ProfiledEventTask
from pretix.celery_app import app
from pretix.helpers import OF_SELF
from pretix.helpers.format import format_map
logger = logging.getLogger(__name__)
@@ -239,7 +240,7 @@ def cancel_event(self, event: Event, subevent: int, auto_refund: bool,
for o in orders_to_change.values_list('id', flat=True).iterator():
with transaction.atomic():
o = event.orders.select_for_update().get(pk=o)
o = event.orders.select_for_update(of=OF_SELF).get(pk=o)
total = Decimal('0.00')
fee = Decimal('0.00')
positions = []

View File

@@ -56,6 +56,7 @@ from pretix.base.models import (
Checkin, CheckinList, Device, Order, OrderPosition, QuestionOption,
)
from pretix.base.signals import checkin_created, order_placed, periodic_task
from pretix.helpers import OF_SELF
from pretix.helpers.jsonlogic import Logic
from pretix.helpers.jsonlogic_boolalg import convert_to_dnf
from pretix.helpers.jsonlogic_query import (
@@ -729,8 +730,11 @@ def perform_checkin(op: OrderPosition, clist: CheckinList, given_answers: dict,
_save_answers(op, answers, given_answers)
with transaction.atomic():
# Lock order positions
op = OrderPosition.all.select_for_update().get(pk=op.pk)
# Lock order positions, if it is an entry. We don't need it for exits, as a race condition wouldn't be problematic
opqs = OrderPosition.all
if type != Checkin.TYPE_EXIT:
opqs = opqs.select_for_update(of=OF_SELF)
op = opqs.get(pk=op.pk)
if not clist.all_products and op.item_id not in [i.pk for i in clist.limit_products.all()]:
raise CheckInError(

View File

@@ -49,6 +49,7 @@ from pretix.base.signals import (
periodic_task, register_data_exporters, register_multievent_data_exporters,
)
from pretix.celery_app import app
from pretix.helpers import OF_SELF
from pretix.helpers.urls import build_absolute_uri
logger = logging.getLogger(__name__)
@@ -344,7 +345,7 @@ def run_scheduled_exports(sender, **kwargs):
qs = ScheduledEventExport.objects.filter(
schedule_next_run__lt=now(),
error_counter__lt=5,
).select_for_update(skip_locked=connection.features.has_select_for_update_skip_locked).select_related('event')
).select_for_update(skip_locked=connection.features.has_select_for_update_skip_locked, of=OF_SELF).select_related('event')
for s in qs:
scheduled_event_export.apply_async(kwargs={
'event': s.event_id,
@@ -355,7 +356,7 @@ def run_scheduled_exports(sender, **kwargs):
qs = ScheduledOrganizerExport.objects.filter(
schedule_next_run__lt=now(),
error_counter__lt=5,
).select_for_update(skip_locked=connection.features.has_select_for_update_skip_locked).select_related('organizer')
).select_for_update(skip_locked=connection.features.has_select_for_update_skip_locked, of=OF_SELF).select_related('organizer')
for s in qs:
scheduled_organizer_export.apply_async(kwargs={
'organizer': s.organizer_id,

View File

@@ -63,7 +63,7 @@ from pretix.base.services.tasks import TransactionAwareTask
from pretix.base.settings import GlobalSettingsObject
from pretix.base.signals import invoice_line_text, periodic_task
from pretix.celery_app import app
from pretix.helpers.database import rolledback_transaction
from pretix.helpers.database import OF_SELF, rolledback_transaction
from pretix.helpers.models import modelcopy
logger = logging.getLogger(__name__)
@@ -500,7 +500,7 @@ def send_invoices_to_organizer(sender, **kwargs):
with transaction.atomic():
qs = Invoice.objects.filter(
sent_to_organizer__isnull=True
).prefetch_related('event').select_for_update(skip_locked=connection.features.has_select_for_update_skip_locked)
).prefetch_related('event').select_for_update(of=OF_SELF, skip_locked=connection.features.has_select_for_update_skip_locked)
for i in qs[:batch_size]:
if i.event.settings.invoice_email_organizer:
with language(i.event.settings.locale):

View File

@@ -32,6 +32,7 @@ from pretix.base.models import (
AbstractPosition, Customer, Event, Item, Membership, Order, OrderPosition,
SubEvent,
)
from pretix.helpers import OF_SELF
def membership_validity(item: Item, subevent: Optional[SubEvent], event: Event):
@@ -118,7 +119,7 @@ def validate_memberships_in_order(customer: Customer, positions: List[AbstractPo
base_qs = Membership.objects.with_usages(ignored_order=ignored_order)
if lock:
base_qs = base_qs.select_for_update()
base_qs = base_qs.select_for_update(of=OF_SELF)
membership_cache = base_qs\
.select_related('membership_type')\

View File

@@ -97,6 +97,7 @@ from pretix.base.signals import (
order_placed, order_split, periodic_task, validate_order,
)
from pretix.celery_app import app
from pretix.helpers import OF_SELF
from pretix.helpers.models import modelcopy
from pretix.helpers.periodic import minimum_interval
@@ -184,7 +185,7 @@ def reactivate_order(order: Order, force: bool=False, user: User=None, auth=None
Voucher.objects.filter(pk=position.voucher.pk).update(redeemed=Greatest(0, F('redeemed') + 1))
for gc in position.issued_gift_cards.all():
gc = GiftCard.objects.select_for_update().get(pk=gc.pk)
gc = GiftCard.objects.select_for_update(of=OF_SELF).get(pk=gc.pk)
gc.transactions.create(value=position.price, order=order)
break
@@ -397,7 +398,7 @@ def _cancel_order(order, user=None, send_mail: bool=True, api_token=None, device
# If new actions are added to this function, make sure to add the reverse operation to reactivate_order()
with transaction.atomic():
if isinstance(order, int):
order = Order.objects.select_for_update().get(pk=order)
order = Order.objects.select_for_update(of=OF_SELF).get(pk=order)
if isinstance(user, int):
user = User.objects.get(pk=user)
if isinstance(api_token, int):
@@ -419,7 +420,7 @@ def _cancel_order(order, user=None, send_mail: bool=True, api_token=None, device
for position in order.positions.all():
for gc in position.issued_gift_cards.all():
gc = GiftCard.objects.select_for_update().get(pk=gc.pk)
gc = GiftCard.objects.select_for_update(of=OF_SELF).get(pk=gc.pk)
if gc.value < position.price:
raise OrderError(
_('This order can not be canceled since the gift card {card} purchased in '
@@ -1205,7 +1206,7 @@ def send_expiry_warnings(sender, **kwargs):
if days and (o.expires - today).days <= days:
with transaction.atomic():
o = Order.objects.select_related('event').select_for_update().get(pk=o.pk)
o = Order.objects.select_related('event').select_for_update(of=OF_SELF).get(pk=o.pk)
if o.status != Order.STATUS_PENDING or o.expiry_reminder_sent:
# Race condition
continue
@@ -1264,7 +1265,7 @@ def send_download_reminders(sender, **kwargs):
continue
with transaction.atomic():
o = Order.objects.select_for_update().get(pk=o.pk)
o = Order.objects.select_for_update(of=OF_SELF).get(pk=o.pk)
if o.download_reminder_sent:
# Race condition
continue
@@ -2059,7 +2060,7 @@ class OrderChangeManager:
op.fee.save(update_fields=['canceled'])
elif isinstance(op, self.CancelOperation):
for gc in op.position.issued_gift_cards.all():
gc = GiftCard.objects.select_for_update().get(pk=gc.pk)
gc = GiftCard.objects.select_for_update(of=OF_SELF).get(pk=gc.pk)
if gc.value < op.position.price:
raise OrderError(_(
'A position can not be canceled since the gift card {card} purchased in this order has '
@@ -2075,7 +2076,7 @@ class OrderChangeManager:
for opa in op.position.addons.all():
for gc in opa.issued_gift_cards.all():
gc = GiftCard.objects.select_for_update().get(pk=gc.pk)
gc = GiftCard.objects.select_for_update(of=OF_SELF).get(pk=gc.pk)
if gc.value < opa.position.price:
raise OrderError(_(
'A position can not be canceled since the gift card {card} purchased in this order has '
@@ -2648,9 +2649,9 @@ def change_payment_provider(order: Order, payment_provider, amount=None, new_pay
open_payment = None
if new_payment:
lp = order.payments.select_for_update().exclude(pk=new_payment.pk).last()
lp = order.payments.select_for_update(of=OF_SELF).exclude(pk=new_payment.pk).last()
else:
lp = order.payments.select_for_update().last()
lp = order.payments.select_for_update(of=OF_SELF).last()
if lp and lp.state in (OrderPayment.PAYMENT_STATE_PENDING, OrderPayment.PAYMENT_STATE_CREATED):
open_payment = lp

View File

@@ -110,7 +110,7 @@ from pretix.control.permissions import (
from pretix.control.signals import nav_organizer
from pretix.control.views import PaginationMixin
from pretix.control.views.mailsetup import MailSettingsSetupView
from pretix.helpers import GroupConcat
from pretix.helpers import OF_SELF, GroupConcat
from pretix.helpers.compat import CompatDeleteView
from pretix.helpers.dicts import merge_dicts
from pretix.helpers.format import format_map
@@ -1381,7 +1381,7 @@ class GiftCardDetailView(OrganizerDetailViewMixin, OrganizerPermissionRequiredMi
@transaction.atomic()
def post(self, request, *args, **kwargs):
self.object = GiftCard.objects.select_for_update().get(pk=self.get_object().pk)
self.object = GiftCard.objects.select_for_update(of=OF_SELF).get(pk=self.get_object().pk)
if 'revert' in request.POST:
t = get_object_or_404(self.object.transactions.all(), pk=request.POST.get('revert'), order__isnull=False)
if self.object.value - t.value < Decimal('0.00'):

View File

@@ -21,8 +21,9 @@
#
import contextlib
from django.db import transaction
from django.db import connection, transaction
from django.db.models import Aggregate, Expression, Field, Lookup, Value
from django.utils.functional import lazy
class DummyRollbackException(Exception):
@@ -142,3 +143,8 @@ class PostgresWindowFrame(Expression):
"start": self.start.value,
"end": self.end.value,
}
# This is a short-hand for .select_for_update(of=("self,")), that falls back gracefully on databases that don't support
# the SELECT FOR UPDATE OF ... query.
OF_SELF = lazy(lambda: ("self",) if connection.features.has_select_for_update_of else (), tuple)()

View File

@@ -51,6 +51,7 @@ from pretix.base.signals import (
EventPluginSignal, event_copy_data, logentry_display, periodic_task,
)
from pretix.control.signals import nav_event
from pretix.helpers import OF_SELF
from pretix.plugins.sendmail.models import ScheduledMail
from pretix.plugins.sendmail.views import OrderSendView, WaitinglistSendView
@@ -187,6 +188,7 @@ def sendmail_run_rules(sender, **kwargs):
with transaction.atomic(durable=True):
m = ScheduledMail.objects.select_for_update(
of=OF_SELF,
skip_locked=connection.features.has_select_for_update_skip_locked
).filter(pk=m_id).first()
if not m or m.state not in (ScheduledMail.STATE_SCHEDULED, ScheduledMail.STATE_FAILED):

View File

@@ -63,6 +63,7 @@ from pretix.base.payment import BasePaymentProvider, PaymentException
from pretix.base.plugins import get_all_plugins
from pretix.base.services.mail import SendMailException
from pretix.base.settings import SettingsSandbox
from pretix.helpers import OF_SELF
from pretix.helpers.urls import build_absolute_uri as build_global_uri
from pretix.multidomain.urlreverse import build_absolute_uri, eventreverse
from pretix.plugins.stripe.forms import StripeKeyValidator
@@ -586,7 +587,7 @@ class StripeMethod(BasePaymentProvider):
self._init_api()
payment_info = refund.payment.info_data
OrderPayment.objects.select_for_update().get(pk=refund.payment.pk)
OrderPayment.objects.select_for_update(of=OF_SELF).get(pk=refund.payment.pk)
if not payment_info:
raise PaymentException(_('No payment information found.'))

View File

@@ -64,6 +64,7 @@ from pretix.control.permissions import (
)
from pretix.control.views.event import DecoupleMixin
from pretix.control.views.organizer import OrganizerDetailViewMixin
from pretix.helpers import OF_SELF
from pretix.multidomain.urlreverse import build_absolute_uri, eventreverse
from pretix.plugins.stripe.forms import OrganizerStripeSettingsForm
from pretix.plugins.stripe.models import ReferencedStripeObject
@@ -297,13 +298,13 @@ def charge_webhook(event, event_json, charge_id, rso):
with transaction.atomic():
if payment:
payment = OrderPayment.objects.select_for_update().get(pk=payment.pk)
payment = OrderPayment.objects.select_for_update(of=OF_SELF).get(pk=payment.pk)
else:
payment = order.payments.filter(
info__icontains=charge['id'],
provider__startswith='stripe',
amount=prov._amount_to_decimal(charge['amount']),
).select_for_update().last()
).select_for_update(of=OF_SELF).last()
if not payment:
payment = order.payments.create(
state=OrderPayment.PAYMENT_STATE_CREATED,
@@ -393,13 +394,13 @@ def source_webhook(event, event_json, source_id, rso):
payment = None
if payment:
payment = OrderPayment.objects.select_for_update().get(pk=payment.pk)
payment = OrderPayment.objects.select_for_update(of=OF_SELF).get(pk=payment.pk)
else:
payment = order.payments.filter(
info__icontains=src['id'],
provider__startswith='stripe',
amount=prov._amount_to_decimal(src['amount']) if src['amount'] is not None else order.total,
).select_for_update().last()
).select_for_update(of=OF_SELF).last()
if not payment:
payment = order.payments.create(
state=OrderPayment.PAYMENT_STATE_CREATED,
@@ -534,7 +535,7 @@ class ReturnView(StripeOrderView, View):
with transaction.atomic():
self.order.refresh_from_db()
self.payment = OrderPayment.objects.select_for_update().get(pk=self.payment.pk)
self.payment = OrderPayment.objects.select_for_update(of=OF_SELF).get(pk=self.payment.pk)
if self.payment.state == OrderPayment.PAYMENT_STATE_CONFIRMED:
if 'payment_stripe_token' in request.session:
del request.session['payment_stripe_token']