Move quota cache from database to redis (#2010)

This commit is contained in:
Raphael Michel
2021-03-29 09:42:27 +02:00
committed by GitHub
parent a927b47b8b
commit d3748a6194
14 changed files with 148 additions and 164 deletions

View File

@@ -0,0 +1,29 @@
# Generated by Django 3.0.12 on 2021-03-24 13:09
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('pretixbase', '0179_auto_20210311_1653'),
]
operations = [
migrations.RemoveField(
model_name='quota',
name='cached_availability_number',
),
migrations.RemoveField(
model_name='quota',
name='cached_availability_paid_orders',
),
migrations.RemoveField(
model_name='quota',
name='cached_availability_state',
),
migrations.RemoveField(
model_name='quota',
name='cached_availability_time',
),
]

View File

@@ -657,10 +657,6 @@ class Event(EventMixin, LoggedModel):
oldid = q.pk
q.pk = None
q.event = self
q.cached_availability_state = None
q.cached_availability_number = None
q.cached_availability_paid_orders = None
q.cached_availability_time = None
q.closed = False
q.save()
q.log_action('pretix.object.cloned')

View File

@@ -17,6 +17,7 @@ from django.utils.functional import cached_property
from django.utils.timezone import is_naive, make_aware, now
from django.utils.translation import gettext_lazy as _, pgettext_lazy
from django_countries.fields import Country
from django_redis import get_redis_connection
from django_scopes import ScopedManager
from i18nfield.fields import I18nCharField, I18nTextField
@@ -25,6 +26,7 @@ from pretix.base.models.base import LoggedModel
from pretix.base.models.fields import MultiStringField
from pretix.base.models.tax import TaxedPrice
from ... import settings
from .event import Event, SubEvent
@@ -1374,10 +1376,6 @@ class Quota(LoggedModel):
blank=True,
verbose_name=_("Variations")
)
cached_availability_state = models.PositiveIntegerField(null=True, blank=True)
cached_availability_number = models.PositiveIntegerField(null=True, blank=True)
cached_availability_paid_orders = models.PositiveIntegerField(null=True, blank=True)
cached_availability_time = models.DateTimeField(null=True, blank=True)
close_when_sold_out = models.BooleanField(
verbose_name=_('Close this quota permanently once it is sold out'),
@@ -1422,14 +1420,10 @@ class Quota(LoggedModel):
self.event.cache.clear()
def rebuild_cache(self, now_dt=None):
self.cached_availability_time = None
self.cached_availability_number = None
self.cached_availability_state = None
self.availability(now_dt=now_dt)
def cache_is_hot(self, now_dt=None):
now_dt = now_dt or now()
return self.cached_availability_time and (now_dt - self.cached_availability_time).total_seconds() < 120
if settings.HAS_REDIS:
rc = get_redis_connection("redis")
rc.hdel(f'quotas:{self.event_id}:availabilitycache', str(self.pk))
self.availability(now_dt=now_dt)
def availability(
self, now_dt: datetime=None, count_waitinglist=True, _cache=None, allow_cache=False
@@ -1452,9 +1446,6 @@ class Quota(LoggedModel):
"""
from ..services.quotas import QuotaAvailability
if allow_cache and self.cache_is_hot() and count_waitinglist:
return self.cached_availability_state, self.cached_availability_number
if _cache and count_waitinglist is not _cache.get('_count_waitinglist', True):
_cache.clear()
@@ -1462,7 +1453,7 @@ class Quota(LoggedModel):
return _cache[self.pk]
qa = QuotaAvailability(count_waitinglist=count_waitinglist, early_out=False)
qa.queue(self)
qa.compute(now_dt=now_dt)
qa.compute(now_dt=now_dt, allow_cache=allow_cache)
res = qa.results[self]
if _cache is not None:

View File

@@ -1,25 +1,22 @@
import sys
import time
from collections import Counter, defaultdict
from datetime import timedelta
from itertools import zip_longest
from django.conf import settings
from django.db import OperationalError, models
from django.db import models
from django.db.models import (
Case, Count, F, Func, Max, OuterRef, Q, Subquery, Sum, Value, When,
)
from django.dispatch import receiver
from django.utils.timezone import now
from django_scopes import scopes_disabled
from django_redis import get_redis_connection
from pretix.base.models import (
CartPosition, Checkin, Event, LogEntry, Order, OrderPosition, Quota,
Voucher, WaitingListEntry,
CartPosition, Checkin, Order, OrderPosition, Quota, Voucher,
WaitingListEntry,
)
from pretix.celery_app import app
from ...helpers.periodic import minimum_interval
from ..signals import periodic_task, quota_availability
from ..signals import quota_availability
class QuotaAvailability:
@@ -89,7 +86,11 @@ class QuotaAvailability:
def queue(self, *quota):
self._queue += quota
def compute(self, now_dt=None):
def compute(self, now_dt=None, allow_cache=False, allow_cache_stale=False):
"""
Compute the queued quotas. If ``allow_cache`` is set, results may also be taken from a cache that might
be a few minutes outdated. In this case, you may not rely on the results in the ``count_*`` properties.
"""
now_dt = now_dt or now()
quotas = list(set(self._queue))
quotas_original = list(self._queue)
@@ -97,6 +98,35 @@ class QuotaAvailability:
if not quotas:
return
if allow_cache:
if self._full_results:
raise ValueError("You cannot combine full_results and allow_cache.")
elif not self._count_waitinglist:
raise ValueError("If you set allow_cache, you need to set count_waitinglist.")
elif settings.HAS_REDIS:
rc = get_redis_connection("redis")
quotas_by_event = defaultdict(list)
for q in quotas_original:
quotas_by_event[q.event_id].append(q)
for eventid, evquotas in quotas_by_event.items():
d = rc.hmget(f'quotas:{eventid}:availabilitycache', [str(q.pk) for q in evquotas])
for redisval, q in zip(d, evquotas):
if redisval is not None:
data = [rv for rv in redisval.decode().split(',')]
if time.time() - int(data[2]) < 120 or allow_cache_stale:
quotas_original.remove(q)
quotas.remove(q)
if data[1] == "None":
self.results[q] = int(data[0]), None, int(data[2])
else:
self.results[q] = int(data[0]), int(data[1]), int(data[2])
if not quotas:
return
self._compute(quotas, now_dt)
for q in quotas_original:
@@ -105,36 +135,48 @@ class QuotaAvailability:
self.results[q] = resp
self._close(quotas)
try:
self._write_cache(quotas, now_dt)
except OperationalError as e:
# Ignore deadlocks when multiple threads try to write to the cache
if 'deadlock' not in str(e).lower():
raise e
self._write_cache(quotas, now_dt)
def _write_cache(self, quotas, now_dt):
if not settings.HAS_REDIS or not quotas:
return
rc = get_redis_connection("redis")
# We write the computed availability to redis in a per-event hash as
#
# quota_id -> (availability_state, availability_number, timestamp).
#
# We store this in a hash instead of inidividual values to avoid making two many redis requests
# which would introduce latency.
# The individual entries in the hash are "valid" for 120 seconds. This means in a typical peak scenario with
# high load *to a specific calendar or event*, lots of parallel web requests will receive an "expired" result
# around the same time, recompute quotas and write back to the cache. To avoid overloading redis with lots of
# simultaneous write queries for the same page, we place a very naive and simple "lock" on the write process for
# these quotas.
lock_name = '_'.join([str(p) for p in sorted([q.pk for q in quotas])])
if rc.exists(f'quotas:availabilitycachewrite:{lock_name}'):
return
rc.setex(f'quotas:availabilitycachewrite:{lock_name}', '1', 10)
update = defaultdict(list)
for q in quotas:
update[q.event_id].append(q)
for eventid, quotas in update.items():
rc.hmset(f'quotas:{eventid}:availabilitycache', {
str(q.id): ",".join(
[str(i) for i in self.results[q]] +
[str(int(time.time()))]
) for q in quotas
})
rc.expire(f'quotas:{eventid}:availabilitycache', 3600 * 24 * 7)
# We used to also delete item_quota_cache:* from the event cache here, but as the cache
# gets more complex, this does not seem worth it. The cache is only present for up to
# 5 seconds to prevent high peaks, and a 5-second delay in availability is usually
# tolerable
update = []
for q in quotas:
rewrite_cache = self._count_waitinglist and (
not q.cache_is_hot(now_dt) or self.results[q][0] > q.cached_availability_state
or q.cached_availability_paid_orders is None
)
if rewrite_cache:
q.cached_availability_state = self.results[q][0]
q.cached_availability_number = self.results[q][1]
q.cached_availability_time = now_dt
if q in self.count_paid_orders:
q.cached_availability_paid_orders = self.count_paid_orders[q]
update.append(q)
if update:
Quota.objects.using('default').bulk_update(update, [
'cached_availability_state', 'cached_availability_number', 'cached_availability_time',
'cached_availability_paid_orders'
], batch_size=50)
def _close(self, quotas):
for q in quotas:
@@ -404,44 +446,8 @@ class QuotaAvailability:
self.results[q] = Quota.AVAILABILITY_GONE, 0
@receiver(signal=periodic_task)
@minimum_interval(minutes_after_success=60)
def build_all_quota_caches(sender, **kwargs):
refresh_quota_caches.apply()
def grouper(iterable, n, fillvalue=None):
"""Collect data into fixed-length chunks or blocks"""
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx
args = [iter(iterable)] * n
return zip_longest(fillvalue=fillvalue, *args)
@app.task
@scopes_disabled()
def refresh_quota_caches():
# Active events
active = LogEntry.objects.using(settings.DATABASE_REPLICA).filter(
datetime__gt=now() - timedelta(days=7)
).order_by().values('event').annotate(
last_activity=Max('datetime')
)
for a in active:
try:
e = Event.objects.using(settings.DATABASE_REPLICA).get(pk=a['event'])
except Event.DoesNotExist:
continue
quotas = e.quotas.filter(
Q(cached_availability_time__isnull=True) |
Q(cached_availability_time__lt=a['last_activity']) |
Q(cached_availability_time__lt=now() - timedelta(hours=2))
).filter(
Q(subevent__isnull=True) |
Q(subevent__date_to__isnull=False, subevent__date_to__gte=now() - timedelta(days=14)) |
Q(subevent__date_from__gte=now() - timedelta(days=14))
)
for qs in grouper(quotas, 100, None):
qa = QuotaAvailability(early_out=False)
qa.queue(*[q for q in qs if q is not None])
qa.compute()