1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16  """ 
 17  Support for running 'shell commands' 
 18  """ 
 19   
 20  import sys 
 21  import os 
 22  import signal 
 23  import types 
 24  import re 
 25  import subprocess 
 26  import traceback 
 27  import stat 
 28  from collections import deque 
 29   
 30  from twisted.python import runtime, log 
 31  from twisted.internet import reactor, defer, protocol, task, error 
 32   
 33  from buildslave import util 
 34  from buildslave.exceptions import AbandonChain 
 35   
 36  if runtime.platformType == 'posix': 
 37      from twisted.internet.process import Process 
 38   
 40       
 41       
 42       
 43       
 44       
 45       
 46       
 47       
 48       
 49       
 50       
 51      if runtime.platformType == 'win32': 
 52          return " ".join([ `e` for e in cmd_list ]) 
 53      else: 
 54          import pipes 
 55          def quote(e): 
 56              if not e: 
 57                  return '""' 
 58              return pipes.quote(e) 
  59          return " ".join([ quote(e) for e in cmd_list ]) 
 60   
 62      POLL_INTERVAL = 2 
 63   
 64 -    def __init__(self, command, name, logfile, follow=False): 
  82   
 85   
 87          log.err(err, msg="Polling error") 
 88          self.poller = None 
  89   
 91          self.poll() 
 92          if self.poller is not None: 
 93              self.poller.stop() 
 94          if self.started: 
 95              self.f.close() 
  96   
 98          if os.path.exists(self.logfile): 
 99              s = os.stat(self.logfile) 
100              return (s[stat.ST_CTIME], s[stat.ST_MTIME], s[stat.ST_SIZE]) 
101          return None 
 102   
104          if not self.started: 
105              s = self.statFile() 
106              if s == self.old_logfile_stats: 
107                  return  
108              if not s: 
109                   
110                   
111                   
112                  self.old_logfile_stats = None 
113                  return  
114              self.f = open(self.logfile, "rb") 
115               
116               
117               
118              if self.follow: 
119                  self.f.seek(s[2], 0) 
120              self.started = True 
121          self.f.seek(self.f.tell(), 0) 
122          while True: 
123              data = self.f.read(10000) 
124              if not data: 
125                  return 
126              self.command.addLogfile(self.name, data) 
  127   
128   
129  if runtime.platformType == 'posix': 
131          """Sumple subclass of Process to also make the spawned process a process 
132          group leader, so we can kill all members of the process group.""" 
133   
135              Process._setupChild(self, *args, **kwargs) 
136   
137               
138               
139               
140              os.setpgid(0, 0) 
  141   
142   
144      debug = False 
145   
147          self.command = command 
148          self.pending_stdin = "" 
149          self.stdin_finished = False 
150          self.killed = False 
 151   
153          assert not self.connected 
154          self.pending_stdin = data 
 155   
157          if self.debug: 
158              log.msg("RunProcessPP.connectionMade") 
159   
160          if self.command.useProcGroup: 
161              if self.debug: 
162                  log.msg(" recording pid %d as subprocess pgid" 
163                          % (self.transport.pid,)) 
164              self.transport.pgid = self.transport.pid 
165   
166          if self.pending_stdin: 
167              if self.debug: log.msg(" writing to stdin") 
168              self.transport.write(self.pending_stdin) 
169          if self.debug: log.msg(" closing stdin") 
170          self.transport.closeStdin() 
 171   
176   
181   
183          if self.debug: 
184              log.msg("RunProcessPP.processEnded", status_object) 
185           
186           
187           
188          sig = status_object.value.signal 
189          rc = status_object.value.exitCode 
190   
191           
192           
193           
194          if self.killed and rc == 0: 
195              log.msg("process was killed, but exited with status 0; faking a failure") 
196               
197              if runtime.platformType == 'win32': 
198                  rc = 1 
199              else: 
200                  rc = -1 
201          self.command.finished(sig, rc) 
  202   
203   
205      """ 
206      This is a helper class, used by slave commands to run programs in a child 
207      shell. 
208      """ 
209   
210      notreally = False 
211      BACKUP_TIMEOUT = 5 
212      KILL = "KILL" 
213      CHUNK_LIMIT = 128*1024 
214   
215       
216       
217      BUFFER_SIZE = 64*1024 
218      BUFFER_TIMEOUT = 5 
219   
220       
221      startTime = None 
222      elapsedTime = None 
223   
224       
225      _reactor = reactor 
226   
227       
228       
229       
230       
231   
232 -    def __init__(self, builder, command, 
233                   workdir, environ=None, 
234                   sendStdout=True, sendStderr=True, sendRC=True, 
235                   timeout=None, maxTime=None, initialStdin=None, 
236                   keepStdout=False, keepStderr=False, 
237                   logEnviron=True, logfiles={}, usePTY="slave-config", 
238                   useProcGroup=True): 
 239          """ 
240   
241          @param keepStdout: if True, we keep a copy of all the stdout text 
242                             that we've seen. This copy is available in 
243                             self.stdout, which can be read after the command 
244                             has finished. 
245          @param keepStderr: same, for stderr 
246   
247          @param usePTY: "slave-config" -> use the SlaveBuilder's usePTY; 
248              otherwise, true to use a PTY, false to not use a PTY. 
249   
250          @param useProcGroup: (default True) use a process group for non-PTY 
251              process invocations 
252          """ 
253   
254          self.builder = builder 
255          self.command = util.Obfuscated.get_real(command) 
256   
257           
258           
259           
260           
261           
262           
263           
264           
265           
266           
267          if isinstance(self.command, (tuple, list)): 
268              for i, a in enumerate(self.command): 
269                  if isinstance(a, unicode): 
270                      self.command[i] = a.encode(self.builder.unicode_encoding) 
271          elif isinstance(self.command, unicode): 
272              self.command = self.command.encode(self.builder.unicode_encoding) 
273   
274          self.fake_command = util.Obfuscated.get_fake(command) 
275          self.sendStdout = sendStdout 
276          self.sendStderr = sendStderr 
277          self.sendRC = sendRC 
278          self.logfiles = logfiles 
279          self.workdir = workdir 
280          self.process = None 
281          if not os.path.exists(workdir): 
282              os.makedirs(workdir) 
283          if environ: 
284              if environ.has_key('PYTHONPATH'): 
285                  ppath = environ['PYTHONPATH'] 
286                   
287                   
288                   
289                  if not isinstance(ppath, str): 
290                       
291                       
292                      ppath = os.pathsep.join(ppath) 
293   
294                  environ['PYTHONPATH'] = ppath + os.pathsep + "${PYTHONPATH}" 
295   
296               
297              p = re.compile('\${([0-9a-zA-Z_]*)}') 
298              def subst(match): 
299                  return os.environ.get(match.group(1), "") 
 300              newenv = {} 
301              for key in os.environ.keys(): 
302                   
303                  if key not in environ or environ[key] is not None: 
304                      newenv[key] = os.environ[key] 
305              for key in environ.keys(): 
306                  if environ[key] is not None: 
307                      newenv[key] = p.sub(subst, environ[key]) 
308   
309              self.environ = newenv 
310          else:  
311              self.environ = os.environ.copy() 
312          self.initialStdin = initialStdin 
313          self.logEnviron = logEnviron 
314          self.timeout = timeout 
315          self.timer = None 
316          self.maxTime = maxTime 
317          self.maxTimer = None 
318          self.keepStdout = keepStdout 
319          self.keepStderr = keepStderr 
320   
321          self.buffered = deque() 
322          self.buflen = 0 
323          self.buftimer = None 
324   
325          if usePTY == "slave-config": 
326              self.usePTY = self.builder.usePTY 
327          else: 
328              self.usePTY = usePTY 
329   
330           
331           
332           
333           
334          if runtime.platformType != "posix" or initialStdin is not None: 
335              if self.usePTY and usePTY != "slave-config": 
336                  self.sendStatus({'header': "WARNING: disabling usePTY for this command"}) 
337              self.usePTY = False 
338   
339           
340           
341          if runtime.platformType != 'posix': 
342              useProcGroup = False 
343          elif self.usePTY: 
344              useProcGroup = True 
345          self.useProcGroup = useProcGroup 
346   
347          self.logFileWatchers = [] 
348          for name,filevalue in self.logfiles.items(): 
349              filename = filevalue 
350              follow = False 
351   
352               
353               
354              if type(filevalue) == dict: 
355                  filename = filevalue['filename'] 
356                  follow = filevalue.get('follow', False) 
357   
358              w = LogFileWatcher(self, name, 
359                                 os.path.join(self.workdir, filename), 
360                                 follow=follow) 
361              self.logFileWatchers.append(w) 
 362   
364          return "<%s '%s'>" % (self.__class__.__name__, self.fake_command) 
 365   
368   
370           
371           
372          if self.keepStdout: 
373              self.stdout = "" 
374          if self.keepStderr: 
375              self.stderr = "" 
376          self.deferred = defer.Deferred() 
377          try: 
378              self._startCommand() 
379          except: 
380              log.msg("error in RunProcess._startCommand") 
381              log.err() 
382              self._addToBuffers('stderr', "error in RunProcess._startCommand\n") 
383              self._addToBuffers('stderr', traceback.format_exc()) 
384              self._sendBuffers() 
385               
386              self.deferred.errback(AbandonChain(-1)) 
387          return self.deferred 
 388   
390           
391          if not os.path.isdir(self.workdir): 
392              os.makedirs(self.workdir) 
393          log.msg("RunProcess._startCommand") 
394          if self.notreally: 
395              self._addToBuffers('header', "command '%s' in dir %s" % \ 
396                               (self.fake_command, self.workdir)) 
397              self._addToBuffers('header', "(not really)\n") 
398              self.finished(None, 0) 
399              return 
400   
401          self.pp = RunProcessPP(self) 
402   
403          if type(self.command) in types.StringTypes: 
404              if runtime.platformType  == 'win32': 
405                  argv = os.environ['COMSPEC'].split()  
406                  if '/c' not in argv: argv += ['/c'] 
407                  argv += [self.command] 
408              else: 
409                   
410                   
411                  argv = ['/bin/sh', '-c', self.command] 
412              display = self.fake_command 
413          else: 
414               
415               
416               
417               
418               
419              if runtime.platformType == 'win32' and not \ 
420                      (self.command[0].lower().endswith(".exe") and os.path.isabs(self.command[0])): 
421                  argv = os.environ['COMSPEC'].split()  
422                  if '/c' not in argv: argv += ['/c'] 
423                  argv += list(self.command) 
424              else: 
425                  argv = self.command 
426               
427              display = shell_quote(self.fake_command) 
428   
429           
430           
431           
432          if not self.environ.get('MACHTYPE', None) == 'i686-pc-msys': 
433              self.environ['PWD'] = os.path.abspath(self.workdir) 
434   
435           
436   
437          log.msg(" " + display) 
438          self._addToBuffers('header', display+"\n") 
439   
440           
441          msg = " in dir %s" % (self.workdir,) 
442          if self.timeout: 
443              if self.timeout == 1: 
444                  unit = "sec" 
445              else: 
446                  unit = "secs" 
447              msg += " (timeout %d %s)" % (self.timeout, unit) 
448          if self.maxTime: 
449              if self.maxTime == 1: 
450                  unit = "sec" 
451              else: 
452                  unit = "secs" 
453              msg += " (maxTime %d %s)" % (self.maxTime, unit) 
454          log.msg(" " + msg) 
455          self._addToBuffers('header', msg+"\n") 
456   
457          msg = " watching logfiles %s" % (self.logfiles,) 
458          log.msg(" " + msg) 
459          self._addToBuffers('header', msg+"\n") 
460   
461           
462          msg = " argv: %s" % (self.fake_command,) 
463          log.msg(" " + msg) 
464          self._addToBuffers('header', msg+"\n") 
465   
466           
467          if self.logEnviron: 
468              msg = " environment:\n" 
469              env_names = self.environ.keys() 
470              env_names.sort() 
471              for name in env_names: 
472                  msg += "  %s=%s\n" % (name, self.environ[name]) 
473              log.msg(" environment: %s" % (self.environ,)) 
474              self._addToBuffers('header', msg) 
475   
476          if self.initialStdin: 
477              msg = " writing %d bytes to stdin" % len(self.initialStdin) 
478              log.msg(" " + msg) 
479              self._addToBuffers('header', msg+"\n") 
480   
481          msg = " using PTY: %s" % bool(self.usePTY) 
482          log.msg(" " + msg) 
483          self._addToBuffers('header', msg+"\n") 
484   
485           
486           
487          if self.initialStdin: 
488              self.pp.setStdin(self.initialStdin) 
489   
490          self.startTime = util.now(self._reactor) 
491   
492           
493   
494          self.process = self._spawnProcess( 
495                                   self.pp, argv[0], argv, 
496                                   self.environ, 
497                                   self.workdir, 
498                                   usePTY=self.usePTY) 
499   
500           
501   
502          if self.timeout: 
503              self.timer = self._reactor.callLater(self.timeout, self.doTimeout) 
504   
505          if self.maxTime: 
506              self.maxTimer = self._reactor.callLater(self.maxTime, self.doMaxTimeout) 
507   
508          for w in self.logFileWatchers: 
509              w.start() 
 510   
511 -    def _spawnProcess(self, processProtocol, executable, args=(), env={}, 
512              path=None, uid=None, gid=None, usePTY=False, childFDs=None): 
 513          """private implementation of reactor.spawnProcess, to allow use of 
514          L{ProcGroupProcess}""" 
515   
516           
517          if runtime.platformType == 'posix': 
518              if self.useProcGroup and not usePTY: 
519                  return ProcGroupProcess(reactor, executable, args, env, path, 
520                                      processProtocol, uid, gid, childFDs) 
521   
522           
523          return reactor.spawnProcess(processProtocol, executable, args, env, 
524                                          path, usePTY=usePTY) 
 525   
527          """ 
528          limit the chunks that we send over PB to 128k, since it has a hardwired 
529          string-size limit of 640k. 
530          """ 
531          LIMIT = self.CHUNK_LIMIT 
532          for i in range(0, len(data), LIMIT): 
533              yield data[i:i+LIMIT] 
 534   
536          """ 
537          Take msg, which is a dictionary of lists of output chunks, and 
538          concatentate all the chunks into a single string 
539          """ 
540          retval = {} 
541          for log in msg: 
542              data = "".join(msg[log]) 
543              if isinstance(log, tuple) and log[0] == 'log': 
544                  retval['log'] = (log[1], data) 
545              else: 
546                  retval[log] = data 
547          return retval 
 548   
550          """ 
551          Collapse and send msg to the master 
552          """ 
553          if not msg: 
554              return 
555          msg = self._collapseMsg(msg) 
556          self.sendStatus(msg) 
 557   
559          self.buftimer = None 
560          self._sendBuffers() 
 561   
563          """ 
564          Send all the content in our buffers. 
565          """ 
566          msg = {} 
567          msg_size = 0 
568          lastlog = None 
569          logdata = [] 
570          while self.buffered: 
571               
572              logname, data = self.buffered.popleft() 
573   
574               
575               
576               
577               
578               
579               
580               
581               
582              if lastlog is None: 
583                  lastlog = logname 
584              elif logname != lastlog: 
585                  self._sendMessage(msg) 
586                  msg = {} 
587                  msg_size = 0 
588              lastlog = logname 
589   
590              logdata = msg.setdefault(logname, []) 
591   
592               
593               
594              for chunk in self._chunkForSend(data): 
595                  if len(chunk) == 0: continue 
596                  logdata.append(chunk) 
597                  msg_size += len(chunk) 
598                  if msg_size >= self.CHUNK_LIMIT: 
599                       
600                       
601                       
602                      self._sendMessage(msg) 
603                      msg = {} 
604                      logdata = msg.setdefault(logname, []) 
605                      msg_size = 0 
606          self.buflen = 0 
607          if logdata: 
608              self._sendMessage(msg) 
609          if self.buftimer: 
610              if self.buftimer.active(): 
611                  self.buftimer.cancel() 
612              self.buftimer = None 
 613   
615          """ 
616          Add data to the buffer for logname 
617          Start a timer to send the buffers if BUFFER_TIMEOUT elapses. 
618          If adding data causes the buffer size to grow beyond BUFFER_SIZE, then 
619          the buffers will be sent. 
620          """ 
621          n = len(data) 
622   
623          self.buflen += n 
624          self.buffered.append((logname, data)) 
625          if self.buflen > self.BUFFER_SIZE: 
626              self._sendBuffers() 
627          elif not self.buftimer: 
628              self.buftimer = self._reactor.callLater(self.BUFFER_TIMEOUT, self._bufferTimeout) 
 629   
631          if self.sendStdout: 
632              self._addToBuffers('stdout', data) 
633   
634          if self.keepStdout: 
635              self.stdout += data 
636          if self.timer: 
637              self.timer.reset(self.timeout) 
 638   
640          if self.sendStderr: 
641              self._addToBuffers('stderr', data) 
642   
643          if self.keepStderr: 
644              self.stderr += data 
645          if self.timer: 
646              self.timer.reset(self.timeout) 
 647   
653   
655          self.elapsedTime = util.now(self._reactor) - self.startTime 
656          log.msg("command finished with signal %s, exit code %s, elapsedTime: %0.6f" % (sig,rc,self.elapsedTime)) 
657          for w in self.logFileWatchers: 
658               
659              w.stop() 
660          self._sendBuffers() 
661          if sig is not None: 
662              rc = -1 
663          if self.sendRC: 
664              if sig is not None: 
665                  self.sendStatus( 
666                      {'header': "process killed by signal %d\n" % sig}) 
667              self.sendStatus({'rc': rc}) 
668          self.sendStatus({'header': "elapsedTime=%0.6f\n" % self.elapsedTime}) 
669          if self.timer: 
670              self.timer.cancel() 
671              self.timer = None 
672          if self.maxTimer: 
673              self.maxTimer.cancel() 
674              self.maxTimer = None 
675          if self.buftimer: 
676              self.buftimer.cancel() 
677              self.buftimer = None 
678          d = self.deferred 
679          self.deferred = None 
680          if d: 
681              d.callback(rc) 
682          else: 
683              log.msg("Hey, command %s finished twice" % self) 
 684   
686          self._sendBuffers() 
687          log.msg("RunProcess.failed: command failed: %s" % (why,)) 
688          if self.timer: 
689              self.timer.cancel() 
690              self.timer = None 
691          if self.maxTimer: 
692              self.maxTimer.cancel() 
693              self.maxTimer = None 
694          if self.buftimer: 
695              self.buftimer.cancel() 
696              self.buftimer = None 
697          d = self.deferred 
698          self.deferred = None 
699          if d: 
700              d.errback(why) 
701          else: 
702              log.msg("Hey, command %s finished twice" % self) 
 703   
705          self.timer = None 
706          msg = "command timed out: %d seconds without output" % self.timeout 
707          self.kill(msg) 
 708   
710          self.maxTimer = None 
711          msg = "command timed out: %d seconds elapsed" % self.maxTime 
712          self.kill(msg) 
 713   
714 -    def kill(self, msg): 
 715           
716           
717          self._sendBuffers() 
718          if self.timer: 
719              self.timer.cancel() 
720              self.timer = None 
721          if self.maxTimer: 
722              self.maxTimer.cancel() 
723              self.maxTimer = None 
724          if self.buftimer: 
725              self.buftimer.cancel() 
726              self.buftimer = None 
727          msg += ", attempting to kill" 
728          log.msg(msg) 
729          self.sendStatus({'header': "\n" + msg + "\n"}) 
730   
731           
732           
733          self.pp.killed = True 
734   
735           
736          hit = 0 
737   
738           
739          if not hit and self.useProcGroup and runtime.platformType == "posix": 
740              sig = getattr(signal, "SIG"+ self.KILL, None) 
741   
742              if sig is None: 
743                  log.msg("signal module is missing SIG%s" % self.KILL) 
744              elif not hasattr(os, "kill"): 
745                  log.msg("os module is missing the 'kill' function") 
746              elif self.process.pgid is None: 
747                  log.msg("self.process has no pgid") 
748              else: 
749                  log.msg("trying to kill process group %d" % 
750                                                  (self.process.pgid,)) 
751                  try: 
752                      os.kill(-self.process.pgid, sig) 
753                      log.msg(" signal %s sent successfully" % sig) 
754                      self.process.pgid = None 
755                      hit = 1 
756                  except OSError: 
757                      log.msg('failed to kill process group (ignored): %s' % 
758                              (sys.exc_info()[1],)) 
759                       
760                       
761                      pass 
762   
763          elif runtime.platformType == "win32": 
764              if self.KILL == None: 
765                  log.msg("self.KILL==None, only pretending to kill child") 
766              else: 
767                  log.msg("using TASKKILL /F PID /T to kill pid %s" % self.process.pid) 
768                  subprocess.check_call("TASKKILL /F /PID %s /T" % self.process.pid) 
769                  log.msg("taskkill'd pid %s" % self.process.pid) 
770                  hit = 1 
771   
772           
773          if not hit: 
774              try: 
775                  log.msg("trying process.signalProcess('%s')" % (self.KILL,)) 
776                  self.process.signalProcess(self.KILL) 
777                  log.msg(" signal %s sent successfully" % (self.KILL,)) 
778                  hit = 1 
779              except OSError: 
780                  log.err("from process.signalProcess:") 
781                   
782                  pass 
783              except error.ProcessExitedAlready: 
784                  log.msg("Process exited already - can't kill") 
785                   
786                   
787                  pass 
788   
789          if not hit: 
790              log.msg("signalProcess/os.kill failed both times") 
791   
792          if runtime.platformType == "posix": 
793               
794               
795               
796              self.pp.transport.loseConnection() 
797   
798          if self.deferred: 
799               
800               
801              self.timer = self._reactor.callLater(self.BACKUP_TIMEOUT, 
802                                         self.doBackupTimeout) 
 803   
805          log.msg("we tried to kill the process, and it wouldn't die.." 
806                  " finish anyway") 
807          self.timer = None 
808          self.sendStatus({'header': "SIGKILL failed to kill process\n"}) 
809          if self.sendRC: 
810              self.sendStatus({'header': "using fake rc=-1\n"}) 
811              self.sendStatus({'rc': -1}) 
812          self.failed(RuntimeError("SIGKILL failed to kill process")) 
 813