Coverage for hermes/backend/redis.py: 100%
59 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-18 20:48 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-18 20:48 +0000
1import time
2from typing import Any, Dict, Iterable, Optional, Union
4import redis
6from . import AbstractBackend, AbstractLock
9__all__ = 'Backend',
12class Lock(AbstractLock):
13 '''
14 Key-aware distributed lock. "Distributed" is in sense of clients,
15 not Redis instances. Implemented as described in `Correct
16 implementation with a single instance <lock_>`_, but without
17 setting unique value to the lock entry and later checking it, because
18 it is expected for a cached function to complete before lock timeout.
20 .. _lock: http://redis.io/topics/distlock#correct-implementation-with-a-single-instance
21 '''
23 client: redis.StrictRedis
24 '''Redis client.'''
26 timeout: int
27 '''Maximum TTL of lock.'''
29 sleep: float
30 '''Amount of time to sleep per ``while True`` iteration when waiting.'''
32 def __init__(
33 self, key: str, client: redis.StrictRedis, *, sleep: float = 0.1, timeout: int = 900
34 ):
35 super().__init__(key)
37 self.client = client
38 self.sleep = sleep
39 self.timeout = timeout
41 def acquire(self, wait = True):
42 while True:
43 if self.client.set(self.key, 'locked', nx = True, ex = self.timeout):
44 return True
45 elif not wait:
46 return False
47 else:
48 time.sleep(self.sleep)
50 def release(self):
51 self.client.delete(self.key)
54class Backend(AbstractBackend):
55 '''Redis backend implementation.'''
57 client: redis.StrictRedis
58 '''Redis client.'''
60 _lockconf: dict
61 '''Lock config.'''
63 def __init__(
64 self,
65 mangler,
66 *,
67 host: str = 'localhost',
68 password: Optional[str] = None,
69 port: int = 6379,
70 db: int = 0,
71 lockconf: Optional[dict] = None,
72 **kwargs
73 ):
74 super().__init__(mangler)
76 # Redis client creates a pool that connects lazily
77 self.client = redis.StrictRedis(host, port, db, password, **kwargs)
79 self._lockconf = lockconf or {}
81 def lock(self, key: str) -> Lock:
82 return Lock(self.mangler.nameLock(key), self.client, **self._lockconf)
84 def save(self, mapping: Dict[str, Any], *, ttl: Optional[int] = None):
85 mapping = {k: self.mangler.dumps(v) for k, v in mapping.items()}
87 if not ttl:
88 self.client.mset(mapping)
89 else:
90 pipeline = self.client.pipeline()
91 for k, v in mapping.items():
92 pipeline.setex(k, ttl, v)
93 pipeline.execute()
95 def load(self, keys: Union[str, Iterable[str]]) -> Optional[Union[Any, Dict[str, Any]]]:
96 if isinstance(keys, str):
97 value = self.client.get(keys)
98 if value is not None:
99 value = self.mangler.loads(value)
100 return value
101 else:
102 keys = tuple(keys)
103 return {
104 k: self.mangler.loads(v)
105 for k, v in zip(keys, self.client.mget(keys))
106 if v is not None
107 }
109 def remove(self, keys: Union[str, Iterable[str]]):
110 if isinstance(keys, str):
111 keys = (keys,)
113 self.client.delete(*keys)
115 def clean(self):
116 self.client.flushdb()