1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17  from twisted.python import log, reflect 
 18  from twisted.python.failure import Failure 
 19  from twisted.internet import defer, reactor 
 20  from twisted.spread import pb 
 21  from twisted.application import service 
 22   
 23  from buildbot.process.builder import Builder 
 24  from buildbot import interfaces, locks, config, util 
 25  from buildbot.process import metrics 
 26   
 27 -class BotMaster(config.ReconfigurableServiceMixin, service.MultiService): 
  28   
 29      """This is the master-side service which manages remote buildbot slaves. 
 30      It provides them with BuildSlaves, and distributes build requests to 
 31      them.""" 
 32   
 33      debug = 0 
 34   
 70   
 72          """Shut down the entire process, once all currently-running builds are 
 73          complete.""" 
 74          if self.shuttingDown: 
 75              return 
 76          log.msg("Initiating clean shutdown") 
 77          self.shuttingDown = True 
 78   
 79           
 80           
 81          d = self.brd.stopService() 
 82   
 83           
 84          def wait(_): 
 85              l = [] 
 86              for builder in self.builders.values(): 
 87                  for build in builder.builder_status.getCurrentBuilds(): 
 88                      l.append(build.waitUntilFinished()) 
 89              if len(l) == 0: 
 90                  log.msg("No running jobs, starting shutdown immediately") 
 91              else: 
 92                  log.msg("Waiting for %i build(s) to finish" % len(l)) 
 93                  return defer.DeferredList(l) 
  94          d.addCallback(wait) 
 95   
 96           
 97          def shutdown(ign): 
 98               
 99               
100              if self.shuttingDown: 
101                   
102                  for builder in self.builders.values(): 
103                      n = len(builder.builder_status.getCurrentBuilds()) 
104                      if n > 0: 
105                          log.msg("Not shutting down, builder %s has %i builds running" % (builder, n)) 
106                          log.msg("Trying shutdown sequence again") 
107                          self.shuttingDown = False 
108                          self.cleanShutdown() 
109                          return 
110                  log.msg("Stopping reactor") 
111                  _reactor.stop() 
112              else: 
113                  self.brd.startService() 
 114          d.addCallback(shutdown) 
115          d.addErrback(log.err, 'while processing cleanShutdown') 
116   
118          """Cancel a clean shutdown that is already in progress, if any""" 
119          if not self.shuttingDown: 
120              return 
121          log.msg("Cancelling clean shutdown") 
122          self.shuttingDown = False 
 123   
124      @metrics.countMethod('BotMaster.slaveLost()') 
130   
131      @metrics.countMethod('BotMaster.getBuildersForSlave()') 
133          return [ b for b in self.builders.values() 
134                   if slavename in b.config.slavenames ] 
 135   
137          return self.builderNames 
 138   
140          return self.builders.values() 
 141   
145          self.buildrequest_sub = \ 
146              self.master.subscribeToBuildRequests(buildRequestAdded) 
147          service.MultiService.startService(self) 
148   
149      @defer.deferredGenerator 
177   
178   
179      @defer.deferredGenerator 
181   
182          timer = metrics.Timer("BotMaster.reconfigServiceSlaves") 
183          timer.start() 
184   
185           
186          old_by_name = dict([ (s.slavename, s) 
187                              for s in list(self) 
188                              if interfaces.IBuildSlave.providedBy(s) ]) 
189          old_set = set(old_by_name.iterkeys()) 
190          new_by_name = dict([ (s.slavename, s) 
191                              for s in new_config.slaves ]) 
192          new_set = set(new_by_name.iterkeys()) 
193   
194           
195          removed_names, added_names = util.diffSets(old_set, new_set) 
196   
197           
198           
199          for n in old_set & new_set: 
200              old = old_by_name[n] 
201              new = new_by_name[n] 
202               
203              if reflect.qual(old.__class__) != reflect.qual(new.__class__): 
204                  removed_names.add(n) 
205                  added_names.add(n) 
206   
207          if removed_names or added_names: 
208              log.msg("adding %d new slaves, removing %d" % 
209                      (len(added_names), len(removed_names))) 
210   
211              for n in removed_names: 
212                  slave = old_by_name[n] 
213   
214                  del self.slaves[n] 
215                  slave.master = None 
216                  slave.botmaster = None 
217   
218                  wfd = defer.waitForDeferred( 
219                      defer.maybeDeferred(lambda : 
220                          slave.disownServiceParent())) 
221                  yield wfd 
222                  wfd.getResult() 
223   
224              for n in added_names: 
225                  slave = new_by_name[n] 
226                  slave.setServiceParent(self) 
227   
228                  slave.botmaster = self 
229                  slave.master = self.master 
230                  self.slaves[n] = slave 
231   
232          metrics.MetricCountEvent.log("num_slaves", 
233                  len(self.slaves), absolute=True) 
234   
235          timer.stop() 
 236   
237   
238      @defer.deferredGenerator 
240   
241          timer = metrics.Timer("BotMaster.reconfigServiceBuilders") 
242          timer.start() 
243   
244           
245          old_by_name = dict([ (b.name, b) 
246                              for b in list(self) 
247                              if isinstance(b, Builder) ]) 
248          old_set = set(old_by_name.iterkeys()) 
249          new_by_name = dict([ (bc.name, bc) 
250                              for bc in new_config.builders ]) 
251          new_set = set(new_by_name.iterkeys()) 
252   
253           
254          removed_names, added_names = util.diffSets(old_set, new_set) 
255   
256          if removed_names or added_names: 
257              log.msg("adding %d new builders, removing %d" % 
258                      (len(added_names), len(removed_names))) 
259   
260              for n in removed_names: 
261                  builder = old_by_name[n] 
262   
263                  del self.builders[n] 
264                  builder.master = None 
265                  builder.botmaster = None 
266   
267                  wfd = defer.waitForDeferred( 
268                      defer.maybeDeferred(lambda : 
269                          builder.disownServiceParent())) 
270                  yield wfd 
271                  wfd.getResult() 
272   
273              for n in added_names: 
274                  builder = Builder(n) 
275                  self.builders[n] = builder 
276   
277                  builder.botmaster = self 
278                  builder.master = self.master 
279                  builder.setServiceParent(self) 
280   
281          self.builderNames = self.builders.keys() 
282   
283          metrics.MetricCountEvent.log("num_builders", 
284                  len(self.builders), absolute=True) 
285   
286          timer.stop() 
 287   
288   
290          if self.buildrequest_sub: 
291              self.buildrequest_sub.unsubscribe() 
292              self.buildrequest_sub = None 
293          for b in self.builders.values(): 
294              b.builder_status.addPointEvent(["master", "shutdown"]) 
295              b.builder_status.saveYourself() 
296          return service.MultiService.stopService(self) 
 297   
299          """Convert a Lock identifier into an actual Lock instance. 
300          @param lockid: a locks.MasterLock or locks.SlaveLock instance 
301          @return: a locks.RealMasterLock or locks.RealSlaveLock instance 
302          """ 
303          assert isinstance(lockid, (locks.MasterLock, locks.SlaveLock)) 
304          if not lockid in self.locks: 
305              self.locks[lockid] = lockid.lockClass(lockid) 
306           
307           
308           
309           
310          return self.locks[lockid] 
 311   
313          """ 
314          Call this when something suggests that a particular builder may now 
315          be available to start a build. 
316   
317          @param buildername: the name of the builder 
318          """ 
319          self.brd.maybeStartBuildsOn([buildername]) 
 320   
322          """ 
323          Call this when something suggests that a particular slave may now be 
324          available to start a build. 
325   
326          @param slave_name: the name of the slave 
327          """ 
328          builders = self.getBuildersForSlave(slave_name) 
329          self.brd.maybeStartBuildsOn([ b.name for b in builders ]) 
 330   
332          """ 
333          Call this when something suggests that this would be a good time to start some 
334          builds, but nothing more specific. 
335          """ 
336          self.brd.maybeStartBuildsOn(self.builderNames) 
 337   
339      """ 
340      Special-purpose class to handle distributing build requests to builders by 
341      calling their C{maybeStartBuild} method. 
342   
343      This takes account of the C{prioritizeBuilders} configuration, and is 
344      highly re-entrant; that is, if a new build request arrives while builders 
345      are still working on the previous build request, then this class will 
346      correctly re-prioritize invocations of builders' C{maybeStartBuild} 
347      methods. 
348      """ 
349   
351          self.botmaster = botmaster 
352          self.master = botmaster.master 
353   
354           
355          self.pending_builders_lock = defer.DeferredLock() 
356   
357           
358           
359          self._pending_builders = [] 
360          self.activity_lock = defer.DeferredLock() 
361          self.active = False 
 362   
364           
365           
366          d = self.activity_lock.acquire() 
367          d.addCallback(lambda _ : service.Service.stopService(self)) 
368          d.addBoth(lambda _ : self.activity_lock.release()) 
369          return d 
 370   
371      @defer.deferredGenerator 
373          """ 
374          Try to start any builds that can be started right now.  This function 
375          returns immediately, and promises to trigger those builders 
376          eventually. 
377   
378          @param new_builders: names of new builders that should be given the 
379          opportunity to check for new requests. 
380          """ 
381          new_builders = set(new_builders) 
382          existing_pending = set(self._pending_builders) 
383   
384           
385          if new_builders < existing_pending: 
386              return 
387   
388           
389           
390          wfd = defer.waitForDeferred( 
391              self.pending_builders_lock.acquire()) 
392          yield wfd 
393          wfd.getResult() 
394   
395          try: 
396               
397               
398              existing_pending = set(self._pending_builders) 
399   
400               
401              wfd = defer.waitForDeferred( 
402                  self._sortBuilders(list(existing_pending | new_builders))) 
403              yield wfd 
404              self._pending_builders = wfd.getResult() 
405   
406               
407              if not self.active: 
408                  self._activityLoop() 
409          except: 
410              log.err(Failure(), 
411                      "while attempting to start builds on %s" % self.name) 
412   
413           
414          self.pending_builders_lock.release() 
 415   
416      @defer.deferredGenerator 
418          timer = metrics.Timer("BuildRequestDistributor._defaultSorter()") 
419          timer.start() 
420           
421           
422          def xform(bldr): 
423              d = defer.maybeDeferred(lambda : 
424                      bldr.getOldestRequestTime()) 
425              d.addCallback(lambda time : 
426                  (((time is None) and None or time),bldr)) 
427              return d 
 428          wfd = defer.waitForDeferred( 
429              defer.gatherResults( 
430                  [ xform(bldr) for bldr in builders ])) 
431          yield wfd 
432          xformed = wfd.getResult() 
433   
434           
435           
436          def nonecmp(a,b): 
437              if a[0] is None: return 1 
438              if b[0] is None: return -1 
439              return cmp(a,b) 
 440          xformed.sort(cmp=nonecmp) 
441   
442           
443          yield [ xf[1] for xf in xformed ] 
444          timer.stop() 
445   
446      @defer.deferredGenerator 
448          timer = metrics.Timer("BuildRequestDistributor._sortBuilders()") 
449          timer.start() 
450           
451   
452           
453          builders_dict = self.botmaster.builders 
454          builders = [ builders_dict.get(n) 
455                       for n in buildernames 
456                       if n in builders_dict ] 
457   
458           
459          sorter = self.master.config.prioritizeBuilders 
460          if not sorter: 
461              sorter = self._defaultSorter 
462   
463           
464          try: 
465              wfd = defer.waitForDeferred( 
466                  defer.maybeDeferred(lambda : 
467                      sorter(self.master, builders))) 
468              yield wfd 
469              builders = wfd.getResult() 
470          except: 
471              log.msg("Exception prioritizing builders; order unspecified") 
472              log.err(Failure()) 
473   
474           
475          yield [ b.name for b in builders ] 
476          timer.stop() 
 477   
478      @defer.deferredGenerator 
480          self.active = True 
481   
482          timer = metrics.Timer('BuildRequestDistributor._activityLoop()') 
483          timer.start() 
484   
485          while 1: 
486              wfd = defer.waitForDeferred( 
487                  self.activity_lock.acquire()) 
488              yield wfd 
489              wfd.getResult() 
490   
491               
492              wfd = defer.waitForDeferred( 
493                  self.pending_builders_lock.acquire()) 
494              yield wfd 
495              wfd.getResult() 
496   
497               
498              if not self.running or not self._pending_builders: 
499                  self.pending_builders_lock.release() 
500                  self.activity_lock.release() 
501                  break 
502   
503              bldr_name = self._pending_builders.pop(0) 
504              self.pending_builders_lock.release() 
505   
506              try: 
507                  wfd = defer.waitForDeferred( 
508                      self._callABuilder(bldr_name)) 
509                  yield wfd 
510                  wfd.getResult() 
511              except: 
512                  log.err(Failure(), 
513                          "from maybeStartBuild for builder '%s'" % (bldr_name,)) 
514   
515              self.activity_lock.release() 
516   
517          timer.stop() 
518   
519          self.active = False 
520          self._quiet() 
 521   
523           
524          bldr = self.botmaster.builders.get(bldr_name) 
525          if not bldr: 
526              return defer.succeed(None) 
527   
528          d = bldr.maybeStartBuild() 
529          d.addErrback(log.err, 'in maybeStartBuild for %r' % (bldr,)) 
530          return d 
 531   
535   
538      """Utility class to arbitrate the situation when a new slave connects with 
539      the name of an existing, connected slave 
540   
541      @ivar buildslave: L{buildbot.process.slavebuilder.AbstractBuildSlave} 
542      instance 
543      @ivar old_remote: L{RemoteReference} to the old slave 
544      @ivar new_remote: L{RemoteReference} to the new slave 
545      """ 
546      _reactor = reactor  
547   
548       
549       
550       
551       
552       
553       
554       
555       
556       
557       
558       
559       
560       
561   
562      PING_TIMEOUT = 10 
563      """Timeout for pinging the old slave.  Set this to something quite long, as 
564      a very busy slave (e.g., one sending a big log chunk) may take a while to 
565      return a ping. 
566      """ 
567   
571   
573          self.new_remote = mind 
574          self.ping_old_slave_done = False 
575          self.old_slave_connected = True 
576          self.ping_new_slave_done = False 
577   
578          old_tport = self.old_remote.broker.transport 
579          new_tport = self.new_remote.broker.transport 
580          log.msg("duplicate slave %s; delaying new slave (%s) and pinging old " 
581                  "(%s)" % (self.buildslave.slavename, new_tport.getPeer(), 
582                            old_tport.getPeer())) 
583   
584           
585          d = self.new_slave_d = defer.Deferred() 
586   
587           
588           
589           
590          self.ping_old_slave(new_tport.getPeer()) 
591   
592           
593          self.ping_new_slave() 
594   
595          return d 
 596   
598          d = defer.maybeDeferred(lambda : 
599              self.new_remote.callRemote("print", "master already has a " 
600                          "connection named '%s' - checking its liveness" 
601                          % self.buildslave.slavename)) 
602          def done(_): 
603               
604              self.ping_new_slave_done = True 
605              self.maybe_done() 
 606          d.addBoth(done) 
 607   
609           
610           
611           
612           
613          def timeout(): 
614              self.ping_old_slave_timeout = None 
615              self.ping_old_slave_timed_out = True 
616              self.old_slave_connected = False 
617              self.ping_old_slave_done = True 
618              self.maybe_done() 
 619          self.ping_old_slave_timeout = self._reactor.callLater( 
620                                      self.PING_TIMEOUT, timeout) 
621          self.ping_old_slave_timed_out = False 
622   
623           
624           
625          d = defer.maybeDeferred(lambda : 
626              self.old_remote.callRemote("print", 
627                  "master got a duplicate connection from %s; keeping this one" 
628                                          % new_peer)) 
629   
630          def clear_timeout(r): 
631              if self.ping_old_slave_timeout: 
632                  self.ping_old_slave_timeout.cancel() 
633                  self.ping_old_slave_timeout = None 
634              return r 
635          d.addBoth(clear_timeout) 
636   
637          def old_gone(f): 
638              if self.ping_old_slave_timed_out: 
639                  return  
640              f.trap(pb.PBConnectionLost, pb.DeadReferenceError) 
641              log.msg(("connection lost while pinging old slave '%s' - " + 
642                       "keeping new slave") % self.buildslave.slavename) 
643              self.old_slave_connected = False 
644          d.addErrback(old_gone) 
645   
646          def other_err(f): 
647              log.err(f, "unexpected error pinging old slave; disconnecting it") 
648              self.old_slave_connected = False 
649          d.addErrback(other_err) 
650   
651          def done(_): 
652              if self.ping_old_slave_timed_out: 
653                  return  
654              self.ping_old_slave_done = True 
655              self.maybe_done() 
656          d.addCallback(done) 
657   
659          if not self.ping_new_slave_done or not self.ping_old_slave_done: 
660              return 
661   
662           
663          if self.old_slave_connected: 
664              self.disconnect_new_slave() 
665          else: 
666              self.start_new_slave() 
 667   
669           
670          if not self.new_slave_d:  
671              return 
672   
673          d = self.new_slave_d 
674          self.new_slave_d = None 
675   
676          if self.buildslave.isConnected(): 
677               
678               
679              def detached(): 
680                  d.callback(self.buildslave) 
 681              self.buildslave.subscribeToDetach(detached) 
682              self.old_remote.broker.transport.loseConnection() 
683          else:  
684               
685               
686               
687              d.callback(self.buildslave) 
688   
690           
691          if not self.new_slave_d:  
692              return 
693   
694          d = self.new_slave_d 
695          self.new_slave_d = None 
696          log.msg("rejecting duplicate slave with exception") 
697          d.errback(Failure(RuntimeError("rejecting duplicate slave"))) 
 698