Package buildbot :: Package util :: Module lru
[frames] | no frames]

Source Code for Module buildbot.util.lru

  1  # This file is part of Buildbot.  Buildbot is free software: you can 
  2  # redistribute it and/or modify it under the terms of the GNU General Public 
  3  # License as published by the Free Software Foundation, version 2. 
  4  # 
  5  # This program is distributed in the hope that it will be useful, but WITHOUT 
  6  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 
  8  # details. 
  9  # 
 10  # You should have received a copy of the GNU General Public License along with 
 11  # this program; if not, write to the Free Software Foundation, Inc., 51 
 12  # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 13  # 
 14  # Copyright Buildbot Team Members 
 15   
 16  from weakref import WeakValueDictionary 
 17  from itertools import ifilterfalse 
 18  from twisted.python import log 
 19  from twisted.internet import defer 
 20  from collections import deque 
 21  from buildbot.util.bbcollections import defaultdict 
 22   
23 -class AsyncLRUCache(object):
24 """ 25 26 A least-recently-used cache, with a fixed maximum size. This cache is 27 designed to control memory usage by minimizing duplication of objects, 28 while avoiding unnecessary re-fetching of the same rows from the database. 29 30 Asynchronous locking is used to ensure that in the common case of multiple 31 concurrent requests for the same key, only one fetch is performed. 32 33 All values are also stored in a weak valued dictionary, even after they 34 have expired from the cache. This allows values that are used elsewhere in 35 Buildbot to "stick" in the cache in case they are needed by another 36 component. Weak references cannot be used for some types, so these types 37 are not compatible with this class. Note that dictionaries can be weakly 38 referenced if they are an instance of a subclass of C{dict}. 39 40 If the result of the C{miss_fn} is C{None}, then the value is not cached; 41 this is intended to avoid caching negative results. 42 43 This is based on Raymond Hettinger's implementation in 44 U{http://code.activestate.com/recipes/498245-lru-and-lfu-cache-decorators/} 45 licensed under the PSF license, which is GPL-compatiblie. 46 47 @ivar hits: cache hits so far 48 @ivar refhits: cache misses found in the weak ref dictionary, so far 49 @ivar misses: cache misses leading to re-fetches, so far 50 @ivar max_size: maximum allowed size of the cache 51 """ 52 53 __slots__ = ('max_size max_queue miss_fn ' 54 'queue cache weakrefs refcount concurrent ' 55 'hits refhits misses'.split()) 56 sentinel = object() 57 QUEUE_SIZE_FACTOR = 10 58
59 - def __init__(self, miss_fn, max_size=50):
60 """ 61 Constructor. 62 63 @param miss_fn: function to call, with key as parameter, for cache 64 misses. This function I{must} return a deferred. 65 66 @param max_size: maximum number of objects in the cache 67 """ 68 self.miss_fn = miss_fn 69 self.max_size = max_size 70 self.max_queue = max_size * self.QUEUE_SIZE_FACTOR 71 self.queue = deque() 72 self.cache = {} 73 self.weakrefs = WeakValueDictionary() 74 self.concurrent = {} 75 self.hits = self.misses = self.refhits = 0 76 self.refcount = defaultdict(lambda : 0)
77
78 - def get(self, key, **miss_fn_kwargs):
79 """ 80 Fetch a value from the cache by key, invoking C{self.miss_fn(key)} if 81 the key is not in the cache. 82 83 Any additional keyword arguments are passed to the C{miss_fn} as 84 keyword arguments; these can supply additional information relating to 85 the key. It is up to the caller to ensure that this information is 86 functionally identical for each key value: if the key is already in the 87 cache, the C{miss_fn} will not be invoked, even if the keyword 88 arguments differ. 89 90 @param key: cache key 91 @param **miss_fn_kwargs: keyword arguments to the miss_fn 92 @returns: value via Deferred 93 """ 94 cache = self.cache 95 weakrefs = self.weakrefs 96 refcount = self.refcount 97 concurrent = self.concurrent 98 queue = self.queue 99 100 # utility function to record recent use of this key 101 def ref_key(): 102 queue.append(key) 103 refcount[key] = refcount[key] + 1 104 105 # periodically compact the queue by eliminating duplicate keys 106 # while preserving order of most recent access. Note that this 107 # is only required when the cache does not exceed its maximum 108 # size 109 if len(queue) > self.max_queue: 110 refcount.clear() 111 queue_appendleft = queue.appendleft 112 queue_appendleft(self.sentinel) 113 for k in ifilterfalse(refcount.__contains__, 114 iter(queue.pop, self.sentinel)): 115 queue_appendleft(k) 116 refcount[k] = 1
117 118 try: 119 result = cache[key] 120 self.hits += 1 121 ref_key() 122 return defer.succeed(result) 123 except KeyError: 124 try: 125 result = weakrefs[key] 126 self.refhits += 1 127 cache[key] = result 128 ref_key() 129 return defer.succeed(result) 130 except KeyError: 131 # if there's already a fetch going on, add 132 # to the list of waiting deferreds 133 conc = concurrent.get(key) 134 if conc: 135 self.hits += 1 136 d = defer.Deferred() 137 conc.append(d) 138 return d 139 140 # if we're here, we've missed and need to fetch 141 self.misses += 1 142 143 # create a list of waiting deferreds for this key 144 d = defer.Deferred() 145 assert key not in concurrent 146 concurrent[key] = [ d ] 147 148 miss_d = self.miss_fn(key, **miss_fn_kwargs) 149 150 def handle_result(result): 151 if result is not None: 152 cache[key] = result 153 weakrefs[key] = result 154 155 # reference the key once, possibly standing in for multiple 156 # concurrent accesses 157 ref_key() 158 159 self.inv() 160 self._purge() 161 162 # and fire all of the waiting Deferreds 163 dlist = concurrent.pop(key) 164 for d in dlist: 165 d.callback(result)
166 167 def handle_failure(f): 168 # errback all of the waiting Deferreds 169 dlist = concurrent.pop(key) 170 for d in dlist: 171 d.errback(f) 172 173 miss_d.addCallbacks(handle_result, handle_failure) 174 miss_d.addErrback(log.err) 175 176 return d 177
178 - def _purge(self):
179 if len(self.cache) <= self.max_size: 180 return 181 182 cache = self.cache 183 refcount = self.refcount 184 queue = self.queue 185 max_size = self.max_size 186 187 # purge least recently used entries, using refcount to count entries 188 # that appear multiple times in the queue 189 while len(cache) > max_size: 190 refc = 1 191 while refc: 192 k = queue.popleft() 193 refc = refcount[k] = refcount[k] - 1 194 del cache[k] 195 del refcount[k]
196
197 - def put(self, key, value):
198 """ 199 Update the cache with the given key and value, if the key is already in 200 the cache. This is intended to be used when updated values are 201 available for an existing cached object, and does not record a 202 reference to the key. 203 204 @param key: key to update 205 @param value: new value 206 @returns: nothing 207 """ 208 if key in self.cache: 209 self.cache[key] = value 210 self.weakrefs[key] = value 211 elif key in self.weakrefs: 212 self.weakrefs[key] = value
213
214 - def set_max_size(self, max_size):
215 if self.max_size == max_size: 216 return 217 218 self.max_size = max_size 219 self.max_queue = max_size * self.QUEUE_SIZE_FACTOR 220 self._purge()
221
222 - def inv(self):
223 """Check invariants and log if they are not met; used for debugging""" 224 global inv_failed 225 226 # the keys of the queue and cache should be identical 227 cache_keys = set(self.cache.keys()) 228 queue_keys = set(self.queue) 229 if queue_keys - cache_keys: 230 log.msg("INV: uncached keys in queue:", queue_keys - cache_keys) 231 inv_failed = True 232 if cache_keys - queue_keys: 233 log.msg("INV: unqueued keys in cache:", cache_keys - queue_keys) 234 inv_failed = True 235 236 # refcount should always represent the number of times each key appears 237 # in the queue 238 exp_refcount = dict() 239 for k in self.queue: 240 exp_refcount[k] = exp_refcount.get(k, 0) + 1 241 if exp_refcount != self.refcount: 242 log.msg("INV: refcounts differ:") 243 log.msg(" expected:", sorted(exp_refcount.items())) 244 log.msg(" got:", sorted(self.refcount.items())) 245 inv_failed = True
246 247 # for tests 248 inv_failed = False 249