Coverage for torrelque/__init__.py: 100%
278 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-05 19:52 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-05 19:52 +0000
1'''Asynchronous Redis-backed reliable queue package.'''
3import asyncio
4import enum
5import json
6import logging
7import time
8import uuid
9from importlib.resources import files
10from typing import Any, AsyncIterable, Callable, Dict, List, NamedTuple, Optional, Tuple, Union
12import redis.asyncio
13from redis.asyncio.client import Pipeline
16# aclosing is available since Python 3.10
17try:
18 from contextlib import aclosing
19except ImportError: # nocov
20 from contextlib import asynccontextmanager
22 @asynccontextmanager
23 async def aclosing(thing):
24 try:
25 yield thing
26 finally:
27 await thing.aclose()
30__all__ = (
31 'Torrelque',
32 'TorrelqueQueueStats',
33 'TorrelqueTaskStatus',
34 'TorrelqueTaskState',
35 'TorrelqueError',
36 'TorrelqueTimeoutError',
37 'TorrelqueLookupError',
38 'TorrelqueTaskSerialiser',
39)
41logger = logging.getLogger(__package__)
43TaskId = str
44TaskPair = Tuple[TaskId, dict]
47class TorrelqueTaskStatus(enum.IntEnum):
48 '''Task status.'''
50 PENDING = 0
51 '''Task is enqueued.'''
53 WORKING = 1
54 '''Task is dequeued.'''
56 DELAYED = 2
57 '''Task is delayed.'''
59 COMPLETED = 3
60 '''Task is released, as the result of successful completion.'''
62 REJECTED = 4
63 '''Task is released, as the result of (multiple) failed attempts.'''
65 def isfinal(self):
66 '''Tells whether the status is final.'''
68 return self in (self.COMPLETED, self.REJECTED) 1aeumkAhEcdbfgp
71class TorrelqueQueueStats(NamedTuple):
72 '''Queue counters.'''
74 tasks: int
75 '''Total number of tasks in the queue.'''
77 pending: int
78 '''Number of pending tasks in the queue.'''
80 working: int
81 '''Number of working tasks in the queue.'''
83 delayed: int
84 '''Number of delayed tasks in the queue.'''
87class TorrelqueTaskState(NamedTuple):
88 '''Task state.'''
90 status: TorrelqueTaskStatus
91 '''Status of the task.'''
93 timeout: float
94 '''
95 Execution timeout of the task after while it's considered stale.
96 '''
98 result: Any
99 '''Optional result of the task with in a final state.'''
101 dequeue_count: int
102 '''Number of times the task was dequeued from the queue.'''
104 requeue_count: int
105 '''Number of times the task was requeued from the queue.'''
107 enqueue_time: float
108 '''Unix timestamp of the enqueue time of the task.'''
110 last_dequeue_time: Optional[float]
111 '''Optional Unix timestamp of the last dequeue time of the task.'''
113 last_requeue_time: Optional[float]
114 '''Optional Unix timestamp of the last requeue time of the task.'''
116 release_time: Optional[float]
117 '''
118 Optional Unix timestamp of the release time of the task in a final
119 status.
120 '''
123class TorrelqueTaskSerialiser(NamedTuple):
124 '''Task serialisation delegate.'''
126 dumps: Callable[[Any], bytes]
127 '''Serialise task data.'''
129 loads: Callable[[bytes], Any]
130 '''Deserialise task data.'''
133class TorrelqueError(Exception):
134 '''Generic Torrelque error.'''
137class TorrelqueTimeoutError(TorrelqueError):
138 '''Torrelque timeout error.'''
141class TorrelqueLookupError(TorrelqueError):
142 '''Torrelque lookup error.'''
145class Torrelque:
146 '''
147 Reliable work queue.
149 :argument client:
150 Redis client instance.
151 :argument queue:
152 Name of the queue. Must match across producers and consumers.
153 :argument serialiser:
154 An object with ``dumps`` and ``loads`` that (de)serialises
155 task bodies.
156 :raises TorrelqueError:
157 Not a ``redis.asyncio.Redis`` instance passed.
158 '''
160 task_timeout = 300
161 '''
162 Default timeout for a task in the "working" set to be
163 considered stale.
164 '''
166 sweep_interval = 30
167 '''Default interval between sweep calls, when sweep is scheduled.'''
169 result_ttl = 3600
170 '''Default time-to-live of a task result (when applicable).'''
172 keys = {
173 'pending' : 'pending', # list
174 'dequeueing' : 'dequeueing', # list
175 'undequeued' : 'undequeued', # set
176 'working' : 'working', # sorted set
177 'delayed' : 'delayed', # sorted set
178 'tasks' : 'tasks', # hash
179 'task' : 'task' # prefix for hashes
180 }
181 '''
182 Queue Redis key name mapping.
184 On initialisation the values are prefixed with the queue name, and
185 the new dictionary is rebound to the instance.
187 .. list-table:: Redis key description
189 * - ``pending``
190 - a list containing enqueued task ids
191 * - ``dequeueing``
192 - a short-living task id list where :py:meth:`.dequeue`
193 ``RPOPLPUSH``-es task ids from ``pending`` list
194 * - ``undequeued``
195 - a set where :py:meth:`.sweep` stored potentially (evaluated
196 on the next sweep) stale, "undequeued", task ids
197 * - ``working``
198 - a sorted set with successfully dequeued task ids where the
199 score is the Unix timestamp when the task becomes stale
200 according to its timeout
201 * - ``delayed``
202 - a sorted set with delayed task ids where the score is the
203 Unix timestamp when the task becomes due
204 * - ``tasks``
205 - a hash mapping a task id to its serialised representation
206 * - ``task``
207 - a string prefix, followed by task id, for hashes storing
208 individual task stats
210 '''
212 _serialiser: TorrelqueTaskSerialiser
213 '''
214 Task data serialiser that converts task object into and from string
215 representation.
216 '''
218 _client: redis.asyncio.Redis
219 '''Redis client.'''
221 _sweep_task: asyncio.Task
222 '''Periodic sweep ``asyncio`` task.'''
224 _keyspace_notification_enabled = False
225 '''
226 Flag indicates that Redis keyspace notification were found
227 correctly configured, and further tests should be omitted.
228 '''
230 _scripts: Dict[str, 'redis.commands.core.AsyncScript']
231 '''Dictionary with Lua scripts read on initialisation.'''
233 def __init__(
234 self,
235 client: redis.asyncio.Redis,
236 *,
237 queue: str = 'trq',
238 serialiser=TorrelqueTaskSerialiser(lambda obj: json.dumps(obj).encode(), json.loads),
239 ):
240 if not isinstance(client, redis.asyncio.Redis): 1aeJd
241 raise TorrelqueError('redis.asyncio.Redis instance expected') 1J
243 self._client = client 1aed
244 self._serialiser = serialiser 1aed
245 self._scripts = { 1aed
246 n: client.register_script((files(__package__) / f'{n}.lua').read_text())
247 for n in ('dequeue', 'sweep')
248 }
250 self.keys = {k: f'{queue}:{v}' for k, v in self.keys.items()} 1aed
252 def _get_state_key(self, task_id):
253 return '{}:{}'.format(self.keys['task'], task_id) 1aesrutvwxmkAhlonyicjdzbfBgpq
255 async def _enqueue(
256 self,
257 pipeline: Pipeline,
258 task,
259 task_timeout: Optional[float] = None,
260 delay: Optional[float] = None,
261 ) -> TaskId:
262 task_timeout = task_timeout or self.task_timeout 1esrutvwxmkhlonicjdzbfgpq
263 task_id = uuid.uuid1().hex 1esrutvwxmkhlonicjdzbfgpq
264 task_data = self._serialiser.dumps(task) 1esrutvwxmkhlonicjdzbfgpq
265 task_state_key = self._get_state_key(task_id) 1esrutvwxmkhlonicjdzbfgpq
267 if delay: 1esrutvwxmkhlonicjdzbfgpq
268 await pipeline.zadd(self.keys['delayed'], {task_id: time.time() + delay}) 1x
269 await pipeline.hset(task_state_key, 'status', TorrelqueTaskStatus.DELAYED.value) 1x
270 else:
271 await pipeline.lpush(self.keys['pending'], task_id) 1esrutvwmkhlonicjdzbfgpq
272 await pipeline.hset(task_state_key, 'status', TorrelqueTaskStatus.PENDING.value) 1esrutvwmkhlonicjdzbfgpq
274 await pipeline.hset(self.keys['tasks'], task_id, task_data) 1esrutvwxmkhlonicjdzbfgpq
275 await pipeline.hset(task_state_key, 'enqueue_time', time.time()) 1esrutvwxmkhlonicjdzbfgpq
276 await pipeline.hset(task_state_key, 'timeout', task_timeout) 1esrutvwxmkhlonicjdzbfgpq
278 return task_id 1esrutvwxmkhlonicjdzbfgpq
280 async def enqueue(
281 self,
282 task,
283 *,
284 task_timeout: Optional[float] = None,
285 delay: Optional[float] = None,
286 pipeline: Optional[Pipeline] = None
287 ) -> TaskId:
288 '''
289 Put a task on the queue optionally providing its timeout and delay.
291 :argument task:
292 Arbitrary serialisable task payload.
293 :argument task_timeout:
294 Time since the task's processing start after which it is
295 considered stale.
296 :argument delay:
297 Number of seconds to delay processing of the task, i.e.
298 putting it into the "pending" list. Note that the sweep
299 must be scheduled for the delayed tasks to return, and
300 the delay only has effect after the sweep execution.
301 :argument pipeline:
302 External Redis pipeline that allows for bulk enqueue.
303 :return:
304 Task identifier.
305 '''
307 if pipeline is not None: 1esrutvwxmkhlonicjdzbfgpq
308 task_id = await self._enqueue(pipeline, task, task_timeout, delay) 1rwkoz
309 else:
310 async with await self._client.pipeline(transaction=True) as pipeline: 1esutvxmhlnicjdbfgpq
311 task_id = await self._enqueue(pipeline, task, task_timeout, delay) 1esutvxmhlnicjdbfgpq
312 await pipeline.execute() 1esutvxmhlnicjdbfgpq
314 return task_id 1esrutvwxmkhlonicjdzbfgpq
316 async def _dequeue(self, timeout: int, max_tasks: int) -> List[TaskPair]:
317 pending_key = self.keys['pending'] 1aesrtCmkhlonicjdbfgpq
318 dequeueing_key = self.keys['dequeueing'] 1aesrtCmkhlonicjdbfgpq
320 # Try to move max_tasks task ids to dequeueing without blocking
321 task_ids: List[bytes] = [] 1aesrtCmkhlonicjdbfgpq
322 if max_tasks > 1: 1aesrtCmkhlonicjdbfgpq
323 async with await self._client.pipeline(transaction=True) as pipeline: 1rko
324 for _ in range(max_tasks): 1rko
325 await pipeline.rpoplpush(pending_key, dequeueing_key) 1rko
326 task_ids.extend(task_id for task_id in await pipeline.execute() if task_id) 1rko
328 # If the above hasn't succeeded guarantee at least one task
329 if not task_ids: 1aesrtCmkhlonicjdbfgpq
330 task_id = await self._client.brpoplpush( 1aesrtCmhlnicjdbfgpq
331 pending_key, dequeueing_key, timeout=timeout or 0
332 )
333 if not task_id: 1aesrtCmhlnicjdbfgpq
334 raise TorrelqueTimeoutError 1srC
335 else:
336 task_ids.append(task_id) 1aestmhlnicjdbfgpq
338 keys = [self.keys[k] for k in ('dequeueing', 'working', 'tasks', 'task')] 1aesrtmkhlonicjdbfgpq
339 args = [time.time(), TorrelqueTaskStatus.WORKING.value] + task_ids 1aesrtmkhlonicjdbfgpq
340 pairs = await self._scripts['dequeue'](keys, args) 1aesrtmkhlonicjdbfgpq
342 result = [] 1aesrtmkhlonicjdbfgpq
343 for task_id, task_data in pairs: 1aesrtmkhlonicjdbfgpq
344 task_id = task_id.decode() 1aesrtmkhlonicjdbfgpq
345 task_data = self._serialiser.loads(task_data) 1aesrtmkhlonicjdbfgpq
346 if not task_data: 1aesrtmkhlonicjdbfgpq
347 logger.warning('Task:%s not found in dequeueing list', task_id) 1t
348 continue 1t
350 result.append((task_id, task_data)) 1aesrmkhlonicjdbfgpq
352 if not result: 1aesrtmkhlonicjdbfgpq
353 raise TorrelqueLookupError('Failed to dequeue pending task') 1t
355 return result 1aesrmkhlonicjdbfgpq
357 async def dequeue(
358 self, timeout: Optional[int] = None, *, max_tasks: int = 1
359 ) -> Union[TaskPair, List[TaskPair]]:
360 '''
361 Get a task or a task list from the queue with optional timeout.
363 :argument timeout:
364 Time to wait until a task is available. Timeout applies
365 only to fetching single task. Note that Redis only supports
366 an integer timeout.
367 :argument max_tasks:
368 If greater than 1 the method will try to optimistically
369 dequeue as many tasks. That means that only 1 task is
370 guaranteed.
371 :raises TorrelqueTimeoutError:
372 If timeout was provided and there was no result within it.
373 :raises TorrelqueLookupError:
374 Indicates that the task id has become staling during the
375 runtime of this method. This is not expected under normal
376 circumstances. It can happen if this method is paused, say
377 on a debugger breakpoint, for a duration of 2 sweeps.
378 :return:
379 Tuple of the task identifier and the deserialised task
380 payload. If ``max_tasks`` is greater than 1, no matter if
381 it dequeues more than 1 task, the return value is a list of
382 said 2-tuples.
383 '''
385 assert max_tasks > 0 1aesrtCmkhlonicjdbfgpq
387 task_pairs = await self._dequeue(timeout, max_tasks) 1aesrtCmkhlonicjdbfgpq
388 return task_pairs if max_tasks > 1 else task_pairs[0] 1aesrmkhlonicjdbfgpq
390 async def _requeue(
391 self,
392 pipeline: Pipeline,
393 task_id: TaskId,
394 delay: Optional[float] = None,
395 task_timeout: Optional[float] = None
396 ) -> list:
397 expected = [] 1elonyicjdbq
398 await pipeline.zrem(self.keys['working'], task_id) 1elonyicjdbq
399 expected.append(1) 1elonyicjdbq
401 task_state_key = self._get_state_key(task_id) 1elonyicjdbq
402 if not delay: 1elonyicjdbq
403 await pipeline.lpush(self.keys['pending'], task_id) 1eloyicbq
404 await pipeline.hset(task_state_key, 'last_requeue_time', time.time()) 1eloyicbq
405 await pipeline.hincrby(task_state_key, 'requeue_count', 1) 1eloyicbq
406 await pipeline.hset(task_state_key, 'status', TorrelqueTaskStatus.PENDING.value) 1eloyicbq
407 expected.extend([..., ..., ..., 0]) 1eloyicbq
408 else:
409 await pipeline.zadd(self.keys['delayed'], {task_id: time.time() + delay}) 1njdb
410 await pipeline.hset(task_state_key, 'status', TorrelqueTaskStatus.DELAYED.value) 1njdb
411 expected.extend([1, 0]) 1njdb
413 if task_timeout: 1elonyicjdbq
414 await pipeline.hset(task_state_key, 'timeout', task_timeout) 1ibq
415 expected.append(0) 1ibq
417 return expected 1elonyicjdbq
419 async def requeue(
420 self,
421 task_id: TaskId,
422 delay: Optional[float] = None,
423 *,
424 task_timeout: Optional[float] = None,
425 pipeline: Optional[Pipeline] = None,
426 ):
427 '''
428 Return failed task into the queue with optional delay.
430 :argument task_id:
431 Task identifier.
432 :argument delay:
433 Number of seconds to delay putting the task into "pending"
434 list. Note that the sweep must be scheduled in order for
435 tasks from "delayed" to return to "pending" list.
436 :argument task_timeout:
437 Redefine task timeout, which is the time since the task's
438 processing start after which it is considered stale.
439 :argument pipeline:
440 External Redis pipeline that allows for bulk requeue.
441 '''
443 if pipeline is not None: 1elonyicjdbq
444 await self._requeue(pipeline, task_id, delay, task_timeout) 1o
445 else:
446 async with await self._client.pipeline(transaction=True) as pipeline: 1elnyicjdbq
447 expected = await self._requeue(pipeline, task_id, delay, task_timeout) 1elnyicjdbq
448 actual = await pipeline.execute() 1elnyicjdbq
450 if not all(expc == actl or expc is ... for expc, actl in zip(expected, actual)): 1elnyicjdbq
451 logger.warning('Inconsistent requeue of task:%s: %s', task_id, actual) 1y
453 async def _release(
454 self,
455 pipeline: Pipeline,
456 task_id: TaskId,
457 result=None,
458 result_ttl: Optional[int] = None,
459 status: TorrelqueTaskStatus = TorrelqueTaskStatus.COMPLETED,
460 ) -> list:
461 if status is not None and not status.isfinal(): 1aemkAhEcdbfgp
462 raise TorrelqueError(f'Invalid status for released task: {status}') 1E
464 expected = [] 1aemkAhcdbfgp
465 await pipeline.zrem(self.keys['working'], task_id) 1aemkAhcdbfgp
466 await pipeline.hdel(self.keys['tasks'], task_id) 1aemkAhcdbfgp
467 expected.extend([1, 1]) 1aemkAhcdbfgp
469 task_state_key = self._get_state_key(task_id) 1aemkAhcdbfgp
470 if result is not None: 1aemkAhcdbfgp
471 await pipeline.hset(task_state_key, 'result', self._serialiser.dumps(result)) 1khcfgp
472 await pipeline.hset(task_state_key, 'release_time', time.time()) 1khcfgp
473 await pipeline.hset(task_state_key, 'status', status.value) 1khcfgp
474 expected.extend([1, 1, 0]) 1khcfgp
476 result_ttl = result_ttl if result_ttl is not None else self.result_ttl 1khcfgp
477 await pipeline.expire(task_state_key, result_ttl) 1khcfgp
478 expected.append(1) 1khcfgp
479 else:
480 await pipeline.delete(task_state_key) 1aemAdb
481 expected.append(1) 1aemAdb
483 return expected 1aemkAhcdbfgp
485 async def release(
486 self,
487 task_id: TaskId,
488 *,
489 result=None,
490 result_ttl: Optional[int] = None,
491 status: TorrelqueTaskStatus = TorrelqueTaskStatus.COMPLETED,
492 pipeline: Optional[Pipeline] = None,
493 ):
494 '''
495 Remove finished task from the queue.
497 Unless ``result`` is specified, all task information is removed
498 from the queue immediately.
500 Since there's no dead letter queue, tasks that have exceeded
501 allowed number of retries should also be released, possibly
502 with ``TorrelqueTaskStatus.REJECTED`` status if producer is
503 interested in the status.
505 :argument task_id:
506 Task identifier.
507 :argument result:
508 Arbitrary serialisable task result. If ``result`` is
509 ``None`` task state key is removed immediately on release.
510 :argument result_ttl:
511 Number of seconds to keep task state key after release.
512 Override of default result TTL.
513 :argument status:
514 Task status to set on release. It only apples when result
515 is not ``None``.
516 :argument pipeline:
517 External Redis pipeline that allows for bulk release.
518 :raises TorrelqueError:
519 If the status is not final.
520 '''
522 if pipeline is not None: 1aemkAhEcdbfgp
523 await self._release(pipeline, task_id, result, result_ttl, status) 1k
524 else:
525 async with await self._client.pipeline(transaction=True) as pipeline: 1aemAhEcdbfgp
526 expected = await self._release(pipeline, task_id, result, result_ttl, status) 1aemAhEcdbfgp
527 actual = await pipeline.execute() 1aemAhcdbfgp
529 if expected != actual: 1aemAhcdbfgp
530 logger.warning('Inconsistent release of task:%s: %s', task_id, actual) 1A
532 async def _check_keyspace_notification_config(self):
533 if not self._keyspace_notification_enabled: 1aufDBg
534 config = await self._client.config_get('notify-keyspace-events') 1aufDBg
535 notify_config = set(config['notify-keyspace-events']) 1aufDBg
536 # See https://redis.io/topics/notifications#configuration
537 if {'K', 'A'} - notify_config and {'K', 'g', 'h'} - notify_config: 1aufDBg
538 raise TorrelqueError('Redis notify-keyspace-events must include KA or Kgh') 1D
539 self._keyspace_notification_enabled = True 1aufBg
541 async def _get_keyspace_notification_message(self, pubsub, timeout: float) -> Optional[dict]:
542 try: 1au
543 message = await asyncio.wait_for(pubsub.get_message(), timeout) 1au
544 except asyncio.TimeoutError as ex:
545 raise TorrelqueTimeoutError from ex
546 else:
547 return message
549 async def watch(
550 self, task_id: TaskId, *, timeout: Optional[float] = None
551 ) -> AsyncIterable[TorrelqueTaskState]:
552 '''
553 Watch task status change until it's released from the queue.
555 .. note::
557 This method relies on ``notify-keyspace-events`` introduced
558 in Redis 2.8. The configuration must have generic and hash
559 commands enabled. That is, the configuration must include
560 either ``KA`` or ``Kgh``.
562 :argument task_id:
563 Task identifier.
564 :argument timeout:
565 Timeout for watching.
566 :raises TorrelqueError:
567 If ``notify-keyspace-events`` is not configured properly.
568 :raises TorrelqueTimeoutError:
569 If ``watch`` has taken longer than ``timeout``.
570 :raises TorrelqueLookupError:
571 If the task state key is not found.
572 :return:
573 Asynchronous generator that yields task state dictionaries
574 as returned by :py:meth:`.get_task_state`. Generator stops
575 when the task is released. If the task is released without
576 result, generator won't yield ``dict`` with final status.
577 '''
579 start = time.monotonic() 1aufDBg
581 await self._check_keyspace_notification_config() 1aufDBg
583 task_state = await self.get_task_state(task_id) 1aufBg
584 yield task_state 1aufg
586 status = task_state.status 1aufg
587 if status.isfinal(): 1aufg
588 return 1fg
590 async with aclosing(self._client.pubsub(ignore_subscribe_messages=True)) as pubsub: 1au
591 dbn = self._client.connection_pool.connection_kwargs.get('db', 0) 1au
592 await pubsub.subscribe('__keyspace@{}__:{}'.format(dbn, self._get_state_key(task_id))) 1au
594 iter_timeout = timeout 1au
595 while True: 1au
596 message = await self._get_keyspace_notification_message(pubsub, iter_timeout) 1au
597 if message and message['data'] == b'del':
598 return # Released without result
599 elif message and message['data'] == b'hset':
600 try:
601 task_state = await self.get_task_state(task_id)
602 except TorrelqueLookupError:
603 return # Race condition with release
605 if task_state.status != status:
606 status = task_state.status
607 yield task_state
608 if status.isfinal():
609 return
611 if timeout is not None:
612 iter_timeout = timeout - (time.monotonic() - start)
614 async def sweep(self) -> Tuple[int, int, int]:
615 '''
616 Execute the task sweep.
618 :return:
619 3-tuple with counts of:
621 - stale tasks from "working" set returned into "pending" list
622 - due delayed tasks from "delayed" set returned into "pending" list
623 - stale dequeueing task ids returned into "pending" list
624 '''
626 keys = [ 1aFjb
627 self.keys[k]
628 for k in ('pending', 'dequeueing', 'undequeued', 'working', 'delayed', 'task')
629 ]
630 args = [time.time(), TorrelqueTaskStatus.PENDING.value] 1aFjb
631 result = await self._scripts['sweep'](keys, args) 1aFjb
632 return tuple(result) 1aFjb
634 async def _sweep_runner(self, interval: float):
635 while True:
636 start = time.monotonic()
637 try:
638 result = await self.sweep()
639 except redis.RedisError:
640 logger.exception('Sweep has failed with Redis error, continuing')
641 except asyncio.CancelledError:
642 break
643 except Exception:
644 logger.exception('Sweep has failed with unexpected error, stopping')
645 break
646 else:
647 logger.debug('Sweep has requeued: stale=%d, delayed=%d, undequeued=%d', *result)
649 await asyncio.sleep(interval - (time.monotonic() - start))
651 def schedule_sweep(self, interval: Optional[float] = None):
652 '''
653 Schedule the sweep in a background coroutine.
655 :argument interval:
656 Override of default sweep interval.
657 '''
659 interval = interval or self.sweep_interval 1tGdHIz
660 self._sweep_task = asyncio.get_event_loop().create_task(self._sweep_runner(interval)) 1tGdHIz
662 def unschedule_sweep(self):
663 '''Unschedule the sweep in a background coroutine.'''
665 assert self._sweep_task 1tGdHIz
666 self._sweep_task.cancel() 1tGdHIz
668 async def get_queue_stats(self) -> TorrelqueQueueStats:
669 '''Get queue counters.'''
671 async with await self._client.pipeline(transaction=True) as pipe: 1srvwmkhlonij
672 await pipe.hlen(self.keys['tasks']) 1srvwmkhlonij
673 await pipe.llen(self.keys['pending']) 1srvwmkhlonij
674 await pipe.zcard(self.keys['working']) 1srvwmkhlonij
675 await pipe.zcard(self.keys['delayed']) 1srvwmkhlonij
676 result = await pipe.execute() 1srvwmkhlonij
678 return TorrelqueQueueStats(*result) 1srvwmkhlonij
680 async def get_task_state(self, task_id: TaskId) -> TorrelqueTaskState:
681 '''
682 Get task state.
684 :argument task_id:
685 Task identifier.
686 :raises TorrelqueLookupError:
687 If the task state key is not found.
688 '''
690 result = await self._client.hgetall(self._get_state_key(task_id)) 1asruvwxmkhlonicjfBg
691 if not result: 1asruvwxmkhlonicjfBg
692 raise TorrelqueLookupError 1amB
694 return TorrelqueTaskState( 1asruvwxkhlonicjfg
695 status = TorrelqueTaskStatus(int(result[b'status'])),
696 timeout = float(result[b'timeout']),
697 enqueue_time = float(result[b'enqueue_time']),
698 last_dequeue_time = float(result.get(b'last_dequeue_time', 0)) or None,
699 dequeue_count = int(result.get(b'dequeue_count', 0)),
700 last_requeue_time = float(result.get(b'last_requeue_time', 0)) or None,
701 requeue_count = int(result.get(b'requeue_count', 0)),
702 release_time = float(result.get(b'release_time', 0)) or None,
703 result = (
704 self._serialiser.loads(result[b'result'])
705 if result.get(b'result') is not None else None
706 )
707 )