1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """
17 Support for running 'shell commands'
18 """
19
20 import sys
21 import os
22 import signal
23 import types
24 import re
25 import subprocess
26 import traceback
27 import stat
28 from collections import deque
29
30 from twisted.python import runtime, log
31 from twisted.internet import reactor, defer, protocol, task, error
32
33 from buildslave import util
34 from buildslave.exceptions import AbandonChain
35
36 if runtime.platformType == 'posix':
37 from twisted.internet.process import Process
38
40
41
42
43
44
45
46
47
48
49
50
51 if runtime.platformType == 'win32':
52 return " ".join([ `e` for e in cmd_list ])
53 else:
54 import pipes
55 def quote(e):
56 if not e:
57 return '""'
58 return pipes.quote(e)
59 return " ".join([ quote(e) for e in cmd_list ])
60
62 POLL_INTERVAL = 2
63
64 - def __init__(self, command, name, logfile, follow=False):
82
85
87 log.err(err, msg="Polling error")
88 self.poller = None
89
91 self.poll()
92 if self.poller is not None:
93 self.poller.stop()
94 if self.started:
95 self.f.close()
96
98 if os.path.exists(self.logfile):
99 s = os.stat(self.logfile)
100 return (s[stat.ST_CTIME], s[stat.ST_MTIME], s[stat.ST_SIZE])
101 return None
102
104 if not self.started:
105 s = self.statFile()
106 if s == self.old_logfile_stats:
107 return
108 if not s:
109
110
111
112 self.old_logfile_stats = None
113 return
114 self.f = open(self.logfile, "rb")
115
116
117
118 if self.follow:
119 self.f.seek(s[2], 0)
120 self.started = True
121 self.f.seek(self.f.tell(), 0)
122 while True:
123 data = self.f.read(10000)
124 if not data:
125 return
126 self.command.addLogfile(self.name, data)
127
128
129 if runtime.platformType == 'posix':
131 """Simple subclass of Process to also make the spawned process a process
132 group leader, so we can kill all members of the process group."""
133
135 Process._setupChild(self, *args, **kwargs)
136
137
138
139
140 os.setpgid(0, 0)
141
142
144 debug = False
145
147 self.command = command
148 self.pending_stdin = ""
149 self.stdin_finished = False
150 self.killed = False
151
153 assert not self.connected
154 self.pending_stdin = data
155
157 if self.debug:
158 log.msg("RunProcessPP.connectionMade")
159
160 if self.command.useProcGroup:
161 if self.debug:
162 log.msg(" recording pid %d as subprocess pgid"
163 % (self.transport.pid,))
164 self.transport.pgid = self.transport.pid
165
166 if self.pending_stdin:
167 if self.debug: log.msg(" writing to stdin")
168 self.transport.write(self.pending_stdin)
169 if self.debug: log.msg(" closing stdin")
170 self.transport.closeStdin()
171
176
181
183 if self.debug:
184 log.msg("RunProcessPP.processEnded", status_object)
185
186
187
188 sig = status_object.value.signal
189 rc = status_object.value.exitCode
190
191
192
193
194 if self.killed and rc == 0:
195 log.msg("process was killed, but exited with status 0; faking a failure")
196
197 if runtime.platformType == 'win32':
198 rc = 1
199 else:
200 rc = -1
201 self.command.finished(sig, rc)
202
203
205 """
206 This is a helper class, used by slave commands to run programs in a child
207 shell.
208 """
209
210 notreally = False
211 BACKUP_TIMEOUT = 5
212 interruptSignal = "KILL"
213 CHUNK_LIMIT = 128*1024
214
215
216
217 BUFFER_SIZE = 64*1024
218 BUFFER_TIMEOUT = 5
219
220
221 startTime = None
222 elapsedTime = None
223
224
225 _reactor = reactor
226
227
228
229
230
231
232 - def __init__(self, builder, command,
233 workdir, environ=None,
234 sendStdout=True, sendStderr=True, sendRC=True,
235 timeout=None, maxTime=None, initialStdin=None,
236 keepStdout=False, keepStderr=False,
237 logEnviron=True, logfiles={}, usePTY="slave-config",
238 useProcGroup=True):
239 """
240
241 @param keepStdout: if True, we keep a copy of all the stdout text
242 that we've seen. This copy is available in
243 self.stdout, which can be read after the command
244 has finished.
245 @param keepStderr: same, for stderr
246
247 @param usePTY: "slave-config" -> use the SlaveBuilder's usePTY;
248 otherwise, true to use a PTY, false to not use a PTY.
249
250 @param useProcGroup: (default True) use a process group for non-PTY
251 process invocations
252 """
253
254 self.builder = builder
255
256
257
258
259
260
261
262
263
264
265
266 def to_str(cmd):
267 if isinstance(cmd, (tuple, list)):
268 for i, a in enumerate(cmd):
269 if isinstance(a, unicode):
270 cmd[i] = a.encode(self.builder.unicode_encoding)
271 elif isinstance(cmd, unicode):
272 cmd = cmd.encode(self.builder.unicode_encoding)
273 return cmd
274
275 self.command = to_str(util.Obfuscated.get_real(command))
276 self.fake_command = to_str(util.Obfuscated.get_fake(command))
277
278 self.sendStdout = sendStdout
279 self.sendStderr = sendStderr
280 self.sendRC = sendRC
281 self.logfiles = logfiles
282 self.workdir = workdir
283 self.process = None
284 if not os.path.exists(workdir):
285 os.makedirs(workdir)
286 if environ:
287 for key, v in environ.iteritems():
288 if isinstance(v, list):
289
290
291
292
293
294 environ[key] = os.pathsep.join(environ[key])
295
296 if environ.has_key('PYTHONPATH'):
297 environ['PYTHONPATH'] += os.pathsep + "${PYTHONPATH}"
298
299
300 p = re.compile('\${([0-9a-zA-Z_]*)}')
301 def subst(match):
302 return os.environ.get(match.group(1), "")
303 newenv = {}
304 for key in os.environ.keys():
305
306 if key not in environ or environ[key] is not None:
307 newenv[key] = os.environ[key]
308 for key, v in environ.iteritems():
309 if v is not None:
310 if not isinstance(v, basestring):
311 raise RuntimeError("'env' values must be strings or "
312 "lists; key '%s' is incorrect" % (key,))
313 newenv[key] = p.sub(subst, v)
314
315 self.environ = newenv
316 else:
317 self.environ = os.environ.copy()
318 self.initialStdin = initialStdin
319 self.logEnviron = logEnviron
320 self.timeout = timeout
321 self.timer = None
322 self.maxTime = maxTime
323 self.maxTimer = None
324 self.keepStdout = keepStdout
325 self.keepStderr = keepStderr
326
327 self.buffered = deque()
328 self.buflen = 0
329 self.buftimer = None
330
331 if usePTY == "slave-config":
332 self.usePTY = self.builder.usePTY
333 else:
334 self.usePTY = usePTY
335
336
337
338
339
340 if runtime.platformType != "posix" or initialStdin is not None:
341 if self.usePTY and usePTY != "slave-config":
342 self.sendStatus({'header': "WARNING: disabling usePTY for this command"})
343 self.usePTY = False
344
345
346
347 if runtime.platformType != 'posix':
348 useProcGroup = False
349 elif self.usePTY:
350 useProcGroup = True
351 self.useProcGroup = useProcGroup
352
353 self.logFileWatchers = []
354 for name,filevalue in self.logfiles.items():
355 filename = filevalue
356 follow = False
357
358
359
360 if type(filevalue) == dict:
361 filename = filevalue['filename']
362 follow = filevalue.get('follow', False)
363
364 w = LogFileWatcher(self, name,
365 os.path.join(self.workdir, filename),
366 follow=follow)
367 self.logFileWatchers.append(w)
368
370 return "<%s '%s'>" % (self.__class__.__name__, self.fake_command)
371
374
376
377
378 if self.keepStdout:
379 self.stdout = ""
380 if self.keepStderr:
381 self.stderr = ""
382 self.deferred = defer.Deferred()
383 try:
384 self._startCommand()
385 except:
386 log.msg("error in RunProcess._startCommand")
387 log.err()
388 self._addToBuffers('stderr', "error in RunProcess._startCommand\n")
389 self._addToBuffers('stderr', traceback.format_exc())
390 self._sendBuffers()
391
392 self.deferred.errback(AbandonChain(-1))
393 return self.deferred
394
396
397 if not os.path.isdir(self.workdir):
398 os.makedirs(self.workdir)
399 log.msg("RunProcess._startCommand")
400 if self.notreally:
401 self._addToBuffers('header', "command '%s' in dir %s" % \
402 (self.fake_command, self.workdir))
403 self._addToBuffers('header', "(not really)\n")
404 self.finished(None, 0)
405 return
406
407 self.pp = RunProcessPP(self)
408
409 if type(self.command) in types.StringTypes:
410 if runtime.platformType == 'win32':
411 argv = os.environ['COMSPEC'].split()
412 if '/c' not in argv: argv += ['/c']
413 argv += [self.command]
414 else:
415
416
417 argv = ['/bin/sh', '-c', self.command]
418 display = self.fake_command
419 else:
420
421
422
423
424
425 if runtime.platformType == 'win32' and not \
426 (self.command[0].lower().endswith(".exe") and os.path.isabs(self.command[0])):
427 argv = os.environ['COMSPEC'].split()
428 if '/c' not in argv: argv += ['/c']
429 argv += list(self.command)
430 else:
431 argv = self.command
432
433 display = shell_quote(self.fake_command)
434
435
436
437
438 if not self.environ.get('MACHTYPE', None) == 'i686-pc-msys':
439 self.environ['PWD'] = os.path.abspath(self.workdir)
440
441
442
443 log.msg(" " + display)
444 self._addToBuffers('header', display+"\n")
445
446
447 msg = " in dir %s" % (self.workdir,)
448 if self.timeout:
449 if self.timeout == 1:
450 unit = "sec"
451 else:
452 unit = "secs"
453 msg += " (timeout %d %s)" % (self.timeout, unit)
454 if self.maxTime:
455 if self.maxTime == 1:
456 unit = "sec"
457 else:
458 unit = "secs"
459 msg += " (maxTime %d %s)" % (self.maxTime, unit)
460 log.msg(" " + msg)
461 self._addToBuffers('header', msg+"\n")
462
463 msg = " watching logfiles %s" % (self.logfiles,)
464 log.msg(" " + msg)
465 self._addToBuffers('header', msg+"\n")
466
467
468 msg = " argv: %s" % (self.fake_command,)
469 log.msg(" " + msg)
470 self._addToBuffers('header', msg+"\n")
471
472
473 if self.logEnviron:
474 msg = " environment:\n"
475 env_names = self.environ.keys()
476 env_names.sort()
477 for name in env_names:
478 msg += " %s=%s\n" % (name, self.environ[name])
479 log.msg(" environment: %s" % (self.environ,))
480 self._addToBuffers('header', msg)
481
482 if self.initialStdin:
483 msg = " writing %d bytes to stdin" % len(self.initialStdin)
484 log.msg(" " + msg)
485 self._addToBuffers('header', msg+"\n")
486
487 msg = " using PTY: %s" % bool(self.usePTY)
488 log.msg(" " + msg)
489 self._addToBuffers('header', msg+"\n")
490
491
492
493 if self.initialStdin:
494 self.pp.setStdin(self.initialStdin)
495
496 self.startTime = util.now(self._reactor)
497
498
499
500 self.process = self._spawnProcess(
501 self.pp, argv[0], argv,
502 self.environ,
503 self.workdir,
504 usePTY=self.usePTY)
505
506
507
508 if self.timeout:
509 self.timer = self._reactor.callLater(self.timeout, self.doTimeout)
510
511 if self.maxTime:
512 self.maxTimer = self._reactor.callLater(self.maxTime, self.doMaxTimeout)
513
514 for w in self.logFileWatchers:
515 w.start()
516
517 - def _spawnProcess(self, processProtocol, executable, args=(), env={},
518 path=None, uid=None, gid=None, usePTY=False, childFDs=None):
519 """private implementation of reactor.spawnProcess, to allow use of
520 L{ProcGroupProcess}"""
521
522
523 if runtime.platformType == 'posix':
524 if self.useProcGroup and not usePTY:
525 return ProcGroupProcess(reactor, executable, args, env, path,
526 processProtocol, uid, gid, childFDs)
527
528
529 return reactor.spawnProcess(processProtocol, executable, args, env,
530 path, usePTY=usePTY)
531
533 """
534 limit the chunks that we send over PB to 128k, since it has a hardwired
535 string-size limit of 640k.
536 """
537 LIMIT = self.CHUNK_LIMIT
538 for i in range(0, len(data), LIMIT):
539 yield data[i:i+LIMIT]
540
542 """
543 Take msg, which is a dictionary of lists of output chunks, and
544 concatentate all the chunks into a single string
545 """
546 retval = {}
547 for log in msg:
548 data = "".join(msg[log])
549 if isinstance(log, tuple) and log[0] == 'log':
550 retval['log'] = (log[1], data)
551 else:
552 retval[log] = data
553 return retval
554
556 """
557 Collapse and send msg to the master
558 """
559 if not msg:
560 return
561 msg = self._collapseMsg(msg)
562 self.sendStatus(msg)
563
565 self.buftimer = None
566 self._sendBuffers()
567
569 """
570 Send all the content in our buffers.
571 """
572 msg = {}
573 msg_size = 0
574 lastlog = None
575 logdata = []
576 while self.buffered:
577
578 logname, data = self.buffered.popleft()
579
580
581
582
583
584
585
586
587
588 if lastlog is None:
589 lastlog = logname
590 elif logname != lastlog:
591 self._sendMessage(msg)
592 msg = {}
593 msg_size = 0
594 lastlog = logname
595
596 logdata = msg.setdefault(logname, [])
597
598
599
600 for chunk in self._chunkForSend(data):
601 if len(chunk) == 0: continue
602 logdata.append(chunk)
603 msg_size += len(chunk)
604 if msg_size >= self.CHUNK_LIMIT:
605
606
607
608 self._sendMessage(msg)
609 msg = {}
610 logdata = msg.setdefault(logname, [])
611 msg_size = 0
612 self.buflen = 0
613 if logdata:
614 self._sendMessage(msg)
615 if self.buftimer:
616 if self.buftimer.active():
617 self.buftimer.cancel()
618 self.buftimer = None
619
621 """
622 Add data to the buffer for logname
623 Start a timer to send the buffers if BUFFER_TIMEOUT elapses.
624 If adding data causes the buffer size to grow beyond BUFFER_SIZE, then
625 the buffers will be sent.
626 """
627 n = len(data)
628
629 self.buflen += n
630 self.buffered.append((logname, data))
631 if self.buflen > self.BUFFER_SIZE:
632 self._sendBuffers()
633 elif not self.buftimer:
634 self.buftimer = self._reactor.callLater(self.BUFFER_TIMEOUT, self._bufferTimeout)
635
637 if self.sendStdout:
638 self._addToBuffers('stdout', data)
639
640 if self.keepStdout:
641 self.stdout += data
642 if self.timer:
643 self.timer.reset(self.timeout)
644
646 if self.sendStderr:
647 self._addToBuffers('stderr', data)
648
649 if self.keepStderr:
650 self.stderr += data
651 if self.timer:
652 self.timer.reset(self.timeout)
653
659
661 self.elapsedTime = util.now(self._reactor) - self.startTime
662 log.msg("command finished with signal %s, exit code %s, elapsedTime: %0.6f" % (sig,rc,self.elapsedTime))
663 for w in self.logFileWatchers:
664
665 w.stop()
666 self._sendBuffers()
667 if sig is not None:
668 rc = -1
669 if self.sendRC:
670 if sig is not None:
671 self.sendStatus(
672 {'header': "process killed by signal %d\n" % sig})
673 self.sendStatus({'rc': rc})
674 self.sendStatus({'header': "elapsedTime=%0.6f\n" % self.elapsedTime})
675 if self.timer:
676 self.timer.cancel()
677 self.timer = None
678 if self.maxTimer:
679 self.maxTimer.cancel()
680 self.maxTimer = None
681 if self.buftimer:
682 self.buftimer.cancel()
683 self.buftimer = None
684 d = self.deferred
685 self.deferred = None
686 if d:
687 d.callback(rc)
688 else:
689 log.msg("Hey, command %s finished twice" % self)
690
692 self._sendBuffers()
693 log.msg("RunProcess.failed: command failed: %s" % (why,))
694 if self.timer:
695 self.timer.cancel()
696 self.timer = None
697 if self.maxTimer:
698 self.maxTimer.cancel()
699 self.maxTimer = None
700 if self.buftimer:
701 self.buftimer.cancel()
702 self.buftimer = None
703 d = self.deferred
704 self.deferred = None
705 if d:
706 d.errback(why)
707 else:
708 log.msg("Hey, command %s finished twice" % self)
709
711 self.timer = None
712 msg = "command timed out: %d seconds without output" % self.timeout
713 self.kill(msg)
714
716 self.maxTimer = None
717 msg = "command timed out: %d seconds elapsed" % self.maxTime
718 self.kill(msg)
719
720 - def kill(self, msg):
721
722
723 self._sendBuffers()
724 if self.timer:
725 self.timer.cancel()
726 self.timer = None
727 if self.maxTimer:
728 self.maxTimer.cancel()
729 self.maxTimer = None
730 if self.buftimer:
731 self.buftimer.cancel()
732 self.buftimer = None
733 msg += ", attempting to kill"
734 log.msg(msg)
735 self.sendStatus({'header': "\n" + msg + "\n"})
736
737
738
739 self.pp.killed = True
740
741
742 hit = 0
743
744
745 if not hit and self.useProcGroup and runtime.platformType == "posix":
746 sig = getattr(signal, "SIG"+ self.interruptSignal, None)
747
748 if sig is None:
749 log.msg("signal module is missing SIG%s" % self.interruptSignal)
750 elif not hasattr(os, "kill"):
751 log.msg("os module is missing the 'kill' function")
752 elif self.process.pgid is None:
753 log.msg("self.process has no pgid")
754 else:
755 log.msg("trying to kill process group %d" %
756 (self.process.pgid,))
757 try:
758 os.kill(-self.process.pgid, sig)
759 log.msg(" signal %s sent successfully" % sig)
760 self.process.pgid = None
761 hit = 1
762 except OSError:
763 log.msg('failed to kill process group (ignored): %s' %
764 (sys.exc_info()[1],))
765
766
767 pass
768
769 elif runtime.platformType == "win32":
770 if self.interruptSignal == None:
771 log.msg("self.interruptSignal==None, only pretending to kill child")
772 else:
773 log.msg("using TASKKILL /F PID /T to kill pid %s" % self.process.pid)
774 subprocess.check_call("TASKKILL /F /PID %s /T" % self.process.pid)
775 log.msg("taskkill'd pid %s" % self.process.pid)
776 hit = 1
777
778
779 if not hit:
780 try:
781 log.msg("trying process.signalProcess('%s')" % (self.interruptSignal,))
782 self.process.signalProcess(self.interruptSignal)
783 log.msg(" signal %s sent successfully" % (self.interruptSignal,))
784 hit = 1
785 except OSError:
786 log.err("from process.signalProcess:")
787
788 pass
789 except error.ProcessExitedAlready:
790 log.msg("Process exited already - can't kill")
791
792
793 pass
794
795 if not hit:
796 log.msg("signalProcess/os.kill failed both times")
797
798 if runtime.platformType == "posix":
799
800
801
802 self.pp.transport.loseConnection()
803
804 if self.deferred:
805
806
807 self.timer = self._reactor.callLater(self.BACKUP_TIMEOUT,
808 self.doBackupTimeout)
809
811 log.msg("we tried to kill the process, and it wouldn't die.."
812 " finish anyway")
813 self.timer = None
814 self.sendStatus({'header': "SIGKILL failed to kill process\n"})
815 if self.sendRC:
816 self.sendStatus({'header': "using fake rc=-1\n"})
817 self.sendStatus({'rc': -1})
818 self.failed(RuntimeError("SIGKILL failed to kill process"))
819