1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16  from weakref import WeakValueDictionary 
 17  from itertools import ifilterfalse 
 18  from twisted.python import log 
 19  from twisted.internet import defer 
 20  from collections import deque 
 21  from buildbot.util.bbcollections import defaultdict 
 22   
 24      """ 
 25   
 26      A least-recently-used cache, with a fixed maximum size.  This cache is 
 27      designed to control memory usage by minimizing duplication of objects, 
 28      while avoiding unnecessary re-fetching of the same rows from the database. 
 29   
 30      Asynchronous locking is used to ensure that in the common case of multiple 
 31      concurrent requests for the same key, only one fetch is performed. 
 32   
 33      All values are also stored in a weak valued dictionary, even after they 
 34      have expired from the cache.  This allows values that are used elsewhere in 
 35      Buildbot to "stick" in the cache in case they are needed by another 
 36      component.  Weak references cannot be used for some types, so these types 
 37      are not compatible with this class.  Note that dictionaries can be weakly 
 38      referenced if they are an instance of a subclass of C{dict}. 
 39   
 40      If the result of the C{miss_fn} is C{None}, then the value is not cached; 
 41      this is intended to avoid caching negative results. 
 42   
 43      This is based on Raymond Hettinger's implementation in 
 44      U{http://code.activestate.com/recipes/498245-lru-and-lfu-cache-decorators/} 
 45      licensed under the PSF license, which is GPL-compatiblie. 
 46   
 47      @ivar hits: cache hits so far 
 48      @ivar refhits: cache misses found in the weak ref dictionary, so far 
 49      @ivar misses: cache misses leading to re-fetches, so far 
 50      @ivar max_size: maximum allowed size of the cache 
 51      """ 
 52   
 53      __slots__ = ('max_size max_queue miss_fn ' 
 54                   'queue cache weakrefs refcount concurrent ' 
 55                   'hits refhits misses'.split()) 
 56      sentinel = object() 
 57      QUEUE_SIZE_FACTOR = 10 
 58   
 59 -    def __init__(self, miss_fn, max_size=50): 
  77   
 78 -    def get(self, key, **miss_fn_kwargs): 
  79          """ 
 80          Fetch a value from the cache by key, invoking C{self.miss_fn(key)} if 
 81          the key is not in the cache. 
 82   
 83          Any additional keyword arguments are passed to the C{miss_fn} as 
 84          keyword arguments; these can supply additional information relating to 
 85          the key.  It is up to the caller to ensure that this information is 
 86          functionally identical for each key value: if the key is already in the 
 87          cache, the C{miss_fn} will not be invoked, even if the keyword 
 88          arguments differ. 
 89   
 90          @param key: cache key 
 91          @param **miss_fn_kwargs: keyword arguments to  the miss_fn 
 92          @returns: value via Deferred 
 93          """ 
 94          cache = self.cache 
 95          weakrefs = self.weakrefs 
 96          refcount = self.refcount 
 97          concurrent = self.concurrent 
 98          queue = self.queue 
 99   
100           
101          def ref_key(): 
102              queue.append(key) 
103              refcount[key] = refcount[key] + 1 
104   
105               
106               
107               
108               
109              if len(queue) > self.max_queue: 
110                  refcount.clear() 
111                  queue_appendleft = queue.appendleft 
112                  queue_appendleft(self.sentinel) 
113                  for k in ifilterfalse(refcount.__contains__, 
114                                          iter(queue.pop, self.sentinel)): 
115                      queue_appendleft(k) 
116                      refcount[k] = 1 
 117   
118          try: 
119              result = cache[key] 
120              self.hits += 1 
121              ref_key() 
122              return defer.succeed(result) 
123          except KeyError: 
124              try: 
125                  result = weakrefs[key] 
126                  self.refhits += 1 
127                  cache[key] = result 
128                  ref_key() 
129                  return defer.succeed(result) 
130              except KeyError: 
131                   
132                   
133                  conc = concurrent.get(key) 
134                  if conc: 
135                      self.hits += 1 
136                      d = defer.Deferred() 
137                      conc.append(d) 
138                      return d 
139   
140           
141          self.misses += 1 
142   
143           
144          d = defer.Deferred() 
145          assert key not in concurrent 
146          concurrent[key] = [ d ] 
147   
148          miss_d = self.miss_fn(key, **miss_fn_kwargs) 
149   
150          def handle_result(result): 
151              if result is not None: 
152                  cache[key] = result 
153                  weakrefs[key] = result 
154   
155                   
156                   
157                  ref_key() 
158   
159              self.inv() 
160              self._purge() 
161   
162               
163              dlist = concurrent.pop(key) 
164              for d in dlist: 
165                  d.callback(result) 
 166   
167          def handle_failure(f): 
168               
169              dlist = concurrent.pop(key) 
170              for d in dlist: 
171                  d.errback(f) 
172   
173          miss_d.addCallbacks(handle_result, handle_failure) 
174          miss_d.addErrback(log.err) 
175   
176          return d 
177   
196   
197 -    def put(self, key, value): 
 198          """ 
199          Update the cache with the given key and value, if the key is already in 
200          the cache.  This is intended to be used when updated values are 
201          available for an existing cached object, and does not record a 
202          reference to the key. 
203   
204          @param key: key to update 
205          @param value: new value 
206          @returns: nothing 
207          """ 
208          if key in self.cache: 
209              self.cache[key] = value 
210              self.weakrefs[key] = value 
211          elif key in self.weakrefs: 
212              self.weakrefs[key] = value 
 213   
221   
223          """Check invariants and log if they are not met; used for debugging""" 
224          global inv_failed 
225   
226           
227          cache_keys = set(self.cache.keys()) 
228          queue_keys = set(self.queue) 
229          if queue_keys - cache_keys: 
230              log.msg("INV: uncached keys in queue:", queue_keys - cache_keys) 
231              inv_failed = True 
232          if cache_keys - queue_keys: 
233              log.msg("INV: unqueued keys in cache:", cache_keys - queue_keys) 
234              inv_failed = True 
235   
236           
237           
238          exp_refcount = dict() 
239          for k in self.queue: 
240              exp_refcount[k] = exp_refcount.get(k, 0) + 1 
241          if exp_refcount != self.refcount: 
242              log.msg("INV: refcounts differ:") 
243              log.msg(" expected:", sorted(exp_refcount.items())) 
244              log.msg("      got:", sorted(self.refcount.items())) 
245              inv_failed = True 
 246   
247   
248  inv_failed = False 
249