Coverage for hermes/backend/inprocess.py: 100%

143 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-18 20:48 +0000

1import asyncio 

2import heapq 

3import threading 

4import time 

5import weakref 

6from typing import Any, Dict, Iterable, List, Optional, Tuple, Union 

7 

8from . import AbstractBackend, AbstractLock 

9 

10 

11__all__ = 'AsyncBackend', 'Backend' 

12 

13 

14class Lock(AbstractLock): 

15 '''Key-unaware reentrant thread lock.''' 

16 

17 _lock: threading.RLock 

18 '''Threading lock instance.''' 

19 

20 def __init__(self, key = None): 

21 self._lock = threading.RLock() 

22 

23 def acquire(self, wait = True): 

24 '''Acquire the ``RLock``.''' 

25 

26 return self._lock.acquire(wait) 

27 

28 def release(self): 

29 '''Release the ``RLock``.''' 

30 

31 self._lock.release() 

32 

33 

34class BaseBackend(AbstractBackend): 

35 '''Base dictionary backend without key expiration.''' 

36 

37 cache: dict 

38 '''A ``dict`` used to store cache entries.''' 

39 

40 def __init__(self, mangler): 

41 super().__init__(mangler) 

42 

43 self.cache = {} 

44 

45 def save(self, mapping: Dict[str, Any], *, ttl: Optional[int] = None): 

46 self.cache.update({k: self.mangler.dumps(v) for k, v in mapping.items()}) 

47 

48 def load(self, keys: Union[str, Iterable[str]]) -> Optional[Union[Any, Dict[str, Any]]]: 

49 if isinstance(keys, str): 

50 value = self.cache.get(keys, None) 

51 if value is not None: 

52 value = self.mangler.loads(value) 

53 return value 

54 else: 

55 return {k: self.mangler.loads(self.cache[k]) for k in keys if k in self.cache} 

56 

57 def remove(self, keys: Union[str, Iterable[str]]): 

58 if isinstance(keys, str): 

59 keys = (keys,) 

60 

61 for key in keys: 

62 self.cache.pop(key, None) 

63 

64 def clean(self): 

65 self.cache.clear() 

66 

67 def dump(self) -> Dict[str, Any]: 

68 '''Dump the cache entries. Sorry, Barbara.''' 

69 

70 return {k: self.mangler.loads(v) for k, v in self.cache.items()} 

71 

72 

73class Backend(BaseBackend): 

74 ''' 

75 Simple in-process dictionary-based backend implementation. 

76 

77 In-process in-memory cache without memory limit, but with 

78 expiration. Besides testing, it may be suitable for limited number of 

79 real-world use-cases with a priori small cached data. 

80 ''' 

81 

82 _lock: Lock 

83 '''Lock instance.''' 

84 

85 _ttlHeap: list 

86 '''TTL heap used by the thread to remove the expired entries.''' 

87 

88 _ttlWatchThread: threading.Thread 

89 '''An instance of TTL watcher thread.''' 

90 

91 _ttlWatchSleep: float 

92 '''Seconds for the expiration watcher to sleep in the loop.''' 

93 

94 _ttlWatchThreadRunning = False 

95 '''Run flag of the while-loop of the thread.''' 

96 

97 def __init__(self, mangler, *, ttlWatchSleep: float = 1): 

98 super().__init__(mangler) 

99 

100 self._lock = Lock() 

101 self._ttlHeap = [] 

102 

103 self._ttlWatchSleep = ttlWatchSleep 

104 self.startWatch() 

105 

106 def lock(self, key: str) -> Lock: 

107 return self._lock 

108 

109 def save(self, mapping: Dict[str, Any], *, ttl: Optional[int] = None): 

110 super().save(mapping, ttl = ttl) 

111 

112 if ttl: 

113 for k in mapping: 

114 heapq.heappush(self._ttlHeap, (time.time() + ttl, k)) 

115 

116 def clean(self): 

117 # It touches the heap and needs to be synchronised 

118 with self._lock: 

119 super().clean() 

120 self._ttlHeap.clear() 

121 

122 def startWatch(self): 

123 self._ttlWatchThread = threading.Thread(target = self._watchExpiry, daemon = True) 

124 self._ttlWatchThreadRunning = True 

125 self._ttlWatchThread.start() 

126 

127 def stopWatch(self): 

128 '''Ask TTL watch thread to stop and join it.''' 

129 

130 self._ttlWatchThreadRunning = False 

131 self._ttlWatchThread.join(2 * self._ttlWatchSleep) 

132 

133 def dump(self) -> Dict[str, Any]: 

134 # It iterates the cache and needs to be synchronised 

135 with self._lock: 

136 return super().dump() 

137 

138 def _watchExpiry(self): 

139 while self._ttlWatchThreadRunning: 

140 with self._lock: 

141 # May contain manually invalidated keys 

142 expiredKeys = [] 

143 now = time.time() 

144 while self._ttlHeap and self._ttlHeap[0][0] < now: 

145 _, key = heapq.heappop(self._ttlHeap) 

146 expiredKeys.append(key) 

147 self.remove(expiredKeys) 

148 

149 time.sleep(self._ttlWatchSleep) 

150 

151 

152class AsyncLock(AbstractLock): 

153 ''' 

154 Key-aware asynchronous lock. 

155 

156 Note that instances of this class are used for both synchronous and 

157 asynchronous cases. For asynchronous cases ``asyncio.Lock`` is used 

158 per key. When a synchronous callable is cached in an asynchronous 

159 application, synchronous code is by definition executed serially in 

160 single-threaded Python process running an ``asyncio`` IO loop. Hence, 

161 for synchronous code this class does nothing. 

162 

163 The trick that makes it work for the both cases is that 

164 :class:`hermes.Cached` uses the context manager protocol, and 

165 :class:`hermes.CachedCoro` uses :obj:`.acquire` and 

166 :obj:`.release` directly. 

167 ''' 

168 

169 _lock: asyncio.Lock = None 

170 '''`Lock instance, created lazily on `.acquire` call.''' 

171 

172 def __enter__(self): 

173 ''' 

174 No-op context manager implementation. 

175 

176 Used by :class:`hermes.Cached` for synchronous code. 

177 ''' 

178 

179 def __exit__(self, *args): 

180 ''' 

181 No-op context manager implementation. 

182 

183 Used by :class:`hermes.Cached` for synchronous code. 

184 ''' 

185 

186 async def acquire(self, wait = True) -> bool: # type: ignore[signature-mismatch] 

187 ''' 

188 Acquire the asynchronous lock. 

189 

190 Used by ``CachedCoro`` for asynchronous code. 

191 ''' 

192 

193 if not self._lock: 

194 self._lock = asyncio.Lock() 

195 

196 if not wait and self._lock.locked(): 

197 return False 

198 

199 await self._lock.acquire() 

200 return True 

201 

202 async def release(self): # type: ignore[signature-mismatch] 

203 ''' 

204 Release the asynchronous lock. 

205 

206 Used by ``CachedCoro`` for asynchronous code. 

207 

208 This method does not have to a coroutine itself, because underlying 

209 ``release`` is synchronous. But because :class:`hermes.CachedCoro` 

210 runs regular synchronous callables in a thread pool, and the thread 

211 won't have running IO loop, making this a coroutine lead to desired 

212 behaviour. 

213 ''' 

214 

215 self._lock.release() 

216 

217 

218class AsyncBackend(BaseBackend): 

219 ''' 

220 Simple in-process dictionary-based backend for ``asyncio`` programs. 

221 

222 For cache entries to expire according to their TTL, 

223 :meth:`.startWatch` must be awaited manually when the IO loop is 

224 already running. 

225 ''' 

226 

227 _lockMap: weakref.WeakValueDictionary 

228 '''` 

229 Mapping between cache keys and :class:`.AsyncLock` instances. 

230 

231 ``WeakValueDictionary`` makes cleanup of released locks automatic. 

232 ''' 

233 

234 _ttlHeap: List[Tuple[float, str]] 

235 '''TTL heap used to remove the expired entries.''' 

236 

237 _ttlWatchSleep: float 

238 '''Seconds for the expiration watcher to sleep in the loop.''' 

239 

240 _ttlWatchTask: Optional[asyncio.Task] = None 

241 '''An instance of TTL watcher task, created by ``startWatch``.''' 

242 

243 def __init__(self, mangler, *, ttlWatchSleep: float = 1): 

244 super().__init__(mangler) 

245 

246 self._ttlWatchSleep = ttlWatchSleep 

247 

248 self._lockMap = weakref.WeakValueDictionary() 

249 self._ttlHeap = [] 

250 

251 def lock(self, key: str) -> AsyncLock: 

252 try: 

253 return self._lockMap[key] 

254 except KeyError: 

255 return self._lockMap.setdefault(key, AsyncLock(key)) 

256 

257 def save(self, mapping: Dict[str, Any], *, ttl: Optional[int] = None): 

258 super().save(mapping, ttl = ttl) 

259 

260 if ttl: 

261 for k in mapping: 

262 heapq.heappush(self._ttlHeap, (time.time() + ttl, k)) 

263 

264 def clean(self): 

265 super().clean() 

266 self._ttlHeap.clear() 

267 

268 def startWatch(self): 

269 ''' 

270 Start TTL watching task. 

271 

272 It must be called when ``asyncio`` IO loop is running. 

273 ''' 

274 

275 self._ttlWatchTask = asyncio.get_event_loop().create_task(self._watchExpiry()) 

276 

277 def stopWatch(self): 

278 '''Stop TTL watching task.''' 

279 

280 self._ttlWatchTask.cancel() 

281 

282 async def _watchExpiry(self): 

283 while True: 

284 expiredKeys = [] 

285 now = time.time() 

286 # The removal uses synchronous API so it's atomic for single-threaded 

287 # program by definition, and doesn't need any locking 

288 while self._ttlHeap and self._ttlHeap[0][0] < now: 

289 _, key = heapq.heappop(self._ttlHeap) 

290 expiredKeys.append(key) 

291 self.remove(expiredKeys) 

292 

293 await asyncio.sleep(self._ttlWatchSleep)