1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 from weakref import WeakValueDictionary
17 from itertools import ifilterfalse
18 from twisted.python import log
19 from twisted.internet import defer
20 from collections import deque
21 from buildbot.util.bbcollections import defaultdict
22
24 """
25
26 A least-recently-used cache, with a fixed maximum size. This cache is
27 designed to control memory usage by minimizing duplication of objects,
28 while avoiding unnecessary re-fetching of the same rows from the database.
29
30 Asynchronous locking is used to ensure that in the common case of multiple
31 concurrent requests for the same key, only one fetch is performed.
32
33 All values are also stored in a weak valued dictionary, even after they
34 have expired from the cache. This allows values that are used elsewhere in
35 Buildbot to "stick" in the cache in case they are needed by another
36 component. Weak references cannot be used for some types, so these types
37 are not compatible with this class. Note that dictionaries can be weakly
38 referenced if they are an instance of a subclass of C{dict}.
39
40 If the result of the C{miss_fn} is C{None}, then the value is not cached;
41 this is intended to avoid caching negative results.
42
43 This is based on Raymond Hettinger's implementation in
44 U{http://code.activestate.com/recipes/498245-lru-and-lfu-cache-decorators/}
45 licensed under the PSF license, which is GPL-compatiblie.
46
47 @ivar hits: cache hits so far
48 @ivar refhits: cache misses found in the weak ref dictionary, so far
49 @ivar misses: cache misses leading to re-fetches, so far
50 @ivar max_size: maximum allowed size of the cache
51 """
52
53 __slots__ = ('max_size max_queue miss_fn '
54 'queue cache weakrefs refcount concurrent '
55 'hits refhits misses'.split())
56 sentinel = object()
57 QUEUE_SIZE_FACTOR = 10
58
59 - def __init__(self, miss_fn, max_size=50):
77
78 - def get(self, key, **miss_fn_kwargs):
79 """
80 Fetch a value from the cache by key, invoking C{self.miss_fn(key)} if
81 the key is not in the cache.
82
83 Any additional keyword arguments are passed to the C{miss_fn} as
84 keyword arguments; these can supply additional information relating to
85 the key. It is up to the caller to ensure that this information is
86 functionally identical for each key value: if the key is already in the
87 cache, the C{miss_fn} will not be invoked, even if the keyword
88 arguments differ.
89
90 @param key: cache key
91 @param **miss_fn_kwargs: keyword arguments to the miss_fn
92 @returns: value via Deferred
93 """
94 cache = self.cache
95 weakrefs = self.weakrefs
96 refcount = self.refcount
97 concurrent = self.concurrent
98 queue = self.queue
99
100
101 def ref_key():
102 queue.append(key)
103 refcount[key] = refcount[key] + 1
104
105
106
107
108
109 if len(queue) > self.max_queue:
110 refcount.clear()
111 queue_appendleft = queue.appendleft
112 queue_appendleft(self.sentinel)
113 for k in ifilterfalse(refcount.__contains__,
114 iter(queue.pop, self.sentinel)):
115 queue_appendleft(k)
116 refcount[k] = 1
117
118 try:
119 result = cache[key]
120 self.hits += 1
121 ref_key()
122 return defer.succeed(result)
123 except KeyError:
124 try:
125 result = weakrefs[key]
126 self.refhits += 1
127 cache[key] = result
128 ref_key()
129 return defer.succeed(result)
130 except KeyError:
131
132
133 conc = concurrent.get(key)
134 if conc:
135 self.hits += 1
136 d = defer.Deferred()
137 conc.append(d)
138 return d
139
140
141 self.misses += 1
142
143
144 d = defer.Deferred()
145 assert key not in concurrent
146 concurrent[key] = [ d ]
147
148 miss_d = self.miss_fn(key, **miss_fn_kwargs)
149
150 def handle_result(result):
151 if result is not None:
152 cache[key] = result
153 weakrefs[key] = result
154
155
156
157 ref_key()
158
159 self.inv()
160 self._purge()
161
162
163 dlist = concurrent.pop(key)
164 for d in dlist:
165 d.callback(result)
166
167 def handle_failure(f):
168
169 dlist = concurrent.pop(key)
170 for d in dlist:
171 d.errback(f)
172
173 miss_d.addCallbacks(handle_result, handle_failure)
174 miss_d.addErrback(log.err)
175
176 return d
177
196
197 - def put(self, key, value):
198 """
199 Update the cache with the given key and value, if the key is already in
200 the cache. This is intended to be used when updated values are
201 available for an existing cached object, and does not record a
202 reference to the key.
203
204 @param key: key to update
205 @param value: new value
206 @returns: nothing
207 """
208 if key in self.cache:
209 self.cache[key] = value
210 self.weakrefs[key] = value
211 elif key in self.weakrefs:
212 self.weakrefs[key] = value
213
221
223 """Check invariants and log if they are not met; used for debugging"""
224 global inv_failed
225
226
227 cache_keys = set(self.cache.keys())
228 queue_keys = set(self.queue)
229 if queue_keys - cache_keys:
230 log.msg("INV: uncached keys in queue:", queue_keys - cache_keys)
231 inv_failed = True
232 if cache_keys - queue_keys:
233 log.msg("INV: unqueued keys in cache:", cache_keys - queue_keys)
234 inv_failed = True
235
236
237
238 exp_refcount = dict()
239 for k in self.queue:
240 exp_refcount[k] = exp_refcount.get(k, 0) + 1
241 if exp_refcount != self.refcount:
242 log.msg("INV: refcounts differ:")
243 log.msg(" expected:", sorted(exp_refcount.items()))
244 log.msg(" got:", sorted(self.refcount.items()))
245 inv_failed = True
246
247
248 inv_failed = False
249