1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """
17 Support for running 'shell commands'
18 """
19
20 import sys
21 import os
22 import signal
23 import types
24 import re
25 import subprocess
26 import traceback
27 import stat
28 from collections import deque
29 from tempfile import NamedTemporaryFile
30
31 from twisted.python import runtime, log
32 from twisted.python.win32 import quoteArguments
33 from twisted.internet import reactor, defer, protocol, task, error
34
35 from buildslave import util
36 from buildslave.exceptions import AbandonChain
37
38 if runtime.platformType == 'posix':
39 from twisted.internet.process import Process
40
42
43
44
45
46
47
48
49
50
51
52
53 if runtime.platformType == 'win32':
54 return " ".join([ `e` for e in cmd_list ])
55 else:
56 import pipes
57 def quote(e):
58 if not e:
59 return '""'
60 return pipes.quote(e)
61 return " ".join([ quote(e) for e in cmd_list ])
62
64 POLL_INTERVAL = 2
65
66 - def __init__(self, command, name, logfile, follow=False):
84
87
91
98
100 if os.path.exists(self.logfile):
101 s = os.stat(self.logfile)
102 return (s[stat.ST_CTIME], s[stat.ST_MTIME], s[stat.ST_SIZE])
103 return None
104
106 if not self.started:
107 s = self.statFile()
108 if s == self.old_logfile_stats:
109 return
110 if not s:
111
112
113
114 self.old_logfile_stats = None
115 return
116 self.f = open(self.logfile, "rb")
117
118
119
120 if self.follow:
121 self.f.seek(s[2], 0)
122 self.started = True
123 self.f.seek(self.f.tell(), 0)
124 while True:
125 data = self.f.read(10000)
126 if not data:
127 return
128 self.command.addLogfile(self.name, data)
129
130
131 if runtime.platformType == 'posix':
133 """Simple subclass of Process to also make the spawned process a process
134 group leader, so we can kill all members of the process group."""
135
137 Process._setupChild(self, *args, **kwargs)
138
139
140
141
142 os.setpgid(0, 0)
143
144
146 debug = False
147
149 self.command = command
150 self.pending_stdin = ""
151 self.stdin_finished = False
152 self.killed = False
153
155 assert not self.connected
156 self.pending_stdin = data
157
159 if self.debug:
160 log.msg("RunProcessPP.connectionMade")
161
162 if self.command.useProcGroup:
163 if self.debug:
164 log.msg(" recording pid %d as subprocess pgid"
165 % (self.transport.pid,))
166 self.transport.pgid = self.transport.pid
167
168 if self.pending_stdin:
169 if self.debug: log.msg(" writing to stdin")
170 self.transport.write(self.pending_stdin)
171 if self.debug: log.msg(" closing stdin")
172 self.transport.closeStdin()
173
178
183
185 if self.debug:
186 log.msg("RunProcessPP.processEnded", status_object)
187
188
189
190 sig = status_object.value.signal
191 rc = status_object.value.exitCode
192
193
194
195
196 if self.killed and rc == 0:
197 log.msg("process was killed, but exited with status 0; faking a failure")
198
199 if runtime.platformType == 'win32':
200 rc = 1
201 else:
202 rc = -1
203 self.command.finished(sig, rc)
204
205
207 """
208 This is a helper class, used by slave commands to run programs in a child
209 shell.
210 """
211
212 notreally = False
213 BACKUP_TIMEOUT = 5
214 interruptSignal = "KILL"
215 CHUNK_LIMIT = 128*1024
216
217
218
219 BUFFER_SIZE = 64*1024
220 BUFFER_TIMEOUT = 5
221
222
223 startTime = None
224 elapsedTime = None
225
226
227 _reactor = reactor
228
229
230
231
232
233
234 - def __init__(self, builder, command,
235 workdir, environ=None,
236 sendStdout=True, sendStderr=True, sendRC=True,
237 timeout=None, maxTime=None, initialStdin=None,
238 keepStdout=False, keepStderr=False,
239 logEnviron=True, logfiles={}, usePTY="slave-config",
240 useProcGroup=True):
241 """
242
243 @param keepStdout: if True, we keep a copy of all the stdout text
244 that we've seen. This copy is available in
245 self.stdout, which can be read after the command
246 has finished.
247 @param keepStderr: same, for stderr
248
249 @param usePTY: "slave-config" -> use the SlaveBuilder's usePTY;
250 otherwise, true to use a PTY, false to not use a PTY.
251
252 @param useProcGroup: (default True) use a process group for non-PTY
253 process invocations
254 """
255
256 self.builder = builder
257
258
259
260
261
262
263
264
265
266
267
268 def to_str(cmd):
269 if isinstance(cmd, (tuple, list)):
270 for i, a in enumerate(cmd):
271 if isinstance(a, unicode):
272 cmd[i] = a.encode(self.builder.unicode_encoding)
273 elif isinstance(cmd, unicode):
274 cmd = cmd.encode(self.builder.unicode_encoding)
275 return cmd
276
277 self.command = to_str(util.Obfuscated.get_real(command))
278 self.fake_command = to_str(util.Obfuscated.get_fake(command))
279
280 self.sendStdout = sendStdout
281 self.sendStderr = sendStderr
282 self.sendRC = sendRC
283 self.logfiles = logfiles
284 self.workdir = workdir
285 self.process = None
286 if not os.path.exists(workdir):
287 os.makedirs(workdir)
288 if environ:
289 for key, v in environ.iteritems():
290 if isinstance(v, list):
291
292
293
294
295
296 environ[key] = os.pathsep.join(environ[key])
297
298 if environ.has_key('PYTHONPATH'):
299 environ['PYTHONPATH'] += os.pathsep + "${PYTHONPATH}"
300
301
302 p = re.compile('\${([0-9a-zA-Z_]*)}')
303 def subst(match):
304 return os.environ.get(match.group(1), "")
305 newenv = {}
306 for key in os.environ.keys():
307
308 if key not in environ or environ[key] is not None:
309 newenv[key] = os.environ[key]
310 for key, v in environ.iteritems():
311 if v is not None:
312 if not isinstance(v, basestring):
313 raise RuntimeError("'env' values must be strings or "
314 "lists; key '%s' is incorrect" % (key,))
315 newenv[key] = p.sub(subst, v)
316
317 self.environ = newenv
318 else:
319 self.environ = os.environ.copy()
320 self.initialStdin = initialStdin
321 self.logEnviron = logEnviron
322 self.timeout = timeout
323 self.timer = None
324 self.maxTime = maxTime
325 self.maxTimer = None
326 self.keepStdout = keepStdout
327 self.keepStderr = keepStderr
328
329 self.buffered = deque()
330 self.buflen = 0
331 self.buftimer = None
332
333 if usePTY == "slave-config":
334 self.usePTY = self.builder.usePTY
335 else:
336 self.usePTY = usePTY
337
338
339
340
341
342 if runtime.platformType != "posix" or initialStdin is not None:
343 if self.usePTY and usePTY != "slave-config":
344 self.sendStatus({'header': "WARNING: disabling usePTY for this command"})
345 self.usePTY = False
346
347
348
349 if runtime.platformType != 'posix':
350 useProcGroup = False
351 elif self.usePTY:
352 useProcGroup = True
353 self.useProcGroup = useProcGroup
354
355 self.logFileWatchers = []
356 for name,filevalue in self.logfiles.items():
357 filename = filevalue
358 follow = False
359
360
361
362 if type(filevalue) == dict:
363 filename = filevalue['filename']
364 follow = filevalue.get('follow', False)
365
366 w = LogFileWatcher(self, name,
367 os.path.join(self.workdir, filename),
368 follow=follow)
369 self.logFileWatchers.append(w)
370
372 return "<%s '%s'>" % (self.__class__.__name__, self.fake_command)
373
376
378
379
380 if self.keepStdout:
381 self.stdout = ""
382 if self.keepStderr:
383 self.stderr = ""
384 self.deferred = defer.Deferred()
385 try:
386 self._startCommand()
387 except:
388 log.msg("error in RunProcess._startCommand")
389 log.err()
390 self._addToBuffers('stderr', "error in RunProcess._startCommand\n")
391 self._addToBuffers('stderr', traceback.format_exc())
392 self._sendBuffers()
393
394 self.deferred.errback(AbandonChain(-1))
395 return self.deferred
396
398
399 if not os.path.isdir(self.workdir):
400 os.makedirs(self.workdir)
401 log.msg("RunProcess._startCommand")
402 if self.notreally:
403 self._addToBuffers('header', "command '%s' in dir %s" % \
404 (self.fake_command, self.workdir))
405 self._addToBuffers('header', "(not really)\n")
406 self.finished(None, 0)
407 return
408
409 self.pp = RunProcessPP(self)
410
411 self.using_comspec = False
412 if type(self.command) in types.StringTypes:
413 if runtime.platformType == 'win32':
414 argv = os.environ['COMSPEC'].split()
415 if '/c' not in argv: argv += ['/c']
416 argv += [self.command]
417 self.using_comspec = True
418 else:
419
420
421 argv = ['/bin/sh', '-c', self.command]
422 display = self.fake_command
423 else:
424
425
426
427
428
429 if runtime.platformType == 'win32' and not \
430 (self.command[0].lower().endswith(".exe") and os.path.isabs(self.command[0])):
431 argv = os.environ['COMSPEC'].split()
432 if '/c' not in argv: argv += ['/c']
433 argv += list(self.command)
434 self.using_comspec = True
435 else:
436 argv = self.command
437
438 display = shell_quote(self.fake_command)
439
440
441
442
443 if not self.environ.get('MACHTYPE', None) == 'i686-pc-msys':
444 self.environ['PWD'] = os.path.abspath(self.workdir)
445
446
447
448 log.msg(" " + display)
449 self._addToBuffers('header', display+"\n")
450
451
452 msg = " in dir %s" % (self.workdir,)
453 if self.timeout:
454 if self.timeout == 1:
455 unit = "sec"
456 else:
457 unit = "secs"
458 msg += " (timeout %d %s)" % (self.timeout, unit)
459 if self.maxTime:
460 if self.maxTime == 1:
461 unit = "sec"
462 else:
463 unit = "secs"
464 msg += " (maxTime %d %s)" % (self.maxTime, unit)
465 log.msg(" " + msg)
466 self._addToBuffers('header', msg+"\n")
467
468 msg = " watching logfiles %s" % (self.logfiles,)
469 log.msg(" " + msg)
470 self._addToBuffers('header', msg+"\n")
471
472
473 msg = " argv: %s" % (self.fake_command,)
474 log.msg(" " + msg)
475 self._addToBuffers('header', msg+"\n")
476
477
478 if self.logEnviron:
479 msg = " environment:\n"
480 env_names = self.environ.keys()
481 env_names.sort()
482 for name in env_names:
483 msg += " %s=%s\n" % (name, self.environ[name])
484 log.msg(" environment: %s" % (self.environ,))
485 self._addToBuffers('header', msg)
486
487 if self.initialStdin:
488 msg = " writing %d bytes to stdin" % len(self.initialStdin)
489 log.msg(" " + msg)
490 self._addToBuffers('header', msg+"\n")
491
492 msg = " using PTY: %s" % bool(self.usePTY)
493 log.msg(" " + msg)
494 self._addToBuffers('header', msg+"\n")
495
496
497
498 if self.initialStdin:
499 self.pp.setStdin(self.initialStdin)
500
501 self.startTime = util.now(self._reactor)
502
503
504
505 self.process = self._spawnProcess(
506 self.pp, argv[0], argv,
507 self.environ,
508 self.workdir,
509 usePTY=self.usePTY)
510
511
512
513 if self.timeout:
514 self.timer = self._reactor.callLater(self.timeout, self.doTimeout)
515
516 if self.maxTime:
517 self.maxTimer = self._reactor.callLater(self.maxTime, self.doMaxTimeout)
518
519 for w in self.logFileWatchers:
520 w.start()
521
522 - def _spawnProcess(self, processProtocol, executable, args=(), env={},
523 path=None, uid=None, gid=None, usePTY=False, childFDs=None):
524 """private implementation of reactor.spawnProcess, to allow use of
525 L{ProcGroupProcess}"""
526
527
528 if runtime.platformType == 'posix':
529 if self.useProcGroup and not usePTY:
530 return ProcGroupProcess(reactor, executable, args, env, path,
531 processProtocol, uid, gid, childFDs)
532
533
534 if self.using_comspec:
535 return self._spawnAsBatch(processProtocol, executable, args, env,
536 path, usePTY=usePTY)
537 else:
538 return reactor.spawnProcess(processProtocol, executable, args, env,
539 path, usePTY=usePTY)
540
541 - def _spawnAsBatch(self, processProtocol, executable, args, env,
542 path, usePTY):
543 """A cheat that routes around the impedance mismatch between
544 twisted and cmd.exe with respect to escaping quotes"""
545
546 tf = NamedTemporaryFile(dir='.',suffix=".bat",delete=False)
547
548 tf.write( "@echo off\n" )
549 if type(self.command) in types.StringTypes:
550 tf.write( self.command )
551 else:
552 def maybe_escape_pipes(arg):
553 if arg != '|':
554 return arg.replace('|','^|')
555 else:
556 return '|'
557 cmd = [maybe_escape_pipes(arg) for arg in self.command]
558 tf.write( quoteArguments(cmd) )
559 tf.close()
560
561 argv = os.environ['COMSPEC'].split()
562 if '/c' not in argv: argv += ['/c']
563 argv += [tf.name]
564
565 def unlink_temp(result):
566 os.unlink(tf.name)
567 return result
568 self.deferred.addBoth(unlink_temp)
569
570 return reactor.spawnProcess(processProtocol, executable, argv, env,
571 path, usePTY=usePTY)
572
574 """
575 limit the chunks that we send over PB to 128k, since it has a hardwired
576 string-size limit of 640k.
577 """
578 LIMIT = self.CHUNK_LIMIT
579 for i in range(0, len(data), LIMIT):
580 yield data[i:i+LIMIT]
581
583 """
584 Take msg, which is a dictionary of lists of output chunks, and
585 concatentate all the chunks into a single string
586 """
587 retval = {}
588 for log in msg:
589 data = "".join(msg[log])
590 if isinstance(log, tuple) and log[0] == 'log':
591 retval['log'] = (log[1], data)
592 else:
593 retval[log] = data
594 return retval
595
597 """
598 Collapse and send msg to the master
599 """
600 if not msg:
601 return
602 msg = self._collapseMsg(msg)
603 self.sendStatus(msg)
604
606 self.buftimer = None
607 self._sendBuffers()
608
610 """
611 Send all the content in our buffers.
612 """
613 msg = {}
614 msg_size = 0
615 lastlog = None
616 logdata = []
617 while self.buffered:
618
619 logname, data = self.buffered.popleft()
620
621
622
623
624
625
626
627
628
629 if lastlog is None:
630 lastlog = logname
631 elif logname != lastlog:
632 self._sendMessage(msg)
633 msg = {}
634 msg_size = 0
635 lastlog = logname
636
637 logdata = msg.setdefault(logname, [])
638
639
640
641 for chunk in self._chunkForSend(data):
642 if len(chunk) == 0: continue
643 logdata.append(chunk)
644 msg_size += len(chunk)
645 if msg_size >= self.CHUNK_LIMIT:
646
647
648
649 self._sendMessage(msg)
650 msg = {}
651 logdata = msg.setdefault(logname, [])
652 msg_size = 0
653 self.buflen = 0
654 if logdata:
655 self._sendMessage(msg)
656 if self.buftimer:
657 if self.buftimer.active():
658 self.buftimer.cancel()
659 self.buftimer = None
660
662 """
663 Add data to the buffer for logname
664 Start a timer to send the buffers if BUFFER_TIMEOUT elapses.
665 If adding data causes the buffer size to grow beyond BUFFER_SIZE, then
666 the buffers will be sent.
667 """
668 n = len(data)
669
670 self.buflen += n
671 self.buffered.append((logname, data))
672 if self.buflen > self.BUFFER_SIZE:
673 self._sendBuffers()
674 elif not self.buftimer:
675 self.buftimer = self._reactor.callLater(self.BUFFER_TIMEOUT, self._bufferTimeout)
676
678 if self.sendStdout:
679 self._addToBuffers('stdout', data)
680
681 if self.keepStdout:
682 self.stdout += data
683 if self.timer:
684 self.timer.reset(self.timeout)
685
687 if self.sendStderr:
688 self._addToBuffers('stderr', data)
689
690 if self.keepStderr:
691 self.stderr += data
692 if self.timer:
693 self.timer.reset(self.timeout)
694
700
702 self.elapsedTime = util.now(self._reactor) - self.startTime
703 log.msg("command finished with signal %s, exit code %s, elapsedTime: %0.6f" % (sig,rc,self.elapsedTime))
704 for w in self.logFileWatchers:
705
706 w.stop()
707 self._sendBuffers()
708 if sig is not None:
709 rc = -1
710 if self.sendRC:
711 if sig is not None:
712 self.sendStatus(
713 {'header': "process killed by signal %d\n" % sig})
714 self.sendStatus({'rc': rc})
715 self.sendStatus({'header': "elapsedTime=%0.6f\n" % self.elapsedTime})
716 if self.timer:
717 self.timer.cancel()
718 self.timer = None
719 if self.maxTimer:
720 self.maxTimer.cancel()
721 self.maxTimer = None
722 if self.buftimer:
723 self.buftimer.cancel()
724 self.buftimer = None
725 d = self.deferred
726 self.deferred = None
727 if d:
728 d.callback(rc)
729 else:
730 log.msg("Hey, command %s finished twice" % self)
731
733 self._sendBuffers()
734 log.msg("RunProcess.failed: command failed: %s" % (why,))
735 if self.timer:
736 self.timer.cancel()
737 self.timer = None
738 if self.maxTimer:
739 self.maxTimer.cancel()
740 self.maxTimer = None
741 if self.buftimer:
742 self.buftimer.cancel()
743 self.buftimer = None
744 d = self.deferred
745 self.deferred = None
746 if d:
747 d.errback(why)
748 else:
749 log.msg("Hey, command %s finished twice" % self)
750
752 self.timer = None
753 msg = "command timed out: %d seconds without output" % self.timeout
754 self.kill(msg)
755
757 self.maxTimer = None
758 msg = "command timed out: %d seconds elapsed" % self.maxTime
759 self.kill(msg)
760
761 - def kill(self, msg):
762
763
764 self._sendBuffers()
765 if self.timer:
766 self.timer.cancel()
767 self.timer = None
768 if self.maxTimer:
769 self.maxTimer.cancel()
770 self.maxTimer = None
771 if self.buftimer:
772 self.buftimer.cancel()
773 self.buftimer = None
774 msg += ", attempting to kill"
775 log.msg(msg)
776 self.sendStatus({'header': "\n" + msg + "\n"})
777
778
779
780 self.pp.killed = True
781
782
783 hit = 0
784
785
786 if not hit and self.useProcGroup and runtime.platformType == "posix":
787 sig = getattr(signal, "SIG"+ self.interruptSignal, None)
788
789 if sig is None:
790 log.msg("signal module is missing SIG%s" % self.interruptSignal)
791 elif not hasattr(os, "kill"):
792 log.msg("os module is missing the 'kill' function")
793 elif self.process.pgid is None:
794 log.msg("self.process has no pgid")
795 else:
796 log.msg("trying to kill process group %d" %
797 (self.process.pgid,))
798 try:
799 os.kill(-self.process.pgid, sig)
800 log.msg(" signal %s sent successfully" % sig)
801 self.process.pgid = None
802 hit = 1
803 except OSError:
804 log.msg('failed to kill process group (ignored): %s' %
805 (sys.exc_info()[1],))
806
807
808 pass
809
810 elif runtime.platformType == "win32":
811 if self.interruptSignal == None:
812 log.msg("self.interruptSignal==None, only pretending to kill child")
813 elif self.process.pid is not None:
814 log.msg("using TASKKILL /F PID /T to kill pid %s" % self.process.pid)
815 subprocess.check_call("TASKKILL /F /PID %s /T" % self.process.pid)
816 log.msg("taskkill'd pid %s" % self.process.pid)
817 hit = 1
818
819
820 if not hit:
821 try:
822 log.msg("trying process.signalProcess('%s')" % (self.interruptSignal,))
823 self.process.signalProcess(self.interruptSignal)
824 log.msg(" signal %s sent successfully" % (self.interruptSignal,))
825 hit = 1
826 except OSError:
827 log.err("from process.signalProcess:")
828
829 pass
830 except error.ProcessExitedAlready:
831 log.msg("Process exited already - can't kill")
832
833
834 pass
835
836 if not hit:
837 log.msg("signalProcess/os.kill failed both times")
838
839 if runtime.platformType == "posix":
840
841
842
843 self.pp.transport.loseConnection()
844
845 if self.deferred:
846
847
848 self.timer = self._reactor.callLater(self.BACKUP_TIMEOUT,
849 self.doBackupTimeout)
850
852 log.msg("we tried to kill the process, and it wouldn't die.."
853 " finish anyway")
854 self.timer = None
855 self.sendStatus({'header': "SIGKILL failed to kill process\n"})
856 if self.sendRC:
857 self.sendStatus({'header': "using fake rc=-1\n"})
858 self.sendStatus({'rc': -1})
859 self.failed(RuntimeError("SIGKILL failed to kill process"))
860