Package buildbot :: Package status :: Module logfile
[frames] | no frames]

Source Code for Module buildbot.status.logfile

  1  # This file is part of Buildbot.  Buildbot is free software: you can 
  2  # redistribute it and/or modify it under the terms of the GNU General Public 
  3  # License as published by the Free Software Foundation, version 2. 
  4  # 
  5  # This program is distributed in the hope that it will be useful, but WITHOUT 
  6  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 
  8  # details. 
  9  # 
 10  # You should have received a copy of the GNU General Public License along with 
 11  # this program; if not, write to the Free Software Foundation, Inc., 51 
 12  # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 13  # 
 14  # Copyright Buildbot Team Members 
 15   
 16  import os 
 17  from cStringIO import StringIO 
 18  from bz2 import BZ2File 
 19  from gzip import GzipFile 
 20   
 21  from zope.interface import implements 
 22  from twisted.python import log, runtime 
 23  from twisted.internet import defer, threads, reactor 
 24  from buildbot.util import netstrings 
 25  from buildbot.util.eventual import eventually 
 26  from buildbot import interfaces 
 27   
 28  STDOUT = interfaces.LOG_CHANNEL_STDOUT 
 29  STDERR = interfaces.LOG_CHANNEL_STDERR 
 30  HEADER = interfaces.LOG_CHANNEL_HEADER 
 31  ChunkTypes = ["stdout", "stderr", "header"] 
 32   
33 -class LogFileScanner(netstrings.NetstringParser):
34 - def __init__(self, chunk_cb, channels=[]):
35 self.chunk_cb = chunk_cb 36 self.channels = channels 37 netstrings.NetstringParser.__init__(self)
38
39 - def stringReceived(self, line):
40 channel = int(line[0]) 41 if not self.channels or (channel in self.channels): 42 self.chunk_cb((channel, line[1:]))
43
44 -class LogFileProducer:
45 """What's the plan? 46 47 the LogFile has just one FD, used for both reading and writing. 48 Each time you add an entry, fd.seek to the end and then write. 49 50 Each reader (i.e. Producer) keeps track of their own offset. The reader 51 starts by seeking to the start of the logfile, and reading forwards. 52 Between each hunk of file they yield chunks, so they must remember their 53 offset before yielding and re-seek back to that offset before reading 54 more data. When their read() returns EOF, they're finished with the first 55 phase of the reading (everything that's already been written to disk). 56 57 After EOF, the remaining data is entirely in the current entries list. 58 These entries are all of the same channel, so we can do one "".join and 59 obtain a single chunk to be sent to the listener. But since that involves 60 a yield, and more data might arrive after we give up control, we have to 61 subscribe them before yielding. We can't subscribe them any earlier, 62 otherwise they'd get data out of order. 63 64 We're using a generator in the first place so that the listener can 65 throttle us, which means they're pulling. But the subscription means 66 we're pushing. Really we're a Producer. In the first phase we can be 67 either a PullProducer or a PushProducer. In the second phase we're only a 68 PushProducer. 69 70 So the client gives a LogFileConsumer to File.subscribeConsumer . This 71 Consumer must have registerProducer(), unregisterProducer(), and 72 writeChunk(), and is just like a regular twisted.interfaces.IConsumer, 73 except that writeChunk() takes chunks (tuples of (channel,text)) instead 74 of the normal write() which takes just text. The LogFileConsumer is 75 allowed to call stopProducing, pauseProducing, and resumeProducing on the 76 producer instance it is given. """ 77 78 paused = False 79 subscribed = False 80 BUFFERSIZE = 2048 81
82 - def __init__(self, logfile, consumer):
83 self.logfile = logfile 84 self.consumer = consumer 85 self.chunkGenerator = self.getChunks() 86 consumer.registerProducer(self, True)
87
88 - def getChunks(self):
89 f = self.logfile.getFile() 90 offset = 0 91 chunks = [] 92 p = LogFileScanner(chunks.append) 93 f.seek(offset) 94 data = f.read(self.BUFFERSIZE) 95 offset = f.tell() 96 while data: 97 p.dataReceived(data) 98 while chunks: 99 c = chunks.pop(0) 100 yield c 101 f.seek(offset) 102 data = f.read(self.BUFFERSIZE) 103 offset = f.tell() 104 del f 105 106 # now subscribe them to receive new entries 107 self.subscribed = True 108 self.logfile.watchers.append(self) 109 d = self.logfile.waitUntilFinished() 110 111 # then give them the not-yet-merged data 112 if self.logfile.runEntries: 113 channel = self.logfile.runEntries[0][0] 114 text = "".join([c[1] for c in self.logfile.runEntries]) 115 yield (channel, text) 116 117 # now we've caught up to the present. Anything further will come from 118 # the logfile subscription. We add the callback *after* yielding the 119 # data from runEntries, because the logfile might have finished 120 # during the yield. 121 d.addCallback(self.logfileFinished)
122
123 - def stopProducing(self):
124 # TODO: should we still call consumer.finish? probably not. 125 self.paused = True 126 self.consumer = None 127 self.done()
128
129 - def done(self):
130 if self.chunkGenerator: 131 self.chunkGenerator = None # stop making chunks 132 if self.subscribed: 133 self.logfile.watchers.remove(self) 134 self.subscribed = False
135
136 - def pauseProducing(self):
137 self.paused = True
138
139 - def resumeProducing(self):
140 # Twisted-1.3.0 has a bug which causes hangs when resumeProducing 141 # calls transport.write (there is a recursive loop, fixed in 2.0 in 142 # t.i.abstract.FileDescriptor.doWrite by setting the producerPaused 143 # flag *before* calling resumeProducing). To work around this, we 144 # just put off the real resumeProducing for a moment. This probably 145 # has a performance hit, but I'm going to assume that the log files 146 # are not retrieved frequently enough for it to be an issue. 147 148 eventually(self._resumeProducing)
149
150 - def _resumeProducing(self):
151 self.paused = False 152 if not self.chunkGenerator: 153 return 154 try: 155 while not self.paused: 156 chunk = self.chunkGenerator.next() 157 self.consumer.writeChunk(chunk) 158 # we exit this when the consumer says to stop, or we run out 159 # of chunks 160 except StopIteration: 161 # if the generator finished, it will have done releaseFile 162 self.chunkGenerator = None
163 # now everything goes through the subscription, and they don't get to 164 # pause anymore 165
166 - def logChunk(self, build, step, logfile, channel, chunk):
167 if self.consumer: 168 self.consumer.writeChunk((channel, chunk))
169
170 - def logfileFinished(self, logfile):
171 self.done() 172 if self.consumer: 173 self.consumer.unregisterProducer() 174 self.consumer.finish() 175 self.consumer = None
176
177 -class LogFile:
178 """A LogFile keeps all of its contents on disk, in a non-pickle format to 179 which new entries can easily be appended. The file on disk has a name 180 like 12-log-compile-output, under the Builder's directory. The actual 181 filename is generated (before the LogFile is created) by 182 L{BuildStatus.generateLogfileName}. 183 184 Old LogFile pickles (which kept their contents in .entries) must be 185 upgraded. The L{BuilderStatus} is responsible for doing this, when it 186 loads the L{BuildStatus} into memory. The Build pickle is not modified, 187 so users who go from 0.6.5 back to 0.6.4 don't have to lose their 188 logs.""" 189 190 implements(interfaces.IStatusLog, interfaces.ILogFile) 191 192 finished = False 193 length = 0 194 nonHeaderLength = 0 195 tailLength = 0 196 chunkSize = 10*1000 197 runLength = 0 198 # No max size by default 199 logMaxSize = None 200 # Don't keep a tail buffer by default 201 logMaxTailSize = None 202 maxLengthExceeded = False 203 runEntries = [] # provided so old pickled builds will getChunks() ok 204 entries = None 205 BUFFERSIZE = 2048 206 filename = None # relative to the Builder's basedir 207 openfile = None 208 compressMethod = "bz2" 209
210 - def __init__(self, parent, name, logfilename):
211 """ 212 @type parent: L{BuildStepStatus} 213 @param parent: the Step that this log is a part of 214 @type name: string 215 @param name: the name of this log, typically 'output' 216 @type logfilename: string 217 @param logfilename: the Builder-relative pathname for the saved entries 218 """ 219 self.step = parent 220 self.name = name 221 self.filename = logfilename 222 fn = self.getFilename() 223 if os.path.exists(fn): 224 # the buildmaster was probably stopped abruptly, before the 225 # BuilderStatus could be saved, so BuilderStatus.nextBuildNumber 226 # is out of date, and we're overlapping with earlier builds now. 227 # Warn about it, but then overwrite the old pickle file 228 log.msg("Warning: Overwriting old serialized Build at %s" % fn) 229 dirname = os.path.dirname(fn) 230 if not os.path.exists(dirname): 231 os.makedirs(dirname) 232 self.openfile = open(fn, "w+") 233 self.runEntries = [] 234 self.watchers = [] 235 self.finishedWatchers = [] 236 self.tailBuffer = []
237
238 - def getFilename(self):
239 return os.path.join(self.step.build.builder.basedir, self.filename)
240
241 - def hasContents(self):
242 return os.path.exists(self.getFilename() + '.bz2') or \ 243 os.path.exists(self.getFilename() + '.gz') or \ 244 os.path.exists(self.getFilename())
245
246 - def getName(self):
247 return self.name
248
249 - def getStep(self):
250 return self.step
251
252 - def isFinished(self):
253 return self.finished
254 - def waitUntilFinished(self):
255 if self.finished: 256 d = defer.succeed(self) 257 else: 258 d = defer.Deferred() 259 self.finishedWatchers.append(d) 260 return d
261
262 - def getFile(self):
263 if self.openfile: 264 # this is the filehandle we're using to write to the log, so 265 # don't close it! 266 return self.openfile 267 # otherwise they get their own read-only handle 268 # try a compressed log first 269 try: 270 return BZ2File(self.getFilename() + ".bz2", "r") 271 except IOError: 272 pass 273 try: 274 return GzipFile(self.getFilename() + ".gz", "r") 275 except IOError: 276 pass 277 return open(self.getFilename(), "r")
278
279 - def getText(self):
280 # this produces one ginormous string 281 return "".join(self.getChunks([STDOUT, STDERR], onlyText=True))
282
283 - def getTextWithHeaders(self):
284 return "".join(self.getChunks(onlyText=True))
285
286 - def getChunks(self, channels=[], onlyText=False):
287 # generate chunks for everything that was logged at the time we were 288 # first called, so remember how long the file was when we started. 289 # Don't read beyond that point. The current contents of 290 # self.runEntries will follow. 291 292 # this returns an iterator, which means arbitrary things could happen 293 # while we're yielding. This will faithfully deliver the log as it 294 # existed when it was started, and not return anything after that 295 # point. To use this in subscribe(catchup=True) without missing any 296 # data, you must insure that nothing will be added to the log during 297 # yield() calls. 298 299 f = self.getFile() 300 if not self.finished: 301 offset = 0 302 f.seek(0, 2) 303 remaining = f.tell() 304 else: 305 offset = 0 306 remaining = None 307 308 leftover = None 309 if self.runEntries and (not channels or 310 (self.runEntries[0][0] in channels)): 311 leftover = (self.runEntries[0][0], 312 "".join([c[1] for c in self.runEntries])) 313 314 # freeze the state of the LogFile by passing a lot of parameters into 315 # a generator 316 return self._generateChunks(f, offset, remaining, leftover, 317 channels, onlyText)
318
319 - def _generateChunks(self, f, offset, remaining, leftover, 320 channels, onlyText):
321 chunks = [] 322 p = LogFileScanner(chunks.append, channels) 323 f.seek(offset) 324 if remaining is not None: 325 data = f.read(min(remaining, self.BUFFERSIZE)) 326 remaining -= len(data) 327 else: 328 data = f.read(self.BUFFERSIZE) 329 330 offset = f.tell() 331 while data: 332 p.dataReceived(data) 333 while chunks: 334 channel, text = chunks.pop(0) 335 if onlyText: 336 yield text 337 else: 338 yield (channel, text) 339 f.seek(offset) 340 if remaining is not None: 341 data = f.read(min(remaining, self.BUFFERSIZE)) 342 remaining -= len(data) 343 else: 344 data = f.read(self.BUFFERSIZE) 345 offset = f.tell() 346 del f 347 348 if leftover: 349 if onlyText: 350 yield leftover[1] 351 else: 352 yield leftover
353
354 - def readlines(self, channel=STDOUT):
355 """Return an iterator that produces newline-terminated lines, 356 excluding header chunks.""" 357 # TODO: make this memory-efficient, by turning it into a generator 358 # that retrieves chunks as necessary, like a pull-driven version of 359 # twisted.protocols.basic.LineReceiver 360 alltext = "".join(self.getChunks([channel], onlyText=True)) 361 io = StringIO(alltext) 362 return io.readlines()
363
364 - def subscribe(self, receiver, catchup):
365 if self.finished: 366 return 367 self.watchers.append(receiver) 368 if catchup: 369 for channel, text in self.getChunks(): 370 # TODO: add logChunks(), to send over everything at once? 371 receiver.logChunk(self.step.build, self.step, self, 372 channel, text)
373
374 - def unsubscribe(self, receiver):
375 if receiver in self.watchers: 376 self.watchers.remove(receiver)
377
378 - def subscribeConsumer(self, consumer):
379 p = LogFileProducer(self, consumer) 380 p.resumeProducing()
381 382 # interface used by the build steps to add things to the log 383
384 - def merge(self):
385 # merge all .runEntries (which are all of the same type) into a 386 # single chunk for .entries 387 if not self.runEntries: 388 return 389 channel = self.runEntries[0][0] 390 text = "".join([c[1] for c in self.runEntries]) 391 assert channel < 10 392 f = self.openfile 393 f.seek(0, 2) 394 offset = 0 395 while offset < len(text): 396 size = min(len(text)-offset, self.chunkSize) 397 f.write("%d:%d" % (1 + size, channel)) 398 f.write(text[offset:offset+size]) 399 f.write(",") 400 offset += size 401 self.runEntries = [] 402 self.runLength = 0
403
404 - def addEntry(self, channel, text):
405 assert not self.finished 406 407 if isinstance(text, unicode): 408 text = text.encode('utf-8') 409 if channel != HEADER: 410 # Truncate the log if it's more than logMaxSize bytes 411 if self.logMaxSize and self.nonHeaderLength > self.logMaxSize: 412 # Add a message about what's going on 413 if not self.maxLengthExceeded: 414 msg = "\nOutput exceeded %i bytes, remaining output has been truncated\n" % self.logMaxSize 415 self.addEntry(HEADER, msg) 416 self.merge() 417 self.maxLengthExceeded = True 418 419 if self.logMaxTailSize: 420 # Update the tail buffer 421 self.tailBuffer.append((channel, text)) 422 self.tailLength += len(text) 423 while self.tailLength > self.logMaxTailSize: 424 # Drop some stuff off the beginning of the buffer 425 c,t = self.tailBuffer.pop(0) 426 n = len(t) 427 self.tailLength -= n 428 assert self.tailLength >= 0 429 return 430 431 self.nonHeaderLength += len(text) 432 433 # we only add to .runEntries here. merge() is responsible for adding 434 # merged chunks to .entries 435 if self.runEntries and channel != self.runEntries[0][0]: 436 self.merge() 437 self.runEntries.append((channel, text)) 438 self.runLength += len(text) 439 if self.runLength >= self.chunkSize: 440 self.merge() 441 442 for w in self.watchers: 443 w.logChunk(self.step.build, self.step, self, channel, text) 444 self.length += len(text)
445
446 - def addStdout(self, text):
447 self.addEntry(STDOUT, text)
448 - def addStderr(self, text):
449 self.addEntry(STDERR, text)
450 - def addHeader(self, text):
451 self.addEntry(HEADER, text)
452
453 - def finish(self):
454 if self.tailBuffer: 455 msg = "\nFinal %i bytes follow below:\n" % self.tailLength 456 tmp = self.runEntries 457 self.runEntries = [(HEADER, msg)] 458 self.merge() 459 self.runEntries = self.tailBuffer 460 self.merge() 461 self.runEntries = tmp 462 self.merge() 463 self.tailBuffer = [] 464 else: 465 self.merge() 466 467 if self.openfile: 468 # we don't do an explicit close, because there might be readers 469 # shareing the filehandle. As soon as they stop reading, the 470 # filehandle will be released and automatically closed. 471 self.openfile.flush() 472 del self.openfile 473 self.finished = True 474 watchers = self.finishedWatchers 475 self.finishedWatchers = [] 476 for w in watchers: 477 w.callback(self) 478 self.watchers = []
479 480
481 - def compressLog(self):
482 # bail out if there's no compression support 483 if self.compressMethod == "bz2": 484 compressed = self.getFilename() + ".bz2.tmp" 485 elif self.compressMethod == "gz": 486 compressed = self.getFilename() + ".gz.tmp" 487 d = threads.deferToThread(self._compressLog, compressed) 488 d.addCallback(self._renameCompressedLog, compressed) 489 d.addErrback(self._cleanupFailedCompress, compressed) 490 return d
491
492 - def _compressLog(self, compressed):
493 infile = self.getFile() 494 if self.compressMethod == "bz2": 495 cf = BZ2File(compressed, 'w') 496 elif self.compressMethod == "gz": 497 cf = GzipFile(compressed, 'w') 498 bufsize = 1024*1024 499 while True: 500 buf = infile.read(bufsize) 501 cf.write(buf) 502 if len(buf) < bufsize: 503 break 504 cf.close()
505 - def _renameCompressedLog(self, rv, compressed):
506 if self.compressMethod == "bz2": 507 filename = self.getFilename() + '.bz2' 508 else: 509 filename = self.getFilename() + '.gz' 510 if runtime.platformType == 'win32': 511 # windows cannot rename a file on top of an existing one, so 512 # fall back to delete-first. There are ways this can fail and 513 # lose the builder's history, so we avoid using it in the 514 # general (non-windows) case 515 if os.path.exists(filename): 516 os.unlink(filename) 517 os.rename(compressed, filename) 518 _tryremove(self.getFilename(), 1, 5)
519 - def _cleanupFailedCompress(self, failure, compressed):
520 log.msg("failed to compress %s" % self.getFilename()) 521 if os.path.exists(compressed): 522 _tryremove(compressed, 1, 5) 523 failure.trap() # reraise the failure
524 525 # persistence stuff
526 - def __getstate__(self):
527 d = self.__dict__.copy() 528 del d['step'] # filled in upon unpickling 529 del d['watchers'] 530 del d['finishedWatchers'] 531 d['entries'] = [] # let 0.6.4 tolerate the saved log. TODO: really? 532 if d.has_key('finished'): 533 del d['finished'] 534 if d.has_key('openfile'): 535 del d['openfile'] 536 return d
537
538 - def __setstate__(self, d):
539 self.__dict__ = d 540 self.watchers = [] # probably not necessary 541 self.finishedWatchers = [] # same 542 # self.step must be filled in by our parent 543 self.finished = True
544
545 - def upgrade(self, logfilename):
546 """Save our .entries to a new-style offline log file (if necessary), 547 and modify our in-memory representation to use it. The original 548 pickled LogFile (inside the pickled Build) won't be modified.""" 549 self.filename = logfilename 550 if not os.path.exists(self.getFilename()): 551 self.openfile = open(self.getFilename(), "w") 552 self.finished = False 553 for channel,text in self.entries: 554 self.addEntry(channel, text) 555 self.finish() # releases self.openfile, which will be closed 556 del self.entries
557
558 -class HTMLLogFile:
559 implements(interfaces.IStatusLog) 560 561 filename = None 562
563 - def __init__(self, parent, name, logfilename, html):
564 self.step = parent 565 self.name = name 566 self.filename = logfilename 567 self.html = html
568
569 - def getName(self):
570 return self.name # set in BuildStepStatus.addLog
571 - def getStep(self):
572 return self.step
573
574 - def isFinished(self):
575 return True
576 - def waitUntilFinished(self):
577 return defer.succeed(self)
578
579 - def hasContents(self):
580 return True
581 - def getText(self):
582 return self.html # looks kinda like text
583 - def getTextWithHeaders(self):
584 return self.html
585 - def getChunks(self):
586 return [(STDERR, self.html)]
587
588 - def subscribe(self, receiver, catchup):
589 pass
590 - def unsubscribe(self, receiver):
591 pass
592
593 - def finish(self):
594 pass
595
596 - def __getstate__(self):
597 d = self.__dict__.copy() 598 del d['step'] 599 return d
600
601 - def upgrade(self, logfilename):
602 pass
603 604
605 -def _tryremove(filename, timeout, retries):
606 """Try to remove a file, and if failed, try again in timeout. 607 Increases the timeout by a factor of 4, and only keeps trying for 608 another retries-amount of times. 609 610 """ 611 try: 612 os.unlink(filename) 613 except OSError: 614 if retries > 0: 615 reactor.callLater(timeout, _tryremove, filename, timeout * 4, 616 retries - 1) 617 else: 618 log.msg("giving up on removing %s after over %d seconds" % 619 (filename, timeout))
620