1 """
2 Support for running 'shell commands'
3 """
4
5 import os
6 import sys
7 import signal
8 import types
9 import re
10 import traceback
11 import stat
12 from collections import deque
13
14 from twisted.python import runtime, log
15 from twisted.internet import reactor, defer, protocol, task
16
17 from buildslave import util
18 from buildslave.exceptions import AbandonChain
19
21 POLL_INTERVAL = 2
22
23 - def __init__(self, command, name, logfile, follow=False):
24 self.command = command
25 self.name = name
26 self.logfile = logfile
27
28 log.msg("LogFileWatcher created to watch %s" % logfile)
29
30
31
32 self.old_logfile_stats = self.statFile()
33 self.started = False
34
35
36
37 self.follow = follow
38
39
40 self.poller = task.LoopingCall(self.poll)
41
44
46 log.err(err, msg="Polling error")
47 self.poller = None
48
50 self.poll()
51 if self.poller is not None:
52 self.poller.stop()
53 if self.started:
54 self.f.close()
55
57 if os.path.exists(self.logfile):
58 s = os.stat(self.logfile)
59 return (s[stat.ST_CTIME], s[stat.ST_MTIME], s[stat.ST_SIZE])
60 return None
61
63 if not self.started:
64 s = self.statFile()
65 if s == self.old_logfile_stats:
66 return
67 if not s:
68
69
70
71 self.old_logfile_stats = None
72 return
73 self.f = open(self.logfile, "rb")
74
75
76
77 if self.follow:
78 self.f.seek(s[2], 0)
79 self.started = True
80 self.f.seek(self.f.tell(), 0)
81 while True:
82 data = self.f.read(10000)
83 if not data:
84 return
85 self.command.addLogfile(self.name, data)
86
87
89 debug = False
90
92 self.command = command
93 self.pending_stdin = ""
94 self.stdin_finished = False
95 self.killed = False
96
98 assert not self.stdin_finished
99 if self.connected:
100 self.transport.write(data)
101 else:
102 self.pending_stdin += data
103
109
111 if self.debug:
112 log.msg("RunProcessPP.connectionMade")
113 if not self.command.process:
114 if self.debug:
115 log.msg(" assigning self.command.process: %s" %
116 (self.transport,))
117 self.command.process = self.transport
118
119
120
121
122
123
124
125
126
127
128
129
130 if self.pending_stdin:
131 if self.debug: log.msg(" writing to stdin")
132 self.transport.write(self.pending_stdin)
133 if self.stdin_finished:
134 if self.debug: log.msg(" closing stdin")
135 self.transport.closeStdin()
136
141
146
148 if self.debug:
149 log.msg("RunProcessPP.processEnded", status_object)
150
151
152
153 sig = status_object.value.signal
154 rc = status_object.value.exitCode
155
156
157
158
159 if self.killed and rc == 0:
160 log.msg("process was killed, but exited with status 0; faking a failure")
161
162 if runtime.platformType == 'win32':
163 rc = 1
164 else:
165 rc = -1
166 self.command.finished(sig, rc)
167
169 """
170 This is a helper class, used by slave commands to run programs in a child
171 shell.
172 """
173
174 notreally = False
175 BACKUP_TIMEOUT = 5
176 KILL = "KILL"
177 CHUNK_LIMIT = 128*1024
178
179
180
181 BUFFER_SIZE = 64*1024
182 BUFFER_TIMEOUT = 5
183
184
185 startTime = None
186 elapsedTime = None
187
188
189 _reactor = reactor
190
191
192
193
194
195
196 - def __init__(self, builder, command,
197 workdir, environ=None,
198 sendStdout=True, sendStderr=True, sendRC=True,
199 timeout=None, maxTime=None, initialStdin=None,
200 keepStdinOpen=False, keepStdout=False, keepStderr=False,
201 logEnviron=True, logfiles={}, usePTY="slave-config"):
202 """
203
204 @param keepStdout: if True, we keep a copy of all the stdout text
205 that we've seen. This copy is available in
206 self.stdout, which can be read after the command
207 has finished.
208 @param keepStderr: same, for stderr
209
210 @param usePTY: "slave-config" -> use the SlaveBuilder's usePTY;
211 otherwise, true to use a PTY, false to not use a PTY.
212 """
213
214 self.builder = builder
215 self.command = util.Obfuscated.get_real(command)
216
217
218
219
220
221
222
223
224
225
226
227 if isinstance(self.command, (tuple, list)):
228 for i, a in enumerate(self.command):
229 if isinstance(a, unicode):
230 self.command[i] = a.encode(self.builder.unicode_encoding)
231 elif isinstance(self.command, unicode):
232 self.command = self.command.encode(self.builder.unicode_encoding)
233
234 self.fake_command = util.Obfuscated.get_fake(command)
235 self.sendStdout = sendStdout
236 self.sendStderr = sendStderr
237 self.sendRC = sendRC
238 self.logfiles = logfiles
239 self.workdir = workdir
240 if not os.path.exists(workdir):
241 os.makedirs(workdir)
242 if environ:
243 if environ.has_key('PYTHONPATH'):
244 ppath = environ['PYTHONPATH']
245
246
247
248 if not isinstance(ppath, str):
249
250
251 ppath = os.pathsep.join(ppath)
252
253 environ['PYTHONPATH'] = ppath + os.pathsep + "${PYTHONPATH}"
254
255
256 p = re.compile('\${([0-9a-zA-Z_]*)}')
257 def subst(match):
258 return os.environ.get(match.group(1), "")
259 newenv = {}
260 for key in os.environ.keys():
261
262 if key not in environ or environ[key] is not None:
263 newenv[key] = os.environ[key]
264 for key in environ.keys():
265 if environ[key] is not None:
266 newenv[key] = p.sub(subst, environ[key])
267
268 self.environ = newenv
269 else:
270 self.environ = os.environ.copy()
271 self.initialStdin = initialStdin
272 self.keepStdinOpen = keepStdinOpen
273 self.logEnviron = logEnviron
274 self.timeout = timeout
275 self.timer = None
276 self.maxTime = maxTime
277 self.maxTimer = None
278 self.keepStdout = keepStdout
279 self.keepStderr = keepStderr
280
281 self.buffered = deque()
282 self.buflen = 0
283 self.buftimer = None
284
285 if usePTY == "slave-config":
286 self.usePTY = self.builder.usePTY
287 else:
288 self.usePTY = usePTY
289
290
291
292
293
294 if runtime.platformType != "posix" or initialStdin is not None:
295 if self.usePTY and usePTY != "slave-config":
296 self.sendStatus({'header': "WARNING: disabling usePTY for this command"})
297 self.usePTY = False
298
299 self.logFileWatchers = []
300 for name,filevalue in self.logfiles.items():
301 filename = filevalue
302 follow = False
303
304
305
306 if type(filevalue) == dict:
307 filename = filevalue['filename']
308 follow = filevalue.get('follow', False)
309
310 w = LogFileWatcher(self, name,
311 os.path.join(self.workdir, filename),
312 follow=follow)
313 self.logFileWatchers.append(w)
314
316 return "<%s '%s'>" % (self.__class__.__name__, self.fake_command)
317
320
322
323
324 if self.keepStdout:
325 self.stdout = ""
326 if self.keepStderr:
327 self.stderr = ""
328 self.deferred = defer.Deferred()
329 try:
330 self._startCommand()
331 except:
332 log.msg("error in RunProcess._startCommand")
333 log.err()
334 self._addToBuffers('stderr', "error in RunProcess._startCommand\n")
335 self._addToBuffers('stderr', traceback.format_exc())
336 self._sendBuffers()
337
338 self.deferred.errback(AbandonChain(-1))
339 return self.deferred
340
342
343 if not os.path.isdir(self.workdir):
344 os.makedirs(self.workdir)
345 log.msg("RunProcess._startCommand")
346 if self.notreally:
347 self._addToBuffers('header', "command '%s' in dir %s" % \
348 (self.fake_command, self.workdir))
349 self._addToBuffers('header', "(not really)\n")
350 self.finished(None, 0)
351 return
352
353 self.pp = RunProcessPP(self)
354
355 if type(self.command) in types.StringTypes:
356 if runtime.platformType == 'win32':
357 argv = os.environ['COMSPEC'].split()
358 if '/c' not in argv: argv += ['/c']
359 argv += [self.command]
360 else:
361
362
363 argv = ['/bin/sh', '-c', self.command]
364 display = self.fake_command
365 else:
366
367
368
369
370
371 if runtime.platformType == 'win32' and not \
372 (self.command[0].lower().endswith(".exe") and os.path.isabs(self.command[0])):
373 argv = os.environ['COMSPEC'].split()
374 if '/c' not in argv: argv += ['/c']
375 argv += list(self.command)
376 else:
377 argv = self.command
378 display = " ".join(self.fake_command)
379
380
381
382
383 if not self.environ.get('MACHTYPE', None) == 'i686-pc-msys':
384 self.environ['PWD'] = os.path.abspath(self.workdir)
385
386
387
388
389
390
391
392 log.msg(" " + display)
393 self._addToBuffers('header', display+"\n")
394
395
396 msg = " in dir %s" % (self.workdir,)
397 if self.timeout:
398 msg += " (timeout %d secs)" % (self.timeout,)
399 log.msg(" " + msg)
400 self._addToBuffers('header', msg+"\n")
401
402 msg = " watching logfiles %s" % (self.logfiles,)
403 log.msg(" " + msg)
404 self._addToBuffers('header', msg+"\n")
405
406
407 msg = " argv: %s" % (self.fake_command,)
408 log.msg(" " + msg)
409 self._addToBuffers('header', msg+"\n")
410
411
412 if self.logEnviron:
413 msg = " environment:\n"
414 env_names = self.environ.keys()
415 env_names.sort()
416 for name in env_names:
417 msg += " %s=%s\n" % (name, self.environ[name])
418 log.msg(" environment: %s" % (self.environ,))
419 self._addToBuffers('header', msg)
420
421 if self.initialStdin:
422 msg = " writing %d bytes to stdin" % len(self.initialStdin)
423 log.msg(" " + msg)
424 self._addToBuffers('header', msg+"\n")
425
426 if self.keepStdinOpen:
427 msg = " leaving stdin open"
428 else:
429 msg = " closing stdin"
430 log.msg(" " + msg)
431 self._addToBuffers('header', msg+"\n")
432
433 msg = " using PTY: %s" % bool(self.usePTY)
434 log.msg(" " + msg)
435 self._addToBuffers('header', msg+"\n")
436
437
438 if self.initialStdin:
439 self.pp.writeStdin(self.initialStdin)
440 if not self.keepStdinOpen:
441 self.pp.closeStdin()
442
443
444
445
446
447
448
449
450
451
452 self.process = None
453 self.startTime = util.now(self._reactor)
454
455 p = reactor.spawnProcess(self.pp, argv[0], argv,
456 self.environ,
457 self.workdir,
458 usePTY=self.usePTY)
459
460 if not self.process:
461 self.process = p
462
463
464
465
466
467
468
469 if self.timeout:
470 self.timer = self._reactor.callLater(self.timeout, self.doTimeout)
471
472 if self.maxTime:
473 self.maxTimer = self._reactor.callLater(self.maxTime, self.doMaxTimeout)
474
475 for w in self.logFileWatchers:
476 w.start()
477
478
480 """
481 limit the chunks that we send over PB to 128k, since it has a hardwired
482 string-size limit of 640k.
483 """
484 LIMIT = self.CHUNK_LIMIT
485 for i in range(0, len(data), LIMIT):
486 yield data[i:i+LIMIT]
487
489 """
490 Take msg, which is a dictionary of lists of output chunks, and
491 concatentate all the chunks into a single string
492 """
493 retval = {}
494 for log in msg:
495 data = "".join(msg[log])
496 if isinstance(log, tuple) and log[0] == 'log':
497 retval['log'] = (log[1], data)
498 else:
499 retval[log] = data
500 return retval
501
503 """
504 Collapse and send msg to the master
505 """
506 if not msg:
507 return
508 msg = self._collapseMsg(msg)
509 self.sendStatus(msg)
510
512 self.buftimer = None
513 self._sendBuffers()
514
516 """
517 Send all the content in our buffers.
518 """
519 msg = {}
520 msg_size = 0
521 lastlog = None
522 logdata = []
523 while self.buffered:
524
525 logname, data = self.buffered.popleft()
526
527
528
529
530
531
532
533
534
535 if lastlog is None:
536 lastlog = logname
537 elif logname != lastlog:
538 self._sendMessage(msg)
539 msg = {}
540 msg_size = 0
541 lastlog = logname
542
543 logdata = msg.setdefault(logname, [])
544
545
546
547 for chunk in self._chunkForSend(data):
548 if len(chunk) == 0: continue
549 logdata.append(chunk)
550 msg_size += len(chunk)
551 if msg_size >= self.CHUNK_LIMIT:
552
553
554
555 self._sendMessage(msg)
556 msg = {}
557 logdata = msg.setdefault(logname, [])
558 msg_size = 0
559 self.buflen = 0
560 if logdata:
561 self._sendMessage(msg)
562 if self.buftimer:
563 if self.buftimer.active():
564 self.buftimer.cancel()
565 self.buftimer = None
566
568 """
569 Add data to the buffer for logname
570 Start a timer to send the buffers if BUFFER_TIMEOUT elapses.
571 If adding data causes the buffer size to grow beyond BUFFER_SIZE, then
572 the buffers will be sent.
573 """
574 n = len(data)
575
576 self.buflen += n
577 self.buffered.append((logname, data))
578 if self.buflen > self.BUFFER_SIZE:
579 self._sendBuffers()
580 elif not self.buftimer:
581 self.buftimer = self._reactor.callLater(self.BUFFER_TIMEOUT, self._bufferTimeout)
582
584 if self.sendStdout:
585 self._addToBuffers('stdout', data)
586
587 if self.keepStdout:
588 self.stdout += data
589 if self.timer:
590 self.timer.reset(self.timeout)
591
593 if self.sendStderr:
594 self._addToBuffers('stderr', data)
595
596 if self.keepStderr:
597 self.stderr += data
598 if self.timer:
599 self.timer.reset(self.timeout)
600
606
608 self.elapsedTime = util.now(self._reactor) - self.startTime
609 log.msg("command finished with signal %s, exit code %s, elapsedTime: %0.6f" % (sig,rc,self.elapsedTime))
610 for w in self.logFileWatchers:
611
612 w.stop()
613 self._sendBuffers()
614 if sig is not None:
615 rc = -1
616 if self.sendRC:
617 if sig is not None:
618 self.sendStatus(
619 {'header': "process killed by signal %d\n" % sig})
620 self.sendStatus({'rc': rc})
621 self.sendStatus({'header': "elapsedTime=%0.6f\n" % self.elapsedTime})
622 if self.timer:
623 self.timer.cancel()
624 self.timer = None
625 if self.maxTimer:
626 self.maxTimer.cancel()
627 self.maxTimer = None
628 if self.buftimer:
629 self.buftimer.cancel()
630 self.buftimer = None
631 d = self.deferred
632 self.deferred = None
633 if d:
634 d.callback(rc)
635 else:
636 log.msg("Hey, command %s finished twice" % self)
637
639 self._sendBuffers()
640 log.msg("RunProcess.failed: command failed: %s" % (why,))
641 if self.timer:
642 self.timer.cancel()
643 self.timer = None
644 if self.maxTimer:
645 self.maxTimer.cancel()
646 self.maxTimer = None
647 if self.buftimer:
648 self.buftimer.cancel()
649 self.buftimer = None
650 d = self.deferred
651 self.deferred = None
652 if d:
653 d.errback(why)
654 else:
655 log.msg("Hey, command %s finished twice" % self)
656
658 self.timer = None
659 msg = "command timed out: %d seconds without output" % self.timeout
660 self.kill(msg)
661
663 self.maxTimer = None
664 msg = "command timed out: %d seconds elapsed" % self.maxTime
665 self.kill(msg)
666
667 - def kill(self, msg):
668
669
670 self._sendBuffers()
671 if self.timer:
672 self.timer.cancel()
673 self.timer = None
674 if self.maxTimer:
675 self.maxTimer.cancel()
676 self.maxTimer = None
677 if self.buftimer:
678 self.buftimer.cancel()
679 self.buftimer = None
680 if hasattr(self.process, "pid") and self.process.pid is not None:
681 msg += ", killing pid %s" % self.process.pid
682 log.msg(msg)
683 self.sendStatus({'header': "\n" + msg + "\n"})
684
685
686
687 self.pp.killed = True
688
689 hit = 0
690 if runtime.platformType == "posix":
691 try:
692
693
694
695
696
697
698
699
700
701 sig = None
702 if self.KILL is not None:
703 sig = getattr(signal, "SIG"+ self.KILL, None)
704
705 if self.KILL == None:
706 log.msg("self.KILL==None, only pretending to kill child")
707 elif sig is None:
708 log.msg("signal module is missing SIG%s" % self.KILL)
709 elif not hasattr(os, "kill"):
710 log.msg("os module is missing the 'kill' function")
711 elif not hasattr(self.process, "pid") or self.process.pid is None:
712 log.msg("self.process has no pid")
713 else:
714 log.msg("trying os.kill(-pid, %d)" % (sig,))
715
716 os.kill(-self.process.pid, sig)
717 log.msg(" signal %s sent successfully" % sig)
718 hit = 1
719 except OSError:
720
721
722 pass
723 if not hit:
724 try:
725 if self.KILL is None:
726 log.msg("self.KILL==None, only pretending to kill child")
727 else:
728 log.msg("trying process.signalProcess('KILL')")
729 self.process.signalProcess(self.KILL)
730 log.msg(" signal %s sent successfully" % (self.KILL,))
731 hit = 1
732 except OSError:
733
734 pass
735 if not hit:
736 log.msg("signalProcess/os.kill failed both times")
737
738 if runtime.platformType == "posix":
739
740
741
742 self.pp.transport.loseConnection()
743
744
745
746 self.timer = self._reactor.callLater(self.BACKUP_TIMEOUT,
747 self.doBackupTimeout)
748
750 log.msg("we tried to kill the process, and it wouldn't die.."
751 " finish anyway")
752 self.timer = None
753 self.sendStatus({'header': "SIGKILL failed to kill process\n"})
754 if self.sendRC:
755 self.sendStatus({'header': "using fake rc=-1\n"})
756 self.sendStatus({'rc': -1})
757 self.failed(RuntimeError("SIGKILL failed to kill process"))
758
759
762
765