Coverage for hermes/__init__.py: 100%
221 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-18 20:48 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-18 20:48 +0000
1import asyncio
2import base64
3import functools
4import hashlib
5import inspect
6import os
7import pickle
8import types
9import typing
10import warnings
11import zlib
12from typing import Any, Callable, Coroutine, Dict, Iterable, Optional, Sequence, Tuple, Type, Union
14from .backend import AbstractBackend
17__all__ = 'Hermes', 'HermesError', 'Mangler', 'Cached', 'CachedCoro', 'Serialiser', 'Compressor'
20class Serialiser(typing.NamedTuple):
21 '''Serialisation delegate.'''
23 dumps: Callable[[Any], bytes]
24 '''Serialise cache value.'''
26 loads: Callable[[bytes], Any]
27 '''Deserialise cache value.'''
30class Compressor(typing.NamedTuple):
31 '''Compression delegate.'''
33 compress: Callable[[bytes], bytes]
34 '''Compress serialised cache value.'''
36 decompress: Callable[[bytes], bytes]
37 '''Decompress serialised cache value.'''
39 decompressError: Union[Type[Exception], Tuple[Type[Exception], ...]]
40 '''Decompression error(s) that indicate uncompressed payload.'''
42 compressMinLength: int = 0
43 '''Minimal length of payload in bytes to trigger compression.'''
46class Mangler:
47 '''Key manager responsible for creating keys, hashing and serialisation.'''
49 prefix = 'cache'
50 '''Prefix for cache and tag entries.'''
52 serialiser = Serialiser(pickle.dumps, pickle.loads)
53 '''Serialisation delegate.'''
55 compressor = Compressor(zlib.compress, zlib.decompress, zlib.error, 100)
56 '''Optional compression delegate.'''
58 def hash(self, value: bytes) -> str:
59 '''
60 Hash value.
62 :return: base64 encoded MD5 hash of the value.
63 '''
65 return base64.urlsafe_b64encode(hashlib.md5(value).digest()).strip(b'=').decode()
67 def dumps(self, value) -> bytes:
68 '''Serialise and conditionally compress value.'''
70 result = self.serialiser.dumps(value)
71 if self.compressor and len(result) >= self.compressor.compressMinLength:
72 result = self.compressor.compress(result)
74 return result
76 def loads(self, value: bytes):
77 '''Conditionally decompress and deserialise value.'''
79 if self.compressor:
80 try:
81 value = self.compressor.decompress(value)
82 except self.compressor.decompressError:
83 # It is expected that the error indicates that the value is
84 # shorter than compressMinLength
85 pass
87 return self.serialiser.loads(value)
89 def nameEntry(self, fn: Callable, *args, **kwargs) -> str:
90 '''
91 Return cache key for given callable and its positional and
92 keyword arguments.
94 Note how callable, ``fn``, is represented in the cache key:
96 1) a ``types.MethodType`` instance -> names of
97 ``(module, class, method)``
98 2) a ``types.FunctionType`` instance -> names of
99 ``(module, function)``
100 3) other callalbe objects with ``__name__`` -> name of
101 ``(module, object)``
103 This means that if two function are defined dynamically in the
104 same module with same names, like::
106 def createF1():
107 @cache
108 def f(a, b):
109 return a + b
110 return f
112 def createF2():
113 @cache
114 def f(a, b):
115 return a * b
116 return f
118 print(createF1()(1, 2))
119 print(createF2()(1, 2))
121 Both will return `3`, because cache keys will clash. In such cases
122 you need to pass ``key`` with custom key function.
124 It can also be that an object in case 3 doesn't have a name, or its
125 name isn't unique, then a ``nameEntry`` should be overridden with
126 something that represents it uniquely, like
127 ``repr(fn).rsplit(' at 0x', 1)[0]`` (address should be stripped so
128 after Python process restart the cache can still be valid
129 and usable).
130 '''
132 result = [self.prefix, 'entry']
133 if callable(fn):
134 try:
135 # types.MethodType
136 result.extend([
137 fn.__module__,
138 fn.__self__.__class__.__name__, # type: ignore[attribute-error]
139 fn.__name__,
140 ])
141 except AttributeError:
142 try:
143 # types.FunctionType and other object with __name__
144 result.extend([fn.__module__, fn.__name__])
145 except AttributeError:
146 raise HermesError(
147 'fn is callable but its name is undefined, consider overriding Mangler.nameEntry'
148 )
149 else:
150 raise HermesError('fn is expected to be callable')
152 arguments = args, tuple(sorted(kwargs.items()))
153 result.append(self.hash(self.dumps(arguments)))
155 return ':'.join(result)
157 def nameTag(self, tag: str) -> str:
158 '''Build fully qualified backend tag name.'''
160 return ':'.join([self.prefix, 'tag', tag])
162 def mapTags(self, tagKeys: Iterable[str]) -> Dict[str, str]:
163 '''Map tags to random values for seeding.'''
165 rnd = os.urandom(4).hex()
166 return {key: self.hash(':'.join((key, rnd)).encode()) for key in tagKeys}
168 def hashTags(self, tagMap: Dict[str, str]) -> str:
169 '''Hash tags of a cache entry for the entry key,'''
171 values = tuple(zip(*sorted(tagMap.items())))[1] # sorted by key dict values
172 return self.hash(':'.join(values).encode())
174 def nameLock(self, entryKey: str) -> str:
175 '''
176 Create fully qualified backend lock key for the entry key.
178 :param entryKey:
179 Entry key to create a lock key for. If given entry key is already
180 a colon-separated key name with first component equal to
181 :attr:`prefix`, first to components are dropped. For instance:
183 - ``foo`` → ``cache:lock:foo``
184 - ``cache:entry:fn:tagged:78d64ea049a57494`` →
185 ``cache:lock:fn:tagged:78d64ea049a57494``
187 '''
189 parts = entryKey.split(':')
190 if parts[0] == self.prefix:
191 entryKey = ':'.join(parts[2:])
193 return ':'.join([self.prefix, 'lock', entryKey])
196KeyFunc = Callable[..., str]
197TtlFunc = Callable[..., int]
200class Cached:
201 '''Cache-point wrapper for callables and descriptors.'''
203 _frontend: 'Hermes'
204 '''
205 Hermes instance which provides backend and mangler instances, and
206 TTL fallback value.
207 '''
209 _callable: Callable
210 '''
211 The decorated callable, stays ``types.FunctionType`` if a function
212 is decorated, otherwise it is transformed to ``types.MethodType``
213 on the instance clone by descriptor protocol implementation. It can
214 also be a method descriptor which is also transformed accordingly to
215 the descriptor protocol (e.g. ``staticmethod`` and ``classmethod``).
216 '''
218 _isDescriptor: bool
219 '''Flag defining if the callable is a method descriptor.'''
221 _isMethod: bool
222 '''Flag defining if the callable is a method.'''
224 _ttl: Optional[Union[int, TtlFunc]]
225 '''
226 Optional cache entry Time To Live for decorated callable.
228 It can be either a number of seconds, or a function to calculate it.
229 If no value is provided the frontend default, :attr:`Hermes.ttl`, is
230 used.
231 '''
233 _keyFunc: Optional[KeyFunc]
234 '''Key creation function.'''
236 _tags: Sequence[str]
237 '''Cache entry tags for decorated callable.'''
239 def __init__(
240 self,
241 frontend: 'Hermes',
242 callable: Callable,
243 *,
244 ttl: Optional[Union[int, TtlFunc]] = None,
245 key: Optional[KeyFunc] = None,
246 tags: Sequence[str] = (),
247 ):
248 self._frontend = frontend
249 self._ttl = ttl
250 self._keyFunc = key
251 self._tags = tags
253 self._callable = callable
254 self._isDescriptor = inspect.ismethoddescriptor(callable)
255 self._isMethod = inspect.ismethod(callable)
257 # preserve ``__name__``, ``__doc__``, etc
258 functools.update_wrapper(self, callable)
260 def _load(self, key):
261 if self._tags:
262 tagMap = self._frontend.backend.load(map(self._frontend.mangler.nameTag, self._tags))
263 if len(tagMap) != len(self._tags):
264 return None
265 else:
266 key += ':' + self._frontend.mangler.hashTags(tagMap)
268 return self._frontend.backend.load(key)
270 def _save(self, key, value, ttl: int):
271 if self._tags:
272 namedTags = tuple(map(self._frontend.mangler.nameTag, self._tags))
273 tagMap = self._frontend.backend.load(namedTags)
274 missingTags = set(namedTags) - set(tagMap.keys())
275 if missingTags:
276 missingTagMap = self._frontend.mangler.mapTags(missingTags)
277 self._frontend.backend.save(mapping = missingTagMap, ttl = None)
278 tagMap.update(missingTagMap)
279 assert len(self._tags) == len(tagMap)
281 key += ':' + self._frontend.mangler.hashTags(tagMap)
283 return self._frontend.backend.save({key: value}, ttl = ttl)
285 def _remove(self, key):
286 if self._tags:
287 tagMap = self._frontend.backend.load(map(self._frontend.mangler.nameTag, self._tags))
288 if len(tagMap) != len(self._tags):
289 return
290 else:
291 key += ':' + self._frontend.mangler.hashTags(tagMap)
293 self._frontend.backend.remove(key)
295 def _get_key(self, *args, **kwargs) -> str:
296 keyFunc = self._keyFunc or self._frontend.mangler.nameEntry
297 return keyFunc(self._callable, *args, **kwargs)
299 def _get_ttl(self, return_value, *args, **kwargs) -> int:
300 result = self._ttl if self._ttl is not None else self._frontend.ttl
301 if callable(result):
302 result = result(return_value, self._callable, *args, **kwargs)
303 return result
305 def invalidate(self, *args, **kwargs):
306 '''
307 Invalidate the cache entry.
309 Invalidated entry corresponds to the wrapped callable called with
310 given ``args`` and ``kwargs``.
311 '''
313 self._remove(self._get_key(*args, **kwargs))
315 def __call__(self, *args, **kwargs):
316 '''Get the value of the wrapped callable.'''
318 key = self._get_key(*args, **kwargs)
319 value = self._load(key)
320 if value is None:
321 with self._frontend.backend.lock(key):
322 # it's better to read twice than lock every read
323 value = self._load(key)
324 if value is None:
325 value = self._callable(*args, **kwargs)
326 ttl = self._get_ttl(value, *args, **kwargs)
327 self._save(key, value, ttl)
329 return value
331 def __get__(self, instance, type):
332 '''
333 Implements non-data descriptor protocol.
335 The invocation happens only when instance method is decorated,
336 so we can distinguish between decorated ``types.MethodType`` and
337 ``types.FunctionType``. Python class declaration mechanics prevent
338 a decorator from having awareness of the class type, as the
339 function is received by the decorator before it becomes an
340 instance method.
342 How it works::
344 cache = hermes.Hermes()
346 class Model:
348 @cache
349 def calc(self):
350 return 42
352 m = Model()
353 m.calc
355 Last attribute access results in the call, ``calc.__get__(m, Model)``,
356 where ``calc`` is instance of :class:`Cached` which decorates the
357 original ``Model.calc``.
359 Note, initially :class:`Cached` is created on decoration per
360 class method, when class type is created by the interpreter, and
361 is shared among all instances. Later, on attribute access, a copy
362 is returned with bound ``_callable``, just like ordinary Python
363 method descriptor works.
365 For more details, `descriptor-protocol
366 <http://docs.python.org/3/howto/descriptor.html#descriptor-protocol>`_.
367 '''
369 if instance is not None and self._isDescriptor:
370 return self._copy(self._callable.__get__(instance, type)) # type: ignore[attribute-error]
371 elif instance is not None and not self._isMethod:
372 return self._copy(types.MethodType(self._callable, instance))
373 else:
374 return self
376 def _copy(self, callable):
377 '''
378 Create a shallow copy of self with ``_callable``
379 replaced to given instance.
380 '''
382 boundCached = object.__new__(self.__class__)
383 boundCached.__dict__ = self.__dict__.copy()
384 boundCached._callable = callable
385 return boundCached
388class CachedCoro(Cached):
389 '''
390 Cache-point wrapper for coroutine functions.
392 The implementation uses the default thread pool of ``asyncio`` to
393 execute synchronous functions of the cache backend, and manage their
394 (distributed) locks.
395 '''
397 async def _run(self, fn, *args, **kwargs) -> Coroutine:
398 ''''
399 Run run given function or coroutine function.
401 If ``fn`` is a coroutine function it's called and awaited.
402 Otherwise it's run in the thread pool.
403 '''
405 if inspect.iscoroutinefunction(fn):
406 return await fn(*args, **kwargs)
408 loop = asyncio.get_event_loop()
409 return await loop.run_in_executor(None, functools.partial(fn, *args, **kwargs))
411 async def invalidate(self, *args, **kwargs):
412 '''
413 Invalidate the cache entry.
415 Invalidated entry corresponds to the wrapped coroutine function
416 called with given ``args`` and ``kwargs``.
417 '''
419 await self._run(super().invalidate, *args, **kwargs)
421 async def __call__(self, *args, **kwargs):
422 '''Get the value of the wrapped coroutine function's coroutine.'''
424 key = self._get_key(*args, **kwargs)
425 value = await self._run(self._load, key)
426 if value is None:
427 lock = self._frontend.backend.lock(key)
428 await self._run(lock.acquire)
429 try:
430 value = await self._run(self._load, key)
431 if value is None:
432 value = await self._callable(*args, **kwargs)
433 ttl = self._get_ttl(value, *args, **kwargs)
434 await self._run(self._save, key, value, ttl)
435 finally:
436 await self._run(lock.release)
438 return value
441def cachedfactory(frontend: 'Hermes', fn, **kwargs) -> Cached:
442 '''
443 Create a cache-point object from the callable.
445 :argument frontend:
446 Cache frontend instance.
447 :argument fn:
448 Must be coroutine function, callable or method descriptor.
449 '''
451 isdescr = inspect.ismethoddescriptor(fn)
452 if (
453 inspect.iscoroutinefunction(fn)
454 or isdescr and inspect.iscoroutinefunction(getattr(fn, '__func__', None))
455 ):
456 return CachedCoro(frontend, fn, **kwargs)
457 elif callable(fn) or isdescr:
458 return Cached(frontend, fn, **kwargs)
459 else:
460 raise HermesError(
461 'First positional argument must be coroutine function, callable or method descriptor'
462 )
465class Hermes:
466 '''
467 Cache façade.
469 :argument backend:
470 Class or instance of cache backend. If a class is passed, keyword
471 arguments of passed to :obj:`Hermes` constructor will be bypassed
472 to the class' constructor.
474 If the argument is omitted no-op backend will be be used.
475 :argument mangler:
476 Optional, typically of a subclass, mangler instance.
477 :argument cachedfactory:
478 Optional, a cache-point factory for functions and coroutines.
479 :argument ttl:
480 Default cache entry time-to-live.
482 Usage::
484 import hermes.backend.redis
487 cache = hermes.Hermes(
488 hermes.backend.redis.Backend, ttl = 600, host = 'localhost', db = 1
489 )
491 @cache
492 def foo(a, b):
493 return a * b
495 class Example:
497 @cache(tags = ('math', 'power'), ttl = 1200)
498 def bar(self, a, b):
499 return a ** b
501 @cache(
502 tags = ('math', 'avg'),
503 key = lambda fn, *args, **kwargs: 'avg:{0}:{1}'.format(*args),
504 )
505 def baz(self, a, b):
506 return (a + b) / 2.0
508 print(foo(2, 333))
510 example = Example()
511 print(example.bar(2, 10))
512 print(example.baz(2, 10))
514 foo.invalidate(2, 333)
515 example.bar.invalidate(2, 10)
516 example.baz.invalidate(2, 10)
518 cache.clean(['math']) # invalidate entries tagged 'math'
519 cache.clean() # flush cache
521 '''
523 backend: AbstractBackend
524 '''Cache backend.'''
526 mangler: Mangler
527 '''Key manager responsible for creating keys, hashing and serialisation.'''
529 cachedfactory: Callable[..., Cached]
530 '''Cache-point callable object factory.'''
532 ttl: Union[int, TtlFunc]
533 '''
534 Default cache entry time-to-live.
536 It can be either a number of seconds, or a function to calculate
537 it. The latter is given:
539 - the return value of the decorated callable's call
540 - the decorated callable object
541 - actual positional arguments of the call
542 - actual keyword arguments of the call
544 '''
546 def __init__(
547 self,
548 backend: Union[Type[AbstractBackend], AbstractBackend] = AbstractBackend,
549 *,
550 mangler: Optional[Mangler] = None,
551 cachedfactory: Callable[..., Cached] = cachedfactory,
552 ttl: Union[int, TtlFunc] = 3600,
553 **backendconf
554 ):
555 self.ttl = ttl
557 mangler = mangler or Mangler()
558 assert isinstance(mangler, Mangler)
559 self.mangler = mangler
561 if isinstance(backend, AbstractBackend):
562 if backendconf:
563 warnings.warn('Backend options ignored because backend instance is passed')
565 self.backend = backend
566 elif isinstance(backend, type) and issubclass(backend, AbstractBackend):
567 self.backend = backend(self.mangler, **backendconf)
568 else:
569 raise HermesError('Expected class or instance of AbstractBackend') # type: ignore
571 assert callable(cachedfactory)
572 self.cachedfactory = cachedfactory
574 def __call__(
575 self,
576 *args,
577 ttl: Optional[Union[int, TtlFunc]] = None,
578 tags: Sequence[str] = (),
579 key: Optional[KeyFunc] = None,
580 ):
581 '''
582 Wrap the callable in a cache-point instance.
584 Decorator that caches method or function result. The following key
585 arguments are optional:
587 Bare decorator, ``@cache``, is supported as well as a call with
588 keyword arguments ``@cache(ttl = 7200)``.
590 :argument ttl:
591 Cache entry Time To Live. See :attr:`ttl`.
592 :argument tags:
593 Cache entry tag list.
594 :argument key:
595 Lambda that provides custom key, otherwise
596 :obj:`Mangler.nameEntry` is used.
597 '''
599 if args:
600 # @cache
601 return self.cachedfactory(self, args[0])
602 else:
603 # @cache()
604 return functools.partial(self.cachedfactory, self, ttl = ttl, tags = tags, key = key)
606 def clean(self, tags: Sequence[str] = ()):
607 '''
608 Clean all, or tagged with given tags, cache entries.
610 :argument tags:
611 If this argument is omitted the call flushes all cache entries,
612 otherwise only the entries tagged by given tags are flushed.
613 '''
615 if tags:
616 self.backend.remove(map(self.mangler.nameTag, tags))
617 else:
618 self.backend.clean()
621class HermesError(Exception):
622 '''Generic Hermes error.'''