Package buildbot :: Package slave :: Package commands :: Module base
[frames] | no frames]

Source Code for Module buildbot.slave.commands.base

   1  # -*- test-case-name: buildbot.test.test_slavecommand -*- 
   2   
   3  import os, signal, types, time, re, traceback 
   4  from stat import ST_CTIME, ST_MTIME, ST_SIZE 
   5  from collections import deque 
   6   
   7  from zope.interface import implements 
   8  from twisted.internet.protocol import ProcessProtocol 
   9  from twisted.internet import reactor, defer, task 
  10  from twisted.python import log, runtime 
  11   
  12  from buildbot.slave.interfaces import ISlaveCommand 
  13  from buildbot.slave.commands.registry import registerSlaveCommand 
  14  from buildbot import util 
15 16 # this used to be a CVS $-style "Revision" auto-updated keyword, but since I 17 # moved to Darcs as the primary repository, this is updated manually each 18 # time this file is changed. The last cvs_ver that was here was 1.51 . 19 command_version = "2.9" 20 21 # version history: 22 # >=1.17: commands are interruptable 23 # >=1.28: Arch understands 'revision', added Bazaar 24 # >=1.33: Source classes understand 'retry' 25 # >=1.39: Source classes correctly handle changes in branch (except Git) 26 # Darcs accepts 'revision' (now all do but Git) (well, and P4Sync) 27 # Arch/Baz should accept 'build-config' 28 # >=1.51: (release 0.7.3) 29 # >= 2.1: SlaveShellCommand now accepts 'initial_stdin', 'keep_stdin_open', 30 # and 'logfiles'. It now sends 'log' messages in addition to 31 # stdout/stdin/header/rc. It acquired writeStdin/closeStdin methods, 32 # but these are not remotely callable yet. 33 # (not externally visible: ShellCommandPP has writeStdin/closeStdin. 34 # ShellCommand accepts new arguments (logfiles=, initialStdin=, 35 # keepStdinOpen=) and no longer accepts stdin=) 36 # (release 0.7.4) 37 # >= 2.2: added monotone, uploadFile, and downloadFile (release 0.7.5) 38 # >= 2.3: added bzr (release 0.7.6) 39 # >= 2.4: Git understands 'revision' and branches 40 # >= 2.5: workaround added for remote 'hg clone --rev REV' when hg<0.9.2 41 # >= 2.6: added uploadDirectory 42 # >= 2.7: added usePTY option to SlaveShellCommand 43 # >= 2.8: added username and password args to SVN class 44 # >= 2.9: add depth arg to SVN class 45 # >= 2.10: CVS can handle 'extra_options' and 'export_options' 46 47 -class CommandInterrupted(Exception):
48 pass
49 -class TimeoutError(Exception):
50 pass
51
52 -class Obfuscated:
53 """An obfuscated string in a command"""
54 - def __init__(self, real, fake):
55 self.real = real 56 self.fake = fake
57
58 - def __str__(self):
59 return self.fake
60
61 - def __repr__(self):
62 return `self.fake`
63 64 @staticmethod
65 - def to_text(s):
66 if isinstance(s, (str, unicode)): 67 return s 68 else: 69 return str(s)
70 71 @staticmethod
72 - def get_real(command):
73 rv = command 74 if type(command) == types.ListType: 75 rv = [] 76 for elt in command: 77 if isinstance(elt, Obfuscated): 78 rv.append(elt.real) 79 else: 80 rv.append(Obfuscated.to_text(elt)) 81 return rv
82 83 @staticmethod
84 - def get_fake(command):
85 rv = command 86 if type(command) == types.ListType: 87 rv = [] 88 for elt in command: 89 if isinstance(elt, Obfuscated): 90 rv.append(elt.fake) 91 else: 92 rv.append(Obfuscated.to_text(elt)) 93 return rv
94
95 -class AbandonChain(Exception):
96 """A series of chained steps can raise this exception to indicate that 97 one of the intermediate ShellCommands has failed, such that there is no 98 point in running the remainder. 'rc' should be the non-zero exit code of 99 the failing ShellCommand.""" 100
101 - def __repr__(self):
102 return "<AbandonChain rc=%s>" % self.args[0]
103
104 -class ShellCommandPP(ProcessProtocol):
105 debug = False 106
107 - def __init__(self, command):
108 self.command = command 109 self.pending_stdin = "" 110 self.stdin_finished = False
111
112 - def writeStdin(self, data):
113 assert not self.stdin_finished 114 if self.connected: 115 self.transport.write(data) 116 else: 117 self.pending_stdin += data
118
119 - def closeStdin(self):
120 if self.connected: 121 if self.debug: log.msg(" closing stdin") 122 self.transport.closeStdin() 123 self.stdin_finished = True
124
125 - def connectionMade(self):
126 if self.debug: 127 log.msg("ShellCommandPP.connectionMade") 128 if not self.command.process: 129 if self.debug: 130 log.msg(" assigning self.command.process: %s" % 131 (self.transport,)) 132 self.command.process = self.transport 133 134 # TODO: maybe we shouldn't close stdin when using a PTY. I can't test 135 # this yet, recent debian glibc has a bug which causes thread-using 136 # test cases to SIGHUP trial, and the workaround is to either run 137 # the whole test with /bin/sh -c " ".join(argv) (way gross) or to 138 # not use a PTY. Once the bug is fixed, I'll be able to test what 139 # happens when you close stdin on a pty. My concern is that it will 140 # SIGHUP the child (since we are, in a sense, hanging up on them). 141 # But it may well be that keeping stdout open prevents the SIGHUP 142 # from being sent. 143 #if not self.command.usePTY: 144 145 if self.pending_stdin: 146 if self.debug: log.msg(" writing to stdin") 147 self.transport.write(self.pending_stdin) 148 if self.stdin_finished: 149 if self.debug: log.msg(" closing stdin") 150 self.transport.closeStdin()
151
152 - def outReceived(self, data):
153 if self.debug: 154 log.msg("ShellCommandPP.outReceived") 155 self.command.addStdout(data)
156
157 - def errReceived(self, data):
158 if self.debug: 159 log.msg("ShellCommandPP.errReceived") 160 self.command.addStderr(data)
161
162 - def processEnded(self, status_object):
163 if self.debug: 164 log.msg("ShellCommandPP.processEnded", status_object) 165 # status_object is a Failure wrapped around an 166 # error.ProcessTerminated or and error.ProcessDone. 167 # requires twisted >= 1.0.4 to overcome a bug in process.py 168 sig = status_object.value.signal 169 rc = status_object.value.exitCode 170 self.command.finished(sig, rc)
171
172 -class LogFileWatcher:
173 POLL_INTERVAL = 2 174
175 - def __init__(self, command, name, logfile, follow=False):
176 self.command = command 177 self.name = name 178 self.logfile = logfile 179 180 log.msg("LogFileWatcher created to watch %s" % logfile) 181 # we are created before the ShellCommand starts. If the logfile we're 182 # supposed to be watching already exists, record its size and 183 # ctime/mtime so we can tell when it starts to change. 184 self.old_logfile_stats = self.statFile() 185 self.started = False 186 187 # follow the file, only sending back lines 188 # added since we started watching 189 self.follow = follow 190 191 # every 2 seconds we check on the file again 192 self.poller = task.LoopingCall(self.poll)
193
194 - def start(self):
195 self.poller.start(self.POLL_INTERVAL).addErrback(self._cleanupPoll)
196
197 - def _cleanupPoll(self, err):
198 log.err(err, msg="Polling error") 199 self.poller = None
200
201 - def stop(self):
202 self.poll() 203 if self.poller is not None: 204 self.poller.stop() 205 if self.started: 206 self.f.close()
207
208 - def statFile(self):
209 if os.path.exists(self.logfile): 210 s = os.stat(self.logfile) 211 return (s[ST_CTIME], s[ST_MTIME], s[ST_SIZE]) 212 return None
213
214 - def poll(self):
215 if not self.started: 216 s = self.statFile() 217 if s == self.old_logfile_stats: 218 return # not started yet 219 if not s: 220 # the file was there, but now it's deleted. Forget about the 221 # initial state, clearly the process has deleted the logfile 222 # in preparation for creating a new one. 223 self.old_logfile_stats = None 224 return # no file to work with 225 self.f = open(self.logfile, "rb") 226 # if we only want new lines, seek to 227 # where we stat'd so we only find new 228 # lines 229 if self.follow: 230 self.f.seek(s[2], 0) 231 self.started = True 232 self.f.seek(self.f.tell(), 0) 233 while True: 234 data = self.f.read(10000) 235 if not data: 236 return 237 self.command.addLogfile(self.name, data)
238
239 240 -class ShellCommand:
241 # This is a helper class, used by SlaveCommands to run programs in a 242 # child shell. 243 244 notreally = False 245 BACKUP_TIMEOUT = 5 246 KILL = "KILL" 247 CHUNK_LIMIT = 128*1024 248 249 # Don't send any data until at least BUFFER_SIZE bytes have been collected 250 # or BUFFER_TIMEOUT elapsed 251 BUFFER_SIZE = 64*1024 252 BUFFER_TIMEOUT = 5 253 254 # For sending elapsed time: 255 startTime = None 256 elapsedTime = None 257 258 # For scheduling future events 259 _reactor = reactor 260 261 # I wish we had easy access to CLOCK_MONOTONIC in Python: 262 # http://www.opengroup.org/onlinepubs/000095399/functions/clock_getres.html 263 # Then changes to the system clock during a run wouldn't effect the "elapsed 264 # time" results. 265
266 - def __init__(self, builder, command, 267 workdir, environ=None, 268 sendStdout=True, sendStderr=True, sendRC=True, 269 timeout=None, maxTime=None, initialStdin=None, 270 keepStdinOpen=False, keepStdout=False, keepStderr=False, 271 logEnviron=True, logfiles={}, usePTY="slave-config"):
272 """ 273 274 @param keepStdout: if True, we keep a copy of all the stdout text 275 that we've seen. This copy is available in 276 self.stdout, which can be read after the command 277 has finished. 278 @param keepStderr: same, for stderr 279 280 @param usePTY: "slave-config" -> use the SlaveBuilder's usePTY; 281 otherwise, true to use a PTY, false to not use a PTY. 282 """ 283 284 self.builder = builder 285 self.command = Obfuscated.get_real(command) 286 287 # We need to take unicode commands and arguments and encode them using 288 # the appropriate encoding for the slave. This is mostly platform 289 # specific, but can be overridden in the slave's buildbot.tac file. 290 # 291 # Encoding the command line here ensures that the called executables 292 # receive arguments as bytestrings encoded with an appropriate 293 # platform-specific encoding. It also plays nicely with twisted's 294 # spawnProcess which checks that arguments are regular strings or 295 # unicode strings that can be encoded as ascii (which generates a 296 # warning). 297 if isinstance(self.command, (tuple, list)): 298 for i, a in enumerate(self.command): 299 if isinstance(a, unicode): 300 self.command[i] = a.encode(self.builder.unicode_encoding) 301 elif isinstance(self.command, unicode): 302 self.command = self.command.encode(self.builder.unicode_encoding) 303 304 self.fake_command = Obfuscated.get_fake(command) 305 self.sendStdout = sendStdout 306 self.sendStderr = sendStderr 307 self.sendRC = sendRC 308 self.logfiles = logfiles 309 self.workdir = workdir 310 if not os.path.exists(workdir): 311 os.makedirs(workdir) 312 if environ: 313 if environ.has_key('PYTHONPATH'): 314 ppath = environ['PYTHONPATH'] 315 # Need to do os.pathsep translation. We could either do that 316 # by replacing all incoming ':'s with os.pathsep, or by 317 # accepting lists. I like lists better. 318 if not isinstance(ppath, str): 319 # If it's not a string, treat it as a sequence to be 320 # turned in to a string. 321 ppath = os.pathsep.join(ppath) 322 323 environ['PYTHONPATH'] = ppath + os.pathsep + "${PYTHONPATH}" 324 325 # do substitution on variable values matching patern: ${name} 326 p = re.compile('\${([0-9a-zA-Z_]*)}') 327 def subst(match): 328 return os.environ.get(match.group(1), "")
329 newenv = {} 330 for key in os.environ.keys(): 331 # setting a key to None will delete it from the slave environment 332 if key not in environ or environ[key] is not None: 333 newenv[key] = os.environ[key] 334 for key in environ.keys(): 335 if environ[key] is not None: 336 newenv[key] = p.sub(subst, environ[key]) 337 338 self.environ = newenv 339 else: # not environ 340 self.environ = os.environ.copy() 341 self.initialStdin = initialStdin 342 self.keepStdinOpen = keepStdinOpen 343 self.logEnviron = logEnviron 344 self.timeout = timeout 345 self.timer = None 346 self.maxTime = maxTime 347 self.maxTimer = None 348 self.keepStdout = keepStdout 349 self.keepStderr = keepStderr 350 351 self.buffered = deque() 352 self.buflen = 0 353 self.buftimer = None 354 355 if usePTY == "slave-config": 356 self.usePTY = self.builder.usePTY 357 else: 358 self.usePTY = usePTY 359 360 # usePTY=True is a convenience for cleaning up all children and 361 # grandchildren of a hung command. Fall back to usePTY=False on systems 362 # and in situations where ptys cause problems. PTYs are posix-only, 363 # and for .closeStdin to matter, we must use a pipe, not a PTY 364 if runtime.platformType != "posix" or initialStdin is not None: 365 if self.usePTY and usePTY != "slave-config": 366 self.sendStatus({'header': "WARNING: disabling usePTY for this command"}) 367 self.usePTY = False 368 369 self.logFileWatchers = [] 370 for name,filevalue in self.logfiles.items(): 371 filename = filevalue 372 follow = False 373 374 # check for a dictionary of options 375 # filename is required, others are optional 376 if type(filevalue) == dict: 377 filename = filevalue['filename'] 378 follow = filevalue.get('follow', False) 379 380 w = LogFileWatcher(self, name, 381 os.path.join(self.workdir, filename), 382 follow=follow) 383 self.logFileWatchers.append(w)
384
385 - def __repr__(self):
386 return "<slavecommand.ShellCommand '%s'>" % self.fake_command
387
388 - def sendStatus(self, status):
389 self.builder.sendUpdate(status)
390
391 - def start(self):
392 # return a Deferred which fires (with the exit code) when the command 393 # completes 394 if self.keepStdout: 395 self.stdout = "" 396 if self.keepStderr: 397 self.stderr = "" 398 self.deferred = defer.Deferred() 399 try: 400 self._startCommand() 401 except: 402 log.msg("error in ShellCommand._startCommand") 403 log.err() 404 self._addToBuffers('stderr', "error in ShellCommand._startCommand\n") 405 self._addToBuffers('stderr', traceback.format_exc()) 406 self._sendBuffers() 407 # pretend it was a shell error 408 self.deferred.errback(AbandonChain(-1)) 409 return self.deferred
410
411 - def _startCommand(self):
412 # ensure workdir exists 413 if not os.path.isdir(self.workdir): 414 os.makedirs(self.workdir) 415 log.msg("ShellCommand._startCommand") 416 if self.notreally: 417 self._addToBuffers('header', "command '%s' in dir %s" % \ 418 (self.fake_command, self.workdir)) 419 self._addToBuffers('header', "(not really)\n") 420 self.finished(None, 0) 421 return 422 423 self.pp = ShellCommandPP(self) 424 425 if type(self.command) in types.StringTypes: 426 if runtime.platformType == 'win32': 427 argv = os.environ['COMSPEC'].split() # allow %COMSPEC% to have args 428 if '/c' not in argv: argv += ['/c'] 429 argv += [self.command] 430 else: 431 # for posix, use /bin/sh. for other non-posix, well, doesn't 432 # hurt to try 433 argv = ['/bin/sh', '-c', self.command] 434 display = self.fake_command 435 else: 436 # On windows, CreateProcess requires an absolute path to the executable. 437 # When we call spawnProcess below, we pass argv[0] as the executable. 438 # So, for .exe's that we have absolute paths to, we can call directly 439 # Otherwise, we should run under COMSPEC (usually cmd.exe) to 440 # handle path searching, etc. 441 if runtime.platformType == 'win32' and not \ 442 (self.command[0].lower().endswith(".exe") and os.path.isabs(self.command[0])): 443 argv = os.environ['COMSPEC'].split() # allow %COMSPEC% to have args 444 if '/c' not in argv: argv += ['/c'] 445 argv += list(self.command) 446 else: 447 argv = self.command 448 display = " ".join(self.fake_command) 449 450 # $PWD usually indicates the current directory; spawnProcess may not 451 # update this value, though, so we set it explicitly here. This causes 452 # weird problems (bug #456) on msys, though.. 453 if not self.environ.get('MACHTYPE', None) == 'i686-pc-msys': 454 self.environ['PWD'] = os.path.abspath(self.workdir) 455 456 # self.stdin is handled in ShellCommandPP.connectionMade 457 458 # first header line is the command in plain text, argv joined with 459 # spaces. You should be able to cut-and-paste this into a shell to 460 # obtain the same results. If there are spaces in the arguments, too 461 # bad. 462 log.msg(" " + display) 463 self._addToBuffers('header', display+"\n") 464 465 # then comes the secondary information 466 msg = " in dir %s" % (self.workdir,) 467 if self.timeout: 468 msg += " (timeout %d secs)" % (self.timeout,) 469 log.msg(" " + msg) 470 self._addToBuffers('header', msg+"\n") 471 472 msg = " watching logfiles %s" % (self.logfiles,) 473 log.msg(" " + msg) 474 self._addToBuffers('header', msg+"\n") 475 476 # then the obfuscated command array for resolving unambiguity 477 msg = " argv: %s" % (self.fake_command,) 478 log.msg(" " + msg) 479 self._addToBuffers('header', msg+"\n") 480 481 # then the environment, since it sometimes causes problems 482 if self.logEnviron: 483 msg = " environment:\n" 484 env_names = self.environ.keys() 485 env_names.sort() 486 for name in env_names: 487 msg += " %s=%s\n" % (name, self.environ[name]) 488 log.msg(" environment: %s" % (self.environ,)) 489 self._addToBuffers('header', msg) 490 491 if self.initialStdin: 492 msg = " writing %d bytes to stdin" % len(self.initialStdin) 493 log.msg(" " + msg) 494 self._addToBuffers('header', msg+"\n") 495 496 if self.keepStdinOpen: 497 msg = " leaving stdin open" 498 else: 499 msg = " closing stdin" 500 log.msg(" " + msg) 501 self._addToBuffers('header', msg+"\n") 502 503 msg = " using PTY: %s" % bool(self.usePTY) 504 log.msg(" " + msg) 505 self._addToBuffers('header', msg+"\n") 506 507 # this will be buffered until connectionMade is called 508 if self.initialStdin: 509 self.pp.writeStdin(self.initialStdin) 510 if not self.keepStdinOpen: 511 self.pp.closeStdin() 512 513 # win32eventreactor's spawnProcess (under twisted <= 2.0.1) returns 514 # None, as opposed to all the posixbase-derived reactors (which 515 # return the new Process object). This is a nuisance. We can make up 516 # for it by having the ProcessProtocol give us their .transport 517 # attribute after they get one. I'd prefer to get it from 518 # spawnProcess because I'm concerned about returning from this method 519 # without having a valid self.process to work with. (if kill() were 520 # called right after we return, but somehow before connectionMade 521 # were called, then kill() would blow up). 522 self.process = None 523 self.startTime = util.now(self._reactor) 524 525 p = reactor.spawnProcess(self.pp, argv[0], argv, 526 self.environ, 527 self.workdir, 528 usePTY=self.usePTY) 529 # connectionMade might have been called during spawnProcess 530 if not self.process: 531 self.process = p 532 533 # connectionMade also closes stdin as long as we're not using a PTY. 534 # This is intended to kill off inappropriately interactive commands 535 # better than the (long) hung-command timeout. ProcessPTY should be 536 # enhanced to allow the same childFDs argument that Process takes, 537 # which would let us connect stdin to /dev/null . 538 539 if self.timeout: 540 self.timer = self._reactor.callLater(self.timeout, self.doTimeout) 541 542 if self.maxTime: 543 self.maxTimer = self._reactor.callLater(self.maxTime, self.doMaxTimeout) 544 545 for w in self.logFileWatchers: 546 w.start()
547 548
549 - def _chunkForSend(self, data):
550 """ 551 limit the chunks that we send over PB to 128k, since it has a hardwired 552 string-size limit of 640k. 553 """ 554 LIMIT = self.CHUNK_LIMIT 555 for i in range(0, len(data), LIMIT): 556 yield data[i:i+LIMIT]
557
558 - def _collapseMsg(self, msg):
559 """ 560 Take msg, which is a dictionary of lists of output chunks, and 561 concatentate all the chunks into a single string 562 """ 563 retval = {} 564 for log in msg: 565 data = "".join(msg[log]) 566 if isinstance(log, tuple) and log[0] == 'log': 567 retval['log'] = (log[1], data) 568 else: 569 retval[log] = data 570 return retval
571
572 - def _sendMessage(self, msg):
573 """ 574 Collapse and send msg to the master 575 """ 576 if not msg: 577 return 578 msg = self._collapseMsg(msg) 579 self.sendStatus(msg)
580
581 - def _bufferTimeout(self):
582 self.buftimer = None 583 self._sendBuffers()
584
585 - def _sendBuffers(self):
586 """ 587 Send all the content in our buffers. 588 """ 589 msg = {} 590 msg_size = 0 591 lastlog = None 592 while self.buffered: 593 # Grab the next bits from the buffer 594 logname, data = self.buffered.popleft() 595 596 # If this log is different than the last one, then we have to send 597 # out the message so far. This is because the message is 598 # transferred as a dictionary, which makes the ordering of keys 599 # unspecified, and makes it impossible to interleave data from 600 # different logs. A future enhancement could be to change the 601 # master to support a list of (logname, data) tuples instead of a 602 # dictionary. TODO: In 0.8.0? 603 # On our first pass through this loop lastlog is None 604 if lastlog is None: 605 lastlog = logname 606 elif logname != lastlog: 607 self._sendMessage(msg) 608 msg = {} 609 msg_size = 0 610 lastlog = logname 611 612 logdata = msg.setdefault(logname, []) 613 614 # Chunkify the log data to make sure we're not sending more than 615 # CHUNK_LIMIT at a time 616 for chunk in self._chunkForSend(data): 617 logdata.append(chunk) 618 msg_size += len(chunk) 619 if msg_size > self.CHUNK_LIMIT: 620 # We've gone beyond the chunk limit, so send out our 621 # message. At worst this results in a message slightly 622 # larger than (2*CHUNK_LIMIT)-1 623 self._sendMessage(msg) 624 msg = {} 625 msg_size = 0 626 self.buflen = 0 627 self._sendMessage(msg) 628 if self.buftimer: 629 if self.buftimer.active(): 630 self.buftimer.cancel() 631 self.buftimer = None
632
633 - def _addToBuffers(self, logname, data):
634 """ 635 Add data to the buffer for logname 636 Start a timer to send the buffers if BUFFER_TIMEOUT elapses. 637 If adding data causes the buffer size to grow beyond BUFFER_SIZE, then 638 the buffers will be sent. 639 """ 640 n = len(data) 641 642 self.buflen += n 643 self.buffered.append((logname, data)) 644 if self.buflen > self.BUFFER_SIZE: 645 self._sendBuffers() 646 elif not self.buftimer: 647 self.buftimer = self._reactor.callLater(self.BUFFER_TIMEOUT, self._bufferTimeout)
648
649 - def addStdout(self, data):
650 if self.sendStdout: 651 self._addToBuffers('stdout', data) 652 653 if self.keepStdout: 654 self.stdout += data 655 if self.timer: 656 self.timer.reset(self.timeout)
657
658 - def addStderr(self, data):
659 if self.sendStderr: 660 self._addToBuffers('stderr', data) 661 662 if self.keepStderr: 663 self.stderr += data 664 if self.timer: 665 self.timer.reset(self.timeout)
666
667 - def addLogfile(self, name, data):
668 self._addToBuffers( ('log', name), data) 669 670 if self.timer: 671 self.timer.reset(self.timeout)
672
673 - def finished(self, sig, rc):
674 self.elapsedTime = util.now(self._reactor) - self.startTime 675 log.msg("command finished with signal %s, exit code %s, elapsedTime: %0.6f" % (sig,rc,self.elapsedTime)) 676 for w in self.logFileWatchers: 677 # this will send the final updates 678 w.stop() 679 self._sendBuffers() 680 if sig is not None: 681 rc = -1 682 if self.sendRC: 683 if sig is not None: 684 self.sendStatus( 685 {'header': "process killed by signal %d\n" % sig}) 686 self.sendStatus({'rc': rc}) 687 self.sendStatus({'header': "elapsedTime=%0.6f\n" % self.elapsedTime}) 688 if self.timer: 689 self.timer.cancel() 690 self.timer = None 691 if self.maxTimer: 692 self.maxTimer.cancel() 693 self.maxTimer = None 694 if self.buftimer: 695 self.buftimer.cancel() 696 self.buftimer = None 697 d = self.deferred 698 self.deferred = None 699 if d: 700 d.callback(rc) 701 else: 702 log.msg("Hey, command %s finished twice" % self)
703
704 - def failed(self, why):
705 self._sendBuffers() 706 log.msg("ShellCommand.failed: command failed: %s" % (why,)) 707 if self.timer: 708 self.timer.cancel() 709 self.timer = None 710 if self.maxTimer: 711 self.maxTimer.cancel() 712 self.maxTimer = None 713 if self.buftimer: 714 self.buftimer.cancel() 715 self.buftimer = None 716 d = self.deferred 717 self.deferred = None 718 if d: 719 d.errback(why) 720 else: 721 log.msg("Hey, command %s finished twice" % self)
722
723 - def doTimeout(self):
724 self.timer = None 725 msg = "command timed out: %d seconds without output" % self.timeout 726 self.kill(msg)
727
728 - def doMaxTimeout(self):
729 self.maxTimer = None 730 msg = "command timed out: %d seconds elapsed" % self.maxTime 731 self.kill(msg)
732
733 - def kill(self, msg):
734 # This may be called by the timeout, or when the user has decided to 735 # abort this build. 736 self._sendBuffers() 737 if self.timer: 738 self.timer.cancel() 739 self.timer = None 740 if self.maxTimer: 741 self.maxTimer.cancel() 742 self.maxTimer = None 743 if self.buftimer: 744 self.buftimer.cancel() 745 self.buftimer = None 746 if hasattr(self.process, "pid") and self.process.pid is not None: 747 msg += ", killing pid %s" % self.process.pid 748 log.msg(msg) 749 self.sendStatus({'header': "\n" + msg + "\n"}) 750 751 hit = 0 752 if runtime.platformType == "posix": 753 try: 754 # really want to kill off all child processes too. Process 755 # Groups are ideal for this, but that requires 756 # spawnProcess(usePTY=1). Try both ways in case process was 757 # not started that way. 758 759 # the test suite sets self.KILL=None to tell us we should 760 # only pretend to kill the child. This lets us test the 761 # backup timer. 762 763 sig = None 764 if self.KILL is not None: 765 sig = getattr(signal, "SIG"+ self.KILL, None) 766 767 if self.KILL == None: 768 log.msg("self.KILL==None, only pretending to kill child") 769 elif sig is None: 770 log.msg("signal module is missing SIG%s" % self.KILL) 771 elif not hasattr(os, "kill"): 772 log.msg("os module is missing the 'kill' function") 773 elif not hasattr(self.process, "pid") or self.process.pid is None: 774 log.msg("self.process has no pid") 775 else: 776 log.msg("trying os.kill(-pid, %d)" % (sig,)) 777 # TODO: maybe use os.killpg instead of a negative pid? 778 os.kill(-self.process.pid, sig) 779 log.msg(" signal %s sent successfully" % sig) 780 hit = 1 781 except OSError: 782 # probably no-such-process, maybe because there is no process 783 # group 784 pass 785 if not hit: 786 try: 787 if self.KILL is None: 788 log.msg("self.KILL==None, only pretending to kill child") 789 else: 790 log.msg("trying process.signalProcess('KILL')") 791 self.process.signalProcess(self.KILL) 792 log.msg(" signal %s sent successfully" % (self.KILL,)) 793 hit = 1 794 except OSError: 795 # could be no-such-process, because they finished very recently 796 pass 797 if not hit: 798 log.msg("signalProcess/os.kill failed both times") 799 800 if runtime.platformType == "posix": 801 # we only do this under posix because the win32eventreactor 802 # blocks here until the process has terminated, while closing 803 # stderr. This is weird. 804 self.pp.transport.loseConnection() 805 806 # finished ought to be called momentarily. Just in case it doesn't, 807 # set a timer which will abandon the command. 808 self.timer = self._reactor.callLater(self.BACKUP_TIMEOUT, 809 self.doBackupTimeout)
810
811 - def doBackupTimeout(self):
812 log.msg("we tried to kill the process, and it wouldn't die.." 813 " finish anyway") 814 self.timer = None 815 self.sendStatus({'header': "SIGKILL failed to kill process\n"}) 816 if self.sendRC: 817 self.sendStatus({'header': "using fake rc=-1\n"}) 818 self.sendStatus({'rc': -1}) 819 self.failed(TimeoutError("SIGKILL failed to kill process"))
820 821
822 - def writeStdin(self, data):
823 self.pp.writeStdin(data)
824
825 - def closeStdin(self):
826 self.pp.closeStdin()
827
828 829 -class Command:
830 implements(ISlaveCommand) 831 832 """This class defines one command that can be invoked by the build master. 833 The command is executed on the slave side, and always sends back a 834 completion message when it finishes. It may also send intermediate status 835 as it runs (by calling builder.sendStatus). Some commands can be 836 interrupted (either by the build master or a local timeout), in which 837 case the step is expected to complete normally with a status message that 838 indicates an error occurred. 839 840 These commands are used by BuildSteps on the master side. Each kind of 841 BuildStep uses a single Command. The slave must implement all the 842 Commands required by the set of BuildSteps used for any given build: 843 this is checked at startup time. 844 845 All Commands are constructed with the same signature: 846 c = CommandClass(builder, args) 847 where 'builder' is the parent SlaveBuilder object, and 'args' is a 848 dict that is interpreted per-command. 849 850 The setup(args) method is available for setup, and is run from __init__. 851 852 The Command is started with start(). This method must be implemented in a 853 subclass, and it should return a Deferred. When your step is done, you 854 should fire the Deferred (the results are not used). If the command is 855 interrupted, it should fire the Deferred anyway. 856 857 While the command runs. it may send status messages back to the 858 buildmaster by calling self.sendStatus(statusdict). The statusdict is 859 interpreted by the master-side BuildStep however it likes. 860 861 A separate completion message is sent when the deferred fires, which 862 indicates that the Command has finished, but does not carry any status 863 data. If the Command needs to return an exit code of some sort, that 864 should be sent as a regular status message before the deferred is fired . 865 Once builder.commandComplete has been run, no more status messages may be 866 sent. 867 868 If interrupt() is called, the Command should attempt to shut down as 869 quickly as possible. Child processes should be killed, new ones should 870 not be started. The Command should send some kind of error status update, 871 then complete as usual by firing the Deferred. 872 873 .interrupted should be set by interrupt(), and can be tested to avoid 874 sending multiple error status messages. 875 876 If .running is False, the bot is shutting down (or has otherwise lost the 877 connection to the master), and should not send any status messages. This 878 is checked in Command.sendStatus . 879 880 """ 881 882 # builder methods: 883 # sendStatus(dict) (zero or more) 884 # commandComplete() or commandInterrupted() (one, at end) 885 886 debug = False 887 interrupted = False 888 running = False # set by Builder, cleared on shutdown or when the 889 # Deferred fires 890 891 _reactor = reactor 892
893 - def __init__(self, builder, stepId, args):
894 self.builder = builder 895 self.stepId = stepId # just for logging 896 self.args = args 897 self.setup(args)
898
899 - def setup(self, args):
900 """Override this in a subclass to extract items from the args dict.""" 901 pass
902
903 - def doStart(self):
904 self.running = True 905 d = defer.maybeDeferred(self.start) 906 d.addBoth(self.commandComplete) 907 return d
908
909 - def start(self):
910 """Start the command. This method should return a Deferred that will 911 fire when the command has completed. The Deferred's argument will be 912 ignored. 913 914 This method should be overridden by subclasses.""" 915 raise NotImplementedError, "You must implement this in a subclass"
916
917 - def sendStatus(self, status):
918 """Send a status update to the master.""" 919 if self.debug: 920 log.msg("sendStatus", status) 921 if not self.running: 922 log.msg("would sendStatus but not .running") 923 return 924 self.builder.sendUpdate(status)
925
926 - def doInterrupt(self):
927 self.running = False 928 self.interrupt()
929
930 - def interrupt(self):
931 """Override this in a subclass to allow commands to be interrupted. 932 May be called multiple times, test and set self.interrupted=True if 933 this matters.""" 934 pass
935
936 - def commandComplete(self, res):
937 self.running = False 938 return res
939 940 # utility methods, mostly used by SlaveShellCommand and the like 941
942 - def _abandonOnFailure(self, rc):
943 if type(rc) is not int: 944 log.msg("weird, _abandonOnFailure was given rc=%s (%s)" % \ 945 (rc, type(rc))) 946 assert isinstance(rc, int) 947 if rc != 0: 948 raise AbandonChain(rc) 949 return rc
950
951 - def _sendRC(self, res):
952 self.sendStatus({'rc': 0})
953
954 - def _checkAbandoned(self, why):
955 log.msg("_checkAbandoned", why) 956 why.trap(AbandonChain) 957 log.msg(" abandoning chain", why.value) 958 self.sendStatus({'rc': why.value.args[0]}) 959 return None
960
961 962 963 -class SlaveShellCommand(Command):
964 """This is a Command which runs a shell command. The args dict contains 965 the following keys: 966 967 - ['command'] (required): a shell command to run. If this is a string, 968 it will be run with /bin/sh (['/bin/sh', 969 '-c', command]). If it is a list 970 (preferred), it will be used directly. 971 - ['workdir'] (required): subdirectory in which the command will be 972 run, relative to the builder dir 973 - ['env']: a dict of environment variables to augment/replace 974 os.environ . PYTHONPATH is treated specially, and 975 should be a list of path components to be prepended to 976 any existing PYTHONPATH environment variable. 977 - ['initial_stdin']: a string which will be written to the command's 978 stdin as soon as it starts 979 - ['keep_stdin_open']: unless True, the command's stdin will be 980 closed as soon as initial_stdin has been 981 written. Set this to True if you plan to write 982 to stdin after the command has been started. 983 - ['want_stdout']: 0 if stdout should be thrown away 984 - ['want_stderr']: 0 if stderr should be thrown away 985 - ['usePTY']: True or False if the command should use a PTY (defaults to 986 configuration of the slave) 987 - ['not_really']: 1 to skip execution and return rc=0 988 - ['timeout']: seconds of silence to tolerate before killing command 989 - ['maxTime']: seconds before killing command 990 - ['logfiles']: dict mapping LogFile name to the workdir-relative 991 filename of a local log file. This local file will be 992 watched just like 'tail -f', and all changes will be 993 written to 'log' status updates. 994 - ['logEnviron']: False to not log the environment variables on the slave 995 996 ShellCommand creates the following status messages: 997 - {'stdout': data} : when stdout data is available 998 - {'stderr': data} : when stderr data is available 999 - {'header': data} : when headers (command start/stop) are available 1000 - {'log': (logfile_name, data)} : when log files have new contents 1001 - {'rc': rc} : when the process has terminated 1002 """ 1003
1004 - def start(self):
1005 args = self.args 1006 # args['workdir'] is relative to Builder directory, and is required. 1007 assert args['workdir'] is not None 1008 workdir = os.path.join(self.builder.basedir, args['workdir']) 1009 1010 c = ShellCommand(self.builder, args['command'], 1011 workdir, environ=args.get('env'), 1012 timeout=args.get('timeout', None), 1013 maxTime=args.get('maxTime', None), 1014 sendStdout=args.get('want_stdout', True), 1015 sendStderr=args.get('want_stderr', True), 1016 sendRC=True, 1017 initialStdin=args.get('initial_stdin'), 1018 keepStdinOpen=args.get('keep_stdin_open'), 1019 logfiles=args.get('logfiles', {}), 1020 usePTY=args.get('usePTY', "slave-config"), 1021 logEnviron=args.get('logEnviron', True), 1022 ) 1023 c._reactor = self._reactor 1024 self.command = c 1025 d = self.command.start() 1026 return d
1027
1028 - def interrupt(self):
1029 self.interrupted = True 1030 self.command.kill("command interrupted")
1031
1032 - def writeStdin(self, data):
1033 self.command.writeStdin(data)
1034
1035 - def closeStdin(self):
1036 self.command.closeStdin()
1037 1038 registerSlaveCommand("shell", SlaveShellCommand, command_version)
1039 1040 1041 -class DummyCommand(Command):
1042 """ 1043 I am a dummy no-op command that by default takes 5 seconds to complete. 1044 See L{buildbot.steps.dummy.RemoteDummy} 1045 """ 1046
1047 - def start(self):
1048 self.d = defer.Deferred() 1049 log.msg(" starting dummy command [%s]" % self.stepId) 1050 self.timer = self._reactor.callLater(1, self.doStatus) 1051 return self.d
1052
1053 - def interrupt(self):
1054 if self.interrupted: 1055 return 1056 self.timer.cancel() 1057 self.timer = None 1058 self.interrupted = True 1059 self.finished()
1060
1061 - def doStatus(self):
1062 log.msg(" sending intermediate status") 1063 self.sendStatus({'stdout': 'data'}) 1064 timeout = self.args.get('timeout', 5) + 1 1065 self.timer = self._reactor.callLater(timeout - 1, self.finished)
1066
1067 - def finished(self):
1068 log.msg(" dummy command finished [%s]" % self.stepId) 1069 if self.interrupted: 1070 self.sendStatus({'rc': 1}) 1071 else: 1072 self.sendStatus({'rc': 0}) 1073 self.d.callback(0)
1074 1075 registerSlaveCommand("dummy", DummyCommand, command_version)
1076 1077 1078 # this maps handle names to a callable. When the WaitCommand starts, this 1079 # callable is invoked with no arguments. It should return a Deferred. When 1080 # that Deferred fires, our WaitCommand will finish. 1081 waitCommandRegistry = {} 1082 1083 -class WaitCommand(Command):
1084 """ 1085 I am a dummy command used by the buildbot unit test suite. I want for the 1086 unit test to tell us to finish. See L{buildbot.steps.dummy.Wait} 1087 """ 1088
1089 - def start(self):
1090 self.d = defer.Deferred() 1091 log.msg(" starting wait command [%s]" % self.stepId) 1092 handle = self.args['handle'] 1093 cb = waitCommandRegistry[handle] 1094 del waitCommandRegistry[handle] 1095 def _called(): 1096 log.msg(" wait-%s starting" % (handle,)) 1097 d = cb() 1098 def _done(res): 1099 log.msg(" wait-%s finishing: %s" % (handle, res)) 1100 return res
1101 d.addBoth(_done) 1102 d.addCallbacks(self.finished, self.failed)
1103 self._reactor.callLater(0, _called) 1104 return self.d 1105
1106 - def interrupt(self):
1107 log.msg(" wait command interrupted") 1108 if self.interrupted: 1109 return 1110 self.interrupted = True 1111 self.finished("interrupted")
1112
1113 - def finished(self, res):
1114 log.msg(" wait command finished [%s]" % self.stepId) 1115 if self.interrupted: 1116 self.sendStatus({'rc': 2}) 1117 else: 1118 self.sendStatus({'rc': 0}) 1119 self.d.callback(0)
1120 - def failed(self, why):
1121 log.msg(" wait command failed [%s]" % self.stepId) 1122 self.sendStatus({'rc': 1}) 1123 self.d.callback(0)
1124 1125 registerSlaveCommand("dummy.wait", WaitCommand, command_version) 1126