1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """
17 Support for running 'shell commands'
18 """
19
20 import os
21 import signal
22 import types
23 import re
24 import traceback
25 import stat
26 from collections import deque
27
28 from twisted.python import runtime, log
29 from twisted.internet import reactor, defer, protocol, task, error
30
31 from buildslave import util
32 from buildslave.exceptions import AbandonChain
33
35
36
37
38
39
40
41
42
43
44
45
46 if runtime.platformType == 'win32':
47 return " ".join([ `e` for e in cmd_list ])
48 else:
49 import pipes
50 def quote(e):
51 if not e:
52 return '""'
53 return pipes.quote(e)
54 return " ".join([ quote(e) for e in cmd_list ])
55
57 POLL_INTERVAL = 2
58
59 - def __init__(self, command, name, logfile, follow=False):
60 self.command = command
61 self.name = name
62 self.logfile = logfile
63
64 log.msg("LogFileWatcher created to watch %s" % logfile)
65
66
67
68 self.old_logfile_stats = self.statFile()
69 self.started = False
70
71
72
73 self.follow = follow
74
75
76 self.poller = task.LoopingCall(self.poll)
77
80
82 log.err(err, msg="Polling error")
83 self.poller = None
84
86 self.poll()
87 if self.poller is not None:
88 self.poller.stop()
89 if self.started:
90 self.f.close()
91
93 if os.path.exists(self.logfile):
94 s = os.stat(self.logfile)
95 return (s[stat.ST_CTIME], s[stat.ST_MTIME], s[stat.ST_SIZE])
96 return None
97
99 if not self.started:
100 s = self.statFile()
101 if s == self.old_logfile_stats:
102 return
103 if not s:
104
105
106
107 self.old_logfile_stats = None
108 return
109 self.f = open(self.logfile, "rb")
110
111
112
113 if self.follow:
114 self.f.seek(s[2], 0)
115 self.started = True
116 self.f.seek(self.f.tell(), 0)
117 while True:
118 data = self.f.read(10000)
119 if not data:
120 return
121 self.command.addLogfile(self.name, data)
122
123
125 debug = False
126
128 self.command = command
129 self.pending_stdin = ""
130 self.stdin_finished = False
131 self.killed = False
132
134 assert not self.stdin_finished
135 if self.connected:
136 self.transport.write(data)
137 else:
138 self.pending_stdin += data
139
145
147 if self.debug:
148 log.msg("RunProcessPP.connectionMade")
149 if not self.command.process:
150 if self.debug:
151 log.msg(" assigning self.command.process: %s" %
152 (self.transport,))
153 self.command.process = self.transport
154
155
156
157
158
159
160
161
162
163
164
165
166 if self.pending_stdin:
167 if self.debug: log.msg(" writing to stdin")
168 self.transport.write(self.pending_stdin)
169 if self.stdin_finished:
170 if self.debug: log.msg(" closing stdin")
171 self.transport.closeStdin()
172
177
182
184 if self.debug:
185 log.msg("RunProcessPP.processEnded", status_object)
186
187
188
189 sig = status_object.value.signal
190 rc = status_object.value.exitCode
191
192
193
194
195 if self.killed and rc == 0:
196 log.msg("process was killed, but exited with status 0; faking a failure")
197
198 if runtime.platformType == 'win32':
199 rc = 1
200 else:
201 rc = -1
202 self.command.finished(sig, rc)
203
205 """
206 This is a helper class, used by slave commands to run programs in a child
207 shell.
208 """
209
210 notreally = False
211 BACKUP_TIMEOUT = 5
212 KILL = "KILL"
213 CHUNK_LIMIT = 128*1024
214
215
216
217 BUFFER_SIZE = 64*1024
218 BUFFER_TIMEOUT = 5
219
220
221 startTime = None
222 elapsedTime = None
223
224
225 _reactor = reactor
226
227
228
229
230
231
232 - def __init__(self, builder, command,
233 workdir, environ=None,
234 sendStdout=True, sendStderr=True, sendRC=True,
235 timeout=None, maxTime=None, initialStdin=None,
236 keepStdinOpen=False, keepStdout=False, keepStderr=False,
237 logEnviron=True, logfiles={}, usePTY="slave-config"):
238 """
239
240 @param keepStdout: if True, we keep a copy of all the stdout text
241 that we've seen. This copy is available in
242 self.stdout, which can be read after the command
243 has finished.
244 @param keepStderr: same, for stderr
245
246 @param usePTY: "slave-config" -> use the SlaveBuilder's usePTY;
247 otherwise, true to use a PTY, false to not use a PTY.
248 """
249
250 self.builder = builder
251 self.command = util.Obfuscated.get_real(command)
252
253
254
255
256
257
258
259
260
261
262
263 if isinstance(self.command, (tuple, list)):
264 for i, a in enumerate(self.command):
265 if isinstance(a, unicode):
266 self.command[i] = a.encode(self.builder.unicode_encoding)
267 elif isinstance(self.command, unicode):
268 self.command = self.command.encode(self.builder.unicode_encoding)
269
270 self.fake_command = util.Obfuscated.get_fake(command)
271 self.sendStdout = sendStdout
272 self.sendStderr = sendStderr
273 self.sendRC = sendRC
274 self.logfiles = logfiles
275 self.workdir = workdir
276 if not os.path.exists(workdir):
277 os.makedirs(workdir)
278 if environ:
279 if environ.has_key('PYTHONPATH'):
280 ppath = environ['PYTHONPATH']
281
282
283
284 if not isinstance(ppath, str):
285
286
287 ppath = os.pathsep.join(ppath)
288
289 environ['PYTHONPATH'] = ppath + os.pathsep + "${PYTHONPATH}"
290
291
292 p = re.compile('\${([0-9a-zA-Z_]*)}')
293 def subst(match):
294 return os.environ.get(match.group(1), "")
295 newenv = {}
296 for key in os.environ.keys():
297
298 if key not in environ or environ[key] is not None:
299 newenv[key] = os.environ[key]
300 for key in environ.keys():
301 if environ[key] is not None:
302 newenv[key] = p.sub(subst, environ[key])
303
304 self.environ = newenv
305 else:
306 self.environ = os.environ.copy()
307 self.initialStdin = initialStdin
308 self.keepStdinOpen = keepStdinOpen
309 self.logEnviron = logEnviron
310 self.timeout = timeout
311 self.timer = None
312 self.maxTime = maxTime
313 self.maxTimer = None
314 self.keepStdout = keepStdout
315 self.keepStderr = keepStderr
316
317 self.buffered = deque()
318 self.buflen = 0
319 self.buftimer = None
320
321 if usePTY == "slave-config":
322 self.usePTY = self.builder.usePTY
323 else:
324 self.usePTY = usePTY
325
326
327
328
329
330 if runtime.platformType != "posix" or initialStdin is not None:
331 if self.usePTY and usePTY != "slave-config":
332 self.sendStatus({'header': "WARNING: disabling usePTY for this command"})
333 self.usePTY = False
334
335 self.logFileWatchers = []
336 for name,filevalue in self.logfiles.items():
337 filename = filevalue
338 follow = False
339
340
341
342 if type(filevalue) == dict:
343 filename = filevalue['filename']
344 follow = filevalue.get('follow', False)
345
346 w = LogFileWatcher(self, name,
347 os.path.join(self.workdir, filename),
348 follow=follow)
349 self.logFileWatchers.append(w)
350
352 return "<%s '%s'>" % (self.__class__.__name__, self.fake_command)
353
356
358
359
360 if self.keepStdout:
361 self.stdout = ""
362 if self.keepStderr:
363 self.stderr = ""
364 self.deferred = defer.Deferred()
365 try:
366 self._startCommand()
367 except:
368 log.msg("error in RunProcess._startCommand")
369 log.err()
370 self._addToBuffers('stderr', "error in RunProcess._startCommand\n")
371 self._addToBuffers('stderr', traceback.format_exc())
372 self._sendBuffers()
373
374 self.deferred.errback(AbandonChain(-1))
375 return self.deferred
376
378
379 if not os.path.isdir(self.workdir):
380 os.makedirs(self.workdir)
381 log.msg("RunProcess._startCommand")
382 if self.notreally:
383 self._addToBuffers('header', "command '%s' in dir %s" % \
384 (self.fake_command, self.workdir))
385 self._addToBuffers('header', "(not really)\n")
386 self.finished(None, 0)
387 return
388
389 self.pp = RunProcessPP(self)
390
391 if type(self.command) in types.StringTypes:
392 if runtime.platformType == 'win32':
393 argv = os.environ['COMSPEC'].split()
394 if '/c' not in argv: argv += ['/c']
395 argv += [self.command]
396 else:
397
398
399 argv = ['/bin/sh', '-c', self.command]
400 display = self.fake_command
401 else:
402
403
404
405
406
407 if runtime.platformType == 'win32' and not \
408 (self.command[0].lower().endswith(".exe") and os.path.isabs(self.command[0])):
409 argv = os.environ['COMSPEC'].split()
410 if '/c' not in argv: argv += ['/c']
411 argv += list(self.command)
412 else:
413 argv = self.command
414
415 display = shell_quote(self.fake_command)
416
417
418
419
420 if not self.environ.get('MACHTYPE', None) == 'i686-pc-msys':
421 self.environ['PWD'] = os.path.abspath(self.workdir)
422
423
424
425 log.msg(" " + display)
426 self._addToBuffers('header', display+"\n")
427
428
429 msg = " in dir %s" % (self.workdir,)
430 if self.timeout:
431 msg += " (timeout %d secs)" % (self.timeout,)
432 log.msg(" " + msg)
433 self._addToBuffers('header', msg+"\n")
434
435 msg = " watching logfiles %s" % (self.logfiles,)
436 log.msg(" " + msg)
437 self._addToBuffers('header', msg+"\n")
438
439
440 msg = " argv: %s" % (self.fake_command,)
441 log.msg(" " + msg)
442 self._addToBuffers('header', msg+"\n")
443
444
445 if self.logEnviron:
446 msg = " environment:\n"
447 env_names = self.environ.keys()
448 env_names.sort()
449 for name in env_names:
450 msg += " %s=%s\n" % (name, self.environ[name])
451 log.msg(" environment: %s" % (self.environ,))
452 self._addToBuffers('header', msg)
453
454 if self.initialStdin:
455 msg = " writing %d bytes to stdin" % len(self.initialStdin)
456 log.msg(" " + msg)
457 self._addToBuffers('header', msg+"\n")
458
459 if self.keepStdinOpen:
460 msg = " leaving stdin open"
461 else:
462 msg = " closing stdin"
463 log.msg(" " + msg)
464 self._addToBuffers('header', msg+"\n")
465
466 msg = " using PTY: %s" % bool(self.usePTY)
467 log.msg(" " + msg)
468 self._addToBuffers('header', msg+"\n")
469
470
471 if self.initialStdin:
472 self.pp.writeStdin(self.initialStdin)
473 if not self.keepStdinOpen:
474 self.pp.closeStdin()
475
476
477
478
479
480
481
482
483
484
485 self.process = None
486 self.startTime = util.now(self._reactor)
487
488 p = reactor.spawnProcess(self.pp, argv[0], argv,
489 self.environ,
490 self.workdir,
491 usePTY=self.usePTY)
492
493 if not self.process:
494 self.process = p
495
496
497
498
499
500
501
502 if self.timeout:
503 self.timer = self._reactor.callLater(self.timeout, self.doTimeout)
504
505 if self.maxTime:
506 self.maxTimer = self._reactor.callLater(self.maxTime, self.doMaxTimeout)
507
508 for w in self.logFileWatchers:
509 w.start()
510
511
513 """
514 limit the chunks that we send over PB to 128k, since it has a hardwired
515 string-size limit of 640k.
516 """
517 LIMIT = self.CHUNK_LIMIT
518 for i in range(0, len(data), LIMIT):
519 yield data[i:i+LIMIT]
520
522 """
523 Take msg, which is a dictionary of lists of output chunks, and
524 concatentate all the chunks into a single string
525 """
526 retval = {}
527 for log in msg:
528 data = "".join(msg[log])
529 if isinstance(log, tuple) and log[0] == 'log':
530 retval['log'] = (log[1], data)
531 else:
532 retval[log] = data
533 return retval
534
536 """
537 Collapse and send msg to the master
538 """
539 if not msg:
540 return
541 msg = self._collapseMsg(msg)
542 self.sendStatus(msg)
543
545 self.buftimer = None
546 self._sendBuffers()
547
549 """
550 Send all the content in our buffers.
551 """
552 msg = {}
553 msg_size = 0
554 lastlog = None
555 logdata = []
556 while self.buffered:
557
558 logname, data = self.buffered.popleft()
559
560
561
562
563
564
565
566
567
568 if lastlog is None:
569 lastlog = logname
570 elif logname != lastlog:
571 self._sendMessage(msg)
572 msg = {}
573 msg_size = 0
574 lastlog = logname
575
576 logdata = msg.setdefault(logname, [])
577
578
579
580 for chunk in self._chunkForSend(data):
581 if len(chunk) == 0: continue
582 logdata.append(chunk)
583 msg_size += len(chunk)
584 if msg_size >= self.CHUNK_LIMIT:
585
586
587
588 self._sendMessage(msg)
589 msg = {}
590 logdata = msg.setdefault(logname, [])
591 msg_size = 0
592 self.buflen = 0
593 if logdata:
594 self._sendMessage(msg)
595 if self.buftimer:
596 if self.buftimer.active():
597 self.buftimer.cancel()
598 self.buftimer = None
599
601 """
602 Add data to the buffer for logname
603 Start a timer to send the buffers if BUFFER_TIMEOUT elapses.
604 If adding data causes the buffer size to grow beyond BUFFER_SIZE, then
605 the buffers will be sent.
606 """
607 n = len(data)
608
609 self.buflen += n
610 self.buffered.append((logname, data))
611 if self.buflen > self.BUFFER_SIZE:
612 self._sendBuffers()
613 elif not self.buftimer:
614 self.buftimer = self._reactor.callLater(self.BUFFER_TIMEOUT, self._bufferTimeout)
615
617 if self.sendStdout:
618 self._addToBuffers('stdout', data)
619
620 if self.keepStdout:
621 self.stdout += data
622 if self.timer:
623 self.timer.reset(self.timeout)
624
626 if self.sendStderr:
627 self._addToBuffers('stderr', data)
628
629 if self.keepStderr:
630 self.stderr += data
631 if self.timer:
632 self.timer.reset(self.timeout)
633
639
641 self.elapsedTime = util.now(self._reactor) - self.startTime
642 log.msg("command finished with signal %s, exit code %s, elapsedTime: %0.6f" % (sig,rc,self.elapsedTime))
643 for w in self.logFileWatchers:
644
645 w.stop()
646 self._sendBuffers()
647 if sig is not None:
648 rc = -1
649 if self.sendRC:
650 if sig is not None:
651 self.sendStatus(
652 {'header': "process killed by signal %d\n" % sig})
653 self.sendStatus({'rc': rc})
654 self.sendStatus({'header': "elapsedTime=%0.6f\n" % self.elapsedTime})
655 if self.timer:
656 self.timer.cancel()
657 self.timer = None
658 if self.maxTimer:
659 self.maxTimer.cancel()
660 self.maxTimer = None
661 if self.buftimer:
662 self.buftimer.cancel()
663 self.buftimer = None
664 d = self.deferred
665 self.deferred = None
666 if d:
667 d.callback(rc)
668 else:
669 log.msg("Hey, command %s finished twice" % self)
670
672 self._sendBuffers()
673 log.msg("RunProcess.failed: command failed: %s" % (why,))
674 if self.timer:
675 self.timer.cancel()
676 self.timer = None
677 if self.maxTimer:
678 self.maxTimer.cancel()
679 self.maxTimer = None
680 if self.buftimer:
681 self.buftimer.cancel()
682 self.buftimer = None
683 d = self.deferred
684 self.deferred = None
685 if d:
686 d.errback(why)
687 else:
688 log.msg("Hey, command %s finished twice" % self)
689
691 self.timer = None
692 msg = "command timed out: %d seconds without output" % self.timeout
693 self.kill(msg)
694
696 self.maxTimer = None
697 msg = "command timed out: %d seconds elapsed" % self.maxTime
698 self.kill(msg)
699
700 - def kill(self, msg):
701
702
703 self._sendBuffers()
704 if self.timer:
705 self.timer.cancel()
706 self.timer = None
707 if self.maxTimer:
708 self.maxTimer.cancel()
709 self.maxTimer = None
710 if self.buftimer:
711 self.buftimer.cancel()
712 self.buftimer = None
713 if hasattr(self.process, "pid") and self.process.pid is not None:
714 msg += ", killing pid %s" % self.process.pid
715 log.msg(msg)
716 self.sendStatus({'header': "\n" + msg + "\n"})
717
718
719
720 self.pp.killed = True
721
722 hit = 0
723 if runtime.platformType == "posix":
724 try:
725
726
727
728
729
730
731
732
733
734 sig = None
735 if self.KILL is not None:
736 sig = getattr(signal, "SIG"+ self.KILL, None)
737
738 if self.KILL == None:
739 log.msg("self.KILL==None, only pretending to kill child")
740 elif sig is None:
741 log.msg("signal module is missing SIG%s" % self.KILL)
742 elif not hasattr(os, "kill"):
743 log.msg("os module is missing the 'kill' function")
744 elif not hasattr(self.process, "pid") or self.process.pid is None:
745 log.msg("self.process has no pid")
746 else:
747 log.msg("trying os.kill(-pid, %d)" % (sig,))
748
749 os.kill(-self.process.pid, sig)
750 log.msg(" signal %s sent successfully" % sig)
751 hit = 1
752 except OSError:
753
754
755 pass
756 if not hit:
757 try:
758 if self.KILL is None:
759 log.msg("self.KILL==None, only pretending to kill child")
760 else:
761 log.msg("trying process.signalProcess('KILL')")
762 self.process.signalProcess(self.KILL)
763 log.msg(" signal %s sent successfully" % (self.KILL,))
764 hit = 1
765 except OSError:
766
767 pass
768 except error.ProcessExitedAlready:
769
770 pass
771 if not hit:
772 log.msg("signalProcess/os.kill failed both times")
773
774 if runtime.platformType == "posix":
775
776
777
778 self.pp.transport.loseConnection()
779
780
781
782 self.timer = self._reactor.callLater(self.BACKUP_TIMEOUT,
783 self.doBackupTimeout)
784
786 log.msg("we tried to kill the process, and it wouldn't die.."
787 " finish anyway")
788 self.timer = None
789 self.sendStatus({'header': "SIGKILL failed to kill process\n"})
790 if self.sendRC:
791 self.sendStatus({'header': "using fake rc=-1\n"})
792 self.sendStatus({'rc': -1})
793 self.failed(RuntimeError("SIGKILL failed to kill process"))
794
795
798
801