Package buildbot :: Package status :: Module logfile
[frames] | no frames]

Source Code for Module buildbot.status.logfile

  1  # This file is part of Buildbot.  Buildbot is free software: you can 
  2  # redistribute it and/or modify it under the terms of the GNU General Public 
  3  # License as published by the Free Software Foundation, version 2. 
  4  # 
  5  # This program is distributed in the hope that it will be useful, but WITHOUT 
  6  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 
  8  # details. 
  9  # 
 10  # You should have received a copy of the GNU General Public License along with 
 11  # this program; if not, write to the Free Software Foundation, Inc., 51 
 12  # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 13  # 
 14  # Copyright Buildbot Team Members 
 15   
 16  import os 
 17  from cStringIO import StringIO 
 18  from bz2 import BZ2File 
 19  from gzip import GzipFile 
 20   
 21  from zope.interface import implements 
 22  from twisted.python import log, runtime 
 23  from twisted.internet import defer, threads, reactor 
 24  from buildbot.util import netstrings 
 25  from buildbot.util.eventual import eventually 
 26  from buildbot import interfaces 
 27   
 28  STDOUT = interfaces.LOG_CHANNEL_STDOUT 
 29  STDERR = interfaces.LOG_CHANNEL_STDERR 
 30  HEADER = interfaces.LOG_CHANNEL_HEADER 
 31  ChunkTypes = ["stdout", "stderr", "header"] 
 32   
33 -class LogFileScanner(netstrings.NetstringParser):
34 - def __init__(self, chunk_cb, channels=[]):
35 self.chunk_cb = chunk_cb 36 self.channels = channels 37 netstrings.NetstringParser.__init__(self)
38
39 - def stringReceived(self, line):
40 channel = int(line[0]) 41 if not self.channels or (channel in self.channels): 42 self.chunk_cb((channel, line[1:]))
43
44 -class LogFileProducer:
45 """What's the plan? 46 47 the LogFile has just one FD, used for both reading and writing. 48 Each time you add an entry, fd.seek to the end and then write. 49 50 Each reader (i.e. Producer) keeps track of their own offset. The reader 51 starts by seeking to the start of the logfile, and reading forwards. 52 Between each hunk of file they yield chunks, so they must remember their 53 offset before yielding and re-seek back to that offset before reading 54 more data. When their read() returns EOF, they're finished with the first 55 phase of the reading (everything that's already been written to disk). 56 57 After EOF, the remaining data is entirely in the current entries list. 58 These entries are all of the same channel, so we can do one "".join and 59 obtain a single chunk to be sent to the listener. But since that involves 60 a yield, and more data might arrive after we give up control, we have to 61 subscribe them before yielding. We can't subscribe them any earlier, 62 otherwise they'd get data out of order. 63 64 We're using a generator in the first place so that the listener can 65 throttle us, which means they're pulling. But the subscription means 66 we're pushing. Really we're a Producer. In the first phase we can be 67 either a PullProducer or a PushProducer. In the second phase we're only a 68 PushProducer. 69 70 So the client gives a LogFileConsumer to File.subscribeConsumer . This 71 Consumer must have registerProducer(), unregisterProducer(), and 72 writeChunk(), and is just like a regular twisted.interfaces.IConsumer, 73 except that writeChunk() takes chunks (tuples of (channel,text)) instead 74 of the normal write() which takes just text. The LogFileConsumer is 75 allowed to call stopProducing, pauseProducing, and resumeProducing on the 76 producer instance it is given. """ 77 78 paused = False 79 subscribed = False 80 BUFFERSIZE = 2048 81
82 - def __init__(self, logfile, consumer):
83 self.logfile = logfile 84 self.consumer = consumer 85 self.chunkGenerator = self.getChunks() 86 consumer.registerProducer(self, True)
87
88 - def getChunks(self):
89 f = self.logfile.getFile() 90 offset = 0 91 chunks = [] 92 p = LogFileScanner(chunks.append) 93 f.seek(offset) 94 data = f.read(self.BUFFERSIZE) 95 offset = f.tell() 96 while data: 97 p.dataReceived(data) 98 while chunks: 99 c = chunks.pop(0) 100 yield c 101 f.seek(offset) 102 data = f.read(self.BUFFERSIZE) 103 offset = f.tell() 104 del f 105 106 # now subscribe them to receive new entries 107 self.subscribed = True 108 self.logfile.watchers.append(self) 109 d = self.logfile.waitUntilFinished() 110 111 # then give them the not-yet-merged data 112 if self.logfile.runEntries: 113 channel = self.logfile.runEntries[0][0] 114 text = "".join([c[1] for c in self.logfile.runEntries]) 115 yield (channel, text) 116 117 # now we've caught up to the present. Anything further will come from 118 # the logfile subscription. We add the callback *after* yielding the 119 # data from runEntries, because the logfile might have finished 120 # during the yield. 121 d.addCallback(self.logfileFinished)
122
123 - def stopProducing(self):
124 # TODO: should we still call consumer.finish? probably not. 125 self.paused = True 126 self.consumer = None 127 self.done()
128
129 - def done(self):
130 if self.chunkGenerator: 131 self.chunkGenerator = None # stop making chunks 132 if self.subscribed: 133 self.logfile.watchers.remove(self) 134 self.subscribed = False
135
136 - def pauseProducing(self):
137 self.paused = True
138
139 - def resumeProducing(self):
140 # Twisted-1.3.0 has a bug which causes hangs when resumeProducing 141 # calls transport.write (there is a recursive loop, fixed in 2.0 in 142 # t.i.abstract.FileDescriptor.doWrite by setting the producerPaused 143 # flag *before* calling resumeProducing). To work around this, we 144 # just put off the real resumeProducing for a moment. This probably 145 # has a performance hit, but I'm going to assume that the log files 146 # are not retrieved frequently enough for it to be an issue. 147 148 eventually(self._resumeProducing)
149
150 - def _resumeProducing(self):
151 self.paused = False 152 if not self.chunkGenerator: 153 return 154 try: 155 while not self.paused: 156 chunk = self.chunkGenerator.next() 157 self.consumer.writeChunk(chunk) 158 # we exit this when the consumer says to stop, or we run out 159 # of chunks 160 except StopIteration: 161 # if the generator finished, it will have done releaseFile 162 self.chunkGenerator = None
163 # now everything goes through the subscription, and they don't get to 164 # pause anymore 165
166 - def logChunk(self, build, step, logfile, channel, chunk):
167 if self.consumer: 168 self.consumer.writeChunk((channel, chunk))
169
170 - def logfileFinished(self, logfile):
171 self.done() 172 if self.consumer: 173 self.consumer.unregisterProducer() 174 self.consumer.finish() 175 self.consumer = None
176
177 -class LogFile:
178 """ 179 A LogFile keeps all of its contents on disk, in a non-pickle format to 180 which new entries can easily be appended. The file on disk has a name like 181 12-log-compile-output, under the Builder's directory. The actual filename 182 is generated (before the LogFile is created) by 183 L{BuildStatus.generateLogfileName}. 184 185 @ivar length: length of the data in the logfile (sum of chunk sizes; not 186 the length of the on-disk encoding) 187 """ 188 189 implements(interfaces.IStatusLog, interfaces.ILogFile) 190 191 finished = False 192 length = 0 193 nonHeaderLength = 0 194 tailLength = 0 195 chunkSize = 10*1000 196 runLength = 0 197 # No max size by default 198 # Don't keep a tail buffer by default 199 logMaxTailSize = None 200 maxLengthExceeded = False 201 runEntries = [] # provided so old pickled builds will getChunks() ok 202 entries = None 203 BUFFERSIZE = 2048 204 filename = None # relative to the Builder's basedir 205 openfile = None 206
207 - def __init__(self, parent, name, logfilename):
208 """ 209 @type parent: L{BuildStepStatus} 210 @param parent: the Step that this log is a part of 211 @type name: string 212 @param name: the name of this log, typically 'output' 213 @type logfilename: string 214 @param logfilename: the Builder-relative pathname for the saved entries 215 """ 216 self.step = parent 217 self.master = parent.build.builder.master 218 self.name = name 219 self.filename = logfilename 220 fn = self.getFilename() 221 if os.path.exists(fn): 222 # the buildmaster was probably stopped abruptly, before the 223 # BuilderStatus could be saved, so BuilderStatus.nextBuildNumber 224 # is out of date, and we're overlapping with earlier builds now. 225 # Warn about it, but then overwrite the old pickle file 226 log.msg("Warning: Overwriting old serialized Build at %s" % fn) 227 dirname = os.path.dirname(fn) 228 if not os.path.exists(dirname): 229 os.makedirs(dirname) 230 self.openfile = open(fn, "w+") 231 self.runEntries = [] 232 self.watchers = [] 233 self.finishedWatchers = [] 234 self.tailBuffer = []
235
236 - def getFilename(self):
237 """ 238 Get the base (uncompressed) filename for this log file. 239 240 @returns: filename 241 """ 242 return os.path.join(self.step.build.builder.basedir, self.filename)
243
244 - def hasContents(self):
245 """ 246 Return true if this logfile's contents are available. For a newly 247 created logfile, this is always true, but for a L{LogFile} instance 248 that has been persisted, the logfiles themselves may have been deleted, 249 in which case this method will return False. 250 251 @returns: boolean 252 """ 253 return os.path.exists(self.getFilename() + '.bz2') or \ 254 os.path.exists(self.getFilename() + '.gz') or \ 255 os.path.exists(self.getFilename())
256
257 - def getName(self):
258 """ 259 Get this logfile's name 260 261 @returns: string 262 """ 263 return self.name
264
265 - def getStep(self):
266 """ 267 Get the L{BuildStepStatus} instance containing this logfile 268 269 @returns: L{BuildStepStatus} instance 270 """ 271 return self.step
272
273 - def isFinished(self):
274 """ 275 Return true if this logfile is finished (that is, if it will not 276 receive any additional data 277 278 @returns: boolean 279 """ 280 281 return self.finished
282
283 - def waitUntilFinished(self):
284 """ 285 Return a Deferred that will fire when this logfile is finished, or will 286 fire immediately if the logfile is already finished. 287 """ 288 if self.finished: 289 d = defer.succeed(self) 290 else: 291 d = defer.Deferred() 292 self.finishedWatchers.append(d) 293 return d
294
295 - def getFile(self):
296 """ 297 Get an open file object for this log. The file may also be in use for 298 writing, so it should not be closed by the caller, and the caller 299 should not rely on its file position remaining constant between 300 asynchronous code segments. 301 302 @returns: file object 303 """ 304 if self.openfile: 305 # this is the filehandle we're using to write to the log, so 306 # don't close it! 307 return self.openfile 308 # otherwise they get their own read-only handle 309 # try a compressed log first 310 try: 311 return BZ2File(self.getFilename() + ".bz2", "r") 312 except IOError: 313 pass 314 try: 315 return GzipFile(self.getFilename() + ".gz", "r") 316 except IOError: 317 pass 318 return open(self.getFilename(), "r")
319
320 - def getText(self):
321 # this produces one ginormous string 322 return "".join(self.getChunks([STDOUT, STDERR], onlyText=True))
323
324 - def getTextWithHeaders(self):
325 return "".join(self.getChunks(onlyText=True))
326
327 - def getChunks(self, channels=[], onlyText=False):
328 # generate chunks for everything that was logged at the time we were 329 # first called, so remember how long the file was when we started. 330 # Don't read beyond that point. The current contents of 331 # self.runEntries will follow. 332 333 # this returns an iterator, which means arbitrary things could happen 334 # while we're yielding. This will faithfully deliver the log as it 335 # existed when it was started, and not return anything after that 336 # point. To use this in subscribe(catchup=True) without missing any 337 # data, you must insure that nothing will be added to the log during 338 # yield() calls. 339 340 f = self.getFile() 341 if not self.finished: 342 offset = 0 343 f.seek(0, 2) 344 remaining = f.tell() 345 else: 346 offset = 0 347 remaining = None 348 349 leftover = None 350 if self.runEntries and (not channels or 351 (self.runEntries[0][0] in channels)): 352 leftover = (self.runEntries[0][0], 353 "".join([c[1] for c in self.runEntries])) 354 355 # freeze the state of the LogFile by passing a lot of parameters into 356 # a generator 357 return self._generateChunks(f, offset, remaining, leftover, 358 channels, onlyText)
359
360 - def _generateChunks(self, f, offset, remaining, leftover, 361 channels, onlyText):
362 chunks = [] 363 p = LogFileScanner(chunks.append, channels) 364 f.seek(offset) 365 if remaining is not None: 366 data = f.read(min(remaining, self.BUFFERSIZE)) 367 remaining -= len(data) 368 else: 369 data = f.read(self.BUFFERSIZE) 370 371 offset = f.tell() 372 while data: 373 p.dataReceived(data) 374 while chunks: 375 channel, text = chunks.pop(0) 376 if onlyText: 377 yield text 378 else: 379 yield (channel, text) 380 f.seek(offset) 381 if remaining is not None: 382 data = f.read(min(remaining, self.BUFFERSIZE)) 383 remaining -= len(data) 384 else: 385 data = f.read(self.BUFFERSIZE) 386 offset = f.tell() 387 del f 388 389 if leftover: 390 if onlyText: 391 yield leftover[1] 392 else: 393 yield leftover
394
395 - def readlines(self, channel=STDOUT):
396 """Return an iterator that produces newline-terminated lines, 397 excluding header chunks.""" 398 # TODO: make this memory-efficient, by turning it into a generator 399 # that retrieves chunks as necessary, like a pull-driven version of 400 # twisted.protocols.basic.LineReceiver 401 alltext = "".join(self.getChunks([channel], onlyText=True)) 402 io = StringIO(alltext) 403 return io.readlines()
404
405 - def subscribe(self, receiver, catchup):
406 if self.finished: 407 return 408 self.watchers.append(receiver) 409 if catchup: 410 for channel, text in self.getChunks(): 411 # TODO: add logChunks(), to send over everything at once? 412 receiver.logChunk(self.step.build, self.step, self, 413 channel, text)
414
415 - def unsubscribe(self, receiver):
416 if receiver in self.watchers: 417 self.watchers.remove(receiver)
418
419 - def subscribeConsumer(self, consumer):
420 p = LogFileProducer(self, consumer) 421 p.resumeProducing()
422 423 # interface used by the build steps to add things to the log 424
425 - def _merge(self):
426 # merge all .runEntries (which are all of the same type) into a 427 # single chunk for .entries 428 if not self.runEntries: 429 return 430 channel = self.runEntries[0][0] 431 text = "".join([c[1] for c in self.runEntries]) 432 assert channel < 10, "channel number must be a single decimal digit" 433 f = self.openfile 434 f.seek(0, 2) 435 offset = 0 436 while offset < len(text): 437 size = min(len(text)-offset, self.chunkSize) 438 f.write("%d:%d" % (1 + size, channel)) 439 f.write(text[offset:offset+size]) 440 f.write(",") 441 offset += size 442 self.runEntries = [] 443 self.runLength = 0
444
445 - def addEntry(self, channel, text, _no_watchers=False):
446 """ 447 Add an entry to the logfile. The C{channel} is one of L{STDOUT}, 448 L{STDERR}, or L{HEADER}. The C{text} is the text to add to the 449 logfile, which can be a unicode string or a bytestring which is 450 presumed to be encoded with utf-8. 451 452 This method cannot be called after the logfile is finished. 453 454 @param channel: channel to add a chunk for 455 @param text: chunk of text 456 @param _no_watchers: private 457 """ 458 459 assert not self.finished, "logfile is already finished" 460 461 if isinstance(text, unicode): 462 text = text.encode('utf-8') 463 464 # notify watchers first, before the chunk gets munged, so that they get 465 # a complete picture of the actual log output 466 # TODO: is this right, or should the watchers get a picture of the chunks? 467 if not _no_watchers: 468 for w in self.watchers: 469 w.logChunk(self.step.build, self.step, self, channel, text) 470 471 if channel != HEADER: 472 # Truncate the log if it's more than logMaxSize bytes 473 logMaxSize = self.master.config.logMaxSize 474 logMaxTailSize = self.master.config.logMaxTailSize 475 if logMaxSize: 476 self.nonHeaderLength += len(text) 477 if self.nonHeaderLength > logMaxSize: 478 # Add a message about what's going on and truncate this 479 # chunk if necessary 480 if not self.maxLengthExceeded: 481 if self.runEntries and channel != self.runEntries[0][0]: 482 self._merge() 483 i = -(self.nonHeaderLength - logMaxSize) 484 trunc, text = text[:i], text[i:] 485 self.runEntries.append((channel, trunc)) 486 self._merge() 487 msg = ("\nOutput exceeded %i bytes, remaining output " 488 "has been truncated\n" % logMaxSize) 489 self.runEntries.append((HEADER, msg)) 490 self.maxLengthExceeded = True 491 492 # and track the tail of the text 493 if logMaxTailSize and text: 494 # Update the tail buffer 495 self.tailBuffer.append((channel, text)) 496 self.tailLength += len(text) 497 while self.tailLength > logMaxTailSize: 498 # Drop some stuff off the beginning of the buffer 499 c,t = self.tailBuffer.pop(0) 500 n = len(t) 501 self.tailLength -= n 502 assert self.tailLength >= 0 503 return 504 505 # we only add to .runEntries here. _merge() is responsible for adding 506 # merged chunks to .entries 507 if self.runEntries and channel != self.runEntries[0][0]: 508 self._merge() 509 self.runEntries.append((channel, text)) 510 self.runLength += len(text) 511 if self.runLength >= self.chunkSize: 512 self._merge() 513 514 self.length += len(text)
515
516 - def addStdout(self, text):
517 """ 518 Shortcut to add stdout text to the logfile 519 520 @param text: text to add to the logfile 521 """ 522 self.addEntry(STDOUT, text)
523
524 - def addStderr(self, text):
525 """ 526 Shortcut to add stderr text to the logfile 527 528 @param text: text to add to the logfile 529 """ 530 self.addEntry(STDERR, text)
531
532 - def addHeader(self, text):
533 """ 534 Shortcut to add header text to the logfile 535 536 @param text: text to add to the logfile 537 """ 538 self.addEntry(HEADER, text)
539
540 - def finish(self):
541 """ 542 Finish the logfile, flushing any buffers and preventing any further 543 writes to the log. 544 """ 545 self._merge() 546 if self.tailBuffer: 547 msg = "\nFinal %i bytes follow below:\n" % self.tailLength 548 tmp = self.runEntries 549 self.runEntries = [(HEADER, msg)] 550 self._merge() 551 self.runEntries = self.tailBuffer 552 self._merge() 553 self.runEntries = tmp 554 self._merge() 555 self.tailBuffer = [] 556 557 if self.openfile: 558 # we don't do an explicit close, because there might be readers 559 # shareing the filehandle. As soon as they stop reading, the 560 # filehandle will be released and automatically closed. 561 self.openfile.flush() 562 self.openfile = None 563 self.finished = True 564 watchers = self.finishedWatchers 565 self.finishedWatchers = [] 566 for w in watchers: 567 w.callback(self) 568 self.watchers = []
569 570
571 - def compressLog(self):
572 logCompressionMethod = self.master.config.logCompressionMethod 573 # bail out if there's no compression support 574 if logCompressionMethod == "bz2": 575 compressed = self.getFilename() + ".bz2.tmp" 576 elif logCompressionMethod == "gz": 577 compressed = self.getFilename() + ".gz.tmp" 578 else: 579 return defer.succeed(None) 580 581 def _compressLog(): 582 infile = self.getFile() 583 if logCompressionMethod == "bz2": 584 cf = BZ2File(compressed, 'w') 585 elif logCompressionMethod == "gz": 586 cf = GzipFile(compressed, 'w') 587 bufsize = 1024*1024 588 while True: 589 buf = infile.read(bufsize) 590 cf.write(buf) 591 if len(buf) < bufsize: 592 break 593 cf.close()
594 d = threads.deferToThread(_compressLog) 595 596 def _renameCompressedLog(rv): 597 if logCompressionMethod == "bz2": 598 filename = self.getFilename() + '.bz2' 599 else: 600 filename = self.getFilename() + '.gz' 601 if runtime.platformType == 'win32': 602 # windows cannot rename a file on top of an existing one, so 603 # fall back to delete-first. There are ways this can fail and 604 # lose the builder's history, so we avoid using it in the 605 # general (non-windows) case 606 if os.path.exists(filename): 607 os.unlink(filename) 608 os.rename(compressed, filename) 609 _tryremove(self.getFilename(), 1, 5)
610 d.addCallback(_renameCompressedLog) 611 612 def _cleanupFailedCompress(failure): 613 log.msg("failed to compress %s" % self.getFilename()) 614 if os.path.exists(compressed): 615 _tryremove(compressed, 1, 5) 616 failure.trap() # reraise the failure 617 d.addErrback(_cleanupFailedCompress) 618 return d 619 620 621 # persistence stuff
622 - def __getstate__(self):
623 d = self.__dict__.copy() 624 del d['step'] # filled in upon unpickling 625 del d['watchers'] 626 del d['finishedWatchers'] 627 del d['master'] 628 d['entries'] = [] # let 0.6.4 tolerate the saved log. TODO: really? 629 if d.has_key('finished'): 630 del d['finished'] 631 if d.has_key('openfile'): 632 del d['openfile'] 633 return d
634
635 - def __setstate__(self, d):
636 self.__dict__ = d 637 self.watchers = [] # probably not necessary 638 self.finishedWatchers = [] # same 639 # self.step must be filled in by our parent 640 self.finished = True
641
642 -class HTMLLogFile:
643 implements(interfaces.IStatusLog) 644 645 filename = None 646
647 - def __init__(self, parent, name, logfilename, html):
648 self.step = parent 649 self.name = name 650 self.filename = logfilename 651 self.html = html
652
653 - def getName(self):
654 return self.name # set in BuildStepStatus.addLog
655 - def getStep(self):
656 return self.step
657
658 - def isFinished(self):
659 return True
660 - def waitUntilFinished(self):
661 return defer.succeed(self)
662
663 - def hasContents(self):
664 return True
665 - def getText(self):
666 return self.html # looks kinda like text
667 - def getTextWithHeaders(self):
668 return self.html
669 - def getChunks(self):
670 return [(STDERR, self.html)]
671
672 - def subscribe(self, receiver, catchup):
673 pass
674 - def unsubscribe(self, receiver):
675 pass
676
677 - def finish(self):
678 pass
679
680 - def __getstate__(self):
681 d = self.__dict__.copy() 682 del d['step'] 683 return d
684 685
686 -def _tryremove(filename, timeout, retries):
687 """Try to remove a file, and if failed, try again in timeout. 688 Increases the timeout by a factor of 4, and only keeps trying for 689 another retries-amount of times. 690 691 """ 692 try: 693 os.unlink(filename) 694 except OSError: 695 if retries > 0: 696 reactor.callLater(timeout, _tryremove, filename, timeout * 4, 697 retries - 1) 698 else: 699 log.msg("giving up on removing %s after over %d seconds" % 700 (filename, timeout))
701