Coverage for hermes/backend/inprocess.py: 100%
143 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-18 20:48 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-18 20:48 +0000
1import asyncio
2import heapq
3import threading
4import time
5import weakref
6from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
8from . import AbstractBackend, AbstractLock
11__all__ = 'AsyncBackend', 'Backend'
14class Lock(AbstractLock):
15 '''Key-unaware reentrant thread lock.'''
17 _lock: threading.RLock
18 '''Threading lock instance.'''
20 def __init__(self, key = None):
21 self._lock = threading.RLock()
23 def acquire(self, wait = True):
24 '''Acquire the ``RLock``.'''
26 return self._lock.acquire(wait)
28 def release(self):
29 '''Release the ``RLock``.'''
31 self._lock.release()
34class BaseBackend(AbstractBackend):
35 '''Base dictionary backend without key expiration.'''
37 cache: dict
38 '''A ``dict`` used to store cache entries.'''
40 def __init__(self, mangler):
41 super().__init__(mangler)
43 self.cache = {}
45 def save(self, mapping: Dict[str, Any], *, ttl: Optional[int] = None):
46 self.cache.update({k: self.mangler.dumps(v) for k, v in mapping.items()})
48 def load(self, keys: Union[str, Iterable[str]]) -> Optional[Union[Any, Dict[str, Any]]]:
49 if isinstance(keys, str):
50 value = self.cache.get(keys, None)
51 if value is not None:
52 value = self.mangler.loads(value)
53 return value
54 else:
55 return {k: self.mangler.loads(self.cache[k]) for k in keys if k in self.cache}
57 def remove(self, keys: Union[str, Iterable[str]]):
58 if isinstance(keys, str):
59 keys = (keys,)
61 for key in keys:
62 self.cache.pop(key, None)
64 def clean(self):
65 self.cache.clear()
67 def dump(self) -> Dict[str, Any]:
68 '''Dump the cache entries. Sorry, Barbara.'''
70 return {k: self.mangler.loads(v) for k, v in self.cache.items()}
73class Backend(BaseBackend):
74 '''
75 Simple in-process dictionary-based backend implementation.
77 In-process in-memory cache without memory limit, but with
78 expiration. Besides testing, it may be suitable for limited number of
79 real-world use-cases with a priori small cached data.
80 '''
82 _lock: Lock
83 '''Lock instance.'''
85 _ttlHeap: list
86 '''TTL heap used by the thread to remove the expired entries.'''
88 _ttlWatchThread: threading.Thread
89 '''An instance of TTL watcher thread.'''
91 _ttlWatchSleep: float
92 '''Seconds for the expiration watcher to sleep in the loop.'''
94 _ttlWatchThreadRunning = False
95 '''Run flag of the while-loop of the thread.'''
97 def __init__(self, mangler, *, ttlWatchSleep: float = 1):
98 super().__init__(mangler)
100 self._lock = Lock()
101 self._ttlHeap = []
103 self._ttlWatchSleep = ttlWatchSleep
104 self.startWatch()
106 def lock(self, key: str) -> Lock:
107 return self._lock
109 def save(self, mapping: Dict[str, Any], *, ttl: Optional[int] = None):
110 super().save(mapping, ttl = ttl)
112 if ttl:
113 for k in mapping:
114 heapq.heappush(self._ttlHeap, (time.time() + ttl, k))
116 def clean(self):
117 # It touches the heap and needs to be synchronised
118 with self._lock:
119 super().clean()
120 self._ttlHeap.clear()
122 def startWatch(self):
123 self._ttlWatchThread = threading.Thread(target = self._watchExpiry, daemon = True)
124 self._ttlWatchThreadRunning = True
125 self._ttlWatchThread.start()
127 def stopWatch(self):
128 '''Ask TTL watch thread to stop and join it.'''
130 self._ttlWatchThreadRunning = False
131 self._ttlWatchThread.join(2 * self._ttlWatchSleep)
133 def dump(self) -> Dict[str, Any]:
134 # It iterates the cache and needs to be synchronised
135 with self._lock:
136 return super().dump()
138 def _watchExpiry(self):
139 while self._ttlWatchThreadRunning:
140 with self._lock:
141 # May contain manually invalidated keys
142 expiredKeys = []
143 now = time.time()
144 while self._ttlHeap and self._ttlHeap[0][0] < now:
145 _, key = heapq.heappop(self._ttlHeap)
146 expiredKeys.append(key)
147 self.remove(expiredKeys)
149 time.sleep(self._ttlWatchSleep)
152class AsyncLock(AbstractLock):
153 '''
154 Key-aware asynchronous lock.
156 Note that instances of this class are used for both synchronous and
157 asynchronous cases. For asynchronous cases ``asyncio.Lock`` is used
158 per key. When a synchronous callable is cached in an asynchronous
159 application, synchronous code is by definition executed serially in
160 single-threaded Python process running an ``asyncio`` IO loop. Hence,
161 for synchronous code this class does nothing.
163 The trick that makes it work for the both cases is that
164 :class:`hermes.Cached` uses the context manager protocol, and
165 :class:`hermes.CachedCoro` uses :obj:`.acquire` and
166 :obj:`.release` directly.
167 '''
169 _lock: asyncio.Lock = None
170 '''`Lock instance, created lazily on `.acquire` call.'''
172 def __enter__(self):
173 '''
174 No-op context manager implementation.
176 Used by :class:`hermes.Cached` for synchronous code.
177 '''
179 def __exit__(self, *args):
180 '''
181 No-op context manager implementation.
183 Used by :class:`hermes.Cached` for synchronous code.
184 '''
186 async def acquire(self, wait = True) -> bool: # type: ignore[signature-mismatch]
187 '''
188 Acquire the asynchronous lock.
190 Used by ``CachedCoro`` for asynchronous code.
191 '''
193 if not self._lock:
194 self._lock = asyncio.Lock()
196 if not wait and self._lock.locked():
197 return False
199 await self._lock.acquire()
200 return True
202 async def release(self): # type: ignore[signature-mismatch]
203 '''
204 Release the asynchronous lock.
206 Used by ``CachedCoro`` for asynchronous code.
208 This method does not have to a coroutine itself, because underlying
209 ``release`` is synchronous. But because :class:`hermes.CachedCoro`
210 runs regular synchronous callables in a thread pool, and the thread
211 won't have running IO loop, making this a coroutine lead to desired
212 behaviour.
213 '''
215 self._lock.release()
218class AsyncBackend(BaseBackend):
219 '''
220 Simple in-process dictionary-based backend for ``asyncio`` programs.
222 For cache entries to expire according to their TTL,
223 :meth:`.startWatch` must be awaited manually when the IO loop is
224 already running.
225 '''
227 _lockMap: weakref.WeakValueDictionary
228 '''`
229 Mapping between cache keys and :class:`.AsyncLock` instances.
231 ``WeakValueDictionary`` makes cleanup of released locks automatic.
232 '''
234 _ttlHeap: List[Tuple[float, str]]
235 '''TTL heap used to remove the expired entries.'''
237 _ttlWatchSleep: float
238 '''Seconds for the expiration watcher to sleep in the loop.'''
240 _ttlWatchTask: Optional[asyncio.Task] = None
241 '''An instance of TTL watcher task, created by ``startWatch``.'''
243 def __init__(self, mangler, *, ttlWatchSleep: float = 1):
244 super().__init__(mangler)
246 self._ttlWatchSleep = ttlWatchSleep
248 self._lockMap = weakref.WeakValueDictionary()
249 self._ttlHeap = []
251 def lock(self, key: str) -> AsyncLock:
252 try:
253 return self._lockMap[key]
254 except KeyError:
255 return self._lockMap.setdefault(key, AsyncLock(key))
257 def save(self, mapping: Dict[str, Any], *, ttl: Optional[int] = None):
258 super().save(mapping, ttl = ttl)
260 if ttl:
261 for k in mapping:
262 heapq.heappush(self._ttlHeap, (time.time() + ttl, k))
264 def clean(self):
265 super().clean()
266 self._ttlHeap.clear()
268 def startWatch(self):
269 '''
270 Start TTL watching task.
272 It must be called when ``asyncio`` IO loop is running.
273 '''
275 self._ttlWatchTask = asyncio.get_event_loop().create_task(self._watchExpiry())
277 def stopWatch(self):
278 '''Stop TTL watching task.'''
280 self._ttlWatchTask.cancel()
282 async def _watchExpiry(self):
283 while True:
284 expiredKeys = []
285 now = time.time()
286 # The removal uses synchronous API so it's atomic for single-threaded
287 # program by definition, and doesn't need any locking
288 while self._ttlHeap and self._ttlHeap[0][0] < now:
289 _, key = heapq.heappop(self._ttlHeap)
290 expiredKeys.append(key)
291 self.remove(expiredKeys)
293 await asyncio.sleep(self._ttlWatchSleep)