Metrics: Fix length and age of of queues (broken after #5513) (#5865)

This commit is contained in:
Raphael Michel
2026-02-02 13:37:16 +01:00
committed by GitHub
parent 0e7bb43a5a
commit 6a594a6166

View File

@@ -294,14 +294,28 @@ def metric_values():
channel = app.broker_connection().channel() channel = app.broker_connection().channel()
if hasattr(channel, 'client') and channel.client is not None: if hasattr(channel, 'client') and channel.client is not None:
client = channel.client client = channel.client
priority_steps = settings.CELERY_BROKER_TRANSPORT_OPTIONS.get("priority_steps", [0])
sep = settings.CELERY_BROKER_TRANSPORT_OPTIONS.get("sep", ":")
for q in settings.CELERY_TASK_QUEUES: for q in settings.CELERY_TASK_QUEUES:
llen = client.llen(q.name) queue_lengths = []
lfirst = client.lindex(q.name, -1) queue_delays = []
metrics['pretix_celery_tasks_queued_count']['{queue="%s"}' % q.name] = llen for prio in priority_steps:
if lfirst: if prio:
ldata = json.loads(lfirst) qname = f"{q.name}{sep}{prio}"
dt = time.time() - ldata.get('created', 0) else:
metrics['pretix_celery_tasks_queued_age_seconds']['{queue="%s"}' % q.name] = dt qname = q.name
queue_length = client.llen(qname)
queue_lengths.append(queue_length)
oldest_queue_item = client.lindex(qname, -1)
if oldest_queue_item:
ldata = json.loads(oldest_queue_item)
oldest_item_age = time.time() - ldata.get('created', 0)
queue_delays.append(oldest_item_age)
metrics['pretix_celery_tasks_queued_count']['{queue="%s"}' % q.name] = sum(queue_lengths)
if queue_delays:
metrics['pretix_celery_tasks_queued_age_seconds']['{queue="%s"}' % q.name] = max(queue_delays)
else: else:
metrics['pretix_celery_tasks_queued_age_seconds']['{queue="%s"}' % q.name] = 0 metrics['pretix_celery_tasks_queued_age_seconds']['{queue="%s"}' % q.name] = 0