Data shredder optimizations (#3429)

Co-authored-by: Martin Gross <gross@rami.io>
This commit is contained in:
Raphael Michel
2023-06-23 16:56:19 +02:00
committed by GitHub
parent 84dbd93d9e
commit b415393ccf
8 changed files with 343 additions and 77 deletions

View File

@@ -20,6 +20,7 @@
# <https://www.gnu.org/licenses/>.
#
# This file is based on an earlier version of pretix which was released under the Apache License 2.0. The full text of
# the Apache License 2.0 can be obtained at <http://www.apache.org/licenses/LICENSE-2.0>.
#
@@ -31,7 +32,7 @@
# Unless required by applicable law or agreed to in writing, software distributed under the Apache License 2.0 is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under the License.
import inspect
import json
from datetime import timedelta
from tempfile import NamedTemporaryFile
@@ -41,10 +42,13 @@ from zipfile import ZipFile
from dateutil.parser import parse
from django.conf import settings
from django.utils.crypto import get_random_string
from django.utils.formats import date_format
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from pretix.base.models import CachedFile, Event, cachedfile_name
from pretix.base.i18n import language
from pretix.base.models import CachedFile, Event, User, cachedfile_name
from pretix.base.services.mail import SendMailException, mail
from pretix.base.services.tasks import ProfiledEventTask
from pretix.base.shredder import ShredError
from pretix.celery_app import app
@@ -101,8 +105,17 @@ def export(event: Event, shredders: List[str], session_key=None, cfid=None) -> N
return cf.pk
@app.task(base=ProfiledEventTask, throws=(ShredError,))
def shred(event: Event, fileid: str, confirm_code: str) -> None:
@app.task(base=ProfiledEventTask, throws=(ShredError,), bind=True)
def shred(self, event: Event, fileid: str, confirm_code: str, user: int=None, locale: str='en') -> None:
steps = []
def set_progress(val):
if not self.request.called_directly:
self.update_state(
state='PROGRESS',
meta={'value': val, 'steps': steps}
)
known_shredders = event.get_data_shredders()
try:
cf = CachedFile.objects.get(pk=fileid)
@@ -124,8 +137,41 @@ def shred(event: Event, fileid: str, confirm_code: str) -> None:
if event.logentry_set.filter(datetime__gte=parse(indexdata['time'])):
raise ShredError(_("Something happened in your event after the export, please try again."))
for shredder in shredders:
shredder.shred_data()
for i, shredder in enumerate(shredders):
with language(locale):
steps.append({'label': str(shredder.verbose_name), 'done': False})
set_progress(i * 100 / len(shredders))
if 'progress_callback' in inspect.signature(shredder.shred_data).parameters:
shredder.shred_data(
progress_callback=lambda y: set_progress(
i * 100 / len(shredders) + min(max(y, 0), 100) / 100 * 100 / len(shredders)
)
)
else:
shredder.shred_data()
steps[-1]['done'] = True
cf.file.delete(save=False)
cf.delete()
if user:
user = User.objects.get(pk=user)
with language(user.locale):
try:
mail(
user.email,
_('Data shredding completed'),
'pretixbase/email/shred_completed.txt',
{
'user': user,
'organizer': event.organizer.name,
'event': str(event.name),
'start_time': date_format(parse(indexdata['time']).astimezone(event.timezone), 'SHORT_DATETIME_FORMAT'),
'shredders': ', '.join([str(s.verbose_name) for s in shredders])
},
event=None,
user=user,
locale=user.locale,
)
except SendMailException:
pass # Already logged

View File

@@ -32,11 +32,12 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under the License.
import copy
import json
import os
import time
from typing import List, Tuple
from django.db import transaction
from django.db.models import Max, Q
from django.db.models.functions import Greatest
from django.dispatch import receiver
@@ -99,11 +100,13 @@ class BaseDataShredder:
"""
raise NotImplementedError() # NOQA
def shred_data(self):
def shred_data(self, progress_callback=None):
"""
This method is called to actually remove the data from the system. You should remove any database objects
here.
You can call ``progress_callback`` with an integer value between 0 and 100 to communicate back your progress.
You should never delete ``LogEntry`` objects, but you might modify them to remove personal data. In this
case, set the ``LogEntry.shredded`` attribute to ``True`` to show that this is no longer original log data.
"""
@@ -151,6 +154,7 @@ class BaseDataShredder:
def shred_log_fields(logentry, banlist=None, whitelist=None):
d = logentry.parsed_data
initial_data = copy.copy(d)
shredded = False
if whitelist:
for k, v in d.items():
@@ -162,9 +166,61 @@ def shred_log_fields(logentry, banlist=None, whitelist=None):
if f in d:
d[f] = ''
shredded = True
logentry.data = json.dumps(d)
logentry.shredded = logentry.shredded or shredded
logentry.save(update_fields=['data', 'shredded'])
if d != initial_data:
logentry.data = json.dumps(d)
logentry.shredded = logentry.shredded or shredded
logentry.save(update_fields=['data', 'shredded'])
def slow_update(qs, batch_size=1000, sleep_time=.5, progress_callback=None, progress_offset=0, progress_total=None, **update):
"""
Doing UPDATE queries on hundreds of thousands of rows can cause outages due to high write load on the database.
This provides a throttled way to update rows. The condition for this to work properly is that the queryset has a
filter condition that no longer applies after the update!
Otherwise, this will be an endless loop!
"""
total_updated = 0
while True:
updated = qs.order_by().filter(
pk__in=qs.order_by().values_list('pk', flat=True)[:batch_size]
).update(**update)
total_updated += updated
if not updated:
break
if total_updated >= 0.8 * batch_size:
time.sleep(sleep_time)
if progress_callback and progress_total:
progress_callback((progress_offset + total_updated) / progress_total)
return total_updated
def slow_delete(qs, batch_size=1000, sleep_time=.5, progress_callback=None, progress_offset=0, progress_total=None):
"""
Doing DELETE queries on hundreds of thousands of rows can cause outages due to high write load on the database.
This provides a throttled way to update rows.
"""
total_deleted = 0
while True:
deleted = qs.order_by().filter(
pk__in=qs.order_by().values_list('pk', flat=True)[:batch_size]
).delete()[0]
total_deleted += deleted
if not deleted:
break
if total_deleted >= 0.8 * batch_size:
time.sleep(sleep_time)
return total_deleted
def _progress_helper(queryset, progress_callback, offset, total):
if not progress_callback:
yield from queryset
else:
for i, o in enumerate(queryset):
yield o
if i % 10 == 0:
progress_callback((i + offset) / total * 100)
class PhoneNumberShredder(BaseDataShredder):
@@ -177,18 +233,26 @@ class PhoneNumberShredder(BaseDataShredder):
o.code: o.phone for o in self.event.orders.filter(phone__isnull=False)
}, cls=CustomJSONEncoder, indent=4)
@transaction.atomic
def shred_data(self):
for o in self.event.orders.all():
def shred_data(self, progress_callback=None):
qs_orders = self.event.orders.all()
qs_orders_cnt = qs_orders.count()
qs_le = self.event.logentry_set.filter(action_type="pretix.event.order.phone.changed")
qs_le_cnt = qs_le.count()
total = qs_le_cnt + qs_orders_cnt
for o in _progress_helper(qs_orders, progress_callback, 0, total):
changed = bool(o.phone)
o.phone = None
d = o.meta_info_data
if d:
if 'contact_form_data' in d and 'phone' in d['contact_form_data']:
changed = True
del d['contact_form_data']['phone']
o.meta_info = json.dumps(d)
o.save(update_fields=['meta_info', 'phone'])
o.meta_info = json.dumps(d)
if changed:
o.save(update_fields=['meta_info', 'phone'])
for le in self.event.logentry_set.filter(action_type="pretix.event.order.phone.changed"):
for le in _progress_helper(qs_le, progress_callback, qs_orders_cnt, total):
shred_log_fields(le, banlist=['old_phone', 'new_phone'])
@@ -207,37 +271,66 @@ class EmailAddressShredder(BaseDataShredder):
for op in OrderPosition.all.filter(order__event=self.event, attendee_email__isnull=False)
}, indent=4)
@transaction.atomic
def shred_data(self):
OrderPosition.all.filter(order__event=self.event, attendee_email__isnull=False).update(attendee_email=None)
def shred_data(self, progress_callback=None):
qs_op = OrderPosition.all.filter(order__event=self.event, attendee_email__isnull=False)
qs_op_cnt = qs_op.count()
for o in self.event.orders.all():
qs_orders = self.event.orders.all()
qs_orders_cnt = qs_orders.count()
qs_le = self.event.logentry_set.filter(
Q(action_type__contains="order.email") | Q(action_type__contains="position.email") |
Q(action_type="pretix.event.order.contact.changed") |
Q(action_type="pretix.event.order.modified")
).exclude(data="")
qs_le_cnt = qs_le.count()
total = qs_op_cnt + qs_orders_cnt + qs_le_cnt
slow_update(
qs_op,
attendee_email=None,
progress_callback=progress_callback,
progress_offset=0,
progress_total=total,
# Updates to order position table are slow, since PostgreSQL needs to update many indexes, so let's
# take them really slowly to not overwhelm the database.
batch_size=100,
sleep_time=2,
)
for o in _progress_helper(qs_orders, progress_callback, qs_op_cnt, total):
changed = bool(o.email) or bool(o.customer)
o.email = None
o.customer = None
d = o.meta_info_data
if d:
if 'contact_form_data' in d and 'email' in d['contact_form_data']:
del d['contact_form_data']['email']
o.meta_info = json.dumps(d)
o.save(update_fields=['meta_info', 'email', 'customer'])
changed = True
o.meta_info = json.dumps(d)
if 'contact_form_data' in d and 'email_repeat' in d['contact_form_data']:
del d['contact_form_data']['email_repeat']
changed = True
if changed:
if d:
o.meta_info = json.dumps(d)
o.save(update_fields=['meta_info', 'email', 'customer'])
for le in self.event.logentry_set.filter(
Q(action_type__contains="order.email") | Q(action_type__contains="position.email"),
):
shred_log_fields(le, banlist=['recipient', 'message', 'subject', 'full_mail'])
for le in self.event.logentry_set.filter(action_type="pretix.event.order.contact.changed"):
shred_log_fields(le, banlist=['old_email', 'new_email'])
for le in self.event.logentry_set.filter(action_type="pretix.event.order.modified").exclude(data=""):
d = le.parsed_data
if 'data' in d:
for row in d['data']:
if 'attendee_email' in row:
row['attendee_email'] = ''
le.data = json.dumps(d)
le.shredded = True
le.save(update_fields=['data', 'shredded'])
for le in _progress_helper(qs_le, progress_callback, qs_op_cnt + qs_orders_cnt, total):
if le.action_type == "pretix.event.order.modified":
d = le.parsed_data
if 'data' in d:
for row in d['data']:
if 'attendee_email' in row:
row['attendee_email'] = ''
le.data = json.dumps(d)
le.shredded = True
le.save(update_fields=['data', 'shredded'])
else:
shred_log_fields(le, banlist=[
'recipient', 'message', 'subject', 'full_mail', 'old_email', 'new_email'
])
class WaitingListShredder(BaseDataShredder):
@@ -251,16 +344,35 @@ class WaitingListShredder(BaseDataShredder):
for wle in self.event.waitinglistentries.all()
], indent=4)
@transaction.atomic
def shred_data(self):
self.event.waitinglistentries.update(name_cached=None, name_parts={'_shredded': True}, email='', phone='')
def shred_data(self, progress_callback=None):
qs_wle = self.event.waitinglistentries.exclude(email='')
qs_wle_cnt = qs_wle.count()
for wle in self.event.waitinglistentries.select_related('voucher').filter(voucher__isnull=False):
qs_voucher = self.event.waitinglistentries.select_related('voucher').filter(voucher__isnull=False)
qs_voucher_cnt = qs_voucher.count()
qs_le = self.event.logentry_set.filter(action_type="pretix.voucher.added.waitinglist").exclude(data="")
qs_le_cnt = qs_le.count()
total = qs_voucher_cnt + qs_wle_cnt + qs_le_cnt
slow_update(
qs_wle,
name_cached=None,
name_parts={'_shredded': True},
email='',
phone='',
progress_callback=progress_callback,
progress_offset=0,
progress_total=total,
)
for wle in _progress_helper(qs_voucher, progress_callback, qs_wle_cnt, total):
if '@' in wle.voucher.comment:
wle.voucher.comment = ''
wle.voucher.save(update_fields=['comment'])
for le in self.event.logentry_set.filter(action_type="pretix.voucher.added.waitinglist").exclude(data=""):
for le in _progress_helper(qs_le, progress_callback, qs_wle_cnt + qs_voucher_cnt, total):
d = le.parsed_data
if 'name' in d:
d['name'] = ''
@@ -298,17 +410,41 @@ class AttendeeInfoShredder(BaseDataShredder):
)
}, indent=4)
@transaction.atomic
def shred_data(self):
OrderPosition.all.filter(
def shred_data(self, progress_callback=None):
qs_op = OrderPosition.all.filter(
order__event=self.event
).filter(
Q(attendee_name_cached__isnull=False) | Q(attendee_name_parts__isnull=False) |
Q(company__isnull=False) | Q(street__isnull=False) | Q(zipcode__isnull=False) | Q(city__isnull=False)
).update(attendee_name_cached=None, attendee_name_parts={'_shredded': True}, company=None, street=None,
zipcode=None, city=None)
Q(attendee_name_cached__isnull=False) |
Q(company__isnull=False) |
Q(street__isnull=False) |
Q(zipcode__isnull=False) |
Q(city__isnull=False)
)
qs_op_cnt = qs_op.count()
for le in self.event.logentry_set.filter(action_type="pretix.event.order.modified").exclude(data=""):
qs_le = self.event.logentry_set.filter(action_type="pretix.event.order.modified").exclude(data="")
qs_le_cnt = qs_le.count()
total = qs_op_cnt + qs_le_cnt
slow_update(
qs_op,
attendee_name_cached=None,
attendee_name_parts={'_shredded': True},
company=None,
street=None,
zipcode=None,
city=None,
progress_callback=progress_callback,
progress_total=total,
progress_offset=0,
# Updates to order position table are slow, since PostgreSQL needs to update many indexes, so let's
# take them really slowly to not overwhelm the database.
batch_size=100,
sleep_time=2,
)
for le in _progress_helper(qs_le, progress_callback, qs_op_cnt, total):
d = le.parsed_data
if 'data' in d:
for i, row in enumerate(d['data']):
@@ -343,11 +479,18 @@ class InvoiceAddressShredder(BaseDataShredder):
for ia in InvoiceAddress.objects.filter(order__event=self.event)
}, indent=4)
@transaction.atomic
def shred_data(self):
InvoiceAddress.objects.filter(order__event=self.event).delete()
def shred_data(self, progress_callback=None):
qs_ia = InvoiceAddress.objects.filter(order__event=self.event)
qs_ia_cnt = qs_ia.count()
for le in self.event.logentry_set.filter(action_type="pretix.event.order.modified").exclude(data=""):
qs_le = self.event.logentry_set.filter(action_type="pretix.event.order.modified").exclude(data="")
qs_le_cnt = qs_le.count()
total = qs_ia_cnt + qs_le_cnt
slow_delete(qs_ia, progress_callback=progress_callback, progress_total=total, progress_offset=0)
for le in _progress_helper(qs_le, progress_callback, qs_ia_cnt, total):
d = le.parsed_data
if 'invoice_data' in d and not isinstance(d['invoice_data'], bool):
for field in d['invoice_data']:
@@ -375,11 +518,18 @@ class QuestionAnswerShredder(BaseDataShredder):
).data
yield 'question-answers.json', 'application/json', json.dumps(d, indent=4)
@transaction.atomic
def shred_data(self):
QuestionAnswer.objects.filter(orderposition__order__event=self.event).delete()
def shred_data(self, progress_callback=None):
qs_qa = QuestionAnswer.objects.filter(orderposition__order__event=self.event)
qs_qa_cnt = qs_qa.count()
for le in self.event.logentry_set.filter(action_type="pretix.event.order.modified").exclude(data=""):
qs_le = self.event.logentry_set.filter(action_type="pretix.event.order.modified").exclude(data="")
qs_le_cnt = qs_le.count()
total = qs_qa_cnt + qs_le_cnt
slow_delete(qs_qa, progress_callback=progress_callback, progress_total=total, progress_offset=0)
for le in _progress_helper(qs_le, progress_callback, qs_qa_cnt, total):
d = le.parsed_data
if 'data' in d:
for i, row in enumerate(d['data']):
@@ -408,9 +558,11 @@ class InvoiceShredder(BaseDataShredder):
yield 'invoices/{}.pdf'.format(i.number), 'application/pdf', i.file.read()
i.file.close()
@transaction.atomic
def shred_data(self):
for i in self.event.invoices.filter(shredded=False):
def shred_data(self, progress_callback=None):
qs_i = self.event.invoices.filter(shredded=False)
total = qs_i.count()
for i in _progress_helper(qs_i, progress_callback, 0, total):
if i.file:
i.file.delete()
i.shredded = True
@@ -430,10 +582,17 @@ class CachedTicketShredder(BaseDataShredder):
def generate_files(self) -> List[Tuple[str, str, str]]:
pass
@transaction.atomic
def shred_data(self):
CachedTicket.objects.filter(order_position__order__event=self.event).delete()
CachedCombinedTicket.objects.filter(order__event=self.event).delete()
def shred_data(self, progress_callback=None):
qs_1 = CachedTicket.objects.filter(order_position__order__event=self.event)
qs_1_cnt = qs_1.count()
qs_2 = CachedCombinedTicket.objects.filter(order__event=self.event)
qs_2_cnt = qs_2.count()
total = qs_1_cnt + qs_2_cnt
slow_delete(qs_1, progress_callback=progress_callback, progress_total=total, progress_offset=0)
slow_delete(qs_2, progress_callback=progress_callback, progress_total=total, progress_offset=qs_1_cnt)
class PaymentInfoShredder(BaseDataShredder):
@@ -446,14 +605,21 @@ class PaymentInfoShredder(BaseDataShredder):
def generate_files(self) -> List[Tuple[str, str, str]]:
pass
@transaction.atomic
def shred_data(self):
def shred_data(self, progress_callback=None):
qs_p = OrderPayment.objects.filter(order__event=self.event)
qs_p_count = qs_p.count()
qs_r = OrderRefund.objects.filter(order__event=self.event)
qs_r_count = qs_r.count()
total = qs_p_count + qs_r_count
provs = self.event.get_payment_providers()
for obj in OrderPayment.objects.filter(order__event=self.event):
for obj in _progress_helper(qs_p, progress_callback, 0, total):
pprov = provs.get(obj.provider)
if pprov:
pprov.shred_payment_info(obj)
for obj in OrderRefund.objects.filter(order__event=self.event):
for obj in _progress_helper(qs_r, progress_callback, qs_p_count, total):
pprov = provs.get(obj.provider)
if pprov:
pprov.shred_payment_info(obj)

View File

@@ -0,0 +1,17 @@
{% load i18n %}
{% load i18n %}{% blocktrans with url=url|safe %}Hello,
we hereby confirm that the following data shredding job has been completed:
Organizer: {{ organizer }}
Event: {{ event }}
Data selection: {{ shredders }}
Start time: {{ start_time }} (new data added after this time might not have been deleted)
Best regards,
Your pretix team
{% endblocktrans %}

View File

@@ -115,7 +115,8 @@ class AsyncMixin:
elif state == 'PROGRESS':
data.update({
'started': True,
'percentage': info.get('value', 0) if isinstance(info, dict) else 0
'percentage': info.get('value', 0) if isinstance(info, dict) else 0,
'steps': info.get('steps', []) if isinstance(info, dict) else None,
})
elif state == 'STARTED':
data.update({

View File

@@ -470,6 +470,8 @@
<div class="progress-bar progress-bar-success">
</div>
</div>
<div class="steps">
</div>
</div>
</div>
</div>

View File

@@ -8,7 +8,7 @@
{% trans "Data shredder" %}
</h1>
<form action="{% url "control:event.shredder.shred" event=request.event.slug organizer=request.organizer.slug %}"
method="post" class="form-horizontal" data-asynctask>
method="post" class="form-horizontal" data-asynctask data-asynctask-long>
{% csrf_token %}
<fieldset>
{% if download_on_shred %}
@@ -55,6 +55,12 @@
</fieldset>
{% endif %}
<input type="hidden" name="file" value="{{ file.pk }}">
<div class="alert alert-info">
{% blocktrans trimmed %}
Depending on the amount of data in your event, the following step may take a while to complete.
We will inform you via email once it has been completed.
{% endblocktrans %}
</div>
<div class="form-group submit-group">
<button type="submit" class="btn btn-primary btn-save">
{% trans "Continue" %}

View File

@@ -37,10 +37,11 @@ import logging
from collections import OrderedDict
from zipfile import ZipFile
from django.shortcuts import get_object_or_404
from django.contrib import messages
from django.shortcuts import get_object_or_404, redirect
from django.urls import reverse
from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _
from django.utils.translation import get_language, gettext_lazy as _
from django.views import View
from django.views.generic import TemplateView
@@ -62,6 +63,16 @@ class ShredderMixin:
sorted(self.request.event.get_data_shredders().items(), key=lambda s: s[1].verbose_name)
)
def dispatch(self, request, *args, **kwargs):
try:
return super().dispatch(request, *args, **kwargs)
except ShredError as e:
messages.error(request, str(e))
return redirect(reverse('control:event.shredder.start', kwargs={
'event': self.request.event.slug,
'organizer': self.request.event.organizer.slug
}))
class StartShredView(RecentAuthenticationRequiredMixin, EventPermissionRequiredMixin, ShredderMixin, TemplateView):
permission = 'can_change_orders'
@@ -167,4 +178,5 @@ class ShredDoView(RecentAuthenticationRequiredMixin, EventPermissionRequiredMixi
if request.event.slug != request.POST.get("slug"):
return self.error(ShredError(_("The slug you entered was not correct.")))
return self.do(self.request.event.id, request.POST.get("file"), request.POST.get("confirm_code"))
return self.do(self.request.event.id, request.POST.get("file"), request.POST.get("confirm_code"),
self.request.user.pk, get_language())

View File

@@ -34,6 +34,21 @@ function async_task_check_callback(data, textStatus, jqXHR) {
} else if (typeof data.percentage === "number") {
$("#loadingmodal .progress").show();
$("#loadingmodal .progress .progress-bar").css("width", data.percentage + "%");
if (typeof data.steps === "object" && Array.isArray(data.steps)) {
var $steps = $("#loadingmodal .steps");
$steps.html("").show()
for (var step of data.steps) {
$steps.append(
$("<span>").addClass("fa fa-fw")
.toggleClass("fa-check text-success", step.done)
.toggleClass("fa-cog fa-spin text-muted", !step.done)
).append(
$("<span>").text(step.label)
).append(
$("<br>")
)
}
}
}
async_task_timeout = window.setTimeout(async_task_check, 250);
@@ -267,6 +282,7 @@ var waitingDialog = {
"use strict";
$("#loadingmodal h3").html(message);
$("#loadingmodal .progress").hide();
$("#loadingmodal .steps").hide();
$("body").addClass("loading");
$("#loadingmodal").removeAttr("hidden");
},