1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17  from collections import deque 
 18  import os 
 19  import cPickle as pickle 
 20   
 21  from zope.interface import implements, Interface 
 22   
 23   
 25      f = open(path, 'rb') 
 26      try: 
 27          return f.read() 
 28      finally: 
 29          f.close() 
  30   
 31   
 33      f = open(path, 'wb') 
 34      try: 
 35          f.write(buf) 
 36      finally: 
 37          f.close() 
  38   
 39   
 41      """Abstraction of a queue.""" 
 43          """Adds an individual item to the end of the queue. 
 44   
 45          Returns an item if it was overflowed.""" 
  46   
 48          """Adds a list of items as the oldest entries. 
 49   
 50          Normally called in case of failure to process the data, queue the data 
 51          back so it can be retrieved at a later time. 
 52   
 53          Returns a list of items if it was overflowed.""" 
  54   
 56          """Pop many items at once. Defaults to self.maxItems().""" 
  57   
 59          """Save the queue to storage if implemented.""" 
  60   
 62          """Returns items in the queue. 
 63   
 64          Warning: Can be extremely slow for queue on disk.""" 
  65   
 67          """Returns the number of items in the queue.""" 
  68   
 70          """Returns the maximum number of items this queue can hold.""" 
   71   
 72   
 74      """Simple length bounded queue using deque. 
 75   
 76      list.pop(0) operation is O(n) so for a 10000 items list, it can start to 
 77      be real slow. On the contrary, deque.popleft() is O(1) most of the time. 
 78      See http://docs.python.org/library/collections.html for more 
 79      information. 
 80      """ 
 81      implements(IQueue) 
 82   
 84          self._maxItems = maxItems 
 85          if self._maxItems is None: 
 86              self._maxItems = 10000 
 87          self._items = deque() 
  88   
 90          ret = None 
 91          if len(self._items) == self._maxItems: 
 92              ret = self._items.popleft() 
 93          self._items.append(item) 
 94          return ret 
  95   
 97          ret = None 
 98          excess = len(self._items) + len(chunk) - self._maxItems 
 99          if excess > 0: 
100              ret = chunk[0:excess] 
101              chunk = chunk[excess:] 
102          self._items.extendleft(reversed(chunk)) 
103          return ret 
 104   
116   
119   
121          return list(self._items) 
 122   
124          return len(self._items) 
 125   
127          return self._maxItems 
  128   
129   
131      """Keeps a list of abstract items and serializes it to the disk. 
132   
133      Use pickle for serialization.""" 
134      implements(IQueue) 
135   
136 -    def __init__(self, path, maxItems=None, pickleFn=pickle.dumps, 
137                   unpickleFn=pickle.loads): 
 138          """ 
139          @path: directory to save the items. 
140          @maxItems: maximum number of items to keep on disk, flush the 
141          older ones. 
142          @pickleFn: function used to pack the items to disk. 
143          @unpickleFn: function used to unpack items from disk. 
144          """ 
145          self.path = path 
146          self._maxItems = maxItems 
147          if self._maxItems is None: 
148              self._maxItems = 100000 
149          if not os.path.isdir(self.path): 
150              os.mkdir(self.path) 
151          self.pickleFn = pickleFn 
152          self.unpickleFn = unpickleFn 
153   
154           
155          self._nbItems = 0 
156           
157          self.firstItemId = 0 
158          self.lastItemId = 0 
159          self._loadFromDisk() 
 160   
162          ret = None 
163          if self._nbItems == self._maxItems: 
164              id = self._findNext(self.firstItemId) 
165              path = os.path.join(self.path, str(id)) 
166              ret = self.unpickleFn(ReadFile(path)) 
167              os.remove(path) 
168              self.firstItemId = id + 1 
169          else: 
170              self._nbItems += 1 
171          self.lastItemId += 1 
172          path = os.path.join(self.path, str(self.lastItemId)) 
173          if os.path.exists(path): 
174              raise IOError('%s already exists.' % path) 
175          WriteFile(path, self.pickleFn(item)) 
176          return ret 
 177   
179          ret = None 
180          excess = self._nbItems + len(chunk) - self._maxItems 
181          if excess > 0: 
182              ret = chunk[0:excess] 
183              chunk = chunk[excess:] 
184          for i in reversed(chunk): 
185              self.firstItemId -= 1 
186              path = os.path.join(self.path, str(self.firstItemId)) 
187              if os.path.exists(path): 
188                  raise IOError('%s already exists.' % path) 
189              WriteFile(path, self.pickleFn(i)) 
190              self._nbItems += 1 
191          return ret 
 192   
194          if nbItems is None: 
195              nbItems = self._maxItems 
196          ret = [] 
197          for i in range(nbItems): 
198              if self._nbItems == 0: 
199                  break 
200              id = self._findNext(self.firstItemId) 
201              path = os.path.join(self.path, str(id)) 
202              ret.append(self.unpickleFn(ReadFile(path))) 
203              os.remove(path) 
204              self._nbItems -= 1 
205              self.firstItemId = id + 1 
206          return ret 
 207   
210   
212          """Warning, very slow.""" 
213          ret = [] 
214          for id in range(self.firstItemId, self.lastItemId + 1): 
215              path = os.path.join(self.path, str(id)) 
216              if os.path.exists(path): 
217                  ret.append(self.unpickleFn(ReadFile(path))) 
218          return ret 
 219   
222   
224          return self._maxItems 
 225   
226       
227   
229          while True: 
230              path = os.path.join(self.path, str(id)) 
231              if os.path.isfile(path): 
232                  return id 
233              id += 1 
234          return None 
 235   
237          """Loads the queue from disk upto self.maxMemoryItems items into 
238          self.items. 
239          """ 
240          def SafeInt(item): 
241              try: 
242                  return int(item) 
243              except ValueError: 
244                  return None 
 245   
246          files = filter(None, [SafeInt(x) for x in os.listdir(self.path)]) 
247          files.sort() 
248          self._nbItems = len(files) 
249          if self._nbItems: 
250              self.firstItemId = files[0] 
251              self.lastItemId = files[-1] 
 252   
253   
255      """Keeps a list of abstract items and serializes it to the disk. 
256   
257      It has 2 layers of queue, normally an in-memory queue and an on-disk queue. 
258      When the number of items in the primary queue gets too large, the new items 
259      are automatically saved to the secondary queue. The older items are kept in 
260      the primary queue. 
261      """ 
262      implements(IQueue) 
263   
264 -    def __init__(self, primaryQueue=None, secondaryQueue=None, path=None): 
 265          """ 
266          @primaryQueue: memory queue to use before buffering to disk. 
267          @secondaryQueue: disk queue to use as permanent buffer. 
268          @path: path is a shortcut when using default DiskQueue settings. 
269          """ 
270          self.primaryQueue = primaryQueue 
271          if self.primaryQueue is None: 
272              self.primaryQueue = MemoryQueue() 
273          self.secondaryQueue = secondaryQueue 
274          if self.secondaryQueue is None: 
275              self.secondaryQueue = DiskQueue(path) 
276           
277           
278          if self.secondaryQueue.nbItems() < self.primaryQueue.maxItems(): 
279              self.primaryQueue.insertBackChunk( 
280                  self.secondaryQueue.popChunk(self.primaryQueue.maxItems())) 
 281   
283           
284           
285           
286          if (self.secondaryQueue.nbItems() or 
287              self.primaryQueue.nbItems() == self.primaryQueue.maxItems()): 
288              item = self.secondaryQueue.pushItem(item) 
289              if item is None: 
290                  return item 
291               
292               
293           
294          return self.primaryQueue.pushItem(item) 
 295   
297          ret = None 
298           
299          excess = self.nbItems() + len(chunk) - self.maxItems() 
300          if excess > 0: 
301              ret = chunk[0:excess] 
302              chunk = chunk[excess:] 
303           
304          excess = (self.primaryQueue.nbItems() + len(chunk) - 
305                    self.primaryQueue.maxItems()) 
306          if excess > 0: 
307              chunk2 = [] 
308              for i in range(excess): 
309                  chunk2.append(self.primaryQueue.items().pop()) 
310              chunk2.reverse() 
311          x = self.primaryQueue.insertBackChunk(chunk) 
312          assert x is None, "primaryQueue.insertBackChunk did not return None" 
313          if excess > 0: 
314              x = self.secondaryQueue.insertBackChunk(chunk2) 
315              assert x is None, ("secondaryQueue.insertBackChunk did not return " 
316                                 " None") 
317          return ret 
 318   
327   
330   
332          return self.primaryQueue.items() + self.secondaryQueue.items() 
 333   
335          return self.primaryQueue.nbItems() + self.secondaryQueue.nbItems() 
 336   
 339   
340   
342      """Adds functionality to a IQueue object to track its usage. 
343   
344      Adds a new member function getIndex() and modify popChunk() and 
345      insertBackChunk() to keep a virtual pointer to the queue's first entry 
346      index.""" 
347      implements(IQueue) 
348   
350           
351           
352          assert IQueue.providedBy(queue) 
353          def Filter(m): 
354              return (m[0] != '_' and callable(getattr(queue, m)) 
355                      and not hasattr(self, m)) 
 356          for member in filter(Filter, dir(queue)): 
357              setattr(self, member, getattr(queue, member)) 
358          self.queue = queue 
359          self._index = 0 
 360   
363   
369   
376   
377   
385   
386   
387