1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 import os
17 from cStringIO import StringIO
18 from bz2 import BZ2File
19 from gzip import GzipFile
20
21 from zope.interface import implements
22 from twisted.python import log, runtime
23 from twisted.internet import defer, threads, reactor
24 from buildbot.util import netstrings
25 from buildbot.util.eventual import eventually
26 from buildbot import interfaces
27
28 STDOUT = interfaces.LOG_CHANNEL_STDOUT
29 STDERR = interfaces.LOG_CHANNEL_STDERR
30 HEADER = interfaces.LOG_CHANNEL_HEADER
31 ChunkTypes = ["stdout", "stderr", "header"]
32
34 - def __init__(self, chunk_cb, channels=[]):
38
40 channel = int(line[0])
41 if not self.channels or (channel in self.channels):
42 self.chunk_cb((channel, line[1:]))
43
45 """What's the plan?
46
47 the LogFile has just one FD, used for both reading and writing.
48 Each time you add an entry, fd.seek to the end and then write.
49
50 Each reader (i.e. Producer) keeps track of their own offset. The reader
51 starts by seeking to the start of the logfile, and reading forwards.
52 Between each hunk of file they yield chunks, so they must remember their
53 offset before yielding and re-seek back to that offset before reading
54 more data. When their read() returns EOF, they're finished with the first
55 phase of the reading (everything that's already been written to disk).
56
57 After EOF, the remaining data is entirely in the current entries list.
58 These entries are all of the same channel, so we can do one "".join and
59 obtain a single chunk to be sent to the listener. But since that involves
60 a yield, and more data might arrive after we give up control, we have to
61 subscribe them before yielding. We can't subscribe them any earlier,
62 otherwise they'd get data out of order.
63
64 We're using a generator in the first place so that the listener can
65 throttle us, which means they're pulling. But the subscription means
66 we're pushing. Really we're a Producer. In the first phase we can be
67 either a PullProducer or a PushProducer. In the second phase we're only a
68 PushProducer.
69
70 So the client gives a LogFileConsumer to File.subscribeConsumer . This
71 Consumer must have registerProducer(), unregisterProducer(), and
72 writeChunk(), and is just like a regular twisted.interfaces.IConsumer,
73 except that writeChunk() takes chunks (tuples of (channel,text)) instead
74 of the normal write() which takes just text. The LogFileConsumer is
75 allowed to call stopProducing, pauseProducing, and resumeProducing on the
76 producer instance it is given. """
77
78 paused = False
79 subscribed = False
80 BUFFERSIZE = 2048
81
87
122
124
125 self.paused = True
126 self.consumer = None
127 self.done()
128
135
138
140
141
142
143
144
145
146
147
148 eventually(self._resumeProducing)
149
151 self.paused = False
152 if not self.chunkGenerator:
153 return
154 try:
155 while not self.paused:
156 chunk = self.chunkGenerator.next()
157 self.consumer.writeChunk(chunk)
158
159
160 except StopIteration:
161
162 self.chunkGenerator = None
163
164
165
166 - def logChunk(self, build, step, logfile, channel, chunk):
167 if self.consumer:
168 self.consumer.writeChunk((channel, chunk))
169
176
178 """A LogFile keeps all of its contents on disk, in a non-pickle format to
179 which new entries can easily be appended. The file on disk has a name
180 like 12-log-compile-output, under the Builder's directory. The actual
181 filename is generated (before the LogFile is created) by
182 L{BuildStatus.generateLogfileName}.
183
184 Old LogFile pickles (which kept their contents in .entries) must be
185 upgraded. The L{BuilderStatus} is responsible for doing this, when it
186 loads the L{BuildStatus} into memory. The Build pickle is not modified,
187 so users who go from 0.6.5 back to 0.6.4 don't have to lose their
188 logs."""
189
190 implements(interfaces.IStatusLog, interfaces.ILogFile)
191
192 finished = False
193 length = 0
194 nonHeaderLength = 0
195 tailLength = 0
196 chunkSize = 10*1000
197 runLength = 0
198
199 logMaxSize = None
200
201 logMaxTailSize = None
202 maxLengthExceeded = False
203 runEntries = []
204 entries = None
205 BUFFERSIZE = 2048
206 filename = None
207 openfile = None
208 compressMethod = "bz2"
209
210 - def __init__(self, parent, name, logfilename):
211 """
212 @type parent: L{BuildStepStatus}
213 @param parent: the Step that this log is a part of
214 @type name: string
215 @param name: the name of this log, typically 'output'
216 @type logfilename: string
217 @param logfilename: the Builder-relative pathname for the saved entries
218 """
219 self.step = parent
220 self.name = name
221 self.filename = logfilename
222 fn = self.getFilename()
223 if os.path.exists(fn):
224
225
226
227
228 log.msg("Warning: Overwriting old serialized Build at %s" % fn)
229 dirname = os.path.dirname(fn)
230 if not os.path.exists(dirname):
231 os.makedirs(dirname)
232 self.openfile = open(fn, "w+")
233 self.runEntries = []
234 self.watchers = []
235 self.finishedWatchers = []
236 self.tailBuffer = []
237
240
241 - def hasContents(self):
242 return os.path.exists(self.getFilename() + '.bz2') or \
243 os.path.exists(self.getFilename() + '.gz') or \
244 os.path.exists(self.getFilename())
245
248
251
261
263 if self.openfile:
264
265
266 return self.openfile
267
268
269 try:
270 return BZ2File(self.getFilename() + ".bz2", "r")
271 except IOError:
272 pass
273 try:
274 return GzipFile(self.getFilename() + ".gz", "r")
275 except IOError:
276 pass
277 return open(self.getFilename(), "r")
278
280
281 return "".join(self.getChunks([STDOUT, STDERR], onlyText=True))
282
284 return "".join(self.getChunks(onlyText=True))
285
286 - def getChunks(self, channels=[], onlyText=False):
287
288
289
290
291
292
293
294
295
296
297
298
299 f = self.getFile()
300 if not self.finished:
301 offset = 0
302 f.seek(0, 2)
303 remaining = f.tell()
304 else:
305 offset = 0
306 remaining = None
307
308 leftover = None
309 if self.runEntries and (not channels or
310 (self.runEntries[0][0] in channels)):
311 leftover = (self.runEntries[0][0],
312 "".join([c[1] for c in self.runEntries]))
313
314
315
316 return self._generateChunks(f, offset, remaining, leftover,
317 channels, onlyText)
318
319 - def _generateChunks(self, f, offset, remaining, leftover,
320 channels, onlyText):
321 chunks = []
322 p = LogFileScanner(chunks.append, channels)
323 f.seek(offset)
324 if remaining is not None:
325 data = f.read(min(remaining, self.BUFFERSIZE))
326 remaining -= len(data)
327 else:
328 data = f.read(self.BUFFERSIZE)
329
330 offset = f.tell()
331 while data:
332 p.dataReceived(data)
333 while chunks:
334 channel, text = chunks.pop(0)
335 if onlyText:
336 yield text
337 else:
338 yield (channel, text)
339 f.seek(offset)
340 if remaining is not None:
341 data = f.read(min(remaining, self.BUFFERSIZE))
342 remaining -= len(data)
343 else:
344 data = f.read(self.BUFFERSIZE)
345 offset = f.tell()
346 del f
347
348 if leftover:
349 if onlyText:
350 yield leftover[1]
351 else:
352 yield leftover
353
355 """Return an iterator that produces newline-terminated lines,
356 excluding header chunks."""
357
358
359
360 alltext = "".join(self.getChunks([channel], onlyText=True))
361 io = StringIO(alltext)
362 return io.readlines()
363
373
377
381
382
383
403
404 - def addEntry(self, channel, text):
405 assert not self.finished
406
407 if isinstance(text, unicode):
408 text = text.encode('utf-8')
409 if channel != HEADER:
410
411 if self.logMaxSize and self.nonHeaderLength > self.logMaxSize:
412
413 if not self.maxLengthExceeded:
414 msg = "\nOutput exceeded %i bytes, remaining output has been truncated\n" % self.logMaxSize
415 self.addEntry(HEADER, msg)
416 self.merge()
417 self.maxLengthExceeded = True
418
419 if self.logMaxTailSize:
420
421 self.tailBuffer.append((channel, text))
422 self.tailLength += len(text)
423 while self.tailLength > self.logMaxTailSize:
424
425 c,t = self.tailBuffer.pop(0)
426 n = len(t)
427 self.tailLength -= n
428 assert self.tailLength >= 0
429 return
430
431 self.nonHeaderLength += len(text)
432
433
434
435 if self.runEntries and channel != self.runEntries[0][0]:
436 self.merge()
437 self.runEntries.append((channel, text))
438 self.runLength += len(text)
439 if self.runLength >= self.chunkSize:
440 self.merge()
441
442 for w in self.watchers:
443 w.logChunk(self.step.build, self.step, self, channel, text)
444 self.length += len(text)
445
452
479
480
482
483 if self.compressMethod == "bz2":
484 compressed = self.getFilename() + ".bz2.tmp"
485 elif self.compressMethod == "gz":
486 compressed = self.getFilename() + ".gz.tmp"
487 d = threads.deferToThread(self._compressLog, compressed)
488 d.addCallback(self._renameCompressedLog, compressed)
489 d.addErrback(self._cleanupFailedCompress, compressed)
490 return d
491
493 infile = self.getFile()
494 if self.compressMethod == "bz2":
495 cf = BZ2File(compressed, 'w')
496 elif self.compressMethod == "gz":
497 cf = GzipFile(compressed, 'w')
498 bufsize = 1024*1024
499 while True:
500 buf = infile.read(bufsize)
501 cf.write(buf)
502 if len(buf) < bufsize:
503 break
504 cf.close()
520 log.msg("failed to compress %s" % self.getFilename())
521 if os.path.exists(compressed):
522 _tryremove(compressed, 1, 5)
523 failure.trap()
524
525
527 d = self.__dict__.copy()
528 del d['step']
529 del d['watchers']
530 del d['finishedWatchers']
531 d['entries'] = []
532 if d.has_key('finished'):
533 del d['finished']
534 if d.has_key('openfile'):
535 del d['openfile']
536 return d
537
544
546 """Save our .entries to a new-style offline log file (if necessary),
547 and modify our in-memory representation to use it. The original
548 pickled LogFile (inside the pickled Build) won't be modified."""
549 self.filename = logfilename
550 if not os.path.exists(self.getFilename()):
551 self.openfile = open(self.getFilename(), "w")
552 self.finished = False
553 for channel,text in self.entries:
554 self.addEntry(channel, text)
555 self.finish()
556 del self.entries
557
559 implements(interfaces.IStatusLog)
560
561 filename = None
562
563 - def __init__(self, parent, name, logfilename, html):
568
573
577 return defer.succeed(self)
578
579 - def hasContents(self):
587
592
595
597 d = self.__dict__.copy()
598 del d['step']
599 return d
600
603
604
606 """Try to remove a file, and if failed, try again in timeout.
607 Increases the timeout by a factor of 4, and only keeps trying for
608 another retries-amount of times.
609
610 """
611 try:
612 os.unlink(filename)
613 except OSError:
614 if retries > 0:
615 reactor.callLater(timeout, _tryremove, filename, timeout * 4,
616 retries - 1)
617 else:
618 log.msg("giving up on removing %s after over %d seconds" %
619 (filename, timeout))
620