1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 from __future__ import with_statement
17
18
19 from collections import deque
20 import os
21 import cPickle as pickle
22
23 from zope.interface import implements, Interface
24
25
27 with open(path, 'rb') as f:
28 return f.read()
29
30
32 with open(path, 'wb') as f:
33 f.write(buf)
34
35
37 """Abstraction of a queue."""
39 """Adds an individual item to the end of the queue.
40
41 Returns an item if it was overflowed."""
42
44 """Adds a list of items as the oldest entries.
45
46 Normally called in case of failure to process the data, queue the data
47 back so it can be retrieved at a later time.
48
49 Returns a list of items if it was overflowed."""
50
52 """Pop many items at once. Defaults to self.maxItems()."""
53
55 """Save the queue to storage if implemented."""
56
58 """Returns items in the queue.
59
60 Warning: Can be extremely slow for queue on disk."""
61
63 """Returns the number of items in the queue."""
64
66 """Returns the maximum number of items this queue can hold."""
67
68
70 """Simple length bounded queue using deque.
71
72 list.pop(0) operation is O(n) so for a 10000 items list, it can start to
73 be real slow. On the contrary, deque.popleft() is O(1) most of the time.
74 See http://docs.python.org/library/collections.html for more
75 information.
76 """
77 implements(IQueue)
78
80 self._maxItems = maxItems
81 if self._maxItems is None:
82 self._maxItems = 10000
83 self._items = deque()
84
86 ret = None
87 if len(self._items) == self._maxItems:
88 ret = self._items.popleft()
89 self._items.append(item)
90 return ret
91
93 ret = None
94 excess = len(self._items) + len(chunk) - self._maxItems
95 if excess > 0:
96 ret = chunk[0:excess]
97 chunk = chunk[excess:]
98 self._items.extendleft(reversed(chunk))
99 return ret
100
112
115
117 return list(self._items)
118
120 return len(self._items)
121
123 return self._maxItems
124
125
127 """Keeps a list of abstract items and serializes it to the disk.
128
129 Use pickle for serialization."""
130 implements(IQueue)
131
132 - def __init__(self, path, maxItems=None, pickleFn=pickle.dumps,
133 unpickleFn=pickle.loads):
134 """
135 @path: directory to save the items.
136 @maxItems: maximum number of items to keep on disk, flush the
137 older ones.
138 @pickleFn: function used to pack the items to disk.
139 @unpickleFn: function used to unpack items from disk.
140 """
141 self.path = path
142 self._maxItems = maxItems
143 if self._maxItems is None:
144 self._maxItems = 100000
145 if not os.path.isdir(self.path):
146 os.mkdir(self.path)
147 self.pickleFn = pickleFn
148 self.unpickleFn = unpickleFn
149
150
151 self._nbItems = 0
152
153 self.firstItemId = 0
154 self.lastItemId = 0
155 self._loadFromDisk()
156
158 ret = None
159 if self._nbItems == self._maxItems:
160 id = self._findNext(self.firstItemId)
161 path = os.path.join(self.path, str(id))
162 ret = self.unpickleFn(ReadFile(path))
163 os.remove(path)
164 self.firstItemId = id + 1
165 else:
166 self._nbItems += 1
167 self.lastItemId += 1
168 path = os.path.join(self.path, str(self.lastItemId))
169 if os.path.exists(path):
170 raise IOError('%s already exists.' % path)
171 WriteFile(path, self.pickleFn(item))
172 return ret
173
175 ret = None
176 excess = self._nbItems + len(chunk) - self._maxItems
177 if excess > 0:
178 ret = chunk[0:excess]
179 chunk = chunk[excess:]
180 for i in reversed(chunk):
181 self.firstItemId -= 1
182 path = os.path.join(self.path, str(self.firstItemId))
183 if os.path.exists(path):
184 raise IOError('%s already exists.' % path)
185 WriteFile(path, self.pickleFn(i))
186 self._nbItems += 1
187 return ret
188
190 if nbItems is None:
191 nbItems = self._maxItems
192 ret = []
193 for i in range(nbItems):
194 if self._nbItems == 0:
195 break
196 id = self._findNext(self.firstItemId)
197 path = os.path.join(self.path, str(id))
198 ret.append(self.unpickleFn(ReadFile(path)))
199 os.remove(path)
200 self._nbItems -= 1
201 self.firstItemId = id + 1
202 return ret
203
206
208 """Warning, very slow."""
209 ret = []
210 for id in range(self.firstItemId, self.lastItemId + 1):
211 path = os.path.join(self.path, str(id))
212 if os.path.exists(path):
213 ret.append(self.unpickleFn(ReadFile(path)))
214 return ret
215
218
220 return self._maxItems
221
222
223
225 while True:
226 path = os.path.join(self.path, str(id))
227 if os.path.isfile(path):
228 return id
229 id += 1
230 return None
231
233 """Loads the queue from disk upto self.maxMemoryItems items into
234 self.items.
235 """
236 def SafeInt(item):
237 try:
238 return int(item)
239 except ValueError:
240 return None
241
242 files = filter(None, [SafeInt(x) for x in os.listdir(self.path)])
243 files.sort()
244 self._nbItems = len(files)
245 if self._nbItems:
246 self.firstItemId = files[0]
247 self.lastItemId = files[-1]
248
249
251 """Keeps a list of abstract items and serializes it to the disk.
252
253 It has 2 layers of queue, normally an in-memory queue and an on-disk queue.
254 When the number of items in the primary queue gets too large, the new items
255 are automatically saved to the secondary queue. The older items are kept in
256 the primary queue.
257 """
258 implements(IQueue)
259
260 - def __init__(self, primaryQueue=None, secondaryQueue=None, path=None):
261 """
262 @primaryQueue: memory queue to use before buffering to disk.
263 @secondaryQueue: disk queue to use as permanent buffer.
264 @path: path is a shortcut when using default DiskQueue settings.
265 """
266 self.primaryQueue = primaryQueue
267 if self.primaryQueue is None:
268 self.primaryQueue = MemoryQueue()
269 self.secondaryQueue = secondaryQueue
270 if self.secondaryQueue is None:
271 self.secondaryQueue = DiskQueue(path)
272
273
274 if self.secondaryQueue.nbItems() < self.primaryQueue.maxItems():
275 self.primaryQueue.insertBackChunk(
276 self.secondaryQueue.popChunk(self.primaryQueue.maxItems()))
277
279
280
281
282 if (self.secondaryQueue.nbItems() or
283 self.primaryQueue.nbItems() == self.primaryQueue.maxItems()):
284 item = self.secondaryQueue.pushItem(item)
285 if item is None:
286 return item
287
288
289
290 return self.primaryQueue.pushItem(item)
291
293 ret = None
294
295 excess = self.nbItems() + len(chunk) - self.maxItems()
296 if excess > 0:
297 ret = chunk[0:excess]
298 chunk = chunk[excess:]
299
300 excess = (self.primaryQueue.nbItems() + len(chunk) -
301 self.primaryQueue.maxItems())
302 if excess > 0:
303 chunk2 = []
304 for i in range(excess):
305 chunk2.append(self.primaryQueue.items().pop())
306 chunk2.reverse()
307 x = self.primaryQueue.insertBackChunk(chunk)
308 assert x is None, "primaryQueue.insertBackChunk did not return None"
309 if excess > 0:
310 x = self.secondaryQueue.insertBackChunk(chunk2)
311 assert x is None, ("secondaryQueue.insertBackChunk did not return "
312 " None")
313 return ret
314
323
326
328 return self.primaryQueue.items() + self.secondaryQueue.items()
329
331 return self.primaryQueue.nbItems() + self.secondaryQueue.nbItems()
332
335
336
338 """Adds functionality to a IQueue object to track its usage.
339
340 Adds a new member function getIndex() and modify popChunk() and
341 insertBackChunk() to keep a virtual pointer to the queue's first entry
342 index."""
343 implements(IQueue)
344
346
347
348 assert IQueue.providedBy(queue)
349 def Filter(m):
350 return (m[0] != '_' and callable(getattr(queue, m))
351 and not hasattr(self, m))
352 for member in filter(Filter, dir(queue)):
353 setattr(self, member, getattr(queue, member))
354 self.queue = queue
355 self._index = 0
356
359
365
372
373
381
382
383