1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 from collections import deque
18 import os
19 import cPickle as pickle
20
21 from zope.interface import implements, Interface
22
23
25 f = open(path, 'rb')
26 try:
27 return f.read()
28 finally:
29 f.close()
30
31
33 f = open(path, 'wb')
34 try:
35 f.write(buf)
36 finally:
37 f.close()
38
39
41 """Abstraction of a queue."""
43 """Adds an individual item to the end of the queue.
44
45 Returns an item if it was overflowed."""
46
48 """Adds a list of items as the oldest entries.
49
50 Normally called in case of failure to process the data, queue the data
51 back so it can be retrieved at a later time.
52
53 Returns a list of items if it was overflowed."""
54
56 """Pop many items at once. Defaults to self.maxItems()."""
57
59 """Save the queue to storage if implemented."""
60
62 """Returns items in the queue.
63
64 Warning: Can be extremely slow for queue on disk."""
65
67 """Returns the number of items in the queue."""
68
70 """Returns the maximum number of items this queue can hold."""
71
72
74 """Simple length bounded queue using deque.
75
76 list.pop(0) operation is O(n) so for a 10000 items list, it can start to
77 be real slow. On the contrary, deque.popleft() is O(1) most of the time.
78 See http://docs.python.org/library/collections.html for more
79 information.
80 """
81 implements(IQueue)
82
84 self._maxItems = maxItems
85 if self._maxItems is None:
86 self._maxItems = 10000
87 self._items = deque()
88
90 ret = None
91 if len(self._items) == self._maxItems:
92 ret = self._items.popleft()
93 self._items.append(item)
94 return ret
95
97 ret = None
98 excess = len(self._items) + len(chunk) - self._maxItems
99 if excess > 0:
100 ret = chunk[0:excess]
101 chunk = chunk[excess:]
102 self._items.extendleft(reversed(chunk))
103 return ret
104
116
119
121 return list(self._items)
122
124 return len(self._items)
125
127 return self._maxItems
128
129
131 """Keeps a list of abstract items and serializes it to the disk.
132
133 Use pickle for serialization."""
134 implements(IQueue)
135
136 - def __init__(self, path, maxItems=None, pickleFn=pickle.dumps,
137 unpickleFn=pickle.loads):
138 """
139 @path: directory to save the items.
140 @maxItems: maximum number of items to keep on disk, flush the
141 older ones.
142 @pickleFn: function used to pack the items to disk.
143 @unpickleFn: function used to unpack items from disk.
144 """
145 self.path = path
146 self._maxItems = maxItems
147 if self._maxItems is None:
148 self._maxItems = 100000
149 if not os.path.isdir(self.path):
150 os.mkdir(self.path)
151 self.pickleFn = pickleFn
152 self.unpickleFn = unpickleFn
153
154
155 self._nbItems = 0
156
157 self.firstItemId = 0
158 self.lastItemId = 0
159 self._loadFromDisk()
160
162 ret = None
163 if self._nbItems == self._maxItems:
164 id = self._findNext(self.firstItemId)
165 path = os.path.join(self.path, str(id))
166 ret = self.unpickleFn(ReadFile(path))
167 os.remove(path)
168 self.firstItemId = id + 1
169 else:
170 self._nbItems += 1
171 self.lastItemId += 1
172 path = os.path.join(self.path, str(self.lastItemId))
173 if os.path.exists(path):
174 raise IOError('%s already exists.' % path)
175 WriteFile(path, self.pickleFn(item))
176 return ret
177
179 ret = None
180 excess = self._nbItems + len(chunk) - self._maxItems
181 if excess > 0:
182 ret = chunk[0:excess]
183 chunk = chunk[excess:]
184 for i in reversed(chunk):
185 self.firstItemId -= 1
186 path = os.path.join(self.path, str(self.firstItemId))
187 if os.path.exists(path):
188 raise IOError('%s already exists.' % path)
189 WriteFile(path, self.pickleFn(i))
190 self._nbItems += 1
191 return ret
192
194 if nbItems is None:
195 nbItems = self._maxItems
196 ret = []
197 for i in range(nbItems):
198 if self._nbItems == 0:
199 break
200 id = self._findNext(self.firstItemId)
201 path = os.path.join(self.path, str(id))
202 ret.append(self.unpickleFn(ReadFile(path)))
203 os.remove(path)
204 self._nbItems -= 1
205 self.firstItemId = id + 1
206 return ret
207
210
212 """Warning, very slow."""
213 ret = []
214 for id in range(self.firstItemId, self.lastItemId + 1):
215 path = os.path.join(self.path, str(id))
216 if os.path.exists(path):
217 ret.append(self.unpickleFn(ReadFile(path)))
218 return ret
219
222
224 return self._maxItems
225
226
227
229 while True:
230 path = os.path.join(self.path, str(id))
231 if os.path.isfile(path):
232 return id
233 id += 1
234 return None
235
237 """Loads the queue from disk upto self.maxMemoryItems items into
238 self.items.
239 """
240 def SafeInt(item):
241 try:
242 return int(item)
243 except ValueError:
244 return None
245
246 files = filter(None, [SafeInt(x) for x in os.listdir(self.path)])
247 files.sort()
248 self._nbItems = len(files)
249 if self._nbItems:
250 self.firstItemId = files[0]
251 self.lastItemId = files[-1]
252
253
255 """Keeps a list of abstract items and serializes it to the disk.
256
257 It has 2 layers of queue, normally an in-memory queue and an on-disk queue.
258 When the number of items in the primary queue gets too large, the new items
259 are automatically saved to the secondary queue. The older items are kept in
260 the primary queue.
261 """
262 implements(IQueue)
263
264 - def __init__(self, primaryQueue=None, secondaryQueue=None, path=None):
265 """
266 @primaryQueue: memory queue to use before buffering to disk.
267 @secondaryQueue: disk queue to use as permanent buffer.
268 @path: path is a shortcut when using default DiskQueue settings.
269 """
270 self.primaryQueue = primaryQueue
271 if self.primaryQueue is None:
272 self.primaryQueue = MemoryQueue()
273 self.secondaryQueue = secondaryQueue
274 if self.secondaryQueue is None:
275 self.secondaryQueue = DiskQueue(path)
276
277
278 if self.secondaryQueue.nbItems() < self.primaryQueue.maxItems():
279 self.primaryQueue.insertBackChunk(
280 self.secondaryQueue.popChunk(self.primaryQueue.maxItems()))
281
283
284
285
286 if (self.secondaryQueue.nbItems() or
287 self.primaryQueue.nbItems() == self.primaryQueue.maxItems()):
288 item = self.secondaryQueue.pushItem(item)
289 if item is None:
290 return item
291
292
293
294 return self.primaryQueue.pushItem(item)
295
297 ret = None
298
299 excess = self.nbItems() + len(chunk) - self.maxItems()
300 if excess > 0:
301 ret = chunk[0:excess]
302 chunk = chunk[excess:]
303
304 excess = (self.primaryQueue.nbItems() + len(chunk) -
305 self.primaryQueue.maxItems())
306 if excess > 0:
307 chunk2 = []
308 for i in range(excess):
309 chunk2.append(self.primaryQueue.items().pop())
310 chunk2.reverse()
311 x = self.primaryQueue.insertBackChunk(chunk)
312 assert x is None, "primaryQueue.insertBackChunk did not return None"
313 if excess > 0:
314 x = self.secondaryQueue.insertBackChunk(chunk2)
315 assert x is None, ("secondaryQueue.insertBackChunk did not return "
316 " None")
317 return ret
318
327
330
332 return self.primaryQueue.items() + self.secondaryQueue.items()
333
335 return self.primaryQueue.nbItems() + self.secondaryQueue.nbItems()
336
339
340
342 """Adds functionality to a IQueue object to track its usage.
343
344 Adds a new member function getIndex() and modify popChunk() and
345 insertBackChunk() to keep a virtual pointer to the queue's first entry
346 index."""
347 implements(IQueue)
348
350
351
352 assert IQueue.providedBy(queue)
353 def Filter(m):
354 return (m[0] != '_' and callable(getattr(queue, m))
355 and not hasattr(self, m))
356 for member in filter(Filter, dir(queue)):
357 setattr(self, member, getattr(queue, member))
358 self.queue = queue
359 self._index = 0
360
363
369
376
377
385
386
387