Package buildbot :: Package util :: Module lru
[frames] | no frames]

Source Code for Module buildbot.util.lru

  1  # This file is part of Buildbot.  Buildbot is free software: you can 
  2  # redistribute it and/or modify it under the terms of the GNU General Public 
  3  # License as published by the Free Software Foundation, version 2. 
  4  # 
  5  # This program is distributed in the hope that it will be useful, but WITHOUT 
  6  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 
  8  # details. 
  9  # 
 10  # You should have received a copy of the GNU General Public License along with 
 11  # this program; if not, write to the Free Software Foundation, Inc., 51 
 12  # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 13  # 
 14  # Copyright Buildbot Team Members 
 15   
 16  from weakref import WeakValueDictionary 
 17  from itertools import ifilterfalse 
 18  from twisted.python import log 
 19  from twisted.internet import defer 
 20  from collections import deque 
 21  from buildbot.util.bbcollections import defaultdict 
 22   
23 -class AsyncLRUCache(object):
24 """ 25 26 A least-recently-used cache, with a fixed maximum size. This cache is 27 designed to control memory usage by minimizing duplication of objects, 28 while avoiding unnecessary re-fetching of the same rows from the database. 29 30 Asynchronous locking is used to ensure that in the common case of multiple 31 concurrent requests for the same key, only one fetch is performed. 32 33 All values are also stored in a weak valued dictionary, even after they 34 have expired from the cache. This allows values that are used elsewhere in 35 Buildbot to "stick" in the cache in case they are needed by another 36 component. Weak references cannot be used for some types, so these types 37 are not compatible with this class. Note that dictionaries can be weakly 38 referenced if they are an instance of a subclass of C{dict}. 39 40 If the result of the C{miss_fn} is C{None}, then the value is not cached; 41 this is intended to avoid caching negative results. 42 43 This is based on Raymond Hettinger's implementation in 44 U{http://code.activestate.com/recipes/498245-lru-and-lfu-cache-decorators/} 45 licensed under the PSF license, which is GPL-compatiblie. 46 47 @ivar hits: cache hits so far 48 @ivar refhits: cache misses found in the weak ref dictionary, so far 49 @ivar misses: cache misses leading to re-fetches, so far 50 @ivar max_size: maximum allowed size of the cache 51 """ 52 53 __slots__ = ('max_size max_queue miss_fn ' 54 'queue cache weakrefs refcount concurrent ' 55 'hits refhits misses'.split()) 56 sentinel = object() 57 QUEUE_SIZE_FACTOR = 10 58
59 - def __init__(self, miss_fn, max_size=50):
60 """ 61 Constructor. 62 63 @param miss_fn: function to call, with key as parameter, for cache 64 misses. This function I{must} return a deferred. 65 66 @param max_size: maximum number of objects in the cache 67 """ 68 self.miss_fn = miss_fn 69 self.max_size = max_size 70 self.max_queue = max_size * self.QUEUE_SIZE_FACTOR 71 self.queue = deque() 72 self.cache = {} 73 self.weakrefs = WeakValueDictionary() 74 self.concurrent = {} 75 self.hits = self.misses = self.refhits = 0 76 self.refcount = defaultdict(lambda : 0)
77
78 - def get(self, key, **miss_fn_kwargs):
79 """ 80 Fetch a value from the cache by key, invoking C{self.miss_fn(key)} if 81 the key is not in the cache. 82 83 Any additional keyword arguments are passed to the C{miss_fn} as 84 keyword arguments; these can supply additional information relating to 85 the key. It is up to the caller to ensure that this information is 86 functionally identical for each key value: if the key is already in the 87 cache, the C{miss_fn} will not be invoked, even if the keyword 88 arguments differ. 89 90 @param key: cache key 91 @param **miss_fn_kwargs: keyword arguments to the miss_fn 92 @returns: value via Deferred 93 """ 94 cache = self.cache 95 weakrefs = self.weakrefs 96 refcount = self.refcount 97 concurrent = self.concurrent 98 queue = self.queue 99 100 # utility function to record recent use of this key 101 def ref_key(): 102 queue.append(key) 103 refcount[key] = refcount[key] + 1 104 105 # periodically compact the queue by eliminating duplicate keys 106 # while preserving order of most recent access. Note that this 107 # is only required when the cache does not exceed its maximum 108 # size 109 if len(queue) > self.max_queue: 110 refcount.clear() 111 queue_appendleft = queue.appendleft 112 queue_appendleft(self.sentinel) 113 for k in ifilterfalse(refcount.__contains__, 114 iter(queue.pop, self.sentinel)): 115 queue_appendleft(k) 116 refcount[k] = 1
117 118 try: 119 result = cache[key] 120 self.hits += 1 121 ref_key() 122 return defer.succeed(result) 123 except KeyError: 124 try: 125 result = weakrefs[key] 126 self.refhits += 1 127 cache[key] = result 128 ref_key() 129 return defer.succeed(result) 130 except KeyError: 131 # if there's already a fetch going on, add 132 # to the list of waiting deferreds 133 conc = concurrent.get(key) 134 if conc: 135 self.hits += 1 136 d = defer.Deferred() 137 conc.append(d) 138 return d 139 140 # if we're here, we've missed and need to fetch 141 self.misses += 1 142 143 # create a list of waiting deferreds for this key 144 d = defer.Deferred() 145 assert key not in concurrent 146 concurrent[key] = [ d ] 147 148 miss_d = self.miss_fn(key, **miss_fn_kwargs) 149 150 def handle_result(result): 151 if result is not None: 152 cache[key] = result 153 weakrefs[key] = result 154 155 # reference the key once, possibly standing in for multiple 156 # concurrent accesses 157 ref_key() 158 159 self._purge() 160 161 # and fire all of the waiting Deferreds 162 dlist = concurrent.pop(key) 163 for d in dlist: 164 d.callback(result)
165 166 def handle_failure(f): 167 # errback all of the waiting Deferreds 168 dlist = concurrent.pop(key) 169 for d in dlist: 170 d.errback(f) 171 172 miss_d.addCallbacks(handle_result, handle_failure) 173 miss_d.addErrback(log.err) 174 175 return d 176
177 - def _purge(self):
178 if len(self.cache) <= self.max_size: 179 return 180 181 cache = self.cache 182 refcount = self.refcount 183 queue = self.queue 184 max_size = self.max_size 185 186 # purge least recently used entries, using refcount to count entries 187 # that appear multiple times in the queue 188 while len(cache) > max_size: 189 refc = 1 190 while refc: 191 k = queue.popleft() 192 refc = refcount[k] = refcount[k] - 1 193 del cache[k] 194 del refcount[k]
195
196 - def inv(self):
197 """Check invariants and log if they are not met; used for debugging""" 198 199 # the keys of the queue and cache should be identical 200 cache_keys = set(self.cache.keys()) 201 queue_keys = set(self.queue) 202 if queue_keys - cache_keys: 203 log.msg("INV: uncached keys in queue:", queue_keys - cache_keys) 204 if cache_keys - queue_keys: 205 log.msg("INV: unqueued keys in cache:", cache_keys - queue_keys) 206 207 # refcount should always represent the number of times each key appears 208 # in the queue 209 exp_refcount = dict() 210 for k in self.queue: 211 exp_refcount[k] = exp_refcount.get(k, 0) + 1 212 if exp_refcount != self.refcount: 213 log.msg("INV: refcounts differ:") 214 log.msg(" expected:", sorted(exp_refcount.items())) 215 log.msg(" got:", sorted(self.refcount.items()))
216 217
218 - def put(self, key, value):
219 """ 220 Update the cache with the given key and value, if the key is already in 221 the cache. This is intended to be used when updated values are 222 available for an existing cached object, and does not record a 223 reference to the key. 224 225 @param key: key to update 226 @param value: new value 227 @returns: nothing 228 """ 229 if key in self.cache: 230 self.cache[key] = value 231 self.weakrefs[key] = value 232 elif key in self.weakrefs: 233 self.weakrefs[key] = value
234
235 - def set_max_size(self, max_size):
236 if self.max_size == max_size: 237 return 238 239 self.max_size = max_size 240 self.max_queue = max_size * self.QUEUE_SIZE_FACTOR 241 self._purge()
242