1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16  from weakref import WeakValueDictionary 
 17  from itertools import ifilterfalse 
 18  from twisted.python import log 
 19  from twisted.internet import defer 
 20  from collections import deque 
 21  from buildbot.util.bbcollections import defaultdict 
 22   
 24      """ 
 25   
 26      A least-recently-used cache, with a fixed maximum size.  This cache is 
 27      designed to control memory usage by minimizing duplication of objects, 
 28      while avoiding unnecessary re-fetching of the same rows from the database. 
 29   
 30      Asynchronous locking is used to ensure that in the common case of multiple 
 31      concurrent requests for the same key, only one fetch is performed. 
 32   
 33      All values are also stored in a weak valued dictionary, even after they 
 34      have expired from the cache.  This allows values that are used elsewhere in 
 35      Buildbot to "stick" in the cache in case they are needed by another 
 36      component.  Weak references cannot be used for some types, so these types 
 37      are not compatible with this class.  Note that dictionaries can be weakly 
 38      referenced if they are an instance of a subclass of C{dict}. 
 39   
 40      If the result of the C{miss_fn} is C{None}, then the value is not cached; 
 41      this is intended to avoid caching negative results. 
 42   
 43      This is based on Raymond Hettinger's implementation in 
 44      U{http://code.activestate.com/recipes/498245-lru-and-lfu-cache-decorators/} 
 45      licensed under the PSF license, which is GPL-compatiblie. 
 46   
 47      @ivar hits: cache hits so far 
 48      @ivar refhits: cache misses found in the weak ref dictionary, so far 
 49      @ivar misses: cache misses leading to re-fetches, so far 
 50      @ivar max_size: maximum allowed size of the cache 
 51      """ 
 52   
 53      __slots__ = ('max_size max_queue miss_fn ' 
 54                   'queue cache weakrefs refcount concurrent ' 
 55                   'hits refhits misses'.split()) 
 56      sentinel = object() 
 57      QUEUE_SIZE_FACTOR = 10 
 58   
 59 -    def __init__(self, miss_fn, max_size=50): 
  77   
 78 -    def get(self, key, **miss_fn_kwargs): 
  79          """ 
 80          Fetch a value from the cache by key, invoking C{self.miss_fn(key)} if 
 81          the key is not in the cache. 
 82   
 83          Any additional keyword arguments are passed to the C{miss_fn} as 
 84          keyword arguments; these can supply additional information relating to 
 85          the key.  It is up to the caller to ensure that this information is 
 86          functionally identical for each key value: if the key is already in the 
 87          cache, the C{miss_fn} will not be invoked, even if the keyword 
 88          arguments differ. 
 89   
 90          @param key: cache key 
 91          @param **miss_fn_kwargs: keyword arguments to  the miss_fn 
 92          @returns: value via Deferred 
 93          """ 
 94          cache = self.cache 
 95          weakrefs = self.weakrefs 
 96          refcount = self.refcount 
 97          concurrent = self.concurrent 
 98          queue = self.queue 
 99   
100           
101          def ref_key(): 
102              queue.append(key) 
103              refcount[key] = refcount[key] + 1 
104   
105               
106               
107               
108               
109              if len(queue) > self.max_queue: 
110                  refcount.clear() 
111                  queue_appendleft = queue.appendleft 
112                  queue_appendleft(self.sentinel) 
113                  for k in ifilterfalse(refcount.__contains__, 
114                                          iter(queue.pop, self.sentinel)): 
115                      queue_appendleft(k) 
116                      refcount[k] = 1 
 117   
118          try: 
119              result = cache[key] 
120              self.hits += 1 
121              ref_key() 
122              return defer.succeed(result) 
123          except KeyError: 
124              try: 
125                  result = weakrefs[key] 
126                  self.refhits += 1 
127                  cache[key] = result 
128                  ref_key() 
129                  return defer.succeed(result) 
130              except KeyError: 
131                   
132                   
133                  conc = concurrent.get(key) 
134                  if conc: 
135                      self.hits += 1 
136                      d = defer.Deferred() 
137                      conc.append(d) 
138                      return d 
139   
140           
141          self.misses += 1 
142   
143           
144          d = defer.Deferred() 
145          assert key not in concurrent 
146          concurrent[key] = [ d ] 
147   
148          miss_d = self.miss_fn(key, **miss_fn_kwargs) 
149   
150          def handle_result(result): 
151              if result is not None: 
152                  cache[key] = result 
153                  weakrefs[key] = result 
154   
155                   
156                   
157                  ref_key() 
158   
159              self._purge() 
160   
161               
162              dlist = concurrent.pop(key) 
163              for d in dlist: 
164                  d.callback(result) 
 165   
166          def handle_failure(f): 
167               
168              dlist = concurrent.pop(key) 
169              for d in dlist: 
170                  d.errback(f) 
171   
172          miss_d.addCallbacks(handle_result, handle_failure) 
173          miss_d.addErrback(log.err) 
174   
175          return d 
176   
195   
197          """Check invariants and log if they are not met; used for debugging""" 
198   
199           
200          cache_keys = set(self.cache.keys()) 
201          queue_keys = set(self.queue) 
202          if queue_keys - cache_keys: 
203              log.msg("INV: uncached keys in queue:", queue_keys - cache_keys) 
204          if cache_keys - queue_keys: 
205              log.msg("INV: unqueued keys in cache:", cache_keys - queue_keys) 
206   
207           
208           
209          exp_refcount = dict() 
210          for k in self.queue: 
211              exp_refcount[k] = exp_refcount.get(k, 0) + 1 
212          if exp_refcount != self.refcount: 
213              log.msg("INV: refcounts differ:") 
214              log.msg(" expected:", sorted(exp_refcount.items())) 
215              log.msg("      got:", sorted(self.refcount.items())) 
 216   
217   
218 -    def put(self, key, value): 
 219          """ 
220          Update the cache with the given key and value, if the key is already in 
221          the cache.  This is intended to be used when updated values are 
222          available for an existing cached object, and does not record a 
223          reference to the key. 
224   
225          @param key: key to update 
226          @param value: new value 
227          @returns: nothing 
228          """ 
229          if key in self.cache: 
230              self.cache[key] = value 
231              self.weakrefs[key] = value 
232          elif key in self.weakrefs: 
233              self.weakrefs[key] = value 
 234   
242