Package buildslave :: Package commands :: Module base
[frames] | no frames]

Source Code for Module buildslave.commands.base

   1   
   2  import os, signal, types, re, traceback 
   3  from stat import ST_CTIME, ST_MTIME, ST_SIZE 
   4  from collections import deque 
   5   
   6  from zope.interface import implements 
   7  from twisted.internet.protocol import ProcessProtocol 
   8  from twisted.internet import reactor, defer, task 
   9  from twisted.python import log, runtime 
  10   
  11  from buildslave.interfaces import ISlaveCommand 
  12  from buildslave.commands.registry import registerSlaveCommand 
  13  from buildslave import util 
  14   
  15  # this used to be a CVS $-style "Revision" auto-updated keyword, but since I 
  16  # moved to Darcs as the primary repository, this is updated manually each 
  17  # time this file is changed. The last cvs_ver that was here was 1.51 . 
  18  command_version = "2.9" 
19 20 # version history: 21 # >=1.17: commands are interruptable 22 # >=1.28: Arch understands 'revision', added Bazaar 23 # >=1.33: Source classes understand 'retry' 24 # >=1.39: Source classes correctly handle changes in branch (except Git) 25 # Darcs accepts 'revision' (now all do but Git) (well, and P4Sync) 26 # Arch/Baz should accept 'build-config' 27 # >=1.51: (release 0.7.3) 28 # >= 2.1: SlaveShellCommand now accepts 'initial_stdin', 'keep_stdin_open', 29 # and 'logfiles'. It now sends 'log' messages in addition to 30 # stdout/stdin/header/rc. It acquired writeStdin/closeStdin methods, 31 # but these are not remotely callable yet. 32 # (not externally visible: ShellCommandPP has writeStdin/closeStdin. 33 # ShellCommand accepts new arguments (logfiles=, initialStdin=, 34 # keepStdinOpen=) and no longer accepts stdin=) 35 # (release 0.7.4) 36 # >= 2.2: added monotone, uploadFile, and downloadFile (release 0.7.5) 37 # >= 2.3: added bzr (release 0.7.6) 38 # >= 2.4: Git understands 'revision' and branches 39 # >= 2.5: workaround added for remote 'hg clone --rev REV' when hg<0.9.2 40 # >= 2.6: added uploadDirectory 41 # >= 2.7: added usePTY option to SlaveShellCommand 42 # >= 2.8: added username and password args to SVN class 43 # >= 2.9: add depth arg to SVN class 44 # >= 2.10: CVS can handle 'extra_options' and 'export_options' 45 46 -class CommandInterrupted(Exception):
47 pass
48 -class TimeoutError(Exception):
49 pass
50
51 -class Obfuscated:
52 """An obfuscated string in a command"""
53 - def __init__(self, real, fake):
54 self.real = real 55 self.fake = fake
56
57 - def __str__(self):
58 return self.fake
59
60 - def __repr__(self):
61 return `self.fake`
62 63 @staticmethod
64 - def to_text(s):
65 if isinstance(s, (str, unicode)): 66 return s 67 else: 68 return str(s)
69 70 @staticmethod
71 - def get_real(command):
72 rv = command 73 if type(command) == types.ListType: 74 rv = [] 75 for elt in command: 76 if isinstance(elt, Obfuscated): 77 rv.append(elt.real) 78 else: 79 rv.append(Obfuscated.to_text(elt)) 80 return rv
81 82 @staticmethod
83 - def get_fake(command):
84 rv = command 85 if type(command) == types.ListType: 86 rv = [] 87 for elt in command: 88 if isinstance(elt, Obfuscated): 89 rv.append(elt.fake) 90 else: 91 rv.append(Obfuscated.to_text(elt)) 92 return rv
93
94 -class AbandonChain(Exception):
95 """A series of chained steps can raise this exception to indicate that 96 one of the intermediate ShellCommands has failed, such that there is no 97 point in running the remainder. 'rc' should be the non-zero exit code of 98 the failing ShellCommand.""" 99
100 - def __repr__(self):
101 return "<AbandonChain rc=%s>" % self.args[0]
102
103 -class ShellCommandPP(ProcessProtocol):
104 debug = False 105
106 - def __init__(self, command):
107 self.command = command 108 self.pending_stdin = "" 109 self.stdin_finished = False
110
111 - def writeStdin(self, data):
112 assert not self.stdin_finished 113 if self.connected: 114 self.transport.write(data) 115 else: 116 self.pending_stdin += data
117
118 - def closeStdin(self):
119 if self.connected: 120 if self.debug: log.msg(" closing stdin") 121 self.transport.closeStdin() 122 self.stdin_finished = True
123
124 - def connectionMade(self):
125 if self.debug: 126 log.msg("ShellCommandPP.connectionMade") 127 if not self.command.process: 128 if self.debug: 129 log.msg(" assigning self.command.process: %s" % 130 (self.transport,)) 131 self.command.process = self.transport 132 133 # TODO: maybe we shouldn't close stdin when using a PTY. I can't test 134 # this yet, recent debian glibc has a bug which causes thread-using 135 # test cases to SIGHUP trial, and the workaround is to either run 136 # the whole test with /bin/sh -c " ".join(argv) (way gross) or to 137 # not use a PTY. Once the bug is fixed, I'll be able to test what 138 # happens when you close stdin on a pty. My concern is that it will 139 # SIGHUP the child (since we are, in a sense, hanging up on them). 140 # But it may well be that keeping stdout open prevents the SIGHUP 141 # from being sent. 142 #if not self.command.usePTY: 143 144 if self.pending_stdin: 145 if self.debug: log.msg(" writing to stdin") 146 self.transport.write(self.pending_stdin) 147 if self.stdin_finished: 148 if self.debug: log.msg(" closing stdin") 149 self.transport.closeStdin()
150
151 - def outReceived(self, data):
152 if self.debug: 153 log.msg("ShellCommandPP.outReceived") 154 self.command.addStdout(data)
155
156 - def errReceived(self, data):
157 if self.debug: 158 log.msg("ShellCommandPP.errReceived") 159 self.command.addStderr(data)
160
161 - def processEnded(self, status_object):
162 if self.debug: 163 log.msg("ShellCommandPP.processEnded", status_object) 164 # status_object is a Failure wrapped around an 165 # error.ProcessTerminated or and error.ProcessDone. 166 # requires twisted >= 1.0.4 to overcome a bug in process.py 167 sig = status_object.value.signal 168 rc = status_object.value.exitCode 169 self.command.finished(sig, rc)
170
171 -class LogFileWatcher:
172 POLL_INTERVAL = 2 173
174 - def __init__(self, command, name, logfile, follow=False):
175 self.command = command 176 self.name = name 177 self.logfile = logfile 178 179 log.msg("LogFileWatcher created to watch %s" % logfile) 180 # we are created before the ShellCommand starts. If the logfile we're 181 # supposed to be watching already exists, record its size and 182 # ctime/mtime so we can tell when it starts to change. 183 self.old_logfile_stats = self.statFile() 184 self.started = False 185 186 # follow the file, only sending back lines 187 # added since we started watching 188 self.follow = follow 189 190 # every 2 seconds we check on the file again 191 self.poller = task.LoopingCall(self.poll)
192
193 - def start(self):
194 self.poller.start(self.POLL_INTERVAL).addErrback(self._cleanupPoll)
195
196 - def _cleanupPoll(self, err):
197 log.err(err, msg="Polling error") 198 self.poller = None
199
200 - def stop(self):
201 self.poll() 202 if self.poller is not None: 203 self.poller.stop() 204 if self.started: 205 self.f.close()
206
207 - def statFile(self):
208 if os.path.exists(self.logfile): 209 s = os.stat(self.logfile) 210 return (s[ST_CTIME], s[ST_MTIME], s[ST_SIZE]) 211 return None
212
213 - def poll(self):
214 if not self.started: 215 s = self.statFile() 216 if s == self.old_logfile_stats: 217 return # not started yet 218 if not s: 219 # the file was there, but now it's deleted. Forget about the 220 # initial state, clearly the process has deleted the logfile 221 # in preparation for creating a new one. 222 self.old_logfile_stats = None 223 return # no file to work with 224 self.f = open(self.logfile, "rb") 225 # if we only want new lines, seek to 226 # where we stat'd so we only find new 227 # lines 228 if self.follow: 229 self.f.seek(s[2], 0) 230 self.started = True 231 self.f.seek(self.f.tell(), 0) 232 while True: 233 data = self.f.read(10000) 234 if not data: 235 return 236 self.command.addLogfile(self.name, data)
237
238 239 -class ShellCommand:
240 # This is a helper class, used by SlaveCommands to run programs in a 241 # child shell. 242 243 notreally = False 244 BACKUP_TIMEOUT = 5 245 KILL = "KILL" 246 CHUNK_LIMIT = 128*1024 247 248 # Don't send any data until at least BUFFER_SIZE bytes have been collected 249 # or BUFFER_TIMEOUT elapsed 250 BUFFER_SIZE = 64*1024 251 BUFFER_TIMEOUT = 5 252 253 # For sending elapsed time: 254 startTime = None 255 elapsedTime = None 256 257 # For scheduling future events 258 _reactor = reactor 259 260 # I wish we had easy access to CLOCK_MONOTONIC in Python: 261 # http://www.opengroup.org/onlinepubs/000095399/functions/clock_getres.html 262 # Then changes to the system clock during a run wouldn't effect the "elapsed 263 # time" results. 264
265 - def __init__(self, builder, command, 266 workdir, environ=None, 267 sendStdout=True, sendStderr=True, sendRC=True, 268 timeout=None, maxTime=None, initialStdin=None, 269 keepStdinOpen=False, keepStdout=False, keepStderr=False, 270 logEnviron=True, logfiles={}, usePTY="slave-config"):
271 """ 272 273 @param keepStdout: if True, we keep a copy of all the stdout text 274 that we've seen. This copy is available in 275 self.stdout, which can be read after the command 276 has finished. 277 @param keepStderr: same, for stderr 278 279 @param usePTY: "slave-config" -> use the SlaveBuilder's usePTY; 280 otherwise, true to use a PTY, false to not use a PTY. 281 """ 282 283 self.builder = builder 284 self.command = Obfuscated.get_real(command) 285 286 # We need to take unicode commands and arguments and encode them using 287 # the appropriate encoding for the slave. This is mostly platform 288 # specific, but can be overridden in the slave's buildbot.tac file. 289 # 290 # Encoding the command line here ensures that the called executables 291 # receive arguments as bytestrings encoded with an appropriate 292 # platform-specific encoding. It also plays nicely with twisted's 293 # spawnProcess which checks that arguments are regular strings or 294 # unicode strings that can be encoded as ascii (which generates a 295 # warning). 296 if isinstance(self.command, (tuple, list)): 297 for i, a in enumerate(self.command): 298 if isinstance(a, unicode): 299 self.command[i] = a.encode(self.builder.unicode_encoding) 300 elif isinstance(self.command, unicode): 301 self.command = self.command.encode(self.builder.unicode_encoding) 302 303 self.fake_command = Obfuscated.get_fake(command) 304 self.sendStdout = sendStdout 305 self.sendStderr = sendStderr 306 self.sendRC = sendRC 307 self.logfiles = logfiles 308 self.workdir = workdir 309 if not os.path.exists(workdir): 310 os.makedirs(workdir) 311 if environ: 312 if environ.has_key('PYTHONPATH'): 313 ppath = environ['PYTHONPATH'] 314 # Need to do os.pathsep translation. We could either do that 315 # by replacing all incoming ':'s with os.pathsep, or by 316 # accepting lists. I like lists better. 317 if not isinstance(ppath, str): 318 # If it's not a string, treat it as a sequence to be 319 # turned in to a string. 320 ppath = os.pathsep.join(ppath) 321 322 environ['PYTHONPATH'] = ppath + os.pathsep + "${PYTHONPATH}" 323 324 # do substitution on variable values matching patern: ${name} 325 p = re.compile('\${([0-9a-zA-Z_]*)}') 326 def subst(match): 327 return os.environ.get(match.group(1), "")
328 newenv = {} 329 for key in os.environ.keys(): 330 # setting a key to None will delete it from the slave environment 331 if key not in environ or environ[key] is not None: 332 newenv[key] = os.environ[key] 333 for key in environ.keys(): 334 if environ[key] is not None: 335 newenv[key] = p.sub(subst, environ[key]) 336 337 self.environ = newenv 338 else: # not environ 339 self.environ = os.environ.copy() 340 self.initialStdin = initialStdin 341 self.keepStdinOpen = keepStdinOpen 342 self.logEnviron = logEnviron 343 self.timeout = timeout 344 self.timer = None 345 self.maxTime = maxTime 346 self.maxTimer = None 347 self.keepStdout = keepStdout 348 self.keepStderr = keepStderr 349 350 self.buffered = deque() 351 self.buflen = 0 352 self.buftimer = None 353 354 if usePTY == "slave-config": 355 self.usePTY = self.builder.usePTY 356 else: 357 self.usePTY = usePTY 358 359 # usePTY=True is a convenience for cleaning up all children and 360 # grandchildren of a hung command. Fall back to usePTY=False on systems 361 # and in situations where ptys cause problems. PTYs are posix-only, 362 # and for .closeStdin to matter, we must use a pipe, not a PTY 363 if runtime.platformType != "posix" or initialStdin is not None: 364 if self.usePTY and usePTY != "slave-config": 365 self.sendStatus({'header': "WARNING: disabling usePTY for this command"}) 366 self.usePTY = False 367 368 self.logFileWatchers = [] 369 for name,filevalue in self.logfiles.items(): 370 filename = filevalue 371 follow = False 372 373 # check for a dictionary of options 374 # filename is required, others are optional 375 if type(filevalue) == dict: 376 filename = filevalue['filename'] 377 follow = filevalue.get('follow', False) 378 379 w = LogFileWatcher(self, name, 380 os.path.join(self.workdir, filename), 381 follow=follow) 382 self.logFileWatchers.append(w)
383
384 - def __repr__(self):
385 return "<slavecommand.ShellCommand '%s'>" % self.fake_command
386
387 - def sendStatus(self, status):
388 self.builder.sendUpdate(status)
389
390 - def start(self):
391 # return a Deferred which fires (with the exit code) when the command 392 # completes 393 if self.keepStdout: 394 self.stdout = "" 395 if self.keepStderr: 396 self.stderr = "" 397 self.deferred = defer.Deferred() 398 try: 399 self._startCommand() 400 except: 401 log.msg("error in ShellCommand._startCommand") 402 log.err() 403 self._addToBuffers('stderr', "error in ShellCommand._startCommand\n") 404 self._addToBuffers('stderr', traceback.format_exc()) 405 self._sendBuffers() 406 # pretend it was a shell error 407 self.deferred.errback(AbandonChain(-1)) 408 return self.deferred
409
410 - def _startCommand(self):
411 # ensure workdir exists 412 if not os.path.isdir(self.workdir): 413 os.makedirs(self.workdir) 414 log.msg("ShellCommand._startCommand") 415 if self.notreally: 416 self._addToBuffers('header', "command '%s' in dir %s" % \ 417 (self.fake_command, self.workdir)) 418 self._addToBuffers('header', "(not really)\n") 419 self.finished(None, 0) 420 return 421 422 self.pp = ShellCommandPP(self) 423 424 if type(self.command) in types.StringTypes: 425 if runtime.platformType == 'win32': 426 argv = os.environ['COMSPEC'].split() # allow %COMSPEC% to have args 427 if '/c' not in argv: argv += ['/c'] 428 argv += [self.command] 429 else: 430 # for posix, use /bin/sh. for other non-posix, well, doesn't 431 # hurt to try 432 argv = ['/bin/sh', '-c', self.command] 433 display = self.fake_command 434 else: 435 # On windows, CreateProcess requires an absolute path to the executable. 436 # When we call spawnProcess below, we pass argv[0] as the executable. 437 # So, for .exe's that we have absolute paths to, we can call directly 438 # Otherwise, we should run under COMSPEC (usually cmd.exe) to 439 # handle path searching, etc. 440 if runtime.platformType == 'win32' and not \ 441 (self.command[0].lower().endswith(".exe") and os.path.isabs(self.command[0])): 442 argv = os.environ['COMSPEC'].split() # allow %COMSPEC% to have args 443 if '/c' not in argv: argv += ['/c'] 444 argv += list(self.command) 445 else: 446 argv = self.command 447 display = " ".join(self.fake_command) 448 449 # $PWD usually indicates the current directory; spawnProcess may not 450 # update this value, though, so we set it explicitly here. This causes 451 # weird problems (bug #456) on msys, though.. 452 if not self.environ.get('MACHTYPE', None) == 'i686-pc-msys': 453 self.environ['PWD'] = os.path.abspath(self.workdir) 454 455 # self.stdin is handled in ShellCommandPP.connectionMade 456 457 # first header line is the command in plain text, argv joined with 458 # spaces. You should be able to cut-and-paste this into a shell to 459 # obtain the same results. If there are spaces in the arguments, too 460 # bad. 461 log.msg(" " + display) 462 self._addToBuffers('header', display+"\n") 463 464 # then comes the secondary information 465 msg = " in dir %s" % (self.workdir,) 466 if self.timeout: 467 msg += " (timeout %d secs)" % (self.timeout,) 468 log.msg(" " + msg) 469 self._addToBuffers('header', msg+"\n") 470 471 msg = " watching logfiles %s" % (self.logfiles,) 472 log.msg(" " + msg) 473 self._addToBuffers('header', msg+"\n") 474 475 # then the obfuscated command array for resolving unambiguity 476 msg = " argv: %s" % (self.fake_command,) 477 log.msg(" " + msg) 478 self._addToBuffers('header', msg+"\n") 479 480 # then the environment, since it sometimes causes problems 481 if self.logEnviron: 482 msg = " environment:\n" 483 env_names = self.environ.keys() 484 env_names.sort() 485 for name in env_names: 486 msg += " %s=%s\n" % (name, self.environ[name]) 487 log.msg(" environment: %s" % (self.environ,)) 488 self._addToBuffers('header', msg) 489 490 if self.initialStdin: 491 msg = " writing %d bytes to stdin" % len(self.initialStdin) 492 log.msg(" " + msg) 493 self._addToBuffers('header', msg+"\n") 494 495 if self.keepStdinOpen: 496 msg = " leaving stdin open" 497 else: 498 msg = " closing stdin" 499 log.msg(" " + msg) 500 self._addToBuffers('header', msg+"\n") 501 502 msg = " using PTY: %s" % bool(self.usePTY) 503 log.msg(" " + msg) 504 self._addToBuffers('header', msg+"\n") 505 506 # this will be buffered until connectionMade is called 507 if self.initialStdin: 508 self.pp.writeStdin(self.initialStdin) 509 if not self.keepStdinOpen: 510 self.pp.closeStdin() 511 512 # win32eventreactor's spawnProcess (under twisted <= 2.0.1) returns 513 # None, as opposed to all the posixbase-derived reactors (which 514 # return the new Process object). This is a nuisance. We can make up 515 # for it by having the ProcessProtocol give us their .transport 516 # attribute after they get one. I'd prefer to get it from 517 # spawnProcess because I'm concerned about returning from this method 518 # without having a valid self.process to work with. (if kill() were 519 # called right after we return, but somehow before connectionMade 520 # were called, then kill() would blow up). 521 self.process = None 522 self.startTime = util.now(self._reactor) 523 524 p = reactor.spawnProcess(self.pp, argv[0], argv, 525 self.environ, 526 self.workdir, 527 usePTY=self.usePTY) 528 # connectionMade might have been called during spawnProcess 529 if not self.process: 530 self.process = p 531 532 # connectionMade also closes stdin as long as we're not using a PTY. 533 # This is intended to kill off inappropriately interactive commands 534 # better than the (long) hung-command timeout. ProcessPTY should be 535 # enhanced to allow the same childFDs argument that Process takes, 536 # which would let us connect stdin to /dev/null . 537 538 if self.timeout: 539 self.timer = self._reactor.callLater(self.timeout, self.doTimeout) 540 541 if self.maxTime: 542 self.maxTimer = self._reactor.callLater(self.maxTime, self.doMaxTimeout) 543 544 for w in self.logFileWatchers: 545 w.start()
546 547
548 - def _chunkForSend(self, data):
549 """ 550 limit the chunks that we send over PB to 128k, since it has a hardwired 551 string-size limit of 640k. 552 """ 553 LIMIT = self.CHUNK_LIMIT 554 for i in range(0, len(data), LIMIT): 555 yield data[i:i+LIMIT]
556
557 - def _collapseMsg(self, msg):
558 """ 559 Take msg, which is a dictionary of lists of output chunks, and 560 concatentate all the chunks into a single string 561 """ 562 retval = {} 563 for log in msg: 564 data = "".join(msg[log]) 565 if isinstance(log, tuple) and log[0] == 'log': 566 retval['log'] = (log[1], data) 567 else: 568 retval[log] = data 569 return retval
570
571 - def _sendMessage(self, msg):
572 """ 573 Collapse and send msg to the master 574 """ 575 if not msg: 576 return 577 msg = self._collapseMsg(msg) 578 self.sendStatus(msg)
579
580 - def _bufferTimeout(self):
581 self.buftimer = None 582 self._sendBuffers()
583
584 - def _sendBuffers(self):
585 """ 586 Send all the content in our buffers. 587 """ 588 msg = {} 589 msg_size = 0 590 lastlog = None 591 logdata = [] 592 while self.buffered: 593 # Grab the next bits from the buffer 594 logname, data = self.buffered.popleft() 595 596 # If this log is different than the last one, then we have to send 597 # out the message so far. This is because the message is 598 # transferred as a dictionary, which makes the ordering of keys 599 # unspecified, and makes it impossible to interleave data from 600 # different logs. A future enhancement could be to change the 601 # master to support a list of (logname, data) tuples instead of a 602 # dictionary. 603 # On our first pass through this loop lastlog is None 604 if lastlog is None: 605 lastlog = logname 606 elif logname != lastlog: 607 self._sendMessage(msg) 608 msg = {} 609 msg_size = 0 610 lastlog = logname 611 612 logdata = msg.setdefault(logname, []) 613 614 # Chunkify the log data to make sure we're not sending more than 615 # CHUNK_LIMIT at a time 616 for chunk in self._chunkForSend(data): 617 if len(chunk) == 0: continue 618 logdata.append(chunk) 619 msg_size += len(chunk) 620 if msg_size >= self.CHUNK_LIMIT: 621 # We've gone beyond the chunk limit, so send out our 622 # message. At worst this results in a message slightly 623 # larger than (2*CHUNK_LIMIT)-1 624 self._sendMessage(msg) 625 msg = {} 626 logdata = msg.setdefault(logname, []) 627 msg_size = 0 628 self.buflen = 0 629 if logdata: 630 self._sendMessage(msg) 631 if self.buftimer: 632 if self.buftimer.active(): 633 self.buftimer.cancel() 634 self.buftimer = None
635
636 - def _addToBuffers(self, logname, data):
637 """ 638 Add data to the buffer for logname 639 Start a timer to send the buffers if BUFFER_TIMEOUT elapses. 640 If adding data causes the buffer size to grow beyond BUFFER_SIZE, then 641 the buffers will be sent. 642 """ 643 n = len(data) 644 645 self.buflen += n 646 self.buffered.append((logname, data)) 647 if self.buflen > self.BUFFER_SIZE: 648 self._sendBuffers() 649 elif not self.buftimer: 650 self.buftimer = self._reactor.callLater(self.BUFFER_TIMEOUT, self._bufferTimeout)
651
652 - def addStdout(self, data):
653 if self.sendStdout: 654 self._addToBuffers('stdout', data) 655 656 if self.keepStdout: 657 self.stdout += data 658 if self.timer: 659 self.timer.reset(self.timeout)
660
661 - def addStderr(self, data):
662 if self.sendStderr: 663 self._addToBuffers('stderr', data) 664 665 if self.keepStderr: 666 self.stderr += data 667 if self.timer: 668 self.timer.reset(self.timeout)
669
670 - def addLogfile(self, name, data):
671 self._addToBuffers( ('log', name), data) 672 673 if self.timer: 674 self.timer.reset(self.timeout)
675
676 - def finished(self, sig, rc):
677 self.elapsedTime = util.now(self._reactor) - self.startTime 678 log.msg("command finished with signal %s, exit code %s, elapsedTime: %0.6f" % (sig,rc,self.elapsedTime)) 679 for w in self.logFileWatchers: 680 # this will send the final updates 681 w.stop() 682 self._sendBuffers() 683 if sig is not None: 684 rc = -1 685 if self.sendRC: 686 if sig is not None: 687 self.sendStatus( 688 {'header': "process killed by signal %d\n" % sig}) 689 self.sendStatus({'rc': rc}) 690 self.sendStatus({'header': "elapsedTime=%0.6f\n" % self.elapsedTime}) 691 if self.timer: 692 self.timer.cancel() 693 self.timer = None 694 if self.maxTimer: 695 self.maxTimer.cancel() 696 self.maxTimer = None 697 if self.buftimer: 698 self.buftimer.cancel() 699 self.buftimer = None 700 d = self.deferred 701 self.deferred = None 702 if d: 703 d.callback(rc) 704 else: 705 log.msg("Hey, command %s finished twice" % self)
706
707 - def failed(self, why):
708 self._sendBuffers() 709 log.msg("ShellCommand.failed: command failed: %s" % (why,)) 710 if self.timer: 711 self.timer.cancel() 712 self.timer = None 713 if self.maxTimer: 714 self.maxTimer.cancel() 715 self.maxTimer = None 716 if self.buftimer: 717 self.buftimer.cancel() 718 self.buftimer = None 719 d = self.deferred 720 self.deferred = None 721 if d: 722 d.errback(why) 723 else: 724 log.msg("Hey, command %s finished twice" % self)
725
726 - def doTimeout(self):
727 self.timer = None 728 msg = "command timed out: %d seconds without output" % self.timeout 729 self.kill(msg)
730
731 - def doMaxTimeout(self):
732 self.maxTimer = None 733 msg = "command timed out: %d seconds elapsed" % self.maxTime 734 self.kill(msg)
735
736 - def kill(self, msg):
737 # This may be called by the timeout, or when the user has decided to 738 # abort this build. 739 self._sendBuffers() 740 if self.timer: 741 self.timer.cancel() 742 self.timer = None 743 if self.maxTimer: 744 self.maxTimer.cancel() 745 self.maxTimer = None 746 if self.buftimer: 747 self.buftimer.cancel() 748 self.buftimer = None 749 if hasattr(self.process, "pid") and self.process.pid is not None: 750 msg += ", killing pid %s" % self.process.pid 751 log.msg(msg) 752 self.sendStatus({'header': "\n" + msg + "\n"}) 753 754 hit = 0 755 if runtime.platformType == "posix": 756 try: 757 # really want to kill off all child processes too. Process 758 # Groups are ideal for this, but that requires 759 # spawnProcess(usePTY=1). Try both ways in case process was 760 # not started that way. 761 762 # the test suite sets self.KILL=None to tell us we should 763 # only pretend to kill the child. This lets us test the 764 # backup timer. 765 766 sig = None 767 if self.KILL is not None: 768 sig = getattr(signal, "SIG"+ self.KILL, None) 769 770 if self.KILL == None: 771 log.msg("self.KILL==None, only pretending to kill child") 772 elif sig is None: 773 log.msg("signal module is missing SIG%s" % self.KILL) 774 elif not hasattr(os, "kill"): 775 log.msg("os module is missing the 'kill' function") 776 elif not hasattr(self.process, "pid") or self.process.pid is None: 777 log.msg("self.process has no pid") 778 else: 779 log.msg("trying os.kill(-pid, %d)" % (sig,)) 780 # TODO: maybe use os.killpg instead of a negative pid? 781 os.kill(-self.process.pid, sig) 782 log.msg(" signal %s sent successfully" % sig) 783 hit = 1 784 except OSError: 785 # probably no-such-process, maybe because there is no process 786 # group 787 pass 788 if not hit: 789 try: 790 if self.KILL is None: 791 log.msg("self.KILL==None, only pretending to kill child") 792 else: 793 log.msg("trying process.signalProcess('KILL')") 794 self.process.signalProcess(self.KILL) 795 log.msg(" signal %s sent successfully" % (self.KILL,)) 796 hit = 1 797 except OSError: 798 # could be no-such-process, because they finished very recently 799 pass 800 if not hit: 801 log.msg("signalProcess/os.kill failed both times") 802 803 if runtime.platformType == "posix": 804 # we only do this under posix because the win32eventreactor 805 # blocks here until the process has terminated, while closing 806 # stderr. This is weird. 807 self.pp.transport.loseConnection() 808 809 # finished ought to be called momentarily. Just in case it doesn't, 810 # set a timer which will abandon the command. 811 self.timer = self._reactor.callLater(self.BACKUP_TIMEOUT, 812 self.doBackupTimeout)
813
814 - def doBackupTimeout(self):
815 log.msg("we tried to kill the process, and it wouldn't die.." 816 " finish anyway") 817 self.timer = None 818 self.sendStatus({'header': "SIGKILL failed to kill process\n"}) 819 if self.sendRC: 820 self.sendStatus({'header': "using fake rc=-1\n"}) 821 self.sendStatus({'rc': -1}) 822 self.failed(TimeoutError("SIGKILL failed to kill process"))
823 824
825 - def writeStdin(self, data):
826 self.pp.writeStdin(data)
827
828 - def closeStdin(self):
829 self.pp.closeStdin()
830
831 832 -class Command:
833 implements(ISlaveCommand) 834 835 """This class defines one command that can be invoked by the build master. 836 The command is executed on the slave side, and always sends back a 837 completion message when it finishes. It may also send intermediate status 838 as it runs (by calling builder.sendStatus). Some commands can be 839 interrupted (either by the build master or a local timeout), in which 840 case the step is expected to complete normally with a status message that 841 indicates an error occurred. 842 843 These commands are used by BuildSteps on the master side. Each kind of 844 BuildStep uses a single Command. The slave must implement all the 845 Commands required by the set of BuildSteps used for any given build: 846 this is checked at startup time. 847 848 All Commands are constructed with the same signature: 849 c = CommandClass(builder, args) 850 where 'builder' is the parent SlaveBuilder object, and 'args' is a 851 dict that is interpreted per-command. 852 853 The setup(args) method is available for setup, and is run from __init__. 854 855 The Command is started with start(). This method must be implemented in a 856 subclass, and it should return a Deferred. When your step is done, you 857 should fire the Deferred (the results are not used). If the command is 858 interrupted, it should fire the Deferred anyway. 859 860 While the command runs. it may send status messages back to the 861 buildmaster by calling self.sendStatus(statusdict). The statusdict is 862 interpreted by the master-side BuildStep however it likes. 863 864 A separate completion message is sent when the deferred fires, which 865 indicates that the Command has finished, but does not carry any status 866 data. If the Command needs to return an exit code of some sort, that 867 should be sent as a regular status message before the deferred is fired . 868 Once builder.commandComplete has been run, no more status messages may be 869 sent. 870 871 If interrupt() is called, the Command should attempt to shut down as 872 quickly as possible. Child processes should be killed, new ones should 873 not be started. The Command should send some kind of error status update, 874 then complete as usual by firing the Deferred. 875 876 .interrupted should be set by interrupt(), and can be tested to avoid 877 sending multiple error status messages. 878 879 If .running is False, the bot is shutting down (or has otherwise lost the 880 connection to the master), and should not send any status messages. This 881 is checked in Command.sendStatus . 882 883 """ 884 885 # builder methods: 886 # sendStatus(dict) (zero or more) 887 # commandComplete() or commandInterrupted() (one, at end) 888 889 debug = False 890 interrupted = False 891 running = False # set by Builder, cleared on shutdown or when the 892 # Deferred fires 893 894 _reactor = reactor 895
896 - def __init__(self, builder, stepId, args):
897 self.builder = builder 898 self.stepId = stepId # just for logging 899 self.args = args 900 self.setup(args)
901
902 - def setup(self, args):
903 """Override this in a subclass to extract items from the args dict.""" 904 pass
905
906 - def doStart(self):
907 self.running = True 908 d = defer.maybeDeferred(self.start) 909 d.addBoth(self.commandComplete) 910 return d
911
912 - def start(self):
913 """Start the command. This method should return a Deferred that will 914 fire when the command has completed. The Deferred's argument will be 915 ignored. 916 917 This method should be overridden by subclasses.""" 918 raise NotImplementedError, "You must implement this in a subclass"
919
920 - def sendStatus(self, status):
921 """Send a status update to the master.""" 922 if self.debug: 923 log.msg("sendStatus", status) 924 if not self.running: 925 log.msg("would sendStatus but not .running") 926 return 927 self.builder.sendUpdate(status)
928
929 - def doInterrupt(self):
930 self.running = False 931 self.interrupt()
932
933 - def interrupt(self):
934 """Override this in a subclass to allow commands to be interrupted. 935 May be called multiple times, test and set self.interrupted=True if 936 this matters.""" 937 pass
938
939 - def commandComplete(self, res):
940 self.running = False 941 return res
942 943 # utility methods, mostly used by SlaveShellCommand and the like 944
945 - def _abandonOnFailure(self, rc):
946 if type(rc) is not int: 947 log.msg("weird, _abandonOnFailure was given rc=%s (%s)" % \ 948 (rc, type(rc))) 949 assert isinstance(rc, int) 950 if rc != 0: 951 raise AbandonChain(rc) 952 return rc
953
954 - def _sendRC(self, res):
955 self.sendStatus({'rc': 0})
956
957 - def _checkAbandoned(self, why):
958 log.msg("_checkAbandoned", why) 959 why.trap(AbandonChain) 960 log.msg(" abandoning chain", why.value) 961 self.sendStatus({'rc': why.value.args[0]}) 962 return None
963
964 965 966 -class SlaveShellCommand(Command):
967 """This is a Command which runs a shell command. The args dict contains 968 the following keys: 969 970 - ['command'] (required): a shell command to run. If this is a string, 971 it will be run with /bin/sh (['/bin/sh', 972 '-c', command]). If it is a list 973 (preferred), it will be used directly. 974 - ['workdir'] (required): subdirectory in which the command will be 975 run, relative to the builder dir 976 - ['env']: a dict of environment variables to augment/replace 977 os.environ . PYTHONPATH is treated specially, and 978 should be a list of path components to be prepended to 979 any existing PYTHONPATH environment variable. 980 - ['initial_stdin']: a string which will be written to the command's 981 stdin as soon as it starts 982 - ['keep_stdin_open']: unless True, the command's stdin will be 983 closed as soon as initial_stdin has been 984 written. Set this to True if you plan to write 985 to stdin after the command has been started. 986 - ['want_stdout']: 0 if stdout should be thrown away 987 - ['want_stderr']: 0 if stderr should be thrown away 988 - ['usePTY']: True or False if the command should use a PTY (defaults to 989 configuration of the slave) 990 - ['not_really']: 1 to skip execution and return rc=0 991 - ['timeout']: seconds of silence to tolerate before killing command 992 - ['maxTime']: seconds before killing command 993 - ['logfiles']: dict mapping LogFile name to the workdir-relative 994 filename of a local log file. This local file will be 995 watched just like 'tail -f', and all changes will be 996 written to 'log' status updates. 997 - ['logEnviron']: False to not log the environment variables on the slave 998 999 ShellCommand creates the following status messages: 1000 - {'stdout': data} : when stdout data is available 1001 - {'stderr': data} : when stderr data is available 1002 - {'header': data} : when headers (command start/stop) are available 1003 - {'log': (logfile_name, data)} : when log files have new contents 1004 - {'rc': rc} : when the process has terminated 1005 """ 1006
1007 - def start(self):
1008 args = self.args 1009 # args['workdir'] is relative to Builder directory, and is required. 1010 assert args['workdir'] is not None 1011 workdir = os.path.join(self.builder.basedir, args['workdir']) 1012 1013 c = ShellCommand(self.builder, args['command'], 1014 workdir, environ=args.get('env'), 1015 timeout=args.get('timeout', None), 1016 maxTime=args.get('maxTime', None), 1017 sendStdout=args.get('want_stdout', True), 1018 sendStderr=args.get('want_stderr', True), 1019 sendRC=True, 1020 initialStdin=args.get('initial_stdin'), 1021 keepStdinOpen=args.get('keep_stdin_open'), 1022 logfiles=args.get('logfiles', {}), 1023 usePTY=args.get('usePTY', "slave-config"), 1024 logEnviron=args.get('logEnviron', True), 1025 ) 1026 c._reactor = self._reactor 1027 self.command = c 1028 d = self.command.start() 1029 return d
1030
1031 - def interrupt(self):
1032 self.interrupted = True 1033 self.command.kill("command interrupted")
1034
1035 - def writeStdin(self, data):
1036 self.command.writeStdin(data)
1037
1038 - def closeStdin(self):
1039 self.command.closeStdin()
1040 1041 registerSlaveCommand("shell", SlaveShellCommand, command_version)
1042 1043 1044 -class DummyCommand(Command):
1045 """ 1046 I am a dummy no-op command that by default takes 5 seconds to complete. 1047 See L{buildbot.steps.dummy.RemoteDummy} 1048 """ 1049
1050 - def start(self):
1051 self.d = defer.Deferred() 1052 log.msg(" starting dummy command [%s]" % self.stepId) 1053 self.timer = self._reactor.callLater(1, self.doStatus) 1054 return self.d
1055
1056 - def interrupt(self):
1057 if self.interrupted: 1058 return 1059 self.timer.cancel() 1060 self.timer = None 1061 self.interrupted = True 1062 self.finished()
1063
1064 - def doStatus(self):
1065 log.msg(" sending intermediate status") 1066 self.sendStatus({'stdout': 'data'}) 1067 timeout = self.args.get('timeout', 5) + 1 1068 self.timer = self._reactor.callLater(timeout - 1, self.finished)
1069
1070 - def finished(self):
1071 log.msg(" dummy command finished [%s]" % self.stepId) 1072 if self.interrupted: 1073 self.sendStatus({'rc': 1}) 1074 else: 1075 self.sendStatus({'rc': 0}) 1076 self.d.callback(0)
1077 1078 registerSlaveCommand("dummy", DummyCommand, command_version) 1079 1080 1081 # this maps handle names to a callable. When the WaitCommand starts, this 1082 # callable is invoked with no arguments. It should return a Deferred. When 1083 # that Deferred fires, our WaitCommand will finish. 1084 waitCommandRegistry = {}
1085 1086 -class WaitCommand(Command):
1087 """ 1088 I am a dummy command used by the buildbot unit test suite. I want for the 1089 unit test to tell us to finish. See L{buildbot.steps.dummy.Wait} 1090 """ 1091
1092 - def start(self):
1093 self.d = defer.Deferred() 1094 log.msg(" starting wait command [%s]" % self.stepId) 1095 handle = self.args['handle'] 1096 cb = waitCommandRegistry[handle] 1097 del waitCommandRegistry[handle] 1098 def _called(): 1099 log.msg(" wait-%s starting" % (handle,)) 1100 d = cb() 1101 def _done(res): 1102 log.msg(" wait-%s finishing: %s" % (handle, res)) 1103 return res
1104 d.addBoth(_done) 1105 d.addCallbacks(self.finished, self.failed)
1106 self._reactor.callLater(0, _called) 1107 return self.d 1108
1109 - def interrupt(self):
1110 log.msg(" wait command interrupted") 1111 if self.interrupted: 1112 return 1113 self.interrupted = True 1114 self.finished("interrupted")
1115
1116 - def finished(self, res):
1117 log.msg(" wait command finished [%s]" % self.stepId) 1118 if self.interrupted: 1119 self.sendStatus({'rc': 2}) 1120 else: 1121 self.sendStatus({'rc': 0}) 1122 self.d.callback(0)
1123 - def failed(self, why):
1124 log.msg(" wait command failed [%s]" % self.stepId) 1125 self.sendStatus({'rc': 1}) 1126 self.d.callback(0)
1127 1128 registerSlaveCommand("dummy.wait", WaitCommand, command_version) 1129