Coverage for torrelque / __init__.py: 99%
276 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-07 17:47 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-07 17:47 +0000
1"""Asynchronous Redis-backed reliable queue package."""
3import asyncio
4import enum
5import json
6import logging
7import time
8import uuid
9from importlib.resources import files
10from typing import Any, AsyncIterable, Callable, NamedTuple
12import redis.asyncio
13from redis.asyncio.client import Pipeline
16__all__ = (
17 'Torrelque',
18 'TorrelqueQueueStats',
19 'TorrelqueTaskStatus',
20 'TorrelqueTaskState',
21 'TorrelqueError',
22 'TorrelqueTimeoutError',
23 'TorrelqueLookupError',
24 'TorrelqueTaskSerialiser',
25)
27logger = logging.getLogger(__package__)
29TaskId = str
30TaskPair = tuple[TaskId, dict]
33class TorrelqueTaskStatus(enum.IntEnum):
34 """Task status."""
36 PENDING = 0
37 """Task is enqueued."""
39 WORKING = 1
40 """Task is dequeued."""
42 DELAYED = 2
43 """Task is delayed."""
45 COMPLETED = 3
46 """Task is released, as the result of successful completion."""
48 REJECTED = 4
49 """Task is released, as the result of (multiple) failed attempts."""
51 def isfinal(self):
52 """Tells whether the status is final."""
54 return self in (self.COMPLETED, self.REJECTED) 1aetmkAhEcdbfgp
57class TorrelqueQueueStats(NamedTuple):
58 """Queue counters."""
60 tasks: int
61 """Total number of tasks in the queue."""
63 pending: int
64 """Number of pending tasks in the queue."""
66 working: int
67 """Number of working tasks in the queue."""
69 delayed: int
70 """Number of delayed tasks in the queue."""
73class TorrelqueTaskState(NamedTuple):
74 """Task state."""
76 status: TorrelqueTaskStatus
77 """Status of the task."""
79 timeout: float
80 """
81 Execution timeout of the task after while it's considered stale.
82 """
84 result: Any
85 """Optional result of the task with in a final state."""
87 dequeue_count: int
88 """Number of times the task was dequeued from the queue."""
90 requeue_count: int
91 """Number of times the task was requeued from the queue."""
93 enqueue_time: float
94 """Unix timestamp of the enqueue time of the task."""
96 last_dequeue_time: float | None
97 """Optional Unix timestamp of the last dequeue time of the task."""
99 last_requeue_time: float | None
100 """Optional Unix timestamp of the last requeue time of the task."""
102 release_time: float | None
103 """
104 Optional Unix timestamp of the release time of the task in a final
105 status.
106 """
109class TorrelqueTaskSerialiser(NamedTuple):
110 """Task serialisation delegate."""
112 dumps: Callable[[Any], bytes]
113 """Serialise task data."""
115 loads: Callable[[bytes], Any]
116 """Deserialise task data."""
119class TorrelqueError(Exception):
120 """Generic Torrelque error."""
123class TorrelqueTimeoutError(TorrelqueError):
124 """Torrelque timeout error."""
127class TorrelqueLookupError(TorrelqueError):
128 """Torrelque lookup error."""
131class Torrelque:
132 """
133 Reliable work queue.
135 :argument client:
136 Redis client instance.
137 :argument queue:
138 Name of the queue. Must match across producers and consumers.
139 :argument serialiser:
140 An object with ``dumps`` and ``loads`` that (de)serialises
141 task bodies.
142 :raises TorrelqueError:
143 Not a ``redis.asyncio.Redis`` instance passed.
144 """
146 task_timeout = 300
147 """
148 Default timeout for a task in the "working" set to be
149 considered stale.
150 """
152 sweep_interval = 30
153 """Default interval between sweep calls, when sweep is scheduled."""
155 result_ttl = 3600
156 """Default time-to-live of a task result (when applicable)."""
158 keys = {
159 'pending' : 'pending', # list
160 'dequeueing' : 'dequeueing', # list
161 'undequeued' : 'undequeued', # set
162 'working' : 'working', # sorted set
163 'delayed' : 'delayed', # sorted set
164 'tasks' : 'tasks', # hash
165 'task' : 'task' # prefix for hashes
166 }
167 """
168 Queue Redis key name mapping.
170 On initialisation the values are prefixed with the queue name, and
171 the new dictionary is rebound to the instance.
173 .. list-table:: Redis key description
175 * - ``pending``
176 - a list containing enqueued task ids
177 * - ``dequeueing``
178 - a short-living task id list where :py:meth:`.dequeue`
179 ``RPOPLPUSH``-es task ids from ``pending`` list
180 * - ``undequeued``
181 - a set where :py:meth:`.sweep` stored potentially (evaluated
182 on the next sweep) stale, "undequeued", task ids
183 * - ``working``
184 - a sorted set with successfully dequeued task ids where the
185 score is the Unix timestamp when the task becomes stale
186 according to its timeout
187 * - ``delayed``
188 - a sorted set with delayed task ids where the score is the
189 Unix timestamp when the task becomes due
190 * - ``tasks``
191 - a hash mapping a task id to its serialised representation
192 * - ``task``
193 - a string prefix, followed by task id, for hashes storing
194 individual task stats
196 """
198 _serialiser: TorrelqueTaskSerialiser
199 """
200 Task data serialiser that converts task object into and from string
201 representation.
202 """
204 _client: redis.asyncio.Redis
205 """Redis client."""
207 _sweep_task: asyncio.Task
208 """Periodic sweep ``asyncio`` task."""
210 _keyspace_notification_enabled = False
211 """
212 Flag indicates that Redis keyspace notification were found
213 correctly configured, and further tests should be omitted.
214 """
216 _scripts: dict[str, 'redis.commands.core.AsyncScript']
217 """Dictionary with Lua scripts read on initialisation."""
219 def __init__(
220 self,
221 client: redis.asyncio.Redis,
222 *,
223 queue: str = 'trq',
224 serialiser=TorrelqueTaskSerialiser(lambda obj: json.dumps(obj).encode(), json.loads),
225 ):
226 if not isinstance(client, redis.asyncio.Redis): 1aeJd
227 raise TorrelqueError('redis.asyncio.Redis instance expected') 1J
229 self._client = client 1aed
230 self._serialiser = serialiser 1aed
231 self._scripts = { 1aed
232 n: client.register_script((files(__package__) / f'{n}.lua').read_text())
233 for n in ('dequeue', 'sweep')
234 }
236 self.keys = {k: f'{queue}:{v}' for k, v in self.keys.items()} 1aed
238 def _get_state_key(self, task_id):
239 return '{}:{}'.format(self.keys['task'], task_id) 1aesrtuvwxmkAhlonyicjdzbfBgpq
241 async def _enqueue(
242 self,
243 pipeline: Pipeline,
244 task,
245 task_timeout: float | None = None,
246 delay: float | None = None,
247 ) -> TaskId:
248 task_timeout = task_timeout or self.task_timeout 1esrtuvwxmkhlonicjdzbfgpq
249 task_id = uuid.uuid1().hex 1esrtuvwxmkhlonicjdzbfgpq
250 task_data = self._serialiser.dumps(task) 1esrtuvwxmkhlonicjdzbfgpq
251 task_state_key = self._get_state_key(task_id) 1esrtuvwxmkhlonicjdzbfgpq
253 if delay: 1esrtuvwxmkhlonicjdzbfgpq
254 await pipeline.zadd(self.keys['delayed'], {task_id: time.time() + delay}) 1x
255 await pipeline.hset(task_state_key, 'status', TorrelqueTaskStatus.DELAYED.value) 1x
256 else:
257 await pipeline.lpush(self.keys['pending'], task_id) 1esrtuvwmkhlonicjdzbfgpq
258 await pipeline.hset(task_state_key, 'status', TorrelqueTaskStatus.PENDING.value) 1esrtuvwmkhlonicjdzbfgpq
260 await pipeline.hset(self.keys['tasks'], task_id, task_data) 1esrtuvwxmkhlonicjdzbfgpq
261 await pipeline.hset(task_state_key, 'enqueue_time', time.time()) 1esrtuvwxmkhlonicjdzbfgpq
262 await pipeline.hset(task_state_key, 'timeout', task_timeout) 1esrtuvwxmkhlonicjdzbfgpq
264 return task_id 1esrtuvwxmkhlonicjdzbfgpq
266 async def enqueue(
267 self,
268 task,
269 *,
270 task_timeout: float | None = None,
271 delay: float | None = None,
272 pipeline: Pipeline | None = None
273 ) -> TaskId:
274 """
275 Put a task on the queue optionally providing its timeout and delay.
277 :argument task:
278 Arbitrary serialisable task payload.
279 :argument task_timeout:
280 Time since the task's processing start after which it is
281 considered stale.
282 :argument delay:
283 Number of seconds to delay processing of the task, i.e.
284 putting it into the "pending" list. Note that the sweep
285 must be scheduled for the delayed tasks to return, and
286 the delay only has effect after the sweep execution.
287 :argument pipeline:
288 External Redis pipeline that allows for bulk enqueue.
289 :return:
290 Task identifier.
291 """
293 if pipeline is not None: 1esrtuvwxmkhlonicjdzbfgpq
294 task_id = await self._enqueue(pipeline, task, task_timeout, delay) 1rwkoz
295 else:
296 async with await self._client.pipeline(transaction=True) as pipeline: 1estuvxmhlnicjdbfgpq
297 task_id = await self._enqueue(pipeline, task, task_timeout, delay) 1estuvxmhlnicjdbfgpq
298 await pipeline.execute() 1estuvxmhlnicjdbfgpq
300 return task_id 1esrtuvwxmkhlonicjdzbfgpq
302 async def _dequeue(self, timeout: int, max_tasks: int) -> list[TaskPair]:
303 pending_key = self.keys['pending'] 1aesruCmkhlonicjdbfgpq
304 dequeueing_key = self.keys['dequeueing'] 1aesruCmkhlonicjdbfgpq
306 # Try to move max_tasks task ids to dequeueing without blocking
307 task_ids: list[bytes] = [] 1aesruCmkhlonicjdbfgpq
308 if max_tasks > 1: 1aesruCmkhlonicjdbfgpq
309 async with await self._client.pipeline(transaction=True) as pipeline: 1rko
310 for _ in range(max_tasks): 1rko
311 await pipeline.rpoplpush(pending_key, dequeueing_key) 1rko
312 task_ids.extend(task_id for task_id in await pipeline.execute() if task_id) 1rko
314 # If the above hasn't succeeded guarantee at least one task
315 if not task_ids: 1aesruCmkhlonicjdbfgpq
316 task_id = await self._client.brpoplpush( 1aesruCmhlnicjdbfgpq
317 pending_key, dequeueing_key, timeout=timeout or 0
318 )
319 if not task_id: 1aesruCmhlnicjdbfgpq
320 raise TorrelqueTimeoutError 1srC
321 else:
322 task_ids.append(task_id) 1aesumhlnicjdbfgpq
324 keys = [self.keys[k] for k in ('dequeueing', 'working', 'tasks', 'task')] 1aesrumkhlonicjdbfgpq
325 args = [time.time(), TorrelqueTaskStatus.WORKING.value] + task_ids 1aesrumkhlonicjdbfgpq
326 pairs = await self._scripts['dequeue'](keys, args) 1aesrumkhlonicjdbfgpq
328 result = [] 1aesrumkhlonicjdbfgpq
329 for task_id, task_data in pairs: 1aesrumkhlonicjdbfgpq
330 task_id = task_id.decode() 1aesrumkhlonicjdbfgpq
331 task_data = self._serialiser.loads(task_data) 1aesrumkhlonicjdbfgpq
332 if not task_data: 1aesrumkhlonicjdbfgpq
333 logger.warning('Task:%s not found in dequeueing list', task_id) 1u
334 continue 1u
336 result.append((task_id, task_data)) 1aesrmkhlonicjdbfgpq
338 if not result: 1aesrumkhlonicjdbfgpq
339 raise TorrelqueLookupError('Failed to dequeue pending task') 1u
341 return result 1aesrmkhlonicjdbfgpq
343 async def dequeue(
344 self, timeout: int | None = None, *, max_tasks: int = 1
345 ) -> TaskPair | list[TaskPair]:
346 """
347 Get a task or a task list from the queue with optional timeout.
349 :argument timeout:
350 Time to wait until a task is available. Timeout applies
351 only to fetching single task. Note that Redis only supports
352 an integer timeout.
353 :argument max_tasks:
354 If greater than 1 the method will try to optimistically
355 dequeue as many tasks. That means that only 1 task is
356 guaranteed.
357 :raises TorrelqueTimeoutError:
358 If timeout was provided and there was no result within it.
359 :raises TorrelqueLookupError:
360 Indicates that the task id has become staling during the
361 runtime of this method. This is not expected under normal
362 circumstances. It can happen if this method is paused, say
363 on a debugger breakpoint, for a duration of 2 sweeps.
364 :return:
365 Tuple of the task identifier and the deserialised task
366 payload. If ``max_tasks`` is greater than 1, no matter if
367 it dequeues more than 1 task, the return value is a list of
368 said 2-tuples.
369 """
371 assert max_tasks > 0 1aesruCmkhlonicjdbfgpq
373 task_pairs = await self._dequeue(timeout, max_tasks) 1aesruCmkhlonicjdbfgpq
374 return task_pairs if max_tasks > 1 else task_pairs[0] 1aesrmkhlonicjdbfgpq
376 async def _requeue(
377 self,
378 pipeline: Pipeline,
379 task_id: TaskId,
380 delay: float | None = None,
381 task_timeout: float | None = None
382 ) -> list:
383 expected = [] 1elonyicjdbq
384 await pipeline.zrem(self.keys['working'], task_id) 1elonyicjdbq
385 expected.append(1) 1elonyicjdbq
387 task_state_key = self._get_state_key(task_id) 1elonyicjdbq
388 if not delay: 1elonyicjdbq
389 await pipeline.lpush(self.keys['pending'], task_id) 1eloyicbq
390 await pipeline.hset(task_state_key, 'last_requeue_time', time.time()) 1eloyicbq
391 await pipeline.hincrby(task_state_key, 'requeue_count', 1) 1eloyicbq
392 await pipeline.hset(task_state_key, 'status', TorrelqueTaskStatus.PENDING.value) 1eloyicbq
393 expected.extend([..., ..., ..., 0]) 1eloyicbq
394 else:
395 await pipeline.zadd(self.keys['delayed'], {task_id: time.time() + delay}) 1njdb
396 await pipeline.hset(task_state_key, 'status', TorrelqueTaskStatus.DELAYED.value) 1njdb
397 expected.extend([1, 0]) 1njdb
399 if task_timeout: 1elonyicjdbq
400 await pipeline.hset(task_state_key, 'timeout', task_timeout) 1ibq
401 expected.append(0) 1ibq
403 return expected 1elonyicjdbq
405 async def requeue(
406 self,
407 task_id: TaskId,
408 delay: float | None = None,
409 *,
410 task_timeout: float | None = None,
411 pipeline: Pipeline | None = None,
412 ):
413 """
414 Return failed task into the queue with optional delay.
416 :argument task_id:
417 Task identifier.
418 :argument delay:
419 Number of seconds to delay putting the task into "pending"
420 list. Note that the sweep must be scheduled in order for
421 tasks from "delayed" to return to "pending" list.
422 :argument task_timeout:
423 Redefine task timeout, which is the time since the task's
424 processing start after which it is considered stale.
425 :argument pipeline:
426 External Redis pipeline that allows for bulk requeue.
427 """
429 if pipeline is not None: 1elonyicjdbq
430 await self._requeue(pipeline, task_id, delay, task_timeout) 1o
431 else:
432 async with await self._client.pipeline(transaction=True) as pipeline: 1elnyicjdbq
433 expected = await self._requeue(pipeline, task_id, delay, task_timeout) 1elnyicjdbq
434 actual = await pipeline.execute() 1elnyicjdbq
436 if not all(expc == actl or expc is ... for expc, actl in zip(expected, actual)): 1elnyicjdbq
437 logger.warning('Inconsistent requeue of task:%s: %s', task_id, actual) 1y
439 async def _release(
440 self,
441 pipeline: Pipeline,
442 task_id: TaskId,
443 result=None,
444 result_ttl: int | None = None,
445 status: TorrelqueTaskStatus = TorrelqueTaskStatus.COMPLETED,
446 ) -> list:
447 if status is not None and not status.isfinal(): 1aemkAhEcdbfgp
448 raise TorrelqueError(f'Invalid status for released task: {status}') 1E
450 expected = [] 1aemkAhcdbfgp
451 await pipeline.zrem(self.keys['working'], task_id) 1aemkAhcdbfgp
452 await pipeline.hdel(self.keys['tasks'], task_id) 1aemkAhcdbfgp
453 expected.extend([1, 1]) 1aemkAhcdbfgp
455 task_state_key = self._get_state_key(task_id) 1aemkAhcdbfgp
456 if result is not None: 1aemkAhcdbfgp
457 await pipeline.hset(task_state_key, 'result', self._serialiser.dumps(result)) 1khcfgp
458 await pipeline.hset(task_state_key, 'release_time', time.time()) 1khcfgp
459 await pipeline.hset(task_state_key, 'status', status.value) 1khcfgp
460 expected.extend([1, 1, 0]) 1khcfgp
462 result_ttl = result_ttl if result_ttl is not None else self.result_ttl 1khcfgp
463 await pipeline.expire(task_state_key, result_ttl) 1khcfgp
464 expected.append(1) 1khcfgp
465 else:
466 await pipeline.delete(task_state_key) 1aemAdb
467 expected.append(1) 1aemAdb
469 return expected 1aemkAhcdbfgp
471 async def release(
472 self,
473 task_id: TaskId,
474 *,
475 result=None,
476 result_ttl: int | None = None,
477 status: TorrelqueTaskStatus = TorrelqueTaskStatus.COMPLETED,
478 pipeline: Pipeline | None = None,
479 ):
480 """
481 Remove finished task from the queue.
483 Unless ``result`` is specified, all task information is removed
484 from the queue immediately.
486 Since there's no dead letter queue, tasks that have exceeded
487 allowed number of retries should also be released, possibly
488 with ``TorrelqueTaskStatus.REJECTED`` status if producer is
489 interested in the status.
491 :argument task_id:
492 Task identifier.
493 :argument result:
494 Arbitrary serialisable task result. If ``result`` is
495 ``None`` task state key is removed immediately on release.
496 :argument result_ttl:
497 Number of seconds to keep task state key after release.
498 Override of default result TTL.
499 :argument status:
500 Task status to set on release. It only apples when result
501 is not ``None``.
502 :argument pipeline:
503 External Redis pipeline that allows for bulk release.
504 :raises TorrelqueError:
505 If the status is not final.
506 """
508 if pipeline is not None: 1aemkAhEcdbfgp
509 await self._release(pipeline, task_id, result, result_ttl, status) 1k
510 else:
511 async with await self._client.pipeline(transaction=True) as pipeline: 1aemAhEcdbfgp
512 expected = await self._release(pipeline, task_id, result, result_ttl, status) 1aemAhEcdbfgp
513 actual = await pipeline.execute() 1aemAhcdbfgp
515 if expected != actual: 1aemAhcdbfgp
516 logger.warning('Inconsistent release of task:%s: %s', task_id, actual) 1A
518 async def _check_keyspace_notification_config(self):
519 if not self._keyspace_notification_enabled: 1atfDBg
520 config = await self._client.config_get('notify-keyspace-events') 1atfDBg
521 notify_config = set(config['notify-keyspace-events']) 1atfDBg
522 # See https://redis.io/topics/notifications#configuration
523 if {'K', 'A'} - notify_config and {'K', 'g', 'h'} - notify_config: 1atfDBg
524 raise TorrelqueError('Redis notify-keyspace-events must include KA or Kgh') 1D
525 self._keyspace_notification_enabled = True 1atfBg
527 async def _get_keyspace_notification_message(self, pubsub, timeout: float) -> dict | None:
528 try: 1at
529 message = await asyncio.wait_for(pubsub.get_message(), timeout) 1at
530 except asyncio.TimeoutError as ex:
531 raise TorrelqueTimeoutError from ex
532 else:
533 return message
535 async def watch(
536 self, task_id: TaskId, *, timeout: float | None = None
537 ) -> AsyncIterable[TorrelqueTaskState]:
538 """
539 Watch task status change until it's released from the queue.
541 .. note::
543 This method relies on ``notify-keyspace-events`` introduced
544 in Redis 2.8. The configuration must have generic and hash
545 commands enabled. That is, the configuration must include
546 either ``KA`` or ``Kgh``.
548 :argument task_id:
549 Task identifier.
550 :argument timeout:
551 Timeout for watching.
552 :raises TorrelqueError:
553 If ``notify-keyspace-events`` is not configured properly.
554 :raises TorrelqueTimeoutError:
555 If ``watch`` has taken longer than ``timeout``.
556 :raises TorrelqueLookupError:
557 If the task state key is not found.
558 :return:
559 Asynchronous generator that yields task state dictionaries
560 as returned by :py:meth:`.get_task_state`. Generator stops
561 when the task is released. If the task is released without
562 result, generator won't yield ``dict`` with final status.
563 """
565 start = time.monotonic() 1atfDBg
567 await self._check_keyspace_notification_config() 1atfDBg
569 task_state = await self.get_task_state(task_id) 1atfBg
570 yield task_state 1atfg
572 status = task_state.status 1atfg
573 if status.isfinal(): 1atfg
574 return 1fg
576 async with self._client.pubsub(ignore_subscribe_messages=True) as pubsub: 1at
577 dbn = self._client.connection_pool.connection_kwargs.get('db', 0) 1at
578 await pubsub.subscribe('__keyspace@{}__:{}'.format(dbn, self._get_state_key(task_id))) 1at
580 iter_timeout = timeout 1at
581 while True: 1at
582 message = await self._get_keyspace_notification_message(pubsub, iter_timeout) 1at
583 if message and message['data'] == b'del': 583 ↛ 584line 583 didn't jump to line 584 because the condition on line 583 was never true
584 return # Released without result
585 elif message and message['data'] == b'hset':
586 try:
587 task_state = await self.get_task_state(task_id) 1at
588 except TorrelqueLookupError: 1at
589 return # Race condition with release 1at
591 if task_state.status != status:
592 status = task_state.status
593 yield task_state
594 if status.isfinal():
595 return
597 if timeout is not None:
598 iter_timeout = timeout - (time.monotonic() - start)
600 async def sweep(self) -> tuple[int, int, int]:
601 """
602 Execute the task sweep.
604 :return:
605 3-tuple with counts of:
607 - stale tasks from "working" set returned into "pending" list
608 - due delayed tasks from "delayed" set returned into "pending" list
609 - stale dequeueing task ids returned into "pending" list
610 """
612 keys = [ 1aFjb
613 self.keys[k]
614 for k in ('pending', 'dequeueing', 'undequeued', 'working', 'delayed', 'task')
615 ]
616 args = [time.time(), TorrelqueTaskStatus.PENDING.value] 1aFjb
617 result = await self._scripts['sweep'](keys, args) 1aFjb
618 return tuple(result) 1aFjb
620 async def _sweep_runner(self, interval: float):
621 while True:
622 start = time.monotonic()
623 try:
624 result = await self.sweep()
625 except redis.RedisError:
626 logger.exception('Sweep has failed with Redis error, continuing')
627 except asyncio.CancelledError:
628 break
629 except Exception:
630 logger.exception('Sweep has failed with unexpected error, stopping')
631 break
632 else:
633 logger.debug('Sweep has requeued: stale=%d, delayed=%d, undequeued=%d', *result)
635 await asyncio.sleep(interval - (time.monotonic() - start))
637 def schedule_sweep(self, interval: float | None = None):
638 """
639 Schedule the sweep in a background coroutine.
641 :argument interval:
642 Override of default sweep interval.
643 """
645 interval = interval or self.sweep_interval 1uGdHIz
646 self._sweep_task = asyncio.get_event_loop().create_task(self._sweep_runner(interval)) 1uGdHIz
648 def unschedule_sweep(self):
649 """Unschedule the sweep in a background coroutine."""
651 assert self._sweep_task 1uGdHIz
652 self._sweep_task.cancel() 1uGdHIz
654 async def get_queue_stats(self) -> TorrelqueQueueStats:
655 """Get queue counters."""
657 async with await self._client.pipeline(transaction=True) as pipe: 1srvwmkhlonij
658 await pipe.hlen(self.keys['tasks']) 1srvwmkhlonij
659 await pipe.llen(self.keys['pending']) 1srvwmkhlonij
660 await pipe.zcard(self.keys['working']) 1srvwmkhlonij
661 await pipe.zcard(self.keys['delayed']) 1srvwmkhlonij
662 result = await pipe.execute() 1srvwmkhlonij
664 return TorrelqueQueueStats(*result) 1srvwmkhlonij
666 async def get_task_state(self, task_id: TaskId) -> TorrelqueTaskState:
667 """
668 Get task state.
670 :argument task_id:
671 Task identifier.
672 :raises TorrelqueLookupError:
673 If the task state key is not found.
674 """
676 result = await self._client.hgetall(self._get_state_key(task_id)) 1asrtvwxmkhlonicjfBg
677 if not result: 1asrtvwxmkhlonicjfBg
678 raise TorrelqueLookupError 1atmB
680 return TorrelqueTaskState( 1asrtvwxkhlonicjfg
681 status = TorrelqueTaskStatus(int(result[b'status'])),
682 timeout = float(result[b'timeout']),
683 enqueue_time = float(result[b'enqueue_time']),
684 last_dequeue_time = float(result.get(b'last_dequeue_time', 0)) or None,
685 dequeue_count = int(result.get(b'dequeue_count', 0)),
686 last_requeue_time = float(result.get(b'last_requeue_time', 0)) or None,
687 requeue_count = int(result.get(b'requeue_count', 0)),
688 release_time = float(result.get(b'release_time', 0)) or None,
689 result = (
690 self._serialiser.loads(result[b'result'])
691 if result.get(b'result') is not None else None
692 )
693 )