1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16  import os 
 17  from cStringIO import StringIO 
 18  from bz2 import BZ2File 
 19  from gzip import GzipFile 
 20   
 21  from zope.interface import implements 
 22  from twisted.python import log, runtime 
 23  from twisted.internet import defer, threads, reactor 
 24  from buildbot.util import netstrings 
 25  from buildbot.util.eventual import eventually 
 26  from buildbot import interfaces 
 27   
 28  STDOUT = interfaces.LOG_CHANNEL_STDOUT 
 29  STDERR = interfaces.LOG_CHANNEL_STDERR 
 30  HEADER = interfaces.LOG_CHANNEL_HEADER 
 31  ChunkTypes = ["stdout", "stderr", "header"] 
 32   
 34 -    def __init__(self, chunk_cb, channels=[]): 
  38   
 40          channel = int(line[0]) 
 41          if not self.channels or (channel in self.channels): 
 42              self.chunk_cb((channel, line[1:])) 
   43   
 45      """What's the plan? 
 46   
 47      the LogFile has just one FD, used for both reading and writing. 
 48      Each time you add an entry, fd.seek to the end and then write. 
 49   
 50      Each reader (i.e. Producer) keeps track of their own offset. The reader 
 51      starts by seeking to the start of the logfile, and reading forwards. 
 52      Between each hunk of file they yield chunks, so they must remember their 
 53      offset before yielding and re-seek back to that offset before reading 
 54      more data. When their read() returns EOF, they're finished with the first 
 55      phase of the reading (everything that's already been written to disk). 
 56   
 57      After EOF, the remaining data is entirely in the current entries list. 
 58      These entries are all of the same channel, so we can do one "".join and 
 59      obtain a single chunk to be sent to the listener. But since that involves 
 60      a yield, and more data might arrive after we give up control, we have to 
 61      subscribe them before yielding. We can't subscribe them any earlier, 
 62      otherwise they'd get data out of order. 
 63   
 64      We're using a generator in the first place so that the listener can 
 65      throttle us, which means they're pulling. But the subscription means 
 66      we're pushing. Really we're a Producer. In the first phase we can be 
 67      either a PullProducer or a PushProducer. In the second phase we're only a 
 68      PushProducer. 
 69   
 70      So the client gives a LogFileConsumer to File.subscribeConsumer . This 
 71      Consumer must have registerProducer(), unregisterProducer(), and 
 72      writeChunk(), and is just like a regular twisted.interfaces.IConsumer, 
 73      except that writeChunk() takes chunks (tuples of (channel,text)) instead 
 74      of the normal write() which takes just text. The LogFileConsumer is 
 75      allowed to call stopProducing, pauseProducing, and resumeProducing on the 
 76      producer instance it is given. """ 
 77   
 78      paused = False 
 79      subscribed = False 
 80      BUFFERSIZE = 2048 
 81   
 87   
122   
124           
125          self.paused = True 
126          self.consumer = None 
127          self.done() 
 128   
135   
138   
140           
141           
142           
143           
144           
145           
146           
147   
148          eventually(self._resumeProducing) 
 149   
151          self.paused = False 
152          if not self.chunkGenerator: 
153              return 
154          try: 
155              while not self.paused: 
156                  chunk = self.chunkGenerator.next() 
157                  self.consumer.writeChunk(chunk) 
158                   
159                   
160          except StopIteration: 
161               
162              self.chunkGenerator = None 
 163           
164           
165   
166 -    def logChunk(self, build, step, logfile, channel, chunk): 
 167          if self.consumer: 
168              self.consumer.writeChunk((channel, chunk)) 
 169   
 176   
178      """A LogFile keeps all of its contents on disk, in a non-pickle format to 
179      which new entries can easily be appended. The file on disk has a name 
180      like 12-log-compile-output, under the Builder's directory. The actual 
181      filename is generated (before the LogFile is created) by 
182      L{BuildStatus.generateLogfileName}. 
183   
184      Old LogFile pickles (which kept their contents in .entries) must be 
185      upgraded. The L{BuilderStatus} is responsible for doing this, when it 
186      loads the L{BuildStatus} into memory. The Build pickle is not modified, 
187      so users who go from 0.6.5 back to 0.6.4 don't have to lose their 
188      logs.""" 
189   
190      implements(interfaces.IStatusLog, interfaces.ILogFile) 
191   
192      finished = False 
193      length = 0 
194      nonHeaderLength = 0 
195      tailLength = 0 
196      chunkSize = 10*1000 
197      runLength = 0 
198       
199      logMaxSize = None 
200       
201      logMaxTailSize = None 
202      maxLengthExceeded = False 
203      runEntries = []  
204      entries = None 
205      BUFFERSIZE = 2048 
206      filename = None  
207      openfile = None 
208      compressMethod = "bz2" 
209   
210 -    def __init__(self, parent, name, logfilename): 
 211          """ 
212          @type  parent: L{BuildStepStatus} 
213          @param parent: the Step that this log is a part of 
214          @type  name: string 
215          @param name: the name of this log, typically 'output' 
216          @type  logfilename: string 
217          @param logfilename: the Builder-relative pathname for the saved entries 
218          """ 
219          self.step = parent 
220          self.name = name 
221          self.filename = logfilename 
222          fn = self.getFilename() 
223          if os.path.exists(fn): 
224               
225               
226               
227               
228              log.msg("Warning: Overwriting old serialized Build at %s" % fn) 
229          dirname = os.path.dirname(fn) 
230          if not os.path.exists(dirname): 
231              os.makedirs(dirname) 
232          self.openfile = open(fn, "w+") 
233          self.runEntries = [] 
234          self.watchers = [] 
235          self.finishedWatchers = [] 
236          self.tailBuffer = [] 
 237   
240   
241 -    def hasContents(self): 
 242          return os.path.exists(self.getFilename() + '.bz2') or \ 
243              os.path.exists(self.getFilename() + '.gz') or \ 
244              os.path.exists(self.getFilename()) 
 245   
248   
251   
261   
263          if self.openfile: 
264               
265               
266              return self.openfile 
267           
268           
269          try: 
270              return BZ2File(self.getFilename() + ".bz2", "r") 
271          except IOError: 
272              pass 
273          try: 
274              return GzipFile(self.getFilename() + ".gz", "r") 
275          except IOError: 
276              pass 
277          return open(self.getFilename(), "r") 
 278   
280           
281          return "".join(self.getChunks([STDOUT, STDERR], onlyText=True)) 
 282   
284          return "".join(self.getChunks(onlyText=True)) 
 285   
286 -    def getChunks(self, channels=[], onlyText=False): 
 287           
288           
289           
290           
291   
292           
293           
294           
295           
296           
297           
298   
299          f = self.getFile() 
300          if not self.finished: 
301              offset = 0 
302              f.seek(0, 2) 
303              remaining = f.tell() 
304          else: 
305              offset = 0 
306              remaining = None 
307   
308          leftover = None 
309          if self.runEntries and (not channels or 
310                                  (self.runEntries[0][0] in channels)): 
311              leftover = (self.runEntries[0][0], 
312                          "".join([c[1] for c in self.runEntries])) 
313   
314           
315           
316          return self._generateChunks(f, offset, remaining, leftover, 
317                                      channels, onlyText) 
 318   
319 -    def _generateChunks(self, f, offset, remaining, leftover, 
320                          channels, onlyText): 
 321          chunks = [] 
322          p = LogFileScanner(chunks.append, channels) 
323          f.seek(offset) 
324          if remaining is not None: 
325              data = f.read(min(remaining, self.BUFFERSIZE)) 
326              remaining -= len(data) 
327          else: 
328              data = f.read(self.BUFFERSIZE) 
329   
330          offset = f.tell() 
331          while data: 
332              p.dataReceived(data) 
333              while chunks: 
334                  channel, text = chunks.pop(0) 
335                  if onlyText: 
336                      yield text 
337                  else: 
338                      yield (channel, text) 
339              f.seek(offset) 
340              if remaining is not None: 
341                  data = f.read(min(remaining, self.BUFFERSIZE)) 
342                  remaining -= len(data) 
343              else: 
344                  data = f.read(self.BUFFERSIZE) 
345              offset = f.tell() 
346          del f 
347   
348          if leftover: 
349              if onlyText: 
350                  yield leftover[1] 
351              else: 
352                  yield leftover 
 353   
355          """Return an iterator that produces newline-terminated lines, 
356          excluding header chunks.""" 
357           
358           
359           
360          alltext = "".join(self.getChunks([channel], onlyText=True)) 
361          io = StringIO(alltext) 
362          return io.readlines() 
 363   
373   
377   
381   
382       
383   
403   
404 -    def addEntry(self, channel, text): 
 405          assert not self.finished 
406   
407          if isinstance(text, unicode): 
408              text = text.encode('utf-8') 
409          if channel != HEADER: 
410               
411              if self.logMaxSize and self.nonHeaderLength > self.logMaxSize: 
412                   
413                  if not self.maxLengthExceeded: 
414                      msg = "\nOutput exceeded %i bytes, remaining output has been truncated\n" % self.logMaxSize 
415                      self.addEntry(HEADER, msg) 
416                      self.merge() 
417                      self.maxLengthExceeded = True 
418   
419                  if self.logMaxTailSize: 
420                       
421                      self.tailBuffer.append((channel, text)) 
422                      self.tailLength += len(text) 
423                      while self.tailLength > self.logMaxTailSize: 
424                           
425                          c,t = self.tailBuffer.pop(0) 
426                          n = len(t) 
427                          self.tailLength -= n 
428                          assert self.tailLength >= 0 
429                  return 
430   
431              self.nonHeaderLength += len(text) 
432   
433           
434           
435          if self.runEntries and channel != self.runEntries[0][0]: 
436              self.merge() 
437          self.runEntries.append((channel, text)) 
438          self.runLength += len(text) 
439          if self.runLength >= self.chunkSize: 
440              self.merge() 
441   
442          for w in self.watchers: 
443              w.logChunk(self.step.build, self.step, self, channel, text) 
444          self.length += len(text) 
 445   
452   
479   
480   
482           
483          if self.compressMethod == "bz2": 
484              compressed = self.getFilename() + ".bz2.tmp" 
485          elif self.compressMethod == "gz": 
486              compressed = self.getFilename() + ".gz.tmp" 
487          d = threads.deferToThread(self._compressLog, compressed) 
488          d.addCallback(self._renameCompressedLog, compressed) 
489          d.addErrback(self._cleanupFailedCompress, compressed) 
490          return d 
 491   
493          infile = self.getFile() 
494          if self.compressMethod == "bz2": 
495              cf = BZ2File(compressed, 'w') 
496          elif self.compressMethod == "gz": 
497              cf = GzipFile(compressed, 'w') 
498          bufsize = 1024*1024 
499          while True: 
500              buf = infile.read(bufsize) 
501              cf.write(buf) 
502              if len(buf) < bufsize: 
503                  break 
504          cf.close() 
 520          log.msg("failed to compress %s" % self.getFilename()) 
521          if os.path.exists(compressed): 
522              _tryremove(compressed, 1, 5) 
523          failure.trap()  
 524   
525       
527          d = self.__dict__.copy() 
528          del d['step']  
529          del d['watchers'] 
530          del d['finishedWatchers'] 
531          d['entries'] = []  
532          if d.has_key('finished'): 
533              del d['finished'] 
534          if d.has_key('openfile'): 
535              del d['openfile'] 
536          return d 
 537   
544   
546          """Save our .entries to a new-style offline log file (if necessary), 
547          and modify our in-memory representation to use it. The original 
548          pickled LogFile (inside the pickled Build) won't be modified.""" 
549          self.filename = logfilename 
550          if not os.path.exists(self.getFilename()): 
551              self.openfile = open(self.getFilename(), "w") 
552              self.finished = False 
553              for channel,text in self.entries: 
554                  self.addEntry(channel, text) 
555              self.finish()  
556          del self.entries 
  557   
559      implements(interfaces.IStatusLog) 
560   
561      filename = None 
562   
563 -    def __init__(self, parent, name, logfilename, html): 
 568   
573   
577          return defer.succeed(self) 
 578   
579 -    def hasContents(self): 
 587   
592   
595   
597          d = self.__dict__.copy() 
598          del d['step'] 
599          return d 
 600   
 603   
604   
606      """Try to remove a file, and if failed, try again in timeout. 
607      Increases the timeout by a factor of 4, and only keeps trying for 
608      another retries-amount of times. 
609   
610      """ 
611      try: 
612          os.unlink(filename) 
613      except OSError: 
614          if retries > 0: 
615              reactor.callLater(timeout, _tryremove, filename, timeout * 4,  
616                                retries - 1) 
617          else: 
618              log.msg("giving up on removing %s after over %d seconds" % 
619                      (filename, timeout)) 
 620