1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """
17 Support for running 'shell commands'
18 """
19
20 import sys
21 import os
22 import signal
23 import types
24 import re
25 import subprocess
26 import traceback
27 import stat
28 from collections import deque
29
30 from twisted.python import runtime, log
31 from twisted.internet import reactor, defer, protocol, task, error
32
33 from buildslave import util
34 from buildslave.exceptions import AbandonChain
35
36 if runtime.platformType == 'posix':
37 from twisted.internet.process import Process
38
40
41
42
43
44
45
46
47
48
49
50
51 if runtime.platformType == 'win32':
52 return " ".join([ `e` for e in cmd_list ])
53 else:
54 import pipes
55 def quote(e):
56 if not e:
57 return '""'
58 return pipes.quote(e)
59 return " ".join([ quote(e) for e in cmd_list ])
60
62 POLL_INTERVAL = 2
63
64 - def __init__(self, command, name, logfile, follow=False):
82
85
87 log.err(err, msg="Polling error")
88 self.poller = None
89
91 self.poll()
92 if self.poller is not None:
93 self.poller.stop()
94 if self.started:
95 self.f.close()
96
98 if os.path.exists(self.logfile):
99 s = os.stat(self.logfile)
100 return (s[stat.ST_CTIME], s[stat.ST_MTIME], s[stat.ST_SIZE])
101 return None
102
104 if not self.started:
105 s = self.statFile()
106 if s == self.old_logfile_stats:
107 return
108 if not s:
109
110
111
112 self.old_logfile_stats = None
113 return
114 self.f = open(self.logfile, "rb")
115
116
117
118 if self.follow:
119 self.f.seek(s[2], 0)
120 self.started = True
121 self.f.seek(self.f.tell(), 0)
122 while True:
123 data = self.f.read(10000)
124 if not data:
125 return
126 self.command.addLogfile(self.name, data)
127
128
129 if runtime.platformType == 'posix':
131 """Sumple subclass of Process to also make the spawned process a process
132 group leader, so we can kill all members of the process group."""
133
135 Process._setupChild(self, *args, **kwargs)
136
137
138
139
140 os.setpgid(0, 0)
141
142
144 debug = False
145
147 self.command = command
148 self.pending_stdin = ""
149 self.stdin_finished = False
150 self.killed = False
151
153 assert not self.connected
154 self.pending_stdin = data
155
157 if self.debug:
158 log.msg("RunProcessPP.connectionMade")
159
160 if self.command.useProcGroup:
161 if self.debug:
162 log.msg(" recording pid %d as subprocess pgid"
163 % (self.transport.pid,))
164 self.transport.pgid = self.transport.pid
165
166 if self.pending_stdin:
167 if self.debug: log.msg(" writing to stdin")
168 self.transport.write(self.pending_stdin)
169 if self.debug: log.msg(" closing stdin")
170 self.transport.closeStdin()
171
176
181
183 if self.debug:
184 log.msg("RunProcessPP.processEnded", status_object)
185
186
187
188 sig = status_object.value.signal
189 rc = status_object.value.exitCode
190
191
192
193
194 if self.killed and rc == 0:
195 log.msg("process was killed, but exited with status 0; faking a failure")
196
197 if runtime.platformType == 'win32':
198 rc = 1
199 else:
200 rc = -1
201 self.command.finished(sig, rc)
202
203
205 """
206 This is a helper class, used by slave commands to run programs in a child
207 shell.
208 """
209
210 notreally = False
211 BACKUP_TIMEOUT = 5
212 KILL = "KILL"
213 CHUNK_LIMIT = 128*1024
214
215
216
217 BUFFER_SIZE = 64*1024
218 BUFFER_TIMEOUT = 5
219
220
221 startTime = None
222 elapsedTime = None
223
224
225 _reactor = reactor
226
227
228
229
230
231
232 - def __init__(self, builder, command,
233 workdir, environ=None,
234 sendStdout=True, sendStderr=True, sendRC=True,
235 timeout=None, maxTime=None, initialStdin=None,
236 keepStdout=False, keepStderr=False,
237 logEnviron=True, logfiles={}, usePTY="slave-config",
238 useProcGroup=True):
239 """
240
241 @param keepStdout: if True, we keep a copy of all the stdout text
242 that we've seen. This copy is available in
243 self.stdout, which can be read after the command
244 has finished.
245 @param keepStderr: same, for stderr
246
247 @param usePTY: "slave-config" -> use the SlaveBuilder's usePTY;
248 otherwise, true to use a PTY, false to not use a PTY.
249
250 @param useProcGroup: (default True) use a process group for non-PTY
251 process invocations
252 """
253
254 self.builder = builder
255 self.command = util.Obfuscated.get_real(command)
256
257
258
259
260
261
262
263
264
265
266
267 if isinstance(self.command, (tuple, list)):
268 for i, a in enumerate(self.command):
269 if isinstance(a, unicode):
270 self.command[i] = a.encode(self.builder.unicode_encoding)
271 elif isinstance(self.command, unicode):
272 self.command = self.command.encode(self.builder.unicode_encoding)
273
274 self.fake_command = util.Obfuscated.get_fake(command)
275 self.sendStdout = sendStdout
276 self.sendStderr = sendStderr
277 self.sendRC = sendRC
278 self.logfiles = logfiles
279 self.workdir = workdir
280 self.process = None
281 if not os.path.exists(workdir):
282 os.makedirs(workdir)
283 if environ:
284 if environ.has_key('PYTHONPATH'):
285 ppath = environ['PYTHONPATH']
286
287
288
289 if not isinstance(ppath, str):
290
291
292 ppath = os.pathsep.join(ppath)
293
294 environ['PYTHONPATH'] = ppath + os.pathsep + "${PYTHONPATH}"
295
296
297 p = re.compile('\${([0-9a-zA-Z_]*)}')
298 def subst(match):
299 return os.environ.get(match.group(1), "")
300 newenv = {}
301 for key in os.environ.keys():
302
303 if key not in environ or environ[key] is not None:
304 newenv[key] = os.environ[key]
305 for key in environ.keys():
306 if environ[key] is not None:
307 newenv[key] = p.sub(subst, environ[key])
308
309 self.environ = newenv
310 else:
311 self.environ = os.environ.copy()
312 self.initialStdin = initialStdin
313 self.logEnviron = logEnviron
314 self.timeout = timeout
315 self.timer = None
316 self.maxTime = maxTime
317 self.maxTimer = None
318 self.keepStdout = keepStdout
319 self.keepStderr = keepStderr
320
321 self.buffered = deque()
322 self.buflen = 0
323 self.buftimer = None
324
325 if usePTY == "slave-config":
326 self.usePTY = self.builder.usePTY
327 else:
328 self.usePTY = usePTY
329
330
331
332
333
334 if runtime.platformType != "posix" or initialStdin is not None:
335 if self.usePTY and usePTY != "slave-config":
336 self.sendStatus({'header': "WARNING: disabling usePTY for this command"})
337 self.usePTY = False
338
339
340
341 if runtime.platformType != 'posix':
342 useProcGroup = False
343 elif self.usePTY:
344 useProcGroup = True
345 self.useProcGroup = useProcGroup
346
347 self.logFileWatchers = []
348 for name,filevalue in self.logfiles.items():
349 filename = filevalue
350 follow = False
351
352
353
354 if type(filevalue) == dict:
355 filename = filevalue['filename']
356 follow = filevalue.get('follow', False)
357
358 w = LogFileWatcher(self, name,
359 os.path.join(self.workdir, filename),
360 follow=follow)
361 self.logFileWatchers.append(w)
362
364 return "<%s '%s'>" % (self.__class__.__name__, self.fake_command)
365
368
370
371
372 if self.keepStdout:
373 self.stdout = ""
374 if self.keepStderr:
375 self.stderr = ""
376 self.deferred = defer.Deferred()
377 try:
378 self._startCommand()
379 except:
380 log.msg("error in RunProcess._startCommand")
381 log.err()
382 self._addToBuffers('stderr', "error in RunProcess._startCommand\n")
383 self._addToBuffers('stderr', traceback.format_exc())
384 self._sendBuffers()
385
386 self.deferred.errback(AbandonChain(-1))
387 return self.deferred
388
390
391 if not os.path.isdir(self.workdir):
392 os.makedirs(self.workdir)
393 log.msg("RunProcess._startCommand")
394 if self.notreally:
395 self._addToBuffers('header', "command '%s' in dir %s" % \
396 (self.fake_command, self.workdir))
397 self._addToBuffers('header', "(not really)\n")
398 self.finished(None, 0)
399 return
400
401 self.pp = RunProcessPP(self)
402
403 if type(self.command) in types.StringTypes:
404 if runtime.platformType == 'win32':
405 argv = os.environ['COMSPEC'].split()
406 if '/c' not in argv: argv += ['/c']
407 argv += [self.command]
408 else:
409
410
411 argv = ['/bin/sh', '-c', self.command]
412 display = self.fake_command
413 else:
414
415
416
417
418
419 if runtime.platformType == 'win32' and not \
420 (self.command[0].lower().endswith(".exe") and os.path.isabs(self.command[0])):
421 argv = os.environ['COMSPEC'].split()
422 if '/c' not in argv: argv += ['/c']
423 argv += list(self.command)
424 else:
425 argv = self.command
426
427 display = shell_quote(self.fake_command)
428
429
430
431
432 if not self.environ.get('MACHTYPE', None) == 'i686-pc-msys':
433 self.environ['PWD'] = os.path.abspath(self.workdir)
434
435
436
437 log.msg(" " + display)
438 self._addToBuffers('header', display+"\n")
439
440
441 msg = " in dir %s" % (self.workdir,)
442 if self.timeout:
443 if self.timeout == 1:
444 unit = "sec"
445 else:
446 unit = "secs"
447 msg += " (timeout %d %s)" % (self.timeout, unit)
448 if self.maxTime:
449 if self.maxTime == 1:
450 unit = "sec"
451 else:
452 unit = "secs"
453 msg += " (maxTime %d %s)" % (self.maxTime, unit)
454 log.msg(" " + msg)
455 self._addToBuffers('header', msg+"\n")
456
457 msg = " watching logfiles %s" % (self.logfiles,)
458 log.msg(" " + msg)
459 self._addToBuffers('header', msg+"\n")
460
461
462 msg = " argv: %s" % (self.fake_command,)
463 log.msg(" " + msg)
464 self._addToBuffers('header', msg+"\n")
465
466
467 if self.logEnviron:
468 msg = " environment:\n"
469 env_names = self.environ.keys()
470 env_names.sort()
471 for name in env_names:
472 msg += " %s=%s\n" % (name, self.environ[name])
473 log.msg(" environment: %s" % (self.environ,))
474 self._addToBuffers('header', msg)
475
476 if self.initialStdin:
477 msg = " writing %d bytes to stdin" % len(self.initialStdin)
478 log.msg(" " + msg)
479 self._addToBuffers('header', msg+"\n")
480
481 msg = " using PTY: %s" % bool(self.usePTY)
482 log.msg(" " + msg)
483 self._addToBuffers('header', msg+"\n")
484
485
486
487 if self.initialStdin:
488 self.pp.setStdin(self.initialStdin)
489
490 self.startTime = util.now(self._reactor)
491
492
493
494 self.process = self._spawnProcess(
495 self.pp, argv[0], argv,
496 self.environ,
497 self.workdir,
498 usePTY=self.usePTY)
499
500
501
502 if self.timeout:
503 self.timer = self._reactor.callLater(self.timeout, self.doTimeout)
504
505 if self.maxTime:
506 self.maxTimer = self._reactor.callLater(self.maxTime, self.doMaxTimeout)
507
508 for w in self.logFileWatchers:
509 w.start()
510
511 - def _spawnProcess(self, processProtocol, executable, args=(), env={},
512 path=None, uid=None, gid=None, usePTY=False, childFDs=None):
513 """private implementation of reactor.spawnProcess, to allow use of
514 L{ProcGroupProcess}"""
515
516
517 if runtime.platformType == 'posix':
518 if self.useProcGroup and not usePTY:
519 return ProcGroupProcess(reactor, executable, args, env, path,
520 processProtocol, uid, gid, childFDs)
521
522
523 return reactor.spawnProcess(processProtocol, executable, args, env,
524 path, usePTY=usePTY)
525
527 """
528 limit the chunks that we send over PB to 128k, since it has a hardwired
529 string-size limit of 640k.
530 """
531 LIMIT = self.CHUNK_LIMIT
532 for i in range(0, len(data), LIMIT):
533 yield data[i:i+LIMIT]
534
536 """
537 Take msg, which is a dictionary of lists of output chunks, and
538 concatentate all the chunks into a single string
539 """
540 retval = {}
541 for log in msg:
542 data = "".join(msg[log])
543 if isinstance(log, tuple) and log[0] == 'log':
544 retval['log'] = (log[1], data)
545 else:
546 retval[log] = data
547 return retval
548
550 """
551 Collapse and send msg to the master
552 """
553 if not msg:
554 return
555 msg = self._collapseMsg(msg)
556 self.sendStatus(msg)
557
559 self.buftimer = None
560 self._sendBuffers()
561
563 """
564 Send all the content in our buffers.
565 """
566 msg = {}
567 msg_size = 0
568 lastlog = None
569 logdata = []
570 while self.buffered:
571
572 logname, data = self.buffered.popleft()
573
574
575
576
577
578
579
580
581
582 if lastlog is None:
583 lastlog = logname
584 elif logname != lastlog:
585 self._sendMessage(msg)
586 msg = {}
587 msg_size = 0
588 lastlog = logname
589
590 logdata = msg.setdefault(logname, [])
591
592
593
594 for chunk in self._chunkForSend(data):
595 if len(chunk) == 0: continue
596 logdata.append(chunk)
597 msg_size += len(chunk)
598 if msg_size >= self.CHUNK_LIMIT:
599
600
601
602 self._sendMessage(msg)
603 msg = {}
604 logdata = msg.setdefault(logname, [])
605 msg_size = 0
606 self.buflen = 0
607 if logdata:
608 self._sendMessage(msg)
609 if self.buftimer:
610 if self.buftimer.active():
611 self.buftimer.cancel()
612 self.buftimer = None
613
615 """
616 Add data to the buffer for logname
617 Start a timer to send the buffers if BUFFER_TIMEOUT elapses.
618 If adding data causes the buffer size to grow beyond BUFFER_SIZE, then
619 the buffers will be sent.
620 """
621 n = len(data)
622
623 self.buflen += n
624 self.buffered.append((logname, data))
625 if self.buflen > self.BUFFER_SIZE:
626 self._sendBuffers()
627 elif not self.buftimer:
628 self.buftimer = self._reactor.callLater(self.BUFFER_TIMEOUT, self._bufferTimeout)
629
631 if self.sendStdout:
632 self._addToBuffers('stdout', data)
633
634 if self.keepStdout:
635 self.stdout += data
636 if self.timer:
637 self.timer.reset(self.timeout)
638
640 if self.sendStderr:
641 self._addToBuffers('stderr', data)
642
643 if self.keepStderr:
644 self.stderr += data
645 if self.timer:
646 self.timer.reset(self.timeout)
647
653
655 self.elapsedTime = util.now(self._reactor) - self.startTime
656 log.msg("command finished with signal %s, exit code %s, elapsedTime: %0.6f" % (sig,rc,self.elapsedTime))
657 for w in self.logFileWatchers:
658
659 w.stop()
660 self._sendBuffers()
661 if sig is not None:
662 rc = -1
663 if self.sendRC:
664 if sig is not None:
665 self.sendStatus(
666 {'header': "process killed by signal %d\n" % sig})
667 self.sendStatus({'rc': rc})
668 self.sendStatus({'header': "elapsedTime=%0.6f\n" % self.elapsedTime})
669 if self.timer:
670 self.timer.cancel()
671 self.timer = None
672 if self.maxTimer:
673 self.maxTimer.cancel()
674 self.maxTimer = None
675 if self.buftimer:
676 self.buftimer.cancel()
677 self.buftimer = None
678 d = self.deferred
679 self.deferred = None
680 if d:
681 d.callback(rc)
682 else:
683 log.msg("Hey, command %s finished twice" % self)
684
686 self._sendBuffers()
687 log.msg("RunProcess.failed: command failed: %s" % (why,))
688 if self.timer:
689 self.timer.cancel()
690 self.timer = None
691 if self.maxTimer:
692 self.maxTimer.cancel()
693 self.maxTimer = None
694 if self.buftimer:
695 self.buftimer.cancel()
696 self.buftimer = None
697 d = self.deferred
698 self.deferred = None
699 if d:
700 d.errback(why)
701 else:
702 log.msg("Hey, command %s finished twice" % self)
703
705 self.timer = None
706 msg = "command timed out: %d seconds without output" % self.timeout
707 self.kill(msg)
708
710 self.maxTimer = None
711 msg = "command timed out: %d seconds elapsed" % self.maxTime
712 self.kill(msg)
713
714 - def kill(self, msg):
715
716
717 self._sendBuffers()
718 if self.timer:
719 self.timer.cancel()
720 self.timer = None
721 if self.maxTimer:
722 self.maxTimer.cancel()
723 self.maxTimer = None
724 if self.buftimer:
725 self.buftimer.cancel()
726 self.buftimer = None
727 msg += ", attempting to kill"
728 log.msg(msg)
729 self.sendStatus({'header': "\n" + msg + "\n"})
730
731
732
733 self.pp.killed = True
734
735
736 hit = 0
737
738
739 if not hit and self.useProcGroup and runtime.platformType == "posix":
740 sig = getattr(signal, "SIG"+ self.KILL, None)
741
742 if sig is None:
743 log.msg("signal module is missing SIG%s" % self.KILL)
744 elif not hasattr(os, "kill"):
745 log.msg("os module is missing the 'kill' function")
746 elif self.process.pgid is None:
747 log.msg("self.process has no pgid")
748 else:
749 log.msg("trying to kill process group %d" %
750 (self.process.pgid,))
751 try:
752 os.kill(-self.process.pgid, sig)
753 log.msg(" signal %s sent successfully" % sig)
754 self.process.pgid = None
755 hit = 1
756 except OSError:
757 log.msg('failed to kill process group (ignored): %s' %
758 (sys.exc_info()[1],))
759
760
761 pass
762
763 elif runtime.platformType == "win32":
764 if self.KILL == None:
765 log.msg("self.KILL==None, only pretending to kill child")
766 else:
767 log.msg("using TASKKILL /F PID /T to kill pid %s" % self.process.pid)
768 subprocess.check_call("TASKKILL /F /PID %s /T" % self.process.pid)
769 log.msg("taskkill'd pid %s" % self.process.pid)
770 hit = 1
771
772
773 if not hit:
774 try:
775 log.msg("trying process.signalProcess('%s')" % (self.KILL,))
776 self.process.signalProcess(self.KILL)
777 log.msg(" signal %s sent successfully" % (self.KILL,))
778 hit = 1
779 except OSError:
780 log.err("from process.signalProcess:")
781
782 pass
783 except error.ProcessExitedAlready:
784 log.msg("Process exited already - can't kill")
785
786
787 pass
788
789 if not hit:
790 log.msg("signalProcess/os.kill failed both times")
791
792 if runtime.platformType == "posix":
793
794
795
796 self.pp.transport.loseConnection()
797
798 if self.deferred:
799
800
801 self.timer = self._reactor.callLater(self.BACKUP_TIMEOUT,
802 self.doBackupTimeout)
803
805 log.msg("we tried to kill the process, and it wouldn't die.."
806 " finish anyway")
807 self.timer = None
808 self.sendStatus({'header': "SIGKILL failed to kill process\n"})
809 if self.sendRC:
810 self.sendStatus({'header': "using fake rc=-1\n"})
811 self.sendStatus({'rc': -1})
812 self.failed(RuntimeError("SIGKILL failed to kill process"))
813