Coverage for torrelque / __init__.py: 99%

276 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-07 17:47 +0000

1"""Asynchronous Redis-backed reliable queue package.""" 

2 

3import asyncio 

4import enum 

5import json 

6import logging 

7import time 

8import uuid 

9from importlib.resources import files 

10from typing import Any, AsyncIterable, Callable, NamedTuple 

11 

12import redis.asyncio 

13from redis.asyncio.client import Pipeline 

14 

15 

16__all__ = ( 

17 'Torrelque', 

18 'TorrelqueQueueStats', 

19 'TorrelqueTaskStatus', 

20 'TorrelqueTaskState', 

21 'TorrelqueError', 

22 'TorrelqueTimeoutError', 

23 'TorrelqueLookupError', 

24 'TorrelqueTaskSerialiser', 

25) 

26 

27logger = logging.getLogger(__package__) 

28 

29TaskId = str 

30TaskPair = tuple[TaskId, dict] 

31 

32 

33class TorrelqueTaskStatus(enum.IntEnum): 

34 """Task status.""" 

35 

36 PENDING = 0 

37 """Task is enqueued.""" 

38 

39 WORKING = 1 

40 """Task is dequeued.""" 

41 

42 DELAYED = 2 

43 """Task is delayed.""" 

44 

45 COMPLETED = 3 

46 """Task is released, as the result of successful completion.""" 

47 

48 REJECTED = 4 

49 """Task is released, as the result of (multiple) failed attempts.""" 

50 

51 def isfinal(self): 

52 """Tells whether the status is final.""" 

53 

54 return self in (self.COMPLETED, self.REJECTED) 1aetmkAhEcdbfgp

55 

56 

57class TorrelqueQueueStats(NamedTuple): 

58 """Queue counters.""" 

59 

60 tasks: int 

61 """Total number of tasks in the queue.""" 

62 

63 pending: int 

64 """Number of pending tasks in the queue.""" 

65 

66 working: int 

67 """Number of working tasks in the queue.""" 

68 

69 delayed: int 

70 """Number of delayed tasks in the queue.""" 

71 

72 

73class TorrelqueTaskState(NamedTuple): 

74 """Task state.""" 

75 

76 status: TorrelqueTaskStatus 

77 """Status of the task.""" 

78 

79 timeout: float 

80 """ 

81 Execution timeout of the task after while it's considered stale. 

82 """ 

83 

84 result: Any 

85 """Optional result of the task with in a final state.""" 

86 

87 dequeue_count: int 

88 """Number of times the task was dequeued from the queue.""" 

89 

90 requeue_count: int 

91 """Number of times the task was requeued from the queue.""" 

92 

93 enqueue_time: float 

94 """Unix timestamp of the enqueue time of the task.""" 

95 

96 last_dequeue_time: float | None 

97 """Optional Unix timestamp of the last dequeue time of the task.""" 

98 

99 last_requeue_time: float | None 

100 """Optional Unix timestamp of the last requeue time of the task.""" 

101 

102 release_time: float | None 

103 """ 

104 Optional Unix timestamp of the release time of the task in a final 

105 status. 

106 """ 

107 

108 

109class TorrelqueTaskSerialiser(NamedTuple): 

110 """Task serialisation delegate.""" 

111 

112 dumps: Callable[[Any], bytes] 

113 """Serialise task data.""" 

114 

115 loads: Callable[[bytes], Any] 

116 """Deserialise task data.""" 

117 

118 

119class TorrelqueError(Exception): 

120 """Generic Torrelque error.""" 

121 

122 

123class TorrelqueTimeoutError(TorrelqueError): 

124 """Torrelque timeout error.""" 

125 

126 

127class TorrelqueLookupError(TorrelqueError): 

128 """Torrelque lookup error.""" 

129 

130 

131class Torrelque: 

132 """ 

133 Reliable work queue. 

134 

135 :argument client: 

136 Redis client instance. 

137 :argument queue: 

138 Name of the queue. Must match across producers and consumers. 

139 :argument serialiser: 

140 An object with ``dumps`` and ``loads`` that (de)serialises 

141 task bodies. 

142 :raises TorrelqueError: 

143 Not a ``redis.asyncio.Redis`` instance passed. 

144 """ 

145 

146 task_timeout = 300 

147 """ 

148 Default timeout for a task in the "working" set to be 

149 considered stale. 

150 """ 

151 

152 sweep_interval = 30 

153 """Default interval between sweep calls, when sweep is scheduled.""" 

154 

155 result_ttl = 3600 

156 """Default time-to-live of a task result (when applicable).""" 

157 

158 keys = { 

159 'pending' : 'pending', # list 

160 'dequeueing' : 'dequeueing', # list 

161 'undequeued' : 'undequeued', # set 

162 'working' : 'working', # sorted set 

163 'delayed' : 'delayed', # sorted set 

164 'tasks' : 'tasks', # hash 

165 'task' : 'task' # prefix for hashes 

166 } 

167 """ 

168 Queue Redis key name mapping. 

169 

170 On initialisation the values are prefixed with the queue name, and 

171 the new dictionary is rebound to the instance. 

172 

173 .. list-table:: Redis key description 

174 

175 * - ``pending`` 

176 - a list containing enqueued task ids 

177 * - ``dequeueing`` 

178 - a short-living task id list where :py:meth:`.dequeue` 

179 ``RPOPLPUSH``-es task ids from ``pending`` list 

180 * - ``undequeued`` 

181 - a set where :py:meth:`.sweep` stored potentially (evaluated 

182 on the next sweep) stale, "undequeued", task ids 

183 * - ``working`` 

184 - a sorted set with successfully dequeued task ids where the 

185 score is the Unix timestamp when the task becomes stale 

186 according to its timeout 

187 * - ``delayed`` 

188 - a sorted set with delayed task ids where the score is the 

189 Unix timestamp when the task becomes due 

190 * - ``tasks`` 

191 - a hash mapping a task id to its serialised representation 

192 * - ``task`` 

193 - a string prefix, followed by task id, for hashes storing 

194 individual task stats 

195 

196 """ 

197 

198 _serialiser: TorrelqueTaskSerialiser 

199 """ 

200 Task data serialiser that converts task object into and from string 

201 representation. 

202 """ 

203 

204 _client: redis.asyncio.Redis 

205 """Redis client.""" 

206 

207 _sweep_task: asyncio.Task 

208 """Periodic sweep ``asyncio`` task.""" 

209 

210 _keyspace_notification_enabled = False 

211 """ 

212 Flag indicates that Redis keyspace notification were found 

213 correctly configured, and further tests should be omitted. 

214 """ 

215 

216 _scripts: dict[str, 'redis.commands.core.AsyncScript'] 

217 """Dictionary with Lua scripts read on initialisation.""" 

218 

219 def __init__( 

220 self, 

221 client: redis.asyncio.Redis, 

222 *, 

223 queue: str = 'trq', 

224 serialiser=TorrelqueTaskSerialiser(lambda obj: json.dumps(obj).encode(), json.loads), 

225 ): 

226 if not isinstance(client, redis.asyncio.Redis): 1aeJd

227 raise TorrelqueError('redis.asyncio.Redis instance expected') 1J

228 

229 self._client = client 1aed

230 self._serialiser = serialiser 1aed

231 self._scripts = { 1aed

232 n: client.register_script((files(__package__) / f'{n}.lua').read_text()) 

233 for n in ('dequeue', 'sweep') 

234 } 

235 

236 self.keys = {k: f'{queue}:{v}' for k, v in self.keys.items()} 1aed

237 

238 def _get_state_key(self, task_id): 

239 return '{}:{}'.format(self.keys['task'], task_id) 1aesrtuvwxmkAhlonyicjdzbfBgpq

240 

241 async def _enqueue( 

242 self, 

243 pipeline: Pipeline, 

244 task, 

245 task_timeout: float | None = None, 

246 delay: float | None = None, 

247 ) -> TaskId: 

248 task_timeout = task_timeout or self.task_timeout 1esrtuvwxmkhlonicjdzbfgpq

249 task_id = uuid.uuid1().hex 1esrtuvwxmkhlonicjdzbfgpq

250 task_data = self._serialiser.dumps(task) 1esrtuvwxmkhlonicjdzbfgpq

251 task_state_key = self._get_state_key(task_id) 1esrtuvwxmkhlonicjdzbfgpq

252 

253 if delay: 1esrtuvwxmkhlonicjdzbfgpq

254 await pipeline.zadd(self.keys['delayed'], {task_id: time.time() + delay}) 1x

255 await pipeline.hset(task_state_key, 'status', TorrelqueTaskStatus.DELAYED.value) 1x

256 else: 

257 await pipeline.lpush(self.keys['pending'], task_id) 1esrtuvwmkhlonicjdzbfgpq

258 await pipeline.hset(task_state_key, 'status', TorrelqueTaskStatus.PENDING.value) 1esrtuvwmkhlonicjdzbfgpq

259 

260 await pipeline.hset(self.keys['tasks'], task_id, task_data) 1esrtuvwxmkhlonicjdzbfgpq

261 await pipeline.hset(task_state_key, 'enqueue_time', time.time()) 1esrtuvwxmkhlonicjdzbfgpq

262 await pipeline.hset(task_state_key, 'timeout', task_timeout) 1esrtuvwxmkhlonicjdzbfgpq

263 

264 return task_id 1esrtuvwxmkhlonicjdzbfgpq

265 

266 async def enqueue( 

267 self, 

268 task, 

269 *, 

270 task_timeout: float | None = None, 

271 delay: float | None = None, 

272 pipeline: Pipeline | None = None 

273 ) -> TaskId: 

274 """ 

275 Put a task on the queue optionally providing its timeout and delay. 

276 

277 :argument task: 

278 Arbitrary serialisable task payload. 

279 :argument task_timeout: 

280 Time since the task's processing start after which it is 

281 considered stale. 

282 :argument delay: 

283 Number of seconds to delay processing of the task, i.e. 

284 putting it into the "pending" list. Note that the sweep 

285 must be scheduled for the delayed tasks to return, and 

286 the delay only has effect after the sweep execution. 

287 :argument pipeline: 

288 External Redis pipeline that allows for bulk enqueue. 

289 :return: 

290 Task identifier. 

291 """ 

292 

293 if pipeline is not None: 1esrtuvwxmkhlonicjdzbfgpq

294 task_id = await self._enqueue(pipeline, task, task_timeout, delay) 1rwkoz

295 else: 

296 async with await self._client.pipeline(transaction=True) as pipeline: 1estuvxmhlnicjdbfgpq

297 task_id = await self._enqueue(pipeline, task, task_timeout, delay) 1estuvxmhlnicjdbfgpq

298 await pipeline.execute() 1estuvxmhlnicjdbfgpq

299 

300 return task_id 1esrtuvwxmkhlonicjdzbfgpq

301 

302 async def _dequeue(self, timeout: int, max_tasks: int) -> list[TaskPair]: 

303 pending_key = self.keys['pending'] 1aesruCmkhlonicjdbfgpq

304 dequeueing_key = self.keys['dequeueing'] 1aesruCmkhlonicjdbfgpq

305 

306 # Try to move max_tasks task ids to dequeueing without blocking 

307 task_ids: list[bytes] = [] 1aesruCmkhlonicjdbfgpq

308 if max_tasks > 1: 1aesruCmkhlonicjdbfgpq

309 async with await self._client.pipeline(transaction=True) as pipeline: 1rko

310 for _ in range(max_tasks): 1rko

311 await pipeline.rpoplpush(pending_key, dequeueing_key) 1rko

312 task_ids.extend(task_id for task_id in await pipeline.execute() if task_id) 1rko

313 

314 # If the above hasn't succeeded guarantee at least one task 

315 if not task_ids: 1aesruCmkhlonicjdbfgpq

316 task_id = await self._client.brpoplpush( 1aesruCmhlnicjdbfgpq

317 pending_key, dequeueing_key, timeout=timeout or 0 

318 ) 

319 if not task_id: 1aesruCmhlnicjdbfgpq

320 raise TorrelqueTimeoutError 1srC

321 else: 

322 task_ids.append(task_id) 1aesumhlnicjdbfgpq

323 

324 keys = [self.keys[k] for k in ('dequeueing', 'working', 'tasks', 'task')] 1aesrumkhlonicjdbfgpq

325 args = [time.time(), TorrelqueTaskStatus.WORKING.value] + task_ids 1aesrumkhlonicjdbfgpq

326 pairs = await self._scripts['dequeue'](keys, args) 1aesrumkhlonicjdbfgpq

327 

328 result = [] 1aesrumkhlonicjdbfgpq

329 for task_id, task_data in pairs: 1aesrumkhlonicjdbfgpq

330 task_id = task_id.decode() 1aesrumkhlonicjdbfgpq

331 task_data = self._serialiser.loads(task_data) 1aesrumkhlonicjdbfgpq

332 if not task_data: 1aesrumkhlonicjdbfgpq

333 logger.warning('Task:%s not found in dequeueing list', task_id) 1u

334 continue 1u

335 

336 result.append((task_id, task_data)) 1aesrmkhlonicjdbfgpq

337 

338 if not result: 1aesrumkhlonicjdbfgpq

339 raise TorrelqueLookupError('Failed to dequeue pending task') 1u

340 

341 return result 1aesrmkhlonicjdbfgpq

342 

343 async def dequeue( 

344 self, timeout: int | None = None, *, max_tasks: int = 1 

345 ) -> TaskPair | list[TaskPair]: 

346 """ 

347 Get a task or a task list from the queue with optional timeout. 

348 

349 :argument timeout: 

350 Time to wait until a task is available. Timeout applies 

351 only to fetching single task. Note that Redis only supports 

352 an integer timeout. 

353 :argument max_tasks: 

354 If greater than 1 the method will try to optimistically 

355 dequeue as many tasks. That means that only 1 task is 

356 guaranteed. 

357 :raises TorrelqueTimeoutError: 

358 If timeout was provided and there was no result within it. 

359 :raises TorrelqueLookupError: 

360 Indicates that the task id has become staling during the 

361 runtime of this method. This is not expected under normal 

362 circumstances. It can happen if this method is paused, say 

363 on a debugger breakpoint, for a duration of 2 sweeps. 

364 :return: 

365 Tuple of the task identifier and the deserialised task 

366 payload. If ``max_tasks`` is greater than 1, no matter if 

367 it dequeues more than 1 task, the return value is a list of 

368 said 2-tuples. 

369 """ 

370 

371 assert max_tasks > 0 1aesruCmkhlonicjdbfgpq

372 

373 task_pairs = await self._dequeue(timeout, max_tasks) 1aesruCmkhlonicjdbfgpq

374 return task_pairs if max_tasks > 1 else task_pairs[0] 1aesrmkhlonicjdbfgpq

375 

376 async def _requeue( 

377 self, 

378 pipeline: Pipeline, 

379 task_id: TaskId, 

380 delay: float | None = None, 

381 task_timeout: float | None = None 

382 ) -> list: 

383 expected = [] 1elonyicjdbq

384 await pipeline.zrem(self.keys['working'], task_id) 1elonyicjdbq

385 expected.append(1) 1elonyicjdbq

386 

387 task_state_key = self._get_state_key(task_id) 1elonyicjdbq

388 if not delay: 1elonyicjdbq

389 await pipeline.lpush(self.keys['pending'], task_id) 1eloyicbq

390 await pipeline.hset(task_state_key, 'last_requeue_time', time.time()) 1eloyicbq

391 await pipeline.hincrby(task_state_key, 'requeue_count', 1) 1eloyicbq

392 await pipeline.hset(task_state_key, 'status', TorrelqueTaskStatus.PENDING.value) 1eloyicbq

393 expected.extend([..., ..., ..., 0]) 1eloyicbq

394 else: 

395 await pipeline.zadd(self.keys['delayed'], {task_id: time.time() + delay}) 1njdb

396 await pipeline.hset(task_state_key, 'status', TorrelqueTaskStatus.DELAYED.value) 1njdb

397 expected.extend([1, 0]) 1njdb

398 

399 if task_timeout: 1elonyicjdbq

400 await pipeline.hset(task_state_key, 'timeout', task_timeout) 1ibq

401 expected.append(0) 1ibq

402 

403 return expected 1elonyicjdbq

404 

405 async def requeue( 

406 self, 

407 task_id: TaskId, 

408 delay: float | None = None, 

409 *, 

410 task_timeout: float | None = None, 

411 pipeline: Pipeline | None = None, 

412 ): 

413 """ 

414 Return failed task into the queue with optional delay. 

415 

416 :argument task_id: 

417 Task identifier. 

418 :argument delay: 

419 Number of seconds to delay putting the task into "pending" 

420 list. Note that the sweep must be scheduled in order for 

421 tasks from "delayed" to return to "pending" list. 

422 :argument task_timeout: 

423 Redefine task timeout, which is the time since the task's 

424 processing start after which it is considered stale. 

425 :argument pipeline: 

426 External Redis pipeline that allows for bulk requeue. 

427 """ 

428 

429 if pipeline is not None: 1elonyicjdbq

430 await self._requeue(pipeline, task_id, delay, task_timeout) 1o

431 else: 

432 async with await self._client.pipeline(transaction=True) as pipeline: 1elnyicjdbq

433 expected = await self._requeue(pipeline, task_id, delay, task_timeout) 1elnyicjdbq

434 actual = await pipeline.execute() 1elnyicjdbq

435 

436 if not all(expc == actl or expc is ... for expc, actl in zip(expected, actual)): 1elnyicjdbq

437 logger.warning('Inconsistent requeue of task:%s: %s', task_id, actual) 1y

438 

439 async def _release( 

440 self, 

441 pipeline: Pipeline, 

442 task_id: TaskId, 

443 result=None, 

444 result_ttl: int | None = None, 

445 status: TorrelqueTaskStatus = TorrelqueTaskStatus.COMPLETED, 

446 ) -> list: 

447 if status is not None and not status.isfinal(): 1aemkAhEcdbfgp

448 raise TorrelqueError(f'Invalid status for released task: {status}') 1E

449 

450 expected = [] 1aemkAhcdbfgp

451 await pipeline.zrem(self.keys['working'], task_id) 1aemkAhcdbfgp

452 await pipeline.hdel(self.keys['tasks'], task_id) 1aemkAhcdbfgp

453 expected.extend([1, 1]) 1aemkAhcdbfgp

454 

455 task_state_key = self._get_state_key(task_id) 1aemkAhcdbfgp

456 if result is not None: 1aemkAhcdbfgp

457 await pipeline.hset(task_state_key, 'result', self._serialiser.dumps(result)) 1khcfgp

458 await pipeline.hset(task_state_key, 'release_time', time.time()) 1khcfgp

459 await pipeline.hset(task_state_key, 'status', status.value) 1khcfgp

460 expected.extend([1, 1, 0]) 1khcfgp

461 

462 result_ttl = result_ttl if result_ttl is not None else self.result_ttl 1khcfgp

463 await pipeline.expire(task_state_key, result_ttl) 1khcfgp

464 expected.append(1) 1khcfgp

465 else: 

466 await pipeline.delete(task_state_key) 1aemAdb

467 expected.append(1) 1aemAdb

468 

469 return expected 1aemkAhcdbfgp

470 

471 async def release( 

472 self, 

473 task_id: TaskId, 

474 *, 

475 result=None, 

476 result_ttl: int | None = None, 

477 status: TorrelqueTaskStatus = TorrelqueTaskStatus.COMPLETED, 

478 pipeline: Pipeline | None = None, 

479 ): 

480 """ 

481 Remove finished task from the queue. 

482 

483 Unless ``result`` is specified, all task information is removed 

484 from the queue immediately. 

485 

486 Since there's no dead letter queue, tasks that have exceeded 

487 allowed number of retries should also be released, possibly 

488 with ``TorrelqueTaskStatus.REJECTED`` status if producer is 

489 interested in the status. 

490 

491 :argument task_id: 

492 Task identifier. 

493 :argument result: 

494 Arbitrary serialisable task result. If ``result`` is 

495 ``None`` task state key is removed immediately on release. 

496 :argument result_ttl: 

497 Number of seconds to keep task state key after release. 

498 Override of default result TTL. 

499 :argument status: 

500 Task status to set on release. It only apples when result 

501 is not ``None``. 

502 :argument pipeline: 

503 External Redis pipeline that allows for bulk release. 

504 :raises TorrelqueError: 

505 If the status is not final. 

506 """ 

507 

508 if pipeline is not None: 1aemkAhEcdbfgp

509 await self._release(pipeline, task_id, result, result_ttl, status) 1k

510 else: 

511 async with await self._client.pipeline(transaction=True) as pipeline: 1aemAhEcdbfgp

512 expected = await self._release(pipeline, task_id, result, result_ttl, status) 1aemAhEcdbfgp

513 actual = await pipeline.execute() 1aemAhcdbfgp

514 

515 if expected != actual: 1aemAhcdbfgp

516 logger.warning('Inconsistent release of task:%s: %s', task_id, actual) 1A

517 

518 async def _check_keyspace_notification_config(self): 

519 if not self._keyspace_notification_enabled: 1atfDBg

520 config = await self._client.config_get('notify-keyspace-events') 1atfDBg

521 notify_config = set(config['notify-keyspace-events']) 1atfDBg

522 # See https://redis.io/topics/notifications#configuration 

523 if {'K', 'A'} - notify_config and {'K', 'g', 'h'} - notify_config: 1atfDBg

524 raise TorrelqueError('Redis notify-keyspace-events must include KA or Kgh') 1D

525 self._keyspace_notification_enabled = True 1atfBg

526 

527 async def _get_keyspace_notification_message(self, pubsub, timeout: float) -> dict | None: 

528 try: 1at

529 message = await asyncio.wait_for(pubsub.get_message(), timeout) 1at

530 except asyncio.TimeoutError as ex: 

531 raise TorrelqueTimeoutError from ex 

532 else: 

533 return message 

534 

535 async def watch( 

536 self, task_id: TaskId, *, timeout: float | None = None 

537 ) -> AsyncIterable[TorrelqueTaskState]: 

538 """ 

539 Watch task status change until it's released from the queue. 

540 

541 .. note:: 

542 

543 This method relies on ``notify-keyspace-events`` introduced 

544 in Redis 2.8. The configuration must have generic and hash 

545 commands enabled. That is, the configuration must include 

546 either ``KA`` or ``Kgh``. 

547 

548 :argument task_id: 

549 Task identifier. 

550 :argument timeout: 

551 Timeout for watching. 

552 :raises TorrelqueError: 

553 If ``notify-keyspace-events`` is not configured properly. 

554 :raises TorrelqueTimeoutError: 

555 If ``watch`` has taken longer than ``timeout``. 

556 :raises TorrelqueLookupError: 

557 If the task state key is not found. 

558 :return: 

559 Asynchronous generator that yields task state dictionaries 

560 as returned by :py:meth:`.get_task_state`. Generator stops 

561 when the task is released. If the task is released without 

562 result, generator won't yield ``dict`` with final status. 

563 """ 

564 

565 start = time.monotonic() 1atfDBg

566 

567 await self._check_keyspace_notification_config() 1atfDBg

568 

569 task_state = await self.get_task_state(task_id) 1atfBg

570 yield task_state 1atfg

571 

572 status = task_state.status 1atfg

573 if status.isfinal(): 1atfg

574 return 1fg

575 

576 async with self._client.pubsub(ignore_subscribe_messages=True) as pubsub: 1at

577 dbn = self._client.connection_pool.connection_kwargs.get('db', 0) 1at

578 await pubsub.subscribe('__keyspace@{}__:{}'.format(dbn, self._get_state_key(task_id))) 1at

579 

580 iter_timeout = timeout 1at

581 while True: 1at

582 message = await self._get_keyspace_notification_message(pubsub, iter_timeout) 1at

583 if message and message['data'] == b'del': 583 ↛ 584line 583 didn't jump to line 584 because the condition on line 583 was never true

584 return # Released without result 

585 elif message and message['data'] == b'hset': 

586 try: 

587 task_state = await self.get_task_state(task_id) 1at

588 except TorrelqueLookupError: 1at

589 return # Race condition with release 1at

590 

591 if task_state.status != status: 

592 status = task_state.status 

593 yield task_state 

594 if status.isfinal(): 

595 return 

596 

597 if timeout is not None: 

598 iter_timeout = timeout - (time.monotonic() - start) 

599 

600 async def sweep(self) -> tuple[int, int, int]: 

601 """ 

602 Execute the task sweep. 

603 

604 :return: 

605 3-tuple with counts of: 

606 

607 - stale tasks from "working" set returned into "pending" list 

608 - due delayed tasks from "delayed" set returned into "pending" list 

609 - stale dequeueing task ids returned into "pending" list 

610 """ 

611 

612 keys = [ 1aFjb

613 self.keys[k] 

614 for k in ('pending', 'dequeueing', 'undequeued', 'working', 'delayed', 'task') 

615 ] 

616 args = [time.time(), TorrelqueTaskStatus.PENDING.value] 1aFjb

617 result = await self._scripts['sweep'](keys, args) 1aFjb

618 return tuple(result) 1aFjb

619 

620 async def _sweep_runner(self, interval: float): 

621 while True: 

622 start = time.monotonic() 

623 try: 

624 result = await self.sweep() 

625 except redis.RedisError: 

626 logger.exception('Sweep has failed with Redis error, continuing') 

627 except asyncio.CancelledError: 

628 break 

629 except Exception: 

630 logger.exception('Sweep has failed with unexpected error, stopping') 

631 break 

632 else: 

633 logger.debug('Sweep has requeued: stale=%d, delayed=%d, undequeued=%d', *result) 

634 

635 await asyncio.sleep(interval - (time.monotonic() - start)) 

636 

637 def schedule_sweep(self, interval: float | None = None): 

638 """ 

639 Schedule the sweep in a background coroutine. 

640 

641 :argument interval: 

642 Override of default sweep interval. 

643 """ 

644 

645 interval = interval or self.sweep_interval 1uGdHIz

646 self._sweep_task = asyncio.get_event_loop().create_task(self._sweep_runner(interval)) 1uGdHIz

647 

648 def unschedule_sweep(self): 

649 """Unschedule the sweep in a background coroutine.""" 

650 

651 assert self._sweep_task 1uGdHIz

652 self._sweep_task.cancel() 1uGdHIz

653 

654 async def get_queue_stats(self) -> TorrelqueQueueStats: 

655 """Get queue counters.""" 

656 

657 async with await self._client.pipeline(transaction=True) as pipe: 1srvwmkhlonij

658 await pipe.hlen(self.keys['tasks']) 1srvwmkhlonij

659 await pipe.llen(self.keys['pending']) 1srvwmkhlonij

660 await pipe.zcard(self.keys['working']) 1srvwmkhlonij

661 await pipe.zcard(self.keys['delayed']) 1srvwmkhlonij

662 result = await pipe.execute() 1srvwmkhlonij

663 

664 return TorrelqueQueueStats(*result) 1srvwmkhlonij

665 

666 async def get_task_state(self, task_id: TaskId) -> TorrelqueTaskState: 

667 """ 

668 Get task state. 

669 

670 :argument task_id: 

671 Task identifier. 

672 :raises TorrelqueLookupError: 

673 If the task state key is not found. 

674 """ 

675 

676 result = await self._client.hgetall(self._get_state_key(task_id)) 1asrtvwxmkhlonicjfBg

677 if not result: 1asrtvwxmkhlonicjfBg

678 raise TorrelqueLookupError 1atmB

679 

680 return TorrelqueTaskState( 1asrtvwxkhlonicjfg

681 status = TorrelqueTaskStatus(int(result[b'status'])), 

682 timeout = float(result[b'timeout']), 

683 enqueue_time = float(result[b'enqueue_time']), 

684 last_dequeue_time = float(result.get(b'last_dequeue_time', 0)) or None, 

685 dequeue_count = int(result.get(b'dequeue_count', 0)), 

686 last_requeue_time = float(result.get(b'last_requeue_time', 0)) or None, 

687 requeue_count = int(result.get(b'requeue_count', 0)), 

688 release_time = float(result.get(b'release_time', 0)) or None, 

689 result = ( 

690 self._serialiser.loads(result[b'result']) 

691 if result.get(b'result') is not None else None 

692 ) 

693 )