Coverage for hermes/__init__.py: 100%

221 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-18 20:48 +0000

1import asyncio 

2import base64 

3import functools 

4import hashlib 

5import inspect 

6import os 

7import pickle 

8import types 

9import typing 

10import warnings 

11import zlib 

12from typing import Any, Callable, Coroutine, Dict, Iterable, Optional, Sequence, Tuple, Type, Union 

13 

14from .backend import AbstractBackend 

15 

16 

17__all__ = 'Hermes', 'HermesError', 'Mangler', 'Cached', 'CachedCoro', 'Serialiser', 'Compressor' 

18 

19 

20class Serialiser(typing.NamedTuple): 

21 '''Serialisation delegate.''' 

22 

23 dumps: Callable[[Any], bytes] 

24 '''Serialise cache value.''' 

25 

26 loads: Callable[[bytes], Any] 

27 '''Deserialise cache value.''' 

28 

29 

30class Compressor(typing.NamedTuple): 

31 '''Compression delegate.''' 

32 

33 compress: Callable[[bytes], bytes] 

34 '''Compress serialised cache value.''' 

35 

36 decompress: Callable[[bytes], bytes] 

37 '''Decompress serialised cache value.''' 

38 

39 decompressError: Union[Type[Exception], Tuple[Type[Exception], ...]] 

40 '''Decompression error(s) that indicate uncompressed payload.''' 

41 

42 compressMinLength: int = 0 

43 '''Minimal length of payload in bytes to trigger compression.''' 

44 

45 

46class Mangler: 

47 '''Key manager responsible for creating keys, hashing and serialisation.''' 

48 

49 prefix = 'cache' 

50 '''Prefix for cache and tag entries.''' 

51 

52 serialiser = Serialiser(pickle.dumps, pickle.loads) 

53 '''Serialisation delegate.''' 

54 

55 compressor = Compressor(zlib.compress, zlib.decompress, zlib.error, 100) 

56 '''Optional compression delegate.''' 

57 

58 def hash(self, value: bytes) -> str: 

59 ''' 

60 Hash value. 

61 

62 :return: base64 encoded MD5 hash of the value. 

63 ''' 

64 

65 return base64.urlsafe_b64encode(hashlib.md5(value).digest()).strip(b'=').decode() 

66 

67 def dumps(self, value) -> bytes: 

68 '''Serialise and conditionally compress value.''' 

69 

70 result = self.serialiser.dumps(value) 

71 if self.compressor and len(result) >= self.compressor.compressMinLength: 

72 result = self.compressor.compress(result) 

73 

74 return result 

75 

76 def loads(self, value: bytes): 

77 '''Conditionally decompress and deserialise value.''' 

78 

79 if self.compressor: 

80 try: 

81 value = self.compressor.decompress(value) 

82 except self.compressor.decompressError: 

83 # It is expected that the error indicates that the value is 

84 # shorter than compressMinLength 

85 pass 

86 

87 return self.serialiser.loads(value) 

88 

89 def nameEntry(self, fn: Callable, *args, **kwargs) -> str: 

90 ''' 

91 Return cache key for given callable and its positional and 

92 keyword arguments. 

93 

94 Note how callable, ``fn``, is represented in the cache key: 

95 

96 1) a ``types.MethodType`` instance -> names of 

97 ``(module, class, method)`` 

98 2) a ``types.FunctionType`` instance -> names of 

99 ``(module, function)`` 

100 3) other callalbe objects with ``__name__`` -> name of 

101 ``(module, object)`` 

102 

103 This means that if two function are defined dynamically in the 

104 same module with same names, like:: 

105 

106 def createF1(): 

107 @cache 

108 def f(a, b): 

109 return a + b 

110 return f 

111 

112 def createF2(): 

113 @cache 

114 def f(a, b): 

115 return a * b 

116 return f 

117 

118 print(createF1()(1, 2)) 

119 print(createF2()(1, 2)) 

120 

121 Both will return `3`, because cache keys will clash. In such cases 

122 you need to pass ``key`` with custom key function. 

123 

124 It can also be that an object in case 3 doesn't have a name, or its 

125 name isn't unique, then a ``nameEntry`` should be overridden with 

126 something that represents it uniquely, like 

127 ``repr(fn).rsplit(' at 0x', 1)[0]`` (address should be stripped so 

128 after Python process restart the cache can still be valid 

129 and usable). 

130 ''' 

131 

132 result = [self.prefix, 'entry'] 

133 if callable(fn): 

134 try: 

135 # types.MethodType 

136 result.extend([ 

137 fn.__module__, 

138 fn.__self__.__class__.__name__, # type: ignore[attribute-error] 

139 fn.__name__, 

140 ]) 

141 except AttributeError: 

142 try: 

143 # types.FunctionType and other object with __name__ 

144 result.extend([fn.__module__, fn.__name__]) 

145 except AttributeError: 

146 raise HermesError( 

147 'fn is callable but its name is undefined, consider overriding Mangler.nameEntry' 

148 ) 

149 else: 

150 raise HermesError('fn is expected to be callable') 

151 

152 arguments = args, tuple(sorted(kwargs.items())) 

153 result.append(self.hash(self.dumps(arguments))) 

154 

155 return ':'.join(result) 

156 

157 def nameTag(self, tag: str) -> str: 

158 '''Build fully qualified backend tag name.''' 

159 

160 return ':'.join([self.prefix, 'tag', tag]) 

161 

162 def mapTags(self, tagKeys: Iterable[str]) -> Dict[str, str]: 

163 '''Map tags to random values for seeding.''' 

164 

165 rnd = os.urandom(4).hex() 

166 return {key: self.hash(':'.join((key, rnd)).encode()) for key in tagKeys} 

167 

168 def hashTags(self, tagMap: Dict[str, str]) -> str: 

169 '''Hash tags of a cache entry for the entry key,''' 

170 

171 values = tuple(zip(*sorted(tagMap.items())))[1] # sorted by key dict values 

172 return self.hash(':'.join(values).encode()) 

173 

174 def nameLock(self, entryKey: str) -> str: 

175 ''' 

176 Create fully qualified backend lock key for the entry key. 

177 

178 :param entryKey: 

179 Entry key to create a lock key for. If given entry key is already 

180 a colon-separated key name with first component equal to 

181 :attr:`prefix`, first to components are dropped. For instance: 

182 

183 - ``foo`` → ``cache:lock:foo`` 

184 - ``cache:entry:fn:tagged:78d64ea049a57494`` → 

185 ``cache:lock:fn:tagged:78d64ea049a57494`` 

186 

187 ''' 

188 

189 parts = entryKey.split(':') 

190 if parts[0] == self.prefix: 

191 entryKey = ':'.join(parts[2:]) 

192 

193 return ':'.join([self.prefix, 'lock', entryKey]) 

194 

195 

196KeyFunc = Callable[..., str] 

197TtlFunc = Callable[..., int] 

198 

199 

200class Cached: 

201 '''Cache-point wrapper for callables and descriptors.''' 

202 

203 _frontend: 'Hermes' 

204 ''' 

205 Hermes instance which provides backend and mangler instances, and 

206 TTL fallback value. 

207 ''' 

208 

209 _callable: Callable 

210 ''' 

211 The decorated callable, stays ``types.FunctionType`` if a function 

212 is decorated, otherwise it is transformed to ``types.MethodType`` 

213 on the instance clone by descriptor protocol implementation. It can 

214 also be a method descriptor which is also transformed accordingly to 

215 the descriptor protocol (e.g. ``staticmethod`` and ``classmethod``). 

216 ''' 

217 

218 _isDescriptor: bool 

219 '''Flag defining if the callable is a method descriptor.''' 

220 

221 _isMethod: bool 

222 '''Flag defining if the callable is a method.''' 

223 

224 _ttl: Optional[Union[int, TtlFunc]] 

225 ''' 

226 Optional cache entry Time To Live for decorated callable. 

227 

228 It can be either a number of seconds, or a function to calculate it. 

229 If no value is provided the frontend default, :attr:`Hermes.ttl`, is 

230 used. 

231 ''' 

232 

233 _keyFunc: Optional[KeyFunc] 

234 '''Key creation function.''' 

235 

236 _tags: Sequence[str] 

237 '''Cache entry tags for decorated callable.''' 

238 

239 def __init__( 

240 self, 

241 frontend: 'Hermes', 

242 callable: Callable, 

243 *, 

244 ttl: Optional[Union[int, TtlFunc]] = None, 

245 key: Optional[KeyFunc] = None, 

246 tags: Sequence[str] = (), 

247 ): 

248 self._frontend = frontend 

249 self._ttl = ttl 

250 self._keyFunc = key 

251 self._tags = tags 

252 

253 self._callable = callable 

254 self._isDescriptor = inspect.ismethoddescriptor(callable) 

255 self._isMethod = inspect.ismethod(callable) 

256 

257 # preserve ``__name__``, ``__doc__``, etc 

258 functools.update_wrapper(self, callable) 

259 

260 def _load(self, key): 

261 if self._tags: 

262 tagMap = self._frontend.backend.load(map(self._frontend.mangler.nameTag, self._tags)) 

263 if len(tagMap) != len(self._tags): 

264 return None 

265 else: 

266 key += ':' + self._frontend.mangler.hashTags(tagMap) 

267 

268 return self._frontend.backend.load(key) 

269 

270 def _save(self, key, value, ttl: int): 

271 if self._tags: 

272 namedTags = tuple(map(self._frontend.mangler.nameTag, self._tags)) 

273 tagMap = self._frontend.backend.load(namedTags) 

274 missingTags = set(namedTags) - set(tagMap.keys()) 

275 if missingTags: 

276 missingTagMap = self._frontend.mangler.mapTags(missingTags) 

277 self._frontend.backend.save(mapping = missingTagMap, ttl = None) 

278 tagMap.update(missingTagMap) 

279 assert len(self._tags) == len(tagMap) 

280 

281 key += ':' + self._frontend.mangler.hashTags(tagMap) 

282 

283 return self._frontend.backend.save({key: value}, ttl = ttl) 

284 

285 def _remove(self, key): 

286 if self._tags: 

287 tagMap = self._frontend.backend.load(map(self._frontend.mangler.nameTag, self._tags)) 

288 if len(tagMap) != len(self._tags): 

289 return 

290 else: 

291 key += ':' + self._frontend.mangler.hashTags(tagMap) 

292 

293 self._frontend.backend.remove(key) 

294 

295 def _get_key(self, *args, **kwargs) -> str: 

296 keyFunc = self._keyFunc or self._frontend.mangler.nameEntry 

297 return keyFunc(self._callable, *args, **kwargs) 

298 

299 def _get_ttl(self, return_value, *args, **kwargs) -> int: 

300 result = self._ttl if self._ttl is not None else self._frontend.ttl 

301 if callable(result): 

302 result = result(return_value, self._callable, *args, **kwargs) 

303 return result 

304 

305 def invalidate(self, *args, **kwargs): 

306 ''' 

307 Invalidate the cache entry. 

308 

309 Invalidated entry corresponds to the wrapped callable called with 

310 given ``args`` and ``kwargs``. 

311 ''' 

312 

313 self._remove(self._get_key(*args, **kwargs)) 

314 

315 def __call__(self, *args, **kwargs): 

316 '''Get the value of the wrapped callable.''' 

317 

318 key = self._get_key(*args, **kwargs) 

319 value = self._load(key) 

320 if value is None: 

321 with self._frontend.backend.lock(key): 

322 # it's better to read twice than lock every read 

323 value = self._load(key) 

324 if value is None: 

325 value = self._callable(*args, **kwargs) 

326 ttl = self._get_ttl(value, *args, **kwargs) 

327 self._save(key, value, ttl) 

328 

329 return value 

330 

331 def __get__(self, instance, type): 

332 ''' 

333 Implements non-data descriptor protocol. 

334 

335 The invocation happens only when instance method is decorated, 

336 so we can distinguish between decorated ``types.MethodType`` and 

337 ``types.FunctionType``. Python class declaration mechanics prevent 

338 a decorator from having awareness of the class type, as the 

339 function is received by the decorator before it becomes an 

340 instance method. 

341 

342 How it works:: 

343 

344 cache = hermes.Hermes() 

345 

346 class Model: 

347 

348 @cache 

349 def calc(self): 

350 return 42 

351 

352 m = Model() 

353 m.calc 

354 

355 Last attribute access results in the call, ``calc.__get__(m, Model)``, 

356 where ``calc`` is instance of :class:`Cached` which decorates the 

357 original ``Model.calc``. 

358 

359 Note, initially :class:`Cached` is created on decoration per 

360 class method, when class type is created by the interpreter, and 

361 is shared among all instances. Later, on attribute access, a copy 

362 is returned with bound ``_callable``, just like ordinary Python 

363 method descriptor works. 

364 

365 For more details, `descriptor-protocol 

366 <http://docs.python.org/3/howto/descriptor.html#descriptor-protocol>`_. 

367 ''' 

368 

369 if instance is not None and self._isDescriptor: 

370 return self._copy(self._callable.__get__(instance, type)) # type: ignore[attribute-error] 

371 elif instance is not None and not self._isMethod: 

372 return self._copy(types.MethodType(self._callable, instance)) 

373 else: 

374 return self 

375 

376 def _copy(self, callable): 

377 ''' 

378 Create a shallow copy of self with ``_callable`` 

379 replaced to given instance. 

380 ''' 

381 

382 boundCached = object.__new__(self.__class__) 

383 boundCached.__dict__ = self.__dict__.copy() 

384 boundCached._callable = callable 

385 return boundCached 

386 

387 

388class CachedCoro(Cached): 

389 ''' 

390 Cache-point wrapper for coroutine functions. 

391 

392 The implementation uses the default thread pool of ``asyncio`` to 

393 execute synchronous functions of the cache backend, and manage their 

394 (distributed) locks. 

395 ''' 

396 

397 async def _run(self, fn, *args, **kwargs) -> Coroutine: 

398 '''' 

399 Run run given function or coroutine function. 

400 

401 If ``fn`` is a coroutine function it's called and awaited. 

402 Otherwise it's run in the thread pool. 

403 ''' 

404 

405 if inspect.iscoroutinefunction(fn): 

406 return await fn(*args, **kwargs) 

407 

408 loop = asyncio.get_event_loop() 

409 return await loop.run_in_executor(None, functools.partial(fn, *args, **kwargs)) 

410 

411 async def invalidate(self, *args, **kwargs): 

412 ''' 

413 Invalidate the cache entry. 

414 

415 Invalidated entry corresponds to the wrapped coroutine function 

416 called with given ``args`` and ``kwargs``. 

417 ''' 

418 

419 await self._run(super().invalidate, *args, **kwargs) 

420 

421 async def __call__(self, *args, **kwargs): 

422 '''Get the value of the wrapped coroutine function's coroutine.''' 

423 

424 key = self._get_key(*args, **kwargs) 

425 value = await self._run(self._load, key) 

426 if value is None: 

427 lock = self._frontend.backend.lock(key) 

428 await self._run(lock.acquire) 

429 try: 

430 value = await self._run(self._load, key) 

431 if value is None: 

432 value = await self._callable(*args, **kwargs) 

433 ttl = self._get_ttl(value, *args, **kwargs) 

434 await self._run(self._save, key, value, ttl) 

435 finally: 

436 await self._run(lock.release) 

437 

438 return value 

439 

440 

441def cachedfactory(frontend: 'Hermes', fn, **kwargs) -> Cached: 

442 ''' 

443 Create a cache-point object from the callable. 

444 

445 :argument frontend: 

446 Cache frontend instance. 

447 :argument fn: 

448 Must be coroutine function, callable or method descriptor. 

449 ''' 

450 

451 isdescr = inspect.ismethoddescriptor(fn) 

452 if ( 

453 inspect.iscoroutinefunction(fn) 

454 or isdescr and inspect.iscoroutinefunction(getattr(fn, '__func__', None)) 

455 ): 

456 return CachedCoro(frontend, fn, **kwargs) 

457 elif callable(fn) or isdescr: 

458 return Cached(frontend, fn, **kwargs) 

459 else: 

460 raise HermesError( 

461 'First positional argument must be coroutine function, callable or method descriptor' 

462 ) 

463 

464 

465class Hermes: 

466 ''' 

467 Cache façade. 

468 

469 :argument backend: 

470 Class or instance of cache backend. If a class is passed, keyword 

471 arguments of passed to :obj:`Hermes` constructor will be bypassed 

472 to the class' constructor. 

473 

474 If the argument is omitted no-op backend will be be used. 

475 :argument mangler: 

476 Optional, typically of a subclass, mangler instance. 

477 :argument cachedfactory: 

478 Optional, a cache-point factory for functions and coroutines. 

479 :argument ttl: 

480 Default cache entry time-to-live. 

481 

482 Usage:: 

483 

484 import hermes.backend.redis 

485 

486 

487 cache = hermes.Hermes( 

488 hermes.backend.redis.Backend, ttl = 600, host = 'localhost', db = 1 

489 ) 

490 

491 @cache 

492 def foo(a, b): 

493 return a * b 

494 

495 class Example: 

496 

497 @cache(tags = ('math', 'power'), ttl = 1200) 

498 def bar(self, a, b): 

499 return a ** b 

500 

501 @cache( 

502 tags = ('math', 'avg'), 

503 key = lambda fn, *args, **kwargs: 'avg:{0}:{1}'.format(*args), 

504 ) 

505 def baz(self, a, b): 

506 return (a + b) / 2.0 

507 

508 print(foo(2, 333)) 

509 

510 example = Example() 

511 print(example.bar(2, 10)) 

512 print(example.baz(2, 10)) 

513 

514 foo.invalidate(2, 333) 

515 example.bar.invalidate(2, 10) 

516 example.baz.invalidate(2, 10) 

517 

518 cache.clean(['math']) # invalidate entries tagged 'math' 

519 cache.clean() # flush cache 

520 

521 ''' 

522 

523 backend: AbstractBackend 

524 '''Cache backend.''' 

525 

526 mangler: Mangler 

527 '''Key manager responsible for creating keys, hashing and serialisation.''' 

528 

529 cachedfactory: Callable[..., Cached] 

530 '''Cache-point callable object factory.''' 

531 

532 ttl: Union[int, TtlFunc] 

533 ''' 

534 Default cache entry time-to-live. 

535 

536 It can be either a number of seconds, or a function to calculate 

537 it. The latter is given: 

538 

539 - the return value of the decorated callable's call 

540 - the decorated callable object 

541 - actual positional arguments of the call 

542 - actual keyword arguments of the call 

543 

544 ''' 

545 

546 def __init__( 

547 self, 

548 backend: Union[Type[AbstractBackend], AbstractBackend] = AbstractBackend, 

549 *, 

550 mangler: Optional[Mangler] = None, 

551 cachedfactory: Callable[..., Cached] = cachedfactory, 

552 ttl: Union[int, TtlFunc] = 3600, 

553 **backendconf 

554 ): 

555 self.ttl = ttl 

556 

557 mangler = mangler or Mangler() 

558 assert isinstance(mangler, Mangler) 

559 self.mangler = mangler 

560 

561 if isinstance(backend, AbstractBackend): 

562 if backendconf: 

563 warnings.warn('Backend options ignored because backend instance is passed') 

564 

565 self.backend = backend 

566 elif isinstance(backend, type) and issubclass(backend, AbstractBackend): 

567 self.backend = backend(self.mangler, **backendconf) 

568 else: 

569 raise HermesError('Expected class or instance of AbstractBackend') # type: ignore 

570 

571 assert callable(cachedfactory) 

572 self.cachedfactory = cachedfactory 

573 

574 def __call__( 

575 self, 

576 *args, 

577 ttl: Optional[Union[int, TtlFunc]] = None, 

578 tags: Sequence[str] = (), 

579 key: Optional[KeyFunc] = None, 

580 ): 

581 ''' 

582 Wrap the callable in a cache-point instance. 

583 

584 Decorator that caches method or function result. The following key 

585 arguments are optional: 

586 

587 Bare decorator, ``@cache``, is supported as well as a call with 

588 keyword arguments ``@cache(ttl = 7200)``. 

589 

590 :argument ttl: 

591 Cache entry Time To Live. See :attr:`ttl`. 

592 :argument tags: 

593 Cache entry tag list. 

594 :argument key: 

595 Lambda that provides custom key, otherwise 

596 :obj:`Mangler.nameEntry` is used. 

597 ''' 

598 

599 if args: 

600 # @cache 

601 return self.cachedfactory(self, args[0]) 

602 else: 

603 # @cache() 

604 return functools.partial(self.cachedfactory, self, ttl = ttl, tags = tags, key = key) 

605 

606 def clean(self, tags: Sequence[str] = ()): 

607 ''' 

608 Clean all, or tagged with given tags, cache entries. 

609 

610 :argument tags: 

611 If this argument is omitted the call flushes all cache entries, 

612 otherwise only the entries tagged by given tags are flushed. 

613 ''' 

614 

615 if tags: 

616 self.backend.remove(map(self.mangler.nameTag, tags)) 

617 else: 

618 self.backend.clean() 

619 

620 

621class HermesError(Exception): 

622 '''Generic Hermes error.'''