Compare commits

...

1 Commits

Author SHA1 Message Date
Raphael Michel
39c16bd69d Metrics: Fix length and age of of queues (broken after #5513) 2026-01-30 15:33:28 +01:00

View File

@@ -294,14 +294,28 @@ def metric_values():
channel = app.broker_connection().channel()
if hasattr(channel, 'client') and channel.client is not None:
client = channel.client
priority_steps = settings.CELERY_BROKER_TRANSPORT_OPTIONS.get("priority_steps", [0])
sep = settings.CELERY_BROKER_TRANSPORT_OPTIONS.get("sep", ":")
for q in settings.CELERY_TASK_QUEUES:
llen = client.llen(q.name)
lfirst = client.lindex(q.name, -1)
metrics['pretix_celery_tasks_queued_count']['{queue="%s"}' % q.name] = llen
if lfirst:
ldata = json.loads(lfirst)
dt = time.time() - ldata.get('created', 0)
metrics['pretix_celery_tasks_queued_age_seconds']['{queue="%s"}' % q.name] = dt
queue_lengths = []
queue_delays = []
for prio in priority_steps:
if prio:
qname = f"{q.name}{sep}{prio}"
else:
qname = q.name
queue_length = client.llen(qname)
queue_lengths.append(queue_length)
oldest_queue_item = client.lindex(qname, -1)
if oldest_queue_item:
ldata = json.loads(oldest_queue_item)
oldest_item_age = time.time() - ldata.get('created', 0)
queue_delays.append(oldest_item_age)
metrics['pretix_celery_tasks_queued_count']['{queue="%s"}' % q.name] = sum(queue_lengths)
if queue_delays:
metrics['pretix_celery_tasks_queued_age_seconds']['{queue="%s"}' % q.name] = max(queue_delays)
else:
metrics['pretix_celery_tasks_queued_age_seconds']['{queue="%s"}' % q.name] = 0