Coverage for torrelque/__init__.py: 100%

278 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-05 19:52 +0000

1'''Asynchronous Redis-backed reliable queue package.''' 

2 

3import asyncio 

4import enum 

5import json 

6import logging 

7import time 

8import uuid 

9from importlib.resources import files 

10from typing import Any, AsyncIterable, Callable, Dict, List, NamedTuple, Optional, Tuple, Union 

11 

12import redis.asyncio 

13from redis.asyncio.client import Pipeline 

14 

15 

16# aclosing is available since Python 3.10 

17try: 

18 from contextlib import aclosing 

19except ImportError: # nocov 

20 from contextlib import asynccontextmanager 

21 

22 @asynccontextmanager 

23 async def aclosing(thing): 

24 try: 

25 yield thing 

26 finally: 

27 await thing.aclose() 

28 

29 

30__all__ = ( 

31 'Torrelque', 

32 'TorrelqueQueueStats', 

33 'TorrelqueTaskStatus', 

34 'TorrelqueTaskState', 

35 'TorrelqueError', 

36 'TorrelqueTimeoutError', 

37 'TorrelqueLookupError', 

38 'TorrelqueTaskSerialiser', 

39) 

40 

41logger = logging.getLogger(__package__) 

42 

43TaskId = str 

44TaskPair = Tuple[TaskId, dict] 

45 

46 

47class TorrelqueTaskStatus(enum.IntEnum): 

48 '''Task status.''' 

49 

50 PENDING = 0 

51 '''Task is enqueued.''' 

52 

53 WORKING = 1 

54 '''Task is dequeued.''' 

55 

56 DELAYED = 2 

57 '''Task is delayed.''' 

58 

59 COMPLETED = 3 

60 '''Task is released, as the result of successful completion.''' 

61 

62 REJECTED = 4 

63 '''Task is released, as the result of (multiple) failed attempts.''' 

64 

65 def isfinal(self): 

66 '''Tells whether the status is final.''' 

67 

68 return self in (self.COMPLETED, self.REJECTED) 1aeumkAhEcdbfgp

69 

70 

71class TorrelqueQueueStats(NamedTuple): 

72 '''Queue counters.''' 

73 

74 tasks: int 

75 '''Total number of tasks in the queue.''' 

76 

77 pending: int 

78 '''Number of pending tasks in the queue.''' 

79 

80 working: int 

81 '''Number of working tasks in the queue.''' 

82 

83 delayed: int 

84 '''Number of delayed tasks in the queue.''' 

85 

86 

87class TorrelqueTaskState(NamedTuple): 

88 '''Task state.''' 

89 

90 status: TorrelqueTaskStatus 

91 '''Status of the task.''' 

92 

93 timeout: float 

94 ''' 

95 Execution timeout of the task after while it's considered stale. 

96 ''' 

97 

98 result: Any 

99 '''Optional result of the task with in a final state.''' 

100 

101 dequeue_count: int 

102 '''Number of times the task was dequeued from the queue.''' 

103 

104 requeue_count: int 

105 '''Number of times the task was requeued from the queue.''' 

106 

107 enqueue_time: float 

108 '''Unix timestamp of the enqueue time of the task.''' 

109 

110 last_dequeue_time: Optional[float] 

111 '''Optional Unix timestamp of the last dequeue time of the task.''' 

112 

113 last_requeue_time: Optional[float] 

114 '''Optional Unix timestamp of the last requeue time of the task.''' 

115 

116 release_time: Optional[float] 

117 ''' 

118 Optional Unix timestamp of the release time of the task in a final 

119 status. 

120 ''' 

121 

122 

123class TorrelqueTaskSerialiser(NamedTuple): 

124 '''Task serialisation delegate.''' 

125 

126 dumps: Callable[[Any], bytes] 

127 '''Serialise task data.''' 

128 

129 loads: Callable[[bytes], Any] 

130 '''Deserialise task data.''' 

131 

132 

133class TorrelqueError(Exception): 

134 '''Generic Torrelque error.''' 

135 

136 

137class TorrelqueTimeoutError(TorrelqueError): 

138 '''Torrelque timeout error.''' 

139 

140 

141class TorrelqueLookupError(TorrelqueError): 

142 '''Torrelque lookup error.''' 

143 

144 

145class Torrelque: 

146 ''' 

147 Reliable work queue. 

148 

149 :argument client: 

150 Redis client instance. 

151 :argument queue: 

152 Name of the queue. Must match across producers and consumers. 

153 :argument serialiser: 

154 An object with ``dumps`` and ``loads`` that (de)serialises 

155 task bodies. 

156 :raises TorrelqueError: 

157 Not a ``redis.asyncio.Redis`` instance passed. 

158 ''' 

159 

160 task_timeout = 300 

161 ''' 

162 Default timeout for a task in the "working" set to be 

163 considered stale. 

164 ''' 

165 

166 sweep_interval = 30 

167 '''Default interval between sweep calls, when sweep is scheduled.''' 

168 

169 result_ttl = 3600 

170 '''Default time-to-live of a task result (when applicable).''' 

171 

172 keys = { 

173 'pending' : 'pending', # list 

174 'dequeueing' : 'dequeueing', # list 

175 'undequeued' : 'undequeued', # set 

176 'working' : 'working', # sorted set 

177 'delayed' : 'delayed', # sorted set 

178 'tasks' : 'tasks', # hash 

179 'task' : 'task' # prefix for hashes 

180 } 

181 ''' 

182 Queue Redis key name mapping. 

183 

184 On initialisation the values are prefixed with the queue name, and 

185 the new dictionary is rebound to the instance. 

186 

187 .. list-table:: Redis key description 

188 

189 * - ``pending`` 

190 - a list containing enqueued task ids 

191 * - ``dequeueing`` 

192 - a short-living task id list where :py:meth:`.dequeue` 

193 ``RPOPLPUSH``-es task ids from ``pending`` list 

194 * - ``undequeued`` 

195 - a set where :py:meth:`.sweep` stored potentially (evaluated 

196 on the next sweep) stale, "undequeued", task ids 

197 * - ``working`` 

198 - a sorted set with successfully dequeued task ids where the 

199 score is the Unix timestamp when the task becomes stale 

200 according to its timeout 

201 * - ``delayed`` 

202 - a sorted set with delayed task ids where the score is the 

203 Unix timestamp when the task becomes due 

204 * - ``tasks`` 

205 - a hash mapping a task id to its serialised representation 

206 * - ``task`` 

207 - a string prefix, followed by task id, for hashes storing 

208 individual task stats 

209 

210 ''' 

211 

212 _serialiser: TorrelqueTaskSerialiser 

213 ''' 

214 Task data serialiser that converts task object into and from string 

215 representation. 

216 ''' 

217 

218 _client: redis.asyncio.Redis 

219 '''Redis client.''' 

220 

221 _sweep_task: asyncio.Task 

222 '''Periodic sweep ``asyncio`` task.''' 

223 

224 _keyspace_notification_enabled = False 

225 ''' 

226 Flag indicates that Redis keyspace notification were found 

227 correctly configured, and further tests should be omitted. 

228 ''' 

229 

230 _scripts: Dict[str, 'redis.commands.core.AsyncScript'] 

231 '''Dictionary with Lua scripts read on initialisation.''' 

232 

233 def __init__( 

234 self, 

235 client: redis.asyncio.Redis, 

236 *, 

237 queue: str = 'trq', 

238 serialiser=TorrelqueTaskSerialiser(lambda obj: json.dumps(obj).encode(), json.loads), 

239 ): 

240 if not isinstance(client, redis.asyncio.Redis): 1aeJd

241 raise TorrelqueError('redis.asyncio.Redis instance expected') 1J

242 

243 self._client = client 1aed

244 self._serialiser = serialiser 1aed

245 self._scripts = { 1aed

246 n: client.register_script((files(__package__) / f'{n}.lua').read_text()) 

247 for n in ('dequeue', 'sweep') 

248 } 

249 

250 self.keys = {k: f'{queue}:{v}' for k, v in self.keys.items()} 1aed

251 

252 def _get_state_key(self, task_id): 

253 return '{}:{}'.format(self.keys['task'], task_id) 1aesrutvwxmkAhlonyicjdzbfBgpq

254 

255 async def _enqueue( 

256 self, 

257 pipeline: Pipeline, 

258 task, 

259 task_timeout: Optional[float] = None, 

260 delay: Optional[float] = None, 

261 ) -> TaskId: 

262 task_timeout = task_timeout or self.task_timeout 1esrutvwxmkhlonicjdzbfgpq

263 task_id = uuid.uuid1().hex 1esrutvwxmkhlonicjdzbfgpq

264 task_data = self._serialiser.dumps(task) 1esrutvwxmkhlonicjdzbfgpq

265 task_state_key = self._get_state_key(task_id) 1esrutvwxmkhlonicjdzbfgpq

266 

267 if delay: 1esrutvwxmkhlonicjdzbfgpq

268 await pipeline.zadd(self.keys['delayed'], {task_id: time.time() + delay}) 1x

269 await pipeline.hset(task_state_key, 'status', TorrelqueTaskStatus.DELAYED.value) 1x

270 else: 

271 await pipeline.lpush(self.keys['pending'], task_id) 1esrutvwmkhlonicjdzbfgpq

272 await pipeline.hset(task_state_key, 'status', TorrelqueTaskStatus.PENDING.value) 1esrutvwmkhlonicjdzbfgpq

273 

274 await pipeline.hset(self.keys['tasks'], task_id, task_data) 1esrutvwxmkhlonicjdzbfgpq

275 await pipeline.hset(task_state_key, 'enqueue_time', time.time()) 1esrutvwxmkhlonicjdzbfgpq

276 await pipeline.hset(task_state_key, 'timeout', task_timeout) 1esrutvwxmkhlonicjdzbfgpq

277 

278 return task_id 1esrutvwxmkhlonicjdzbfgpq

279 

280 async def enqueue( 

281 self, 

282 task, 

283 *, 

284 task_timeout: Optional[float] = None, 

285 delay: Optional[float] = None, 

286 pipeline: Optional[Pipeline] = None 

287 ) -> TaskId: 

288 ''' 

289 Put a task on the queue optionally providing its timeout and delay. 

290 

291 :argument task: 

292 Arbitrary serialisable task payload. 

293 :argument task_timeout: 

294 Time since the task's processing start after which it is 

295 considered stale. 

296 :argument delay: 

297 Number of seconds to delay processing of the task, i.e. 

298 putting it into the "pending" list. Note that the sweep 

299 must be scheduled for the delayed tasks to return, and 

300 the delay only has effect after the sweep execution. 

301 :argument pipeline: 

302 External Redis pipeline that allows for bulk enqueue. 

303 :return: 

304 Task identifier. 

305 ''' 

306 

307 if pipeline is not None: 1esrutvwxmkhlonicjdzbfgpq

308 task_id = await self._enqueue(pipeline, task, task_timeout, delay) 1rwkoz

309 else: 

310 async with await self._client.pipeline(transaction=True) as pipeline: 1esutvxmhlnicjdbfgpq

311 task_id = await self._enqueue(pipeline, task, task_timeout, delay) 1esutvxmhlnicjdbfgpq

312 await pipeline.execute() 1esutvxmhlnicjdbfgpq

313 

314 return task_id 1esrutvwxmkhlonicjdzbfgpq

315 

316 async def _dequeue(self, timeout: int, max_tasks: int) -> List[TaskPair]: 

317 pending_key = self.keys['pending'] 1aesrtCmkhlonicjdbfgpq

318 dequeueing_key = self.keys['dequeueing'] 1aesrtCmkhlonicjdbfgpq

319 

320 # Try to move max_tasks task ids to dequeueing without blocking 

321 task_ids: List[bytes] = [] 1aesrtCmkhlonicjdbfgpq

322 if max_tasks > 1: 1aesrtCmkhlonicjdbfgpq

323 async with await self._client.pipeline(transaction=True) as pipeline: 1rko

324 for _ in range(max_tasks): 1rko

325 await pipeline.rpoplpush(pending_key, dequeueing_key) 1rko

326 task_ids.extend(task_id for task_id in await pipeline.execute() if task_id) 1rko

327 

328 # If the above hasn't succeeded guarantee at least one task 

329 if not task_ids: 1aesrtCmkhlonicjdbfgpq

330 task_id = await self._client.brpoplpush( 1aesrtCmhlnicjdbfgpq

331 pending_key, dequeueing_key, timeout=timeout or 0 

332 ) 

333 if not task_id: 1aesrtCmhlnicjdbfgpq

334 raise TorrelqueTimeoutError 1srC

335 else: 

336 task_ids.append(task_id) 1aestmhlnicjdbfgpq

337 

338 keys = [self.keys[k] for k in ('dequeueing', 'working', 'tasks', 'task')] 1aesrtmkhlonicjdbfgpq

339 args = [time.time(), TorrelqueTaskStatus.WORKING.value] + task_ids 1aesrtmkhlonicjdbfgpq

340 pairs = await self._scripts['dequeue'](keys, args) 1aesrtmkhlonicjdbfgpq

341 

342 result = [] 1aesrtmkhlonicjdbfgpq

343 for task_id, task_data in pairs: 1aesrtmkhlonicjdbfgpq

344 task_id = task_id.decode() 1aesrtmkhlonicjdbfgpq

345 task_data = self._serialiser.loads(task_data) 1aesrtmkhlonicjdbfgpq

346 if not task_data: 1aesrtmkhlonicjdbfgpq

347 logger.warning('Task:%s not found in dequeueing list', task_id) 1t

348 continue 1t

349 

350 result.append((task_id, task_data)) 1aesrmkhlonicjdbfgpq

351 

352 if not result: 1aesrtmkhlonicjdbfgpq

353 raise TorrelqueLookupError('Failed to dequeue pending task') 1t

354 

355 return result 1aesrmkhlonicjdbfgpq

356 

357 async def dequeue( 

358 self, timeout: Optional[int] = None, *, max_tasks: int = 1 

359 ) -> Union[TaskPair, List[TaskPair]]: 

360 ''' 

361 Get a task or a task list from the queue with optional timeout. 

362 

363 :argument timeout: 

364 Time to wait until a task is available. Timeout applies 

365 only to fetching single task. Note that Redis only supports 

366 an integer timeout. 

367 :argument max_tasks: 

368 If greater than 1 the method will try to optimistically 

369 dequeue as many tasks. That means that only 1 task is 

370 guaranteed. 

371 :raises TorrelqueTimeoutError: 

372 If timeout was provided and there was no result within it. 

373 :raises TorrelqueLookupError: 

374 Indicates that the task id has become staling during the 

375 runtime of this method. This is not expected under normal 

376 circumstances. It can happen if this method is paused, say 

377 on a debugger breakpoint, for a duration of 2 sweeps. 

378 :return: 

379 Tuple of the task identifier and the deserialised task 

380 payload. If ``max_tasks`` is greater than 1, no matter if 

381 it dequeues more than 1 task, the return value is a list of 

382 said 2-tuples. 

383 ''' 

384 

385 assert max_tasks > 0 1aesrtCmkhlonicjdbfgpq

386 

387 task_pairs = await self._dequeue(timeout, max_tasks) 1aesrtCmkhlonicjdbfgpq

388 return task_pairs if max_tasks > 1 else task_pairs[0] 1aesrmkhlonicjdbfgpq

389 

390 async def _requeue( 

391 self, 

392 pipeline: Pipeline, 

393 task_id: TaskId, 

394 delay: Optional[float] = None, 

395 task_timeout: Optional[float] = None 

396 ) -> list: 

397 expected = [] 1elonyicjdbq

398 await pipeline.zrem(self.keys['working'], task_id) 1elonyicjdbq

399 expected.append(1) 1elonyicjdbq

400 

401 task_state_key = self._get_state_key(task_id) 1elonyicjdbq

402 if not delay: 1elonyicjdbq

403 await pipeline.lpush(self.keys['pending'], task_id) 1eloyicbq

404 await pipeline.hset(task_state_key, 'last_requeue_time', time.time()) 1eloyicbq

405 await pipeline.hincrby(task_state_key, 'requeue_count', 1) 1eloyicbq

406 await pipeline.hset(task_state_key, 'status', TorrelqueTaskStatus.PENDING.value) 1eloyicbq

407 expected.extend([..., ..., ..., 0]) 1eloyicbq

408 else: 

409 await pipeline.zadd(self.keys['delayed'], {task_id: time.time() + delay}) 1njdb

410 await pipeline.hset(task_state_key, 'status', TorrelqueTaskStatus.DELAYED.value) 1njdb

411 expected.extend([1, 0]) 1njdb

412 

413 if task_timeout: 1elonyicjdbq

414 await pipeline.hset(task_state_key, 'timeout', task_timeout) 1ibq

415 expected.append(0) 1ibq

416 

417 return expected 1elonyicjdbq

418 

419 async def requeue( 

420 self, 

421 task_id: TaskId, 

422 delay: Optional[float] = None, 

423 *, 

424 task_timeout: Optional[float] = None, 

425 pipeline: Optional[Pipeline] = None, 

426 ): 

427 ''' 

428 Return failed task into the queue with optional delay. 

429 

430 :argument task_id: 

431 Task identifier. 

432 :argument delay: 

433 Number of seconds to delay putting the task into "pending" 

434 list. Note that the sweep must be scheduled in order for 

435 tasks from "delayed" to return to "pending" list. 

436 :argument task_timeout: 

437 Redefine task timeout, which is the time since the task's 

438 processing start after which it is considered stale. 

439 :argument pipeline: 

440 External Redis pipeline that allows for bulk requeue. 

441 ''' 

442 

443 if pipeline is not None: 1elonyicjdbq

444 await self._requeue(pipeline, task_id, delay, task_timeout) 1o

445 else: 

446 async with await self._client.pipeline(transaction=True) as pipeline: 1elnyicjdbq

447 expected = await self._requeue(pipeline, task_id, delay, task_timeout) 1elnyicjdbq

448 actual = await pipeline.execute() 1elnyicjdbq

449 

450 if not all(expc == actl or expc is ... for expc, actl in zip(expected, actual)): 1elnyicjdbq

451 logger.warning('Inconsistent requeue of task:%s: %s', task_id, actual) 1y

452 

453 async def _release( 

454 self, 

455 pipeline: Pipeline, 

456 task_id: TaskId, 

457 result=None, 

458 result_ttl: Optional[int] = None, 

459 status: TorrelqueTaskStatus = TorrelqueTaskStatus.COMPLETED, 

460 ) -> list: 

461 if status is not None and not status.isfinal(): 1aemkAhEcdbfgp

462 raise TorrelqueError(f'Invalid status for released task: {status}') 1E

463 

464 expected = [] 1aemkAhcdbfgp

465 await pipeline.zrem(self.keys['working'], task_id) 1aemkAhcdbfgp

466 await pipeline.hdel(self.keys['tasks'], task_id) 1aemkAhcdbfgp

467 expected.extend([1, 1]) 1aemkAhcdbfgp

468 

469 task_state_key = self._get_state_key(task_id) 1aemkAhcdbfgp

470 if result is not None: 1aemkAhcdbfgp

471 await pipeline.hset(task_state_key, 'result', self._serialiser.dumps(result)) 1khcfgp

472 await pipeline.hset(task_state_key, 'release_time', time.time()) 1khcfgp

473 await pipeline.hset(task_state_key, 'status', status.value) 1khcfgp

474 expected.extend([1, 1, 0]) 1khcfgp

475 

476 result_ttl = result_ttl if result_ttl is not None else self.result_ttl 1khcfgp

477 await pipeline.expire(task_state_key, result_ttl) 1khcfgp

478 expected.append(1) 1khcfgp

479 else: 

480 await pipeline.delete(task_state_key) 1aemAdb

481 expected.append(1) 1aemAdb

482 

483 return expected 1aemkAhcdbfgp

484 

485 async def release( 

486 self, 

487 task_id: TaskId, 

488 *, 

489 result=None, 

490 result_ttl: Optional[int] = None, 

491 status: TorrelqueTaskStatus = TorrelqueTaskStatus.COMPLETED, 

492 pipeline: Optional[Pipeline] = None, 

493 ): 

494 ''' 

495 Remove finished task from the queue. 

496 

497 Unless ``result`` is specified, all task information is removed 

498 from the queue immediately. 

499 

500 Since there's no dead letter queue, tasks that have exceeded 

501 allowed number of retries should also be released, possibly 

502 with ``TorrelqueTaskStatus.REJECTED`` status if producer is 

503 interested in the status. 

504 

505 :argument task_id: 

506 Task identifier. 

507 :argument result: 

508 Arbitrary serialisable task result. If ``result`` is 

509 ``None`` task state key is removed immediately on release. 

510 :argument result_ttl: 

511 Number of seconds to keep task state key after release. 

512 Override of default result TTL. 

513 :argument status: 

514 Task status to set on release. It only apples when result 

515 is not ``None``. 

516 :argument pipeline: 

517 External Redis pipeline that allows for bulk release. 

518 :raises TorrelqueError: 

519 If the status is not final. 

520 ''' 

521 

522 if pipeline is not None: 1aemkAhEcdbfgp

523 await self._release(pipeline, task_id, result, result_ttl, status) 1k

524 else: 

525 async with await self._client.pipeline(transaction=True) as pipeline: 1aemAhEcdbfgp

526 expected = await self._release(pipeline, task_id, result, result_ttl, status) 1aemAhEcdbfgp

527 actual = await pipeline.execute() 1aemAhcdbfgp

528 

529 if expected != actual: 1aemAhcdbfgp

530 logger.warning('Inconsistent release of task:%s: %s', task_id, actual) 1A

531 

532 async def _check_keyspace_notification_config(self): 

533 if not self._keyspace_notification_enabled: 1aufDBg

534 config = await self._client.config_get('notify-keyspace-events') 1aufDBg

535 notify_config = set(config['notify-keyspace-events']) 1aufDBg

536 # See https://redis.io/topics/notifications#configuration 

537 if {'K', 'A'} - notify_config and {'K', 'g', 'h'} - notify_config: 1aufDBg

538 raise TorrelqueError('Redis notify-keyspace-events must include KA or Kgh') 1D

539 self._keyspace_notification_enabled = True 1aufBg

540 

541 async def _get_keyspace_notification_message(self, pubsub, timeout: float) -> Optional[dict]: 

542 try: 1au

543 message = await asyncio.wait_for(pubsub.get_message(), timeout) 1au

544 except asyncio.TimeoutError as ex: 

545 raise TorrelqueTimeoutError from ex 

546 else: 

547 return message 

548 

549 async def watch( 

550 self, task_id: TaskId, *, timeout: Optional[float] = None 

551 ) -> AsyncIterable[TorrelqueTaskState]: 

552 ''' 

553 Watch task status change until it's released from the queue. 

554 

555 .. note:: 

556 

557 This method relies on ``notify-keyspace-events`` introduced 

558 in Redis 2.8. The configuration must have generic and hash 

559 commands enabled. That is, the configuration must include 

560 either ``KA`` or ``Kgh``. 

561 

562 :argument task_id: 

563 Task identifier. 

564 :argument timeout: 

565 Timeout for watching. 

566 :raises TorrelqueError: 

567 If ``notify-keyspace-events`` is not configured properly. 

568 :raises TorrelqueTimeoutError: 

569 If ``watch`` has taken longer than ``timeout``. 

570 :raises TorrelqueLookupError: 

571 If the task state key is not found. 

572 :return: 

573 Asynchronous generator that yields task state dictionaries 

574 as returned by :py:meth:`.get_task_state`. Generator stops 

575 when the task is released. If the task is released without 

576 result, generator won't yield ``dict`` with final status. 

577 ''' 

578 

579 start = time.monotonic() 1aufDBg

580 

581 await self._check_keyspace_notification_config() 1aufDBg

582 

583 task_state = await self.get_task_state(task_id) 1aufBg

584 yield task_state 1aufg

585 

586 status = task_state.status 1aufg

587 if status.isfinal(): 1aufg

588 return 1fg

589 

590 async with aclosing(self._client.pubsub(ignore_subscribe_messages=True)) as pubsub: 1au

591 dbn = self._client.connection_pool.connection_kwargs.get('db', 0) 1au

592 await pubsub.subscribe('__keyspace@{}__:{}'.format(dbn, self._get_state_key(task_id))) 1au

593 

594 iter_timeout = timeout 1au

595 while True: 1au

596 message = await self._get_keyspace_notification_message(pubsub, iter_timeout) 1au

597 if message and message['data'] == b'del': 

598 return # Released without result 

599 elif message and message['data'] == b'hset': 

600 try: 

601 task_state = await self.get_task_state(task_id) 

602 except TorrelqueLookupError: 

603 return # Race condition with release 

604 

605 if task_state.status != status: 

606 status = task_state.status 

607 yield task_state 

608 if status.isfinal(): 

609 return 

610 

611 if timeout is not None: 

612 iter_timeout = timeout - (time.monotonic() - start) 

613 

614 async def sweep(self) -> Tuple[int, int, int]: 

615 ''' 

616 Execute the task sweep. 

617 

618 :return: 

619 3-tuple with counts of: 

620 

621 - stale tasks from "working" set returned into "pending" list 

622 - due delayed tasks from "delayed" set returned into "pending" list 

623 - stale dequeueing task ids returned into "pending" list 

624 ''' 

625 

626 keys = [ 1aFjb

627 self.keys[k] 

628 for k in ('pending', 'dequeueing', 'undequeued', 'working', 'delayed', 'task') 

629 ] 

630 args = [time.time(), TorrelqueTaskStatus.PENDING.value] 1aFjb

631 result = await self._scripts['sweep'](keys, args) 1aFjb

632 return tuple(result) 1aFjb

633 

634 async def _sweep_runner(self, interval: float): 

635 while True: 

636 start = time.monotonic() 

637 try: 

638 result = await self.sweep() 

639 except redis.RedisError: 

640 logger.exception('Sweep has failed with Redis error, continuing') 

641 except asyncio.CancelledError: 

642 break 

643 except Exception: 

644 logger.exception('Sweep has failed with unexpected error, stopping') 

645 break 

646 else: 

647 logger.debug('Sweep has requeued: stale=%d, delayed=%d, undequeued=%d', *result) 

648 

649 await asyncio.sleep(interval - (time.monotonic() - start)) 

650 

651 def schedule_sweep(self, interval: Optional[float] = None): 

652 ''' 

653 Schedule the sweep in a background coroutine. 

654 

655 :argument interval: 

656 Override of default sweep interval. 

657 ''' 

658 

659 interval = interval or self.sweep_interval 1tGdHIz

660 self._sweep_task = asyncio.get_event_loop().create_task(self._sweep_runner(interval)) 1tGdHIz

661 

662 def unschedule_sweep(self): 

663 '''Unschedule the sweep in a background coroutine.''' 

664 

665 assert self._sweep_task 1tGdHIz

666 self._sweep_task.cancel() 1tGdHIz

667 

668 async def get_queue_stats(self) -> TorrelqueQueueStats: 

669 '''Get queue counters.''' 

670 

671 async with await self._client.pipeline(transaction=True) as pipe: 1srvwmkhlonij

672 await pipe.hlen(self.keys['tasks']) 1srvwmkhlonij

673 await pipe.llen(self.keys['pending']) 1srvwmkhlonij

674 await pipe.zcard(self.keys['working']) 1srvwmkhlonij

675 await pipe.zcard(self.keys['delayed']) 1srvwmkhlonij

676 result = await pipe.execute() 1srvwmkhlonij

677 

678 return TorrelqueQueueStats(*result) 1srvwmkhlonij

679 

680 async def get_task_state(self, task_id: TaskId) -> TorrelqueTaskState: 

681 ''' 

682 Get task state. 

683 

684 :argument task_id: 

685 Task identifier. 

686 :raises TorrelqueLookupError: 

687 If the task state key is not found. 

688 ''' 

689 

690 result = await self._client.hgetall(self._get_state_key(task_id)) 1asruvwxmkhlonicjfBg

691 if not result: 1asruvwxmkhlonicjfBg

692 raise TorrelqueLookupError 1amB

693 

694 return TorrelqueTaskState( 1asruvwxkhlonicjfg

695 status = TorrelqueTaskStatus(int(result[b'status'])), 

696 timeout = float(result[b'timeout']), 

697 enqueue_time = float(result[b'enqueue_time']), 

698 last_dequeue_time = float(result.get(b'last_dequeue_time', 0)) or None, 

699 dequeue_count = int(result.get(b'dequeue_count', 0)), 

700 last_requeue_time = float(result.get(b'last_requeue_time', 0)) or None, 

701 requeue_count = int(result.get(b'requeue_count', 0)), 

702 release_time = float(result.get(b'release_time', 0)) or None, 

703 result = ( 

704 self._serialiser.loads(result[b'result']) 

705 if result.get(b'result') is not None else None 

706 ) 

707 )