Coverage for hermes/backend/memcached.py: 100%
53 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-18 20:48 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-18 20:48 +0000
1import time
2from typing import Any, Dict, Iterable, Optional, Union
4import pymemcache
6from . import AbstractBackend, AbstractLock
9__all__ = 'Backend',
12class Lock(AbstractLock):
13 '''Key-aware distributed lock.'''
15 client: pymemcache.PooledClient
16 '''Memcached client.'''
18 timeout: int
19 '''
20 Maximum TTL of lock, can be up to 30 days, otherwise memcached will
21 treated it as a UNIX timestamp of an exact date.
22 '''
24 sleep: float
25 '''Amount of time to sleep per ``while True`` iteration when waiting.'''
27 def __init__(
28 self, key: str, client: pymemcache.PooledClient, *, sleep: float = 0.1, timeout: int = 900
29 ):
30 super().__init__(key)
32 self.client = client
33 self.sleep = sleep
34 self.timeout = timeout
36 def acquire(self, wait = True):
37 while True:
38 if self.client.add(self.key, 'locked', self.timeout, noreply = False):
39 return True
40 elif not wait:
41 return False
42 else:
43 time.sleep(self.sleep)
45 def release(self):
46 self.client.delete(self.key)
49class Backend(AbstractBackend):
50 '''Memcached backend implementation.'''
52 client: pymemcache.PooledClient
53 '''Memcached client.'''
55 _lockconf: dict
56 '''Lock config.'''
58 def __init__(
59 self,
60 mangler,
61 *,
62 server: str = 'localhost:11211',
63 lockconf: Optional[dict] = None,
64 **kwargs,
65 ):
66 self.mangler = mangler
68 # Memcached client creates a pool that connects lazily
69 self.client = pymemcache.PooledClient(server, **kwargs)
71 self._lockconf = lockconf or {}
73 def lock(self, key) -> Lock:
74 return Lock(self.mangler.nameLock(key), self.client, **self._lockconf)
76 def save(self, mapping: Dict[str, Any], *, ttl: Optional[int] = None):
77 mapping = {k: self.mangler.dumps(v) for k, v in mapping.items()}
78 self.client.set_multi(mapping, ttl if ttl is not None else 0)
80 def load(self, keys: Union[str, Iterable[str]]) -> Optional[Union[Any, Dict[str, Any]]]:
81 if isinstance(keys, str):
82 value = self.client.get(keys)
83 if value is not None:
84 value = self.mangler.loads(value)
85 return value
86 else:
87 return {k: self.mangler.loads(v) for k, v in self.client.get_multi(tuple(keys)).items()}
89 def remove(self, keys: Union[str, Iterable[str]]):
90 if isinstance(keys, str):
91 keys = (keys,)
93 self.client.delete_multi(keys)
95 def clean(self):
96 self.client.flush_all()