Package buildbot :: Package status :: Module persistent_queue
[frames] | no frames]

Source Code for Module buildbot.status.persistent_queue

  1  # This file is part of Buildbot.  Buildbot is free software: you can 
  2  # redistribute it and/or modify it under the terms of the GNU General Public 
  3  # License as published by the Free Software Foundation, version 2. 
  4  # 
  5  # This program is distributed in the hope that it will be useful, but WITHOUT 
  6  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 
  8  # details. 
  9  # 
 10  # You should have received a copy of the GNU General Public License along with 
 11  # this program; if not, write to the Free Software Foundation, Inc., 51 
 12  # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 13  # 
 14  # Copyright Buildbot Team Members 
 15   
 16  from __future__ import with_statement 
 17   
 18   
 19  from collections import deque 
 20  import os 
 21  import cPickle as pickle 
 22   
 23  from zope.interface import implements, Interface 
 24   
 25   
26 -def ReadFile(path):
27 with open(path, 'rb') as f: 28 return f.read()
29 30
31 -def WriteFile(path, buf):
32 with open(path, 'wb') as f: 33 f.write(buf)
34 35
36 -class IQueue(Interface):
37 """Abstraction of a queue."""
38 - def pushItem(item):
39 """Adds an individual item to the end of the queue. 40 41 Returns an item if it was overflowed."""
42
43 - def insertBackChunk(items):
44 """Adds a list of items as the oldest entries. 45 46 Normally called in case of failure to process the data, queue the data 47 back so it can be retrieved at a later time. 48 49 Returns a list of items if it was overflowed."""
50
51 - def popChunk(nbItems=None):
52 """Pop many items at once. Defaults to self.maxItems()."""
53
54 - def save():
55 """Save the queue to storage if implemented."""
56
57 - def items():
58 """Returns items in the queue. 59 60 Warning: Can be extremely slow for queue on disk."""
61
62 - def nbItems():
63 """Returns the number of items in the queue."""
64
65 - def maxItems():
66 """Returns the maximum number of items this queue can hold."""
67 68
69 -class MemoryQueue(object):
70 """Simple length bounded queue using deque. 71 72 list.pop(0) operation is O(n) so for a 10000 items list, it can start to 73 be real slow. On the contrary, deque.popleft() is O(1) most of the time. 74 See http://docs.python.org/library/collections.html for more 75 information. 76 """ 77 implements(IQueue) 78
79 - def __init__(self, maxItems=None):
80 self._maxItems = maxItems 81 if self._maxItems is None: 82 self._maxItems = 10000 83 self._items = deque()
84
85 - def pushItem(self, item):
86 ret = None 87 if len(self._items) == self._maxItems: 88 ret = self._items.popleft() 89 self._items.append(item) 90 return ret
91
92 - def insertBackChunk(self, chunk):
93 ret = None 94 excess = len(self._items) + len(chunk) - self._maxItems 95 if excess > 0: 96 ret = chunk[0:excess] 97 chunk = chunk[excess:] 98 self._items.extendleft(reversed(chunk)) 99 return ret
100
101 - def popChunk(self, nbItems=None):
102 if nbItems is None: 103 nbItems = self._maxItems 104 if nbItems > len(self._items): 105 items = list(self._items) 106 self._items = deque() 107 else: 108 items = [] 109 for i in range(nbItems): 110 items.append(self._items.popleft()) 111 return items
112
113 - def save(self):
114 pass
115
116 - def items(self):
117 return list(self._items)
118
119 - def nbItems(self):
120 return len(self._items)
121
122 - def maxItems(self):
123 return self._maxItems
124 125
126 -class DiskQueue(object):
127 """Keeps a list of abstract items and serializes it to the disk. 128 129 Use pickle for serialization.""" 130 implements(IQueue) 131
132 - def __init__(self, path, maxItems=None, pickleFn=pickle.dumps, 133 unpickleFn=pickle.loads):
134 """ 135 @path: directory to save the items. 136 @maxItems: maximum number of items to keep on disk, flush the 137 older ones. 138 @pickleFn: function used to pack the items to disk. 139 @unpickleFn: function used to unpack items from disk. 140 """ 141 self.path = path 142 self._maxItems = maxItems 143 if self._maxItems is None: 144 self._maxItems = 100000 145 if not os.path.isdir(self.path): 146 os.mkdir(self.path) 147 self.pickleFn = pickleFn 148 self.unpickleFn = unpickleFn 149 150 # Total number of items. 151 self._nbItems = 0 152 # The actual items id start at one. 153 self.firstItemId = 0 154 self.lastItemId = 0 155 self._loadFromDisk()
156
157 - def pushItem(self, item):
158 ret = None 159 if self._nbItems == self._maxItems: 160 id = self._findNext(self.firstItemId) 161 path = os.path.join(self.path, str(id)) 162 ret = self.unpickleFn(ReadFile(path)) 163 os.remove(path) 164 self.firstItemId = id + 1 165 else: 166 self._nbItems += 1 167 self.lastItemId += 1 168 path = os.path.join(self.path, str(self.lastItemId)) 169 if os.path.exists(path): 170 raise IOError('%s already exists.' % path) 171 WriteFile(path, self.pickleFn(item)) 172 return ret
173
174 - def insertBackChunk(self, chunk):
175 ret = None 176 excess = self._nbItems + len(chunk) - self._maxItems 177 if excess > 0: 178 ret = chunk[0:excess] 179 chunk = chunk[excess:] 180 for i in reversed(chunk): 181 self.firstItemId -= 1 182 path = os.path.join(self.path, str(self.firstItemId)) 183 if os.path.exists(path): 184 raise IOError('%s already exists.' % path) 185 WriteFile(path, self.pickleFn(i)) 186 self._nbItems += 1 187 return ret
188
189 - def popChunk(self, nbItems=None):
190 if nbItems is None: 191 nbItems = self._maxItems 192 ret = [] 193 for i in range(nbItems): 194 if self._nbItems == 0: 195 break 196 id = self._findNext(self.firstItemId) 197 path = os.path.join(self.path, str(id)) 198 ret.append(self.unpickleFn(ReadFile(path))) 199 os.remove(path) 200 self._nbItems -= 1 201 self.firstItemId = id + 1 202 return ret
203
204 - def save(self):
205 pass
206
207 - def items(self):
208 """Warning, very slow.""" 209 ret = [] 210 for id in range(self.firstItemId, self.lastItemId + 1): 211 path = os.path.join(self.path, str(id)) 212 if os.path.exists(path): 213 ret.append(self.unpickleFn(ReadFile(path))) 214 return ret
215
216 - def nbItems(self):
217 return self._nbItems
218
219 - def maxItems(self):
220 return self._maxItems
221 222 #### Protected functions 223
224 - def _findNext(self, id):
225 while True: 226 path = os.path.join(self.path, str(id)) 227 if os.path.isfile(path): 228 return id 229 id += 1 230 return None
231
232 - def _loadFromDisk(self):
233 """Loads the queue from disk upto self.maxMemoryItems items into 234 self.items. 235 """ 236 def SafeInt(item): 237 try: 238 return int(item) 239 except ValueError: 240 return None
241 242 files = filter(None, [SafeInt(x) for x in os.listdir(self.path)]) 243 files.sort() 244 self._nbItems = len(files) 245 if self._nbItems: 246 self.firstItemId = files[0] 247 self.lastItemId = files[-1]
248 249
250 -class PersistentQueue(object):
251 """Keeps a list of abstract items and serializes it to the disk. 252 253 It has 2 layers of queue, normally an in-memory queue and an on-disk queue. 254 When the number of items in the primary queue gets too large, the new items 255 are automatically saved to the secondary queue. The older items are kept in 256 the primary queue. 257 """ 258 implements(IQueue) 259
260 - def __init__(self, primaryQueue=None, secondaryQueue=None, path=None):
261 """ 262 @primaryQueue: memory queue to use before buffering to disk. 263 @secondaryQueue: disk queue to use as permanent buffer. 264 @path: path is a shortcut when using default DiskQueue settings. 265 """ 266 self.primaryQueue = primaryQueue 267 if self.primaryQueue is None: 268 self.primaryQueue = MemoryQueue() 269 self.secondaryQueue = secondaryQueue 270 if self.secondaryQueue is None: 271 self.secondaryQueue = DiskQueue(path) 272 # Preload data from the secondary queue only if we know we won't start 273 # using the secondary queue right away. 274 if self.secondaryQueue.nbItems() < self.primaryQueue.maxItems(): 275 self.primaryQueue.insertBackChunk( 276 self.secondaryQueue.popChunk(self.primaryQueue.maxItems()))
277
278 - def pushItem(self, item):
279 # If there is already items in secondaryQueue, we'd need to pop them 280 # all to start inserting them into primaryQueue so don't bother and 281 # just push it in secondaryQueue. 282 if (self.secondaryQueue.nbItems() or 283 self.primaryQueue.nbItems() == self.primaryQueue.maxItems()): 284 item = self.secondaryQueue.pushItem(item) 285 if item is None: 286 return item 287 # If item is not None, secondaryQueue overflowed. We need to push it 288 # back to primaryQueue so the oldest item is dumped. 289 # Or everything fit in the primaryQueue. 290 return self.primaryQueue.pushItem(item)
291
292 - def insertBackChunk(self, chunk):
293 ret = None 294 # Overall excess 295 excess = self.nbItems() + len(chunk) - self.maxItems() 296 if excess > 0: 297 ret = chunk[0:excess] 298 chunk = chunk[excess:] 299 # Memory excess 300 excess = (self.primaryQueue.nbItems() + len(chunk) - 301 self.primaryQueue.maxItems()) 302 if excess > 0: 303 chunk2 = [] 304 for i in range(excess): 305 chunk2.append(self.primaryQueue.items().pop()) 306 chunk2.reverse() 307 x = self.primaryQueue.insertBackChunk(chunk) 308 assert x is None, "primaryQueue.insertBackChunk did not return None" 309 if excess > 0: 310 x = self.secondaryQueue.insertBackChunk(chunk2) 311 assert x is None, ("secondaryQueue.insertBackChunk did not return " 312 " None") 313 return ret
314
315 - def popChunk(self, nbItems=None):
316 if nbItems is None: 317 nbItems = self.primaryQueue.maxItems() 318 ret = self.primaryQueue.popChunk(nbItems) 319 nbItems -= len(ret) 320 if nbItems and self.secondaryQueue.nbItems(): 321 ret.extend(self.secondaryQueue.popChunk(nbItems)) 322 return ret
323
324 - def save(self):
325 self.secondaryQueue.insertBackChunk(self.primaryQueue.popChunk())
326
327 - def items(self):
328 return self.primaryQueue.items() + self.secondaryQueue.items()
329
330 - def nbItems(self):
331 return self.primaryQueue.nbItems() + self.secondaryQueue.nbItems()
332
333 - def maxItems(self):
334 return self.primaryQueue.maxItems() + self.secondaryQueue.maxItems()
335 336
337 -class IndexedQueue(object):
338 """Adds functionality to a IQueue object to track its usage. 339 340 Adds a new member function getIndex() and modify popChunk() and 341 insertBackChunk() to keep a virtual pointer to the queue's first entry 342 index.""" 343 implements(IQueue) 344
345 - def __init__(self, queue):
346 # Copy all the member functions from the other object that this class 347 # doesn't already define. 348 assert IQueue.providedBy(queue) 349 def Filter(m): 350 return (m[0] != '_' and callable(getattr(queue, m)) 351 and not hasattr(self, m))
352 for member in filter(Filter, dir(queue)): 353 setattr(self, member, getattr(queue, member)) 354 self.queue = queue 355 self._index = 0
356
357 - def getIndex(self):
358 return self._index
359
360 - def popChunk(self, *args, **kwargs):
361 items = self.queue.popChunk(*args, **kwargs) 362 if items: 363 self._index += len(items) 364 return items
365
366 - def insertBackChunk(self, items):
367 self._index -= len(items) 368 ret = self.queue.insertBackChunk(items) 369 if ret: 370 self._index += len(ret) 371 return ret
372 373
374 -def ToIndexedQueue(queue):
375 """If the IQueue wasn't already a IndexedQueue, makes it an IndexedQueue.""" 376 if not IQueue.providedBy(queue): 377 raise TypeError("queue doesn't implement IQueue", queue) 378 if isinstance(queue, IndexedQueue): 379 return queue 380 return IndexedQueue(queue)
381 382 # vim: set ts=4 sts=4 sw=4 et: 383