Coverage for hermes/backend/redis.py: 100%

59 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-18 20:48 +0000

1import time 

2from typing import Any, Dict, Iterable, Optional, Union 

3 

4import redis 

5 

6from . import AbstractBackend, AbstractLock 

7 

8 

9__all__ = 'Backend', 

10 

11 

12class Lock(AbstractLock): 

13 ''' 

14 Key-aware distributed lock. "Distributed" is in sense of clients, 

15 not Redis instances. Implemented as described in `Correct 

16 implementation with a single instance <lock_>`_, but without 

17 setting unique value to the lock entry and later checking it, because 

18 it is expected for a cached function to complete before lock timeout. 

19 

20 .. _lock: http://redis.io/topics/distlock#correct-implementation-with-a-single-instance 

21 ''' 

22 

23 client: redis.StrictRedis 

24 '''Redis client.''' 

25 

26 timeout: int 

27 '''Maximum TTL of lock.''' 

28 

29 sleep: float 

30 '''Amount of time to sleep per ``while True`` iteration when waiting.''' 

31 

32 def __init__( 

33 self, key: str, client: redis.StrictRedis, *, sleep: float = 0.1, timeout: int = 900 

34 ): 

35 super().__init__(key) 

36 

37 self.client = client 

38 self.sleep = sleep 

39 self.timeout = timeout 

40 

41 def acquire(self, wait = True): 

42 while True: 

43 if self.client.set(self.key, 'locked', nx = True, ex = self.timeout): 

44 return True 

45 elif not wait: 

46 return False 

47 else: 

48 time.sleep(self.sleep) 

49 

50 def release(self): 

51 self.client.delete(self.key) 

52 

53 

54class Backend(AbstractBackend): 

55 '''Redis backend implementation.''' 

56 

57 client: redis.StrictRedis 

58 '''Redis client.''' 

59 

60 _lockconf: dict 

61 '''Lock config.''' 

62 

63 def __init__( 

64 self, 

65 mangler, 

66 *, 

67 host: str = 'localhost', 

68 password: Optional[str] = None, 

69 port: int = 6379, 

70 db: int = 0, 

71 lockconf: Optional[dict] = None, 

72 **kwargs 

73 ): 

74 super().__init__(mangler) 

75 

76 # Redis client creates a pool that connects lazily 

77 self.client = redis.StrictRedis(host, port, db, password, **kwargs) 

78 

79 self._lockconf = lockconf or {} 

80 

81 def lock(self, key: str) -> Lock: 

82 return Lock(self.mangler.nameLock(key), self.client, **self._lockconf) 

83 

84 def save(self, mapping: Dict[str, Any], *, ttl: Optional[int] = None): 

85 mapping = {k: self.mangler.dumps(v) for k, v in mapping.items()} 

86 

87 if not ttl: 

88 self.client.mset(mapping) 

89 else: 

90 pipeline = self.client.pipeline() 

91 for k, v in mapping.items(): 

92 pipeline.setex(k, ttl, v) 

93 pipeline.execute() 

94 

95 def load(self, keys: Union[str, Iterable[str]]) -> Optional[Union[Any, Dict[str, Any]]]: 

96 if isinstance(keys, str): 

97 value = self.client.get(keys) 

98 if value is not None: 

99 value = self.mangler.loads(value) 

100 return value 

101 else: 

102 keys = tuple(keys) 

103 return { 

104 k: self.mangler.loads(v) 

105 for k, v in zip(keys, self.client.mget(keys)) 

106 if v is not None 

107 } 

108 

109 def remove(self, keys: Union[str, Iterable[str]]): 

110 if isinstance(keys, str): 

111 keys = (keys,) 

112 

113 self.client.delete(*keys) 

114 

115 def clean(self): 

116 self.client.flushdb()