1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 import os
17 from cStringIO import StringIO
18 from bz2 import BZ2File
19 from gzip import GzipFile
20
21 from zope.interface import implements
22 from twisted.python import log, runtime
23 from twisted.internet import defer, threads, reactor
24 from buildbot.util import netstrings
25 from buildbot.util.eventual import eventually
26 from buildbot import interfaces
27
28 STDOUT = interfaces.LOG_CHANNEL_STDOUT
29 STDERR = interfaces.LOG_CHANNEL_STDERR
30 HEADER = interfaces.LOG_CHANNEL_HEADER
31 ChunkTypes = ["stdout", "stderr", "header"]
32
34 - def __init__(self, chunk_cb, channels=[]):
38
40 channel = int(line[0])
41 if not self.channels or (channel in self.channels):
42 self.chunk_cb((channel, line[1:]))
43
45 """What's the plan?
46
47 the LogFile has just one FD, used for both reading and writing.
48 Each time you add an entry, fd.seek to the end and then write.
49
50 Each reader (i.e. Producer) keeps track of their own offset. The reader
51 starts by seeking to the start of the logfile, and reading forwards.
52 Between each hunk of file they yield chunks, so they must remember their
53 offset before yielding and re-seek back to that offset before reading
54 more data. When their read() returns EOF, they're finished with the first
55 phase of the reading (everything that's already been written to disk).
56
57 After EOF, the remaining data is entirely in the current entries list.
58 These entries are all of the same channel, so we can do one "".join and
59 obtain a single chunk to be sent to the listener. But since that involves
60 a yield, and more data might arrive after we give up control, we have to
61 subscribe them before yielding. We can't subscribe them any earlier,
62 otherwise they'd get data out of order.
63
64 We're using a generator in the first place so that the listener can
65 throttle us, which means they're pulling. But the subscription means
66 we're pushing. Really we're a Producer. In the first phase we can be
67 either a PullProducer or a PushProducer. In the second phase we're only a
68 PushProducer.
69
70 So the client gives a LogFileConsumer to File.subscribeConsumer . This
71 Consumer must have registerProducer(), unregisterProducer(), and
72 writeChunk(), and is just like a regular twisted.interfaces.IConsumer,
73 except that writeChunk() takes chunks (tuples of (channel,text)) instead
74 of the normal write() which takes just text. The LogFileConsumer is
75 allowed to call stopProducing, pauseProducing, and resumeProducing on the
76 producer instance it is given. """
77
78 paused = False
79 subscribed = False
80 BUFFERSIZE = 2048
81
87
122
124
125 self.paused = True
126 self.consumer = None
127 self.done()
128
135
138
140
141
142
143
144
145
146
147
148 eventually(self._resumeProducing)
149
151 self.paused = False
152 if not self.chunkGenerator:
153 return
154 try:
155 while not self.paused:
156 chunk = self.chunkGenerator.next()
157 self.consumer.writeChunk(chunk)
158
159
160 except StopIteration:
161
162 self.chunkGenerator = None
163
164
165
166 - def logChunk(self, build, step, logfile, channel, chunk):
167 if self.consumer:
168 self.consumer.writeChunk((channel, chunk))
169
176
178 """
179 A LogFile keeps all of its contents on disk, in a non-pickle format to
180 which new entries can easily be appended. The file on disk has a name like
181 12-log-compile-output, under the Builder's directory. The actual filename
182 is generated (before the LogFile is created) by
183 L{BuildStatus.generateLogfileName}.
184
185 @ivar length: length of the data in the logfile (sum of chunk sizes; not
186 the length of the on-disk encoding)
187 """
188
189 implements(interfaces.IStatusLog, interfaces.ILogFile)
190
191 finished = False
192 length = 0
193 nonHeaderLength = 0
194 tailLength = 0
195 chunkSize = 10*1000
196 runLength = 0
197
198
199 logMaxTailSize = None
200 maxLengthExceeded = False
201 runEntries = []
202 entries = None
203 BUFFERSIZE = 2048
204 filename = None
205 openfile = None
206
207 - def __init__(self, parent, name, logfilename):
208 """
209 @type parent: L{BuildStepStatus}
210 @param parent: the Step that this log is a part of
211 @type name: string
212 @param name: the name of this log, typically 'output'
213 @type logfilename: string
214 @param logfilename: the Builder-relative pathname for the saved entries
215 """
216 self.step = parent
217 self.master = parent.build.builder.master
218 self.name = name
219 self.filename = logfilename
220 fn = self.getFilename()
221 if os.path.exists(fn):
222
223
224
225
226 log.msg("Warning: Overwriting old serialized Build at %s" % fn)
227 dirname = os.path.dirname(fn)
228 if not os.path.exists(dirname):
229 os.makedirs(dirname)
230 self.openfile = open(fn, "w+")
231 self.runEntries = []
232 self.watchers = []
233 self.finishedWatchers = []
234 self.tailBuffer = []
235
243
244 - def hasContents(self):
245 """
246 Return true if this logfile's contents are available. For a newly
247 created logfile, this is always true, but for a L{LogFile} instance
248 that has been persisted, the logfiles themselves may have been deleted,
249 in which case this method will return False.
250
251 @returns: boolean
252 """
253 return os.path.exists(self.getFilename() + '.bz2') or \
254 os.path.exists(self.getFilename() + '.gz') or \
255 os.path.exists(self.getFilename())
256
258 """
259 Get this logfile's name
260
261 @returns: string
262 """
263 return self.name
264
266 """
267 Get the L{BuildStepStatus} instance containing this logfile
268
269 @returns: L{BuildStepStatus} instance
270 """
271 return self.step
272
274 """
275 Return true if this logfile is finished (that is, if it will not
276 receive any additional data
277
278 @returns: boolean
279 """
280
281 return self.finished
282
284 """
285 Return a Deferred that will fire when this logfile is finished, or will
286 fire immediately if the logfile is already finished.
287 """
288 if self.finished:
289 d = defer.succeed(self)
290 else:
291 d = defer.Deferred()
292 self.finishedWatchers.append(d)
293 return d
294
296 """
297 Get an open file object for this log. The file may also be in use for
298 writing, so it should not be closed by the caller, and the caller
299 should not rely on its file position remaining constant between
300 asynchronous code segments.
301
302 @returns: file object
303 """
304 if self.openfile:
305
306
307 return self.openfile
308
309
310 try:
311 return BZ2File(self.getFilename() + ".bz2", "r")
312 except IOError:
313 pass
314 try:
315 return GzipFile(self.getFilename() + ".gz", "r")
316 except IOError:
317 pass
318 return open(self.getFilename(), "r")
319
321
322 return "".join(self.getChunks([STDOUT, STDERR], onlyText=True))
323
325 return "".join(self.getChunks(onlyText=True))
326
327 - def getChunks(self, channels=[], onlyText=False):
328
329
330
331
332
333
334
335
336
337
338
339
340 f = self.getFile()
341 if not self.finished:
342 offset = 0
343 f.seek(0, 2)
344 remaining = f.tell()
345 else:
346 offset = 0
347 remaining = None
348
349 leftover = None
350 if self.runEntries and (not channels or
351 (self.runEntries[0][0] in channels)):
352 leftover = (self.runEntries[0][0],
353 "".join([c[1] for c in self.runEntries]))
354
355
356
357 return self._generateChunks(f, offset, remaining, leftover,
358 channels, onlyText)
359
360 - def _generateChunks(self, f, offset, remaining, leftover,
361 channels, onlyText):
362 chunks = []
363 p = LogFileScanner(chunks.append, channels)
364 f.seek(offset)
365 if remaining is not None:
366 data = f.read(min(remaining, self.BUFFERSIZE))
367 remaining -= len(data)
368 else:
369 data = f.read(self.BUFFERSIZE)
370
371 offset = f.tell()
372 while data:
373 p.dataReceived(data)
374 while chunks:
375 channel, text = chunks.pop(0)
376 if onlyText:
377 yield text
378 else:
379 yield (channel, text)
380 f.seek(offset)
381 if remaining is not None:
382 data = f.read(min(remaining, self.BUFFERSIZE))
383 remaining -= len(data)
384 else:
385 data = f.read(self.BUFFERSIZE)
386 offset = f.tell()
387 del f
388
389 if leftover:
390 if onlyText:
391 yield leftover[1]
392 else:
393 yield leftover
394
396 """Return an iterator that produces newline-terminated lines,
397 excluding header chunks."""
398
399
400
401 alltext = "".join(self.getChunks([channel], onlyText=True))
402 io = StringIO(alltext)
403 return io.readlines()
404
414
418
422
423
424
444
445 - def addEntry(self, channel, text, _no_watchers=False):
446 """
447 Add an entry to the logfile. The C{channel} is one of L{STDOUT},
448 L{STDERR}, or L{HEADER}. The C{text} is the text to add to the
449 logfile, which can be a unicode string or a bytestring which is
450 presumed to be encoded with utf-8.
451
452 This method cannot be called after the logfile is finished.
453
454 @param channel: channel to add a chunk for
455 @param text: chunk of text
456 @param _no_watchers: private
457 """
458
459 assert not self.finished, "logfile is already finished"
460
461 if isinstance(text, unicode):
462 text = text.encode('utf-8')
463
464
465
466
467 if not _no_watchers:
468 for w in self.watchers:
469 w.logChunk(self.step.build, self.step, self, channel, text)
470
471 if channel != HEADER:
472
473 logMaxSize = self.master.config.logMaxSize
474 logMaxTailSize = self.master.config.logMaxTailSize
475 if logMaxSize:
476 self.nonHeaderLength += len(text)
477 if self.nonHeaderLength > logMaxSize:
478
479
480 if not self.maxLengthExceeded:
481 if self.runEntries and channel != self.runEntries[0][0]:
482 self._merge()
483 i = -(self.nonHeaderLength - logMaxSize)
484 trunc, text = text[:i], text[i:]
485 self.runEntries.append((channel, trunc))
486 self._merge()
487 msg = ("\nOutput exceeded %i bytes, remaining output "
488 "has been truncated\n" % logMaxSize)
489 self.runEntries.append((HEADER, msg))
490 self.maxLengthExceeded = True
491
492
493 if logMaxTailSize and text:
494
495 self.tailBuffer.append((channel, text))
496 self.tailLength += len(text)
497 while self.tailLength > logMaxTailSize:
498
499 c,t = self.tailBuffer.pop(0)
500 n = len(t)
501 self.tailLength -= n
502 assert self.tailLength >= 0
503 return
504
505
506
507 if self.runEntries and channel != self.runEntries[0][0]:
508 self._merge()
509 self.runEntries.append((channel, text))
510 self.runLength += len(text)
511 if self.runLength >= self.chunkSize:
512 self._merge()
513
514 self.length += len(text)
515
517 """
518 Shortcut to add stdout text to the logfile
519
520 @param text: text to add to the logfile
521 """
522 self.addEntry(STDOUT, text)
523
525 """
526 Shortcut to add stderr text to the logfile
527
528 @param text: text to add to the logfile
529 """
530 self.addEntry(STDERR, text)
531
533 """
534 Shortcut to add header text to the logfile
535
536 @param text: text to add to the logfile
537 """
538 self.addEntry(HEADER, text)
539
569
570
572 logCompressionMethod = self.master.config.logCompressionMethod
573
574 if logCompressionMethod == "bz2":
575 compressed = self.getFilename() + ".bz2.tmp"
576 elif logCompressionMethod == "gz":
577 compressed = self.getFilename() + ".gz.tmp"
578 else:
579 return defer.succeed(None)
580
581 def _compressLog():
582 infile = self.getFile()
583 if logCompressionMethod == "bz2":
584 cf = BZ2File(compressed, 'w')
585 elif logCompressionMethod == "gz":
586 cf = GzipFile(compressed, 'w')
587 bufsize = 1024*1024
588 while True:
589 buf = infile.read(bufsize)
590 cf.write(buf)
591 if len(buf) < bufsize:
592 break
593 cf.close()
594 d = threads.deferToThread(_compressLog)
595
596 def _renameCompressedLog(rv):
597 if logCompressionMethod == "bz2":
598 filename = self.getFilename() + '.bz2'
599 else:
600 filename = self.getFilename() + '.gz'
601 if runtime.platformType == 'win32':
602
603
604
605
606 if os.path.exists(filename):
607 os.unlink(filename)
608 os.rename(compressed, filename)
609 _tryremove(self.getFilename(), 1, 5)
610 d.addCallback(_renameCompressedLog)
611
612 def _cleanupFailedCompress(failure):
613 log.msg("failed to compress %s" % self.getFilename())
614 if os.path.exists(compressed):
615 _tryremove(compressed, 1, 5)
616 failure.trap()
617 d.addErrback(_cleanupFailedCompress)
618 return d
619
620
621
623 d = self.__dict__.copy()
624 del d['step']
625 del d['watchers']
626 del d['finishedWatchers']
627 del d['master']
628 d['entries'] = []
629 if d.has_key('finished'):
630 del d['finished']
631 if d.has_key('openfile'):
632 del d['openfile']
633 return d
634
641
643 implements(interfaces.IStatusLog)
644
645 filename = None
646
647 - def __init__(self, parent, name, logfilename, html):
652
657
661 return defer.succeed(self)
662
663 - def hasContents(self):
671
676
679
681 d = self.__dict__.copy()
682 del d['step']
683 return d
684
685
687 """Try to remove a file, and if failed, try again in timeout.
688 Increases the timeout by a factor of 4, and only keeps trying for
689 another retries-amount of times.
690
691 """
692 try:
693 os.unlink(filename)
694 except OSError:
695 if retries > 0:
696 reactor.callLater(timeout, _tryremove, filename, timeout * 4,
697 retries - 1)
698 else:
699 log.msg("giving up on removing %s after over %d seconds" %
700 (filename, timeout))
701