Package buildbot :: Package util :: Module lru
[frames] | no frames]

Source Code for Module buildbot.util.lru

  1  # This file is part of Buildbot.  Buildbot is free software: you can 
  2  # redistribute it and/or modify it under the terms of the GNU General Public 
  3  # License as published by the Free Software Foundation, version 2. 
  4  # 
  5  # This program is distributed in the hope that it will be useful, but WITHOUT 
  6  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 
  8  # details. 
  9  # 
 10  # You should have received a copy of the GNU General Public License along with 
 11  # this program; if not, write to the Free Software Foundation, Inc., 51 
 12  # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 13  # 
 14  # Copyright Buildbot Team Members 
 15   
 16  from weakref import WeakValueDictionary 
 17  from itertools import ifilterfalse 
 18  from twisted.python import log 
 19  from twisted.internet import defer 
 20  from collections import deque 
 21  from collections import defaultdict 
 22   
 23   
24 -class LRUCache(object):
25 """ 26 A least-recently-used cache, with a fixed maximum size. 27 28 See buildbot manual for more information. 29 """ 30 31 __slots__ = ('max_size max_queue miss_fn queue cache weakrefs ' 32 'refcount hits refhits misses'.split()) 33 sentinel = object() 34 QUEUE_SIZE_FACTOR = 10 35
36 - def __init__(self, miss_fn, max_size=50):
37 self.max_size = max_size 38 self.max_queue = max_size * self.QUEUE_SIZE_FACTOR 39 self.queue = deque() 40 self.cache = {} 41 self.weakrefs = WeakValueDictionary() 42 self.hits = self.misses = self.refhits = 0 43 self.refcount = defaultdict(lambda : 0) 44 self.miss_fn = miss_fn
45
46 - def put(self, key, value):
47 if key in self.cache: 48 self.cache[key] = value 49 self.weakrefs[key] = value 50 elif key in self.weakrefs: 51 self.weakrefs[key] = value
52
53 - def get(self, key, **miss_fn_kwargs):
54 try: 55 return self._get_hit(key) 56 except KeyError: 57 pass 58 59 self.misses += 1 60 61 result = self.miss_fn(key, **miss_fn_kwargs) 62 if result is not None: 63 self.cache[key] = result 64 self.weakrefs[key] = result 65 self._ref_key(key) 66 self._purge() 67 68 return result
69
70 - def keys(self):
71 return self.cache.keys()
72
73 - def set_max_size(self, max_size):
74 if self.max_size == max_size: 75 return 76 77 self.max_size = max_size 78 self.max_queue = max_size * self.QUEUE_SIZE_FACTOR 79 self._purge()
80
81 - def inv(self):
82 global inv_failed 83 84 # the keys of the queue and cache should be identical 85 cache_keys = set(self.cache.keys()) 86 queue_keys = set(self.queue) 87 if queue_keys - cache_keys: 88 log.msg("INV: uncached keys in queue:", queue_keys - cache_keys) 89 inv_failed = True 90 if cache_keys - queue_keys: 91 log.msg("INV: unqueued keys in cache:", cache_keys - queue_keys) 92 inv_failed = True 93 94 # refcount should always represent the number of times each key appears 95 # in the queue 96 exp_refcount = dict() 97 for k in self.queue: 98 exp_refcount[k] = exp_refcount.get(k, 0) + 1 99 if exp_refcount != self.refcount: 100 log.msg("INV: refcounts differ:") 101 log.msg(" expected:", sorted(exp_refcount.items())) 102 log.msg(" got:", sorted(self.refcount.items())) 103 inv_failed = True
104
105 - def _ref_key(self, key):
106 """Record a reference to the argument key.""" 107 queue = self.queue 108 refcount = self.refcount 109 110 queue.append(key) 111 refcount[key] = refcount[key] + 1 112 113 # periodically compact the queue by eliminating duplicate keys 114 # while preserving order of most recent access. Note that this 115 # is only required when the cache does not exceed its maximum 116 # size 117 if len(queue) > self.max_queue: 118 refcount.clear() 119 queue_appendleft = queue.appendleft 120 queue_appendleft(self.sentinel) 121 for k in ifilterfalse(refcount.__contains__, 122 iter(queue.pop, self.sentinel)): 123 queue_appendleft(k) 124 refcount[k] = 1
125
126 - def _get_hit(self, key):
127 """Try to do a value lookup from the existing cache entries.""" 128 try: 129 result = self.cache[key] 130 self.hits += 1 131 self._ref_key(key) 132 return result 133 except KeyError: 134 pass 135 136 result = self.weakrefs[key] 137 self.refhits += 1 138 self.cache[key] = result 139 self._ref_key(key) 140 return result
141
142 - def _purge(self):
143 """ 144 Trim the cache down to max_size by evicting the 145 least-recently-used entries. 146 """ 147 if len(self.cache) <= self.max_size: 148 return 149 150 cache = self.cache 151 refcount = self.refcount 152 queue = self.queue 153 max_size = self.max_size 154 155 # purge least recently used entries, using refcount to count entries 156 # that appear multiple times in the queue 157 while len(cache) > max_size: 158 refc = 1 159 while refc: 160 k = queue.popleft() 161 refc = refcount[k] = refcount[k] - 1 162 del cache[k] 163 del refcount[k]
164 165
166 -class AsyncLRUCache(LRUCache):
167 """ 168 An LRU cache with asynchronous locking to ensure that in the common case of 169 multiple concurrent requests for the same key, only one fetch is performed. 170 """ 171 172 __slots__ = ['concurrent'] 173
174 - def __init__(self, miss_fn, max_size=50):
177
178 - def get(self, key, **miss_fn_kwargs):
179 try: 180 result = self._get_hit(key) 181 return defer.succeed(result) 182 except KeyError: 183 pass 184 185 concurrent = self.concurrent 186 conc = concurrent.get(key) 187 if conc: 188 self.hits += 1 189 d = defer.Deferred() 190 conc.append(d) 191 return d 192 193 # if we're here, we've missed and need to fetch 194 self.misses += 1 195 196 # create a list of waiting deferreds for this key 197 d = defer.Deferred() 198 assert key not in concurrent 199 concurrent[key] = [ d ] 200 201 miss_d = self.miss_fn(key, **miss_fn_kwargs) 202 203 def handle_result(result): 204 if result is not None: 205 self.cache[key] = result 206 self.weakrefs[key] = result 207 208 # reference the key once, possibly standing in for multiple 209 # concurrent accesses 210 self._ref_key(key) 211 212 self._purge() 213 214 # and fire all of the waiting Deferreds 215 dlist = concurrent.pop(key) 216 for d in dlist: 217 d.callback(result)
218 219 def handle_failure(f): 220 # errback all of the waiting Deferreds 221 dlist = concurrent.pop(key) 222 for d in dlist: 223 d.errback(f)
224 225 miss_d.addCallbacks(handle_result, handle_failure) 226 miss_d.addErrback(log.err) 227 228 return d 229 230 231 # for tests 232 inv_failed = False 233