1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 import os
17 from cStringIO import StringIO
18 from bz2 import BZ2File
19 from gzip import GzipFile
20
21 from zope.interface import implements
22 from twisted.python import log, runtime
23 from twisted.internet import defer, threads, reactor
24 from buildbot.util import netstrings
25 from buildbot.util.eventual import eventually
26 from buildbot import interfaces
27
28 STDOUT = interfaces.LOG_CHANNEL_STDOUT
29 STDERR = interfaces.LOG_CHANNEL_STDERR
30 HEADER = interfaces.LOG_CHANNEL_HEADER
31 ChunkTypes = ["stdout", "stderr", "header"]
32
34 - def __init__(self, chunk_cb, channels=[]):
38
40 channel = int(line[0])
41 if not self.channels or (channel in self.channels):
42 self.chunk_cb((channel, line[1:]))
43
45 """What's the plan?
46
47 the LogFile has just one FD, used for both reading and writing.
48 Each time you add an entry, fd.seek to the end and then write.
49
50 Each reader (i.e. Producer) keeps track of their own offset. The reader
51 starts by seeking to the start of the logfile, and reading forwards.
52 Between each hunk of file they yield chunks, so they must remember their
53 offset before yielding and re-seek back to that offset before reading
54 more data. When their read() returns EOF, they're finished with the first
55 phase of the reading (everything that's already been written to disk).
56
57 After EOF, the remaining data is entirely in the current entries list.
58 These entries are all of the same channel, so we can do one "".join and
59 obtain a single chunk to be sent to the listener. But since that involves
60 a yield, and more data might arrive after we give up control, we have to
61 subscribe them before yielding. We can't subscribe them any earlier,
62 otherwise they'd get data out of order.
63
64 We're using a generator in the first place so that the listener can
65 throttle us, which means they're pulling. But the subscription means
66 we're pushing. Really we're a Producer. In the first phase we can be
67 either a PullProducer or a PushProducer. In the second phase we're only a
68 PushProducer.
69
70 So the client gives a LogFileConsumer to File.subscribeConsumer . This
71 Consumer must have registerProducer(), unregisterProducer(), and
72 writeChunk(), and is just like a regular twisted.interfaces.IConsumer,
73 except that writeChunk() takes chunks (tuples of (channel,text)) instead
74 of the normal write() which takes just text. The LogFileConsumer is
75 allowed to call stopProducing, pauseProducing, and resumeProducing on the
76 producer instance it is given. """
77
78 paused = False
79 subscribed = False
80 BUFFERSIZE = 2048
81
87
122
124
125 self.paused = True
126 self.consumer = None
127 self.done()
128
135
138
140
141
142
143
144
145
146
147
148 eventually(self._resumeProducing)
149
151 self.paused = False
152 if not self.chunkGenerator:
153 return
154 try:
155 while not self.paused:
156 chunk = self.chunkGenerator.next()
157 self.consumer.writeChunk(chunk)
158
159
160 except StopIteration:
161
162 self.chunkGenerator = None
163
164
165
166 - def logChunk(self, build, step, logfile, channel, chunk):
167 if self.consumer:
168 self.consumer.writeChunk((channel, chunk))
169
176
178 """
179 A LogFile keeps all of its contents on disk, in a non-pickle format to
180 which new entries can easily be appended. The file on disk has a name like
181 12-log-compile-output, under the Builder's directory. The actual filename
182 is generated (before the LogFile is created) by
183 L{BuildStatus.generateLogfileName}.
184
185 @ivar length: length of the data in the logfile (sum of chunk sizes; not
186 the length of the on-disk encoding)
187 """
188
189 implements(interfaces.IStatusLog, interfaces.ILogFile)
190
191 finished = False
192 length = 0
193 nonHeaderLength = 0
194 tailLength = 0
195 chunkSize = 10*1000
196 runLength = 0
197
198
199 logMaxTailSize = None
200 maxLengthExceeded = False
201 runEntries = []
202 entries = None
203 BUFFERSIZE = 2048
204 filename = None
205 openfile = None
206
207 - def __init__(self, parent, name, logfilename):
208 """
209 @type parent: L{BuildStepStatus}
210 @param parent: the Step that this log is a part of
211 @type name: string
212 @param name: the name of this log, typically 'output'
213 @type logfilename: string
214 @param logfilename: the Builder-relative pathname for the saved entries
215 """
216 self.step = parent
217 self.master = parent.build.builder.master
218 self.name = name
219 self.filename = logfilename
220 fn = self.getFilename()
221 if os.path.exists(fn):
222
223
224
225
226 log.msg("Warning: Overwriting old serialized Build at %s" % fn)
227 dirname = os.path.dirname(fn)
228 if not os.path.exists(dirname):
229 os.makedirs(dirname)
230 self.openfile = open(fn, "w+")
231 self.runEntries = []
232 self.watchers = []
233 self.finishedWatchers = []
234 self.tailBuffer = []
235
243
244 - def hasContents(self):
245 """
246 Return true if this logfile's contents are available. For a newly
247 created logfile, this is always true, but for a L{LogFile} instance
248 that has been persisted, the logfiles themselves may have been deleted,
249 in which case this method will return False.
250
251 @returns: boolean
252 """
253 return os.path.exists(self.getFilename() + '.bz2') or \
254 os.path.exists(self.getFilename() + '.gz') or \
255 os.path.exists(self.getFilename())
256
258 """
259 Get this logfile's name
260
261 @returns: string
262 """
263 return self.name
264
266 """
267 Get the L{BuildStepStatus} instance containing this logfile
268
269 @returns: L{BuildStepStatus} instance
270 """
271 return self.step
272
274 """
275 Return true if this logfile is finished (that is, if it will not
276 receive any additional data
277
278 @returns: boolean
279 """
280
281 return self.finished
282
284 """
285 Return a Deferred that will fire when this logfile is finished, or will
286 fire immediately if the logfile is already finished.
287 """
288 if self.finished:
289 d = defer.succeed(self)
290 else:
291 d = defer.Deferred()
292 self.finishedWatchers.append(d)
293 return d
294
296 """
297 Get an open file object for this log. The file may also be in use for
298 writing, so it should not be closed by the caller, and the caller
299 should not rely on its file position remaining constant between
300 asynchronous code segments.
301
302 @returns: file object
303 """
304 if self.openfile:
305
306
307 return self.openfile
308
309
310 try:
311 return BZ2File(self.getFilename() + ".bz2", "r")
312 except IOError:
313 pass
314 try:
315 return GzipFile(self.getFilename() + ".gz", "r")
316 except IOError:
317 pass
318 return open(self.getFilename(), "r")
319
321
322 return "".join(self.getChunks([STDOUT, STDERR], onlyText=True))
323
325 return "".join(self.getChunks(onlyText=True))
326
327 - def getChunks(self, channels=[], onlyText=False):
328
329
330
331
332
333
334
335
336
337
338
339
340 f = self.getFile()
341 if not self.finished:
342 offset = 0
343 f.seek(0, 2)
344 remaining = f.tell()
345 else:
346 offset = 0
347 remaining = None
348
349 leftover = None
350 if self.runEntries and (not channels or
351 (self.runEntries[0][0] in channels)):
352 leftover = (self.runEntries[0][0],
353 "".join([c[1] for c in self.runEntries]))
354
355
356
357 return self._generateChunks(f, offset, remaining, leftover,
358 channels, onlyText)
359
360 - def _generateChunks(self, f, offset, remaining, leftover,
361 channels, onlyText):
362 chunks = []
363 p = LogFileScanner(chunks.append, channels)
364 f.seek(offset)
365 if remaining is not None:
366 data = f.read(min(remaining, self.BUFFERSIZE))
367 remaining -= len(data)
368 else:
369 data = f.read(self.BUFFERSIZE)
370
371 offset = f.tell()
372 while data:
373 p.dataReceived(data)
374 while chunks:
375 channel, text = chunks.pop(0)
376 if onlyText:
377 yield text
378 else:
379 yield (channel, text)
380 f.seek(offset)
381 if remaining is not None:
382 data = f.read(min(remaining, self.BUFFERSIZE))
383 remaining -= len(data)
384 else:
385 data = f.read(self.BUFFERSIZE)
386 offset = f.tell()
387 del f
388
389 if leftover:
390 if onlyText:
391 yield leftover[1]
392 else:
393 yield leftover
394
396 """Return an iterator that produces newline-terminated lines,
397 excluding header chunks."""
398 alltext = "".join(self.getChunks([STDOUT], onlyText=True))
399 io = StringIO(alltext)
400 return io.readlines()
401
411
415
419
420
421
441
442 - def addEntry(self, channel, text, _no_watchers=False):
443 """
444 Add an entry to the logfile. The C{channel} is one of L{STDOUT},
445 L{STDERR}, or L{HEADER}. The C{text} is the text to add to the
446 logfile, which can be a unicode string or a bytestring which is
447 presumed to be encoded with utf-8.
448
449 This method cannot be called after the logfile is finished.
450
451 @param channel: channel to add a chunk for
452 @param text: chunk of text
453 @param _no_watchers: private
454 """
455
456 assert not self.finished, "logfile is already finished"
457
458 if isinstance(text, unicode):
459 text = text.encode('utf-8')
460
461
462
463
464 if not _no_watchers:
465 for w in self.watchers:
466 w.logChunk(self.step.build, self.step, self, channel, text)
467
468 if channel != HEADER:
469
470 logMaxSize = self.master.config.logMaxSize
471 logMaxTailSize = self.master.config.logMaxTailSize
472 if logMaxSize:
473 self.nonHeaderLength += len(text)
474 if self.nonHeaderLength > logMaxSize:
475
476
477 if not self.maxLengthExceeded:
478 if self.runEntries and channel != self.runEntries[0][0]:
479 self._merge()
480 i = -(self.nonHeaderLength - logMaxSize)
481 trunc, text = text[:i], text[i:]
482 self.runEntries.append((channel, trunc))
483 self._merge()
484 msg = ("\nOutput exceeded %i bytes, remaining output "
485 "has been truncated\n" % logMaxSize)
486 self.runEntries.append((HEADER, msg))
487 self.maxLengthExceeded = True
488
489
490 if logMaxTailSize and text:
491
492 self.tailBuffer.append((channel, text))
493 self.tailLength += len(text)
494 while self.tailLength > logMaxTailSize:
495
496 c,t = self.tailBuffer.pop(0)
497 n = len(t)
498 self.tailLength -= n
499 assert self.tailLength >= 0
500 return
501
502
503
504 if self.runEntries and channel != self.runEntries[0][0]:
505 self._merge()
506 self.runEntries.append((channel, text))
507 self.runLength += len(text)
508 if self.runLength >= self.chunkSize:
509 self._merge()
510
511 self.length += len(text)
512
514 """
515 Shortcut to add stdout text to the logfile
516
517 @param text: text to add to the logfile
518 """
519 self.addEntry(STDOUT, text)
520
522 """
523 Shortcut to add stderr text to the logfile
524
525 @param text: text to add to the logfile
526 """
527 self.addEntry(STDERR, text)
528
530 """
531 Shortcut to add header text to the logfile
532
533 @param text: text to add to the logfile
534 """
535 self.addEntry(HEADER, text)
536
566
567
569 logCompressionMethod = self.master.config.logCompressionMethod
570
571 if logCompressionMethod == "bz2":
572 compressed = self.getFilename() + ".bz2.tmp"
573 elif logCompressionMethod == "gz":
574 compressed = self.getFilename() + ".gz.tmp"
575 else:
576 return defer.succeed(None)
577
578 def _compressLog():
579 infile = self.getFile()
580 if logCompressionMethod == "bz2":
581 cf = BZ2File(compressed, 'w')
582 elif logCompressionMethod == "gz":
583 cf = GzipFile(compressed, 'w')
584 bufsize = 1024*1024
585 while True:
586 buf = infile.read(bufsize)
587 cf.write(buf)
588 if len(buf) < bufsize:
589 break
590 cf.close()
591 d = threads.deferToThread(_compressLog)
592
593 def _renameCompressedLog(rv):
594 if logCompressionMethod == "bz2":
595 filename = self.getFilename() + '.bz2'
596 else:
597 filename = self.getFilename() + '.gz'
598 if runtime.platformType == 'win32':
599
600
601
602
603 if os.path.exists(filename):
604 os.unlink(filename)
605 os.rename(compressed, filename)
606 _tryremove(self.getFilename(), 1, 5)
607 d.addCallback(_renameCompressedLog)
608
609 def _cleanupFailedCompress(failure):
610 log.msg("failed to compress %s" % self.getFilename())
611 if os.path.exists(compressed):
612 _tryremove(compressed, 1, 5)
613 failure.trap()
614 d.addErrback(_cleanupFailedCompress)
615 return d
616
617
618
620 d = self.__dict__.copy()
621 del d['step']
622 del d['watchers']
623 del d['finishedWatchers']
624 del d['master']
625 d['entries'] = []
626 if d.has_key('finished'):
627 del d['finished']
628 if d.has_key('openfile'):
629 del d['openfile']
630 return d
631
638
640 implements(interfaces.IStatusLog)
641
642 filename = None
643
644 - def __init__(self, parent, name, logfilename, html):
649
654
658 return defer.succeed(self)
659
660 - def hasContents(self):
668
673
676
678 d = self.__dict__.copy()
679 del d['step']
680 if d.has_key('master'):
681 del d['master']
682 return d
683
684
686 """Try to remove a file, and if failed, try again in timeout.
687 Increases the timeout by a factor of 4, and only keeps trying for
688 another retries-amount of times.
689
690 """
691 try:
692 os.unlink(filename)
693 except OSError:
694 if retries > 0:
695 reactor.callLater(timeout, _tryremove, filename, timeout * 4,
696 retries - 1)
697 else:
698 log.msg("giving up on removing %s after over %d seconds" %
699 (filename, timeout))
700