Package buildbot :: Package process :: Module botmaster
[frames] | no frames]

Source Code for Module buildbot.process.botmaster

  1  # This file is part of Buildbot.  Buildbot is free software: you can 
  2  # redistribute it and/or modify it under the terms of the GNU General Public 
  3  # License as published by the Free Software Foundation, version 2. 
  4  # 
  5  # This program is distributed in the hope that it will be useful, but WITHOUT 
  6  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 
  8  # details. 
  9  # 
 10  # You should have received a copy of the GNU General Public License along with 
 11  # this program; if not, write to the Free Software Foundation, Inc., 51 
 12  # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 13  # 
 14  # Copyright Buildbot Team Members 
 15   
 16   
 17  from twisted.python import log 
 18  from twisted.python.failure import Failure 
 19  from twisted.internet import defer, reactor 
 20  from twisted.spread import pb 
 21  from twisted.application import service 
 22   
 23  from buildbot.process.builder import Builder 
 24  from buildbot import interfaces, locks 
 25  from buildbot.process import metrics 
26 27 -class BotMaster(service.MultiService):
28 29 """This is the master-side service which manages remote buildbot slaves. 30 It provides them with BuildSlaves, and distributes build requests to 31 them.""" 32 33 debug = 0 34 reactor = reactor 35
36 - def __init__(self, master):
37 service.MultiService.__init__(self) 38 self.master = master 39 40 self.builders = {} 41 self.builderNames = [] 42 # builders maps Builder names to instances of bb.p.builder.Builder, 43 # which is the master-side object that defines and controls a build. 44 45 # self.slaves contains a ready BuildSlave instance for each 46 # potential buildslave, i.e. all the ones listed in the config file. 47 # If the slave is connected, self.slaves[slavename].slave will 48 # contain a RemoteReference to their Bot instance. If it is not 49 # connected, that attribute will hold None. 50 self.slaves = {} # maps slavename to BuildSlave 51 self.watchers = {} 52 53 # self.locks holds the real Lock instances 54 self.locks = {} 55 56 # self.mergeRequests is the callable override for merging build 57 # requests 58 self.mergeRequests = None 59 60 # self.prioritizeBuilders is the callable override for builder order 61 # traversal 62 self.prioritizeBuilders = None 63 64 self.shuttingDown = False 65 66 self.lastSlavePortnum = None 67 68 # subscription to new build requests 69 self.buildrequest_sub = None 70 71 # a distributor for incoming build requests; see below 72 self.brd = BuildRequestDistributor(self) 73 self.brd.setServiceParent(self)
74
75 - def cleanShutdown(self, _reactor=reactor):
76 """Shut down the entire process, once all currently-running builds are 77 complete.""" 78 if self.shuttingDown: 79 return 80 log.msg("Initiating clean shutdown") 81 self.shuttingDown = True 82 83 # first, stop the distributor; this will finish any ongoing scheduling 84 # operations before firing 85 d = self.brd.stopService() 86 87 # then wait for all builds to finish 88 def wait(_): 89 l = [] 90 for builder in self.builders.values(): 91 for build in builder.builder_status.getCurrentBuilds(): 92 l.append(build.waitUntilFinished()) 93 if len(l) == 0: 94 log.msg("No running jobs, starting shutdown immediately") 95 else: 96 log.msg("Waiting for %i build(s) to finish" % len(l)) 97 return defer.DeferredList(l)
98 d.addCallback(wait) 99 100 # Finally, shut the whole process down 101 def shutdown(ign): 102 # Double check that we're still supposed to be shutting down 103 # The shutdown may have been cancelled! 104 if self.shuttingDown: 105 # Check that there really aren't any running builds 106 for builder in self.builders.values(): 107 n = len(builder.builder_status.getCurrentBuilds()) 108 if n > 0: 109 log.msg("Not shutting down, builder %s has %i builds running" % (builder, n)) 110 log.msg("Trying shutdown sequence again") 111 self.shuttingDown = False 112 self.cleanShutdown() 113 return 114 log.msg("Stopping reactor") 115 _reactor.stop() 116 else: 117 self.brd.startService()
118 d.addCallback(shutdown) 119 d.addErrback(log.err, 'while processing cleanShutdown') 120
121 - def cancelCleanShutdown(self):
122 """Cancel a clean shutdown that is already in progress, if any""" 123 if not self.shuttingDown: 124 return 125 log.msg("Cancelling clean shutdown") 126 self.shuttingDown = False
127
128 - def loadConfig_Slaves(self, new_slaves):
129 timer = metrics.Timer("BotMaster.loadConfig_Slaves()") 130 timer.start() 131 new_portnum = (self.lastSlavePortnum is not None 132 and self.lastSlavePortnum != self.master.slavePortnum) 133 if new_portnum: 134 # it turns out this is pretty hard.. 135 raise ValueError("changing slavePortnum in reconfig is not supported") 136 self.lastSlavePortnum = self.master.slavePortnum 137 138 old_slaves = [c for c in list(self) 139 if interfaces.IBuildSlave.providedBy(c)] 140 141 # identify added/removed slaves. For each slave we construct a tuple 142 # of (name, password, class), and we consider the slave to be already 143 # present if the tuples match. (we include the class to make sure 144 # that BuildSlave(name,pw) is different than 145 # SubclassOfBuildSlave(name,pw) ). If the password or class has 146 # changed, we will remove the old version of the slave and replace it 147 # with a new one. If anything else has changed, we just update the 148 # old BuildSlave instance in place. If the name has changed, of 149 # course, it looks exactly the same as deleting one slave and adding 150 # an unrelated one. 151 152 old_t = {} 153 for s in old_slaves: 154 old_t[s.identity()] = s 155 new_t = {} 156 for s in new_slaves: 157 new_t[s.identity()] = s 158 removed = [old_t[t] 159 for t in old_t 160 if t not in new_t] 161 added = [new_t[t] 162 for t in new_t 163 if t not in old_t] 164 remaining_t = [t 165 for t in new_t 166 if t in old_t] 167 168 # removeSlave will hang up on the old bot 169 dl = [] 170 for s in removed: 171 dl.append(self.removeSlave(s)) 172 d = defer.DeferredList(dl, fireOnOneErrback=True) 173 174 def add_new(res): 175 for s in added: 176 self.addSlave(s)
177 d.addCallback(add_new) 178 179 def update_remaining(_): 180 for t in remaining_t: 181 old_t[t].update(new_t[t]) 182 183 d.addCallback(update_remaining) 184 185 def stop(_): 186 metrics.MetricCountEvent.log("num_slaves", 187 len(self.slaves), absolute=True) 188 timer.stop() 189 return _ 190 d.addBoth(stop) 191 192 return d 193
194 - def addSlave(self, s):
195 s.setServiceParent(self) 196 s.setBotmaster(self) 197 self.slaves[s.slavename] = s 198 s.pb_registration = self.master.pbmanager.register( 199 self.master.slavePortnum, s.slavename, 200 s.password, self.getPerspective)
201 # do not call maybeStartBuildsForSlave here, as the slave has not 202 # necessarily attached yet 203 204 @metrics.countMethod('BotMaster.removeSlave()')
205 - def removeSlave(self, s):
206 d = s.disownServiceParent() 207 d.addCallback(lambda _ : s.pb_registration.unregister()) 208 d.addCallback(lambda _ : self.slaves[s.slavename].disconnect()) 209 def delslave(_): 210 del self.slaves[s.slavename]
211 d.addCallback(delslave) 212 return d 213 214 @metrics.countMethod('BotMaster.slaveLost()')
215 - def slaveLost(self, bot):
216 metrics.MetricCountEvent.log("BotMaster.attached_slaves", -1) 217 for name, b in self.builders.items(): 218 if bot.slavename in b.slavenames: 219 b.detached(bot)
220 221 @metrics.countMethod('BotMaster.getBuildersForSlave()')
222 - def getBuildersForSlave(self, slavename):
223 return [b 224 for b in self.builders.values() 225 if slavename in b.slavenames]
226
227 - def getBuildernames(self):
228 return self.builderNames
229
230 - def getBuilders(self):
231 allBuilders = [self.builders[name] for name in self.builderNames] 232 return allBuilders
233
234 - def setBuilders(self, builders):
235 # TODO: diff against previous list of builders instead of replacing 236 # wholesale? 237 self.builders = {} 238 self.builderNames = [] 239 d = defer.DeferredList([b.disownServiceParent() for b in list(self) 240 if isinstance(b, Builder)], 241 fireOnOneErrback=True) 242 def _add(ign): 243 log.msg("setBuilders._add: %s %s" % (list(self), [b.name for b in builders])) 244 for b in builders: 245 for slavename in b.slavenames: 246 # this is actually validated earlier 247 assert slavename in self.slaves 248 self.builders[b.name] = b 249 self.builderNames.append(b.name) 250 b.setBotmaster(self) 251 b.setServiceParent(self)
252 d.addCallback(_add) 253 d.addCallback(lambda ign: self._updateAllSlaves()) 254 # N.B. this takes care of starting all builders at master startup 255 d.addCallback(lambda _ : 256 self.maybeStartBuildsForAllBuilders()) 257 return d 258
259 - def _updateAllSlaves(self):
260 """Notify all buildslaves about changes in their Builders.""" 261 timer = metrics.Timer("BotMaster._updateAllSlaves()") 262 timer.start() 263 dl = [] 264 for s in self.slaves.values(): 265 d = s.updateSlave() 266 d.addErrback(log.err) 267 dl.append(d) 268 d = defer.DeferredList(dl) 269 def stop(_): 270 timer.stop() 271 return _
272 d.addBoth(stop) 273 return d 274
275 - def getPerspective(self, mind, slavename):
276 sl = self.slaves[slavename] 277 if not sl: 278 return None 279 metrics.MetricCountEvent.log("BotMaster.attached_slaves", 1) 280 281 # record when this connection attempt occurred 282 sl.recordConnectTime() 283 284 if sl.isConnected(): 285 # duplicate slave - send it to arbitration 286 arb = DuplicateSlaveArbitrator(sl) 287 return arb.getPerspective(mind, slavename) 288 else: 289 log.msg("slave '%s' attaching from %s" % (slavename, mind.broker.transport.getPeer())) 290 return sl
291
292 - def startService(self):
293 def buildRequestAdded(notif): 294 self.maybeStartBuildsForBuilder(notif['buildername'])
295 self.buildrequest_sub = \ 296 self.master.subscribeToBuildRequests(buildRequestAdded) 297 service.MultiService.startService(self) 298
299 - def stopService(self):
300 if self.buildrequest_sub: 301 self.buildrequest_sub.unsubscribe() 302 self.buildrequest_sub = None 303 for b in self.builders.values(): 304 b.builder_status.addPointEvent(["master", "shutdown"]) 305 b.builder_status.saveYourself() 306 return service.MultiService.stopService(self)
307
308 - def getLockByID(self, lockid):
309 """Convert a Lock identifier into an actual Lock instance. 310 @param lockid: a locks.MasterLock or locks.SlaveLock instance 311 @return: a locks.RealMasterLock or locks.RealSlaveLock instance 312 """ 313 assert isinstance(lockid, (locks.MasterLock, locks.SlaveLock)) 314 if not lockid in self.locks: 315 self.locks[lockid] = lockid.lockClass(lockid) 316 # if the master.cfg file has changed maxCount= on the lock, the next 317 # time a build is started, they'll get a new RealLock instance. Note 318 # that this requires that MasterLock and SlaveLock (marker) instances 319 # be hashable and that they should compare properly. 320 return self.locks[lockid]
321
322 - def maybeStartBuildsForBuilder(self, buildername):
323 """ 324 Call this when something suggests that a particular builder may now 325 be available to start a build. 326 327 @param buildername: the name of the builder 328 """ 329 self.brd.maybeStartBuildsOn([buildername])
330
331 - def maybeStartBuildsForSlave(self, slave_name):
332 """ 333 Call this when something suggests that a particular slave may now be 334 available to start a build. 335 336 @param slave_name: the name of the slave 337 """ 338 builders = self.getBuildersForSlave(slave_name) 339 self.brd.maybeStartBuildsOn([ b.name for b in builders ])
340
341 - def maybeStartBuildsForAllBuilders(self):
342 """ 343 Call this when something suggests that this would be a good time to start some 344 builds, but nothing more specific. 345 """ 346 self.brd.maybeStartBuildsOn(self.builderNames)
347
348 -class BuildRequestDistributor(service.Service):
349 """ 350 Special-purpose class to handle distributing build requests to builders by 351 calling their C{maybeStartBuild} method. 352 353 This takes account of the C{prioritizeBuilders} configuration, and is 354 highly re-entrant; that is, if a new build request arrives while builders 355 are still working on the previous build request, then this class will 356 correctly re-prioritize invocations of builders' C{maybeStartBuild} 357 methods. 358 """ 359
360 - def __init__(self, botmaster):
361 self.botmaster = botmaster 362 self.master = botmaster.master 363 364 # lock to ensure builders are only sorted once at any time 365 self.pending_builders_lock = defer.DeferredLock() 366 367 # sorted list of names of builders that need their maybeStartBuild 368 # method invoked. 369 self._pending_builders = [] 370 self.activity_lock = defer.DeferredLock() 371 self.active = False
372
373 - def stopService(self):
374 # let the parent stopService succeed between activity; then the loop 375 # will stop calling itself, since self.running is false 376 d = self.activity_lock.acquire() 377 d.addCallback(lambda _ : service.Service.stopService(self)) 378 d.addBoth(lambda _ : self.activity_lock.release()) 379 return d
380 381 @defer.deferredGenerator
382 - def maybeStartBuildsOn(self, new_builders):
383 """ 384 Try to start any builds that can be started right now. This function 385 returns immediately, and promises to trigger those builders 386 eventually. 387 388 @param new_builders: names of new builders that should be given the 389 opportunity to check for new requests. 390 """ 391 new_builders = set(new_builders) 392 existing_pending = set(self._pending_builders) 393 394 # if we won't add any builders, there's nothing to do 395 if new_builders < existing_pending: 396 return 397 398 # reset the list of pending builders; this is async, so begin 399 # by grabbing a lock 400 wfd = defer.waitForDeferred( 401 self.pending_builders_lock.acquire()) 402 yield wfd 403 wfd.getResult() 404 405 try: 406 # re-fetch existing_pending, in case it has changed while acquiring 407 # the lock 408 existing_pending = set(self._pending_builders) 409 410 # then sort the new, expanded set of builders 411 wfd = defer.waitForDeferred( 412 self._sortBuilders(list(existing_pending | new_builders))) 413 yield wfd 414 self._pending_builders = wfd.getResult() 415 416 # start the activity loop, if we aren't already working on that. 417 if not self.active: 418 self._activityLoop() 419 except: 420 log.err(Failure(), 421 "while attempting to start builds on %s" % self.name) 422 423 # release the lock unconditionally 424 self.pending_builders_lock.release()
425 426 @defer.deferredGenerator
427 - def _defaultSorter(self, master, builders):
428 timer = metrics.Timer("BuildRequestDistributor._defaultSorter()") 429 timer.start() 430 # perform an asynchronous schwarzian transform, transforming None 431 # into sys.maxint so that it sorts to the end 432 def xform(bldr): 433 d = defer.maybeDeferred(lambda : 434 bldr.getOldestRequestTime()) 435 d.addCallback(lambda time : 436 (((time is None) and None or time),bldr)) 437 return d
438 wfd = defer.waitForDeferred( 439 defer.gatherResults( 440 [ xform(bldr) for bldr in builders ])) 441 yield wfd 442 xformed = wfd.getResult() 443 444 # sort the transformed list synchronously, comparing None to the end of 445 # the list 446 def nonecmp(a,b): 447 if a[0] is None: return 1 448 if b[0] is None: return -1 449 return cmp(a,b)
450 xformed.sort(cmp=nonecmp) 451 452 # and reverse the transform 453 yield [ xf[1] for xf in xformed ] 454 timer.stop() 455 456 @defer.deferredGenerator
457 - def _sortBuilders(self, buildernames):
458 timer = metrics.Timer("BuildRequestDistributor._sortBuilders()") 459 timer.start() 460 # note that this takes and returns a list of builder names 461 462 # convert builder names to builders 463 builders_dict = self.botmaster.builders 464 builders = [ builders_dict.get(n) 465 for n in buildernames 466 if n in builders_dict ] 467 468 # find a sorting function 469 sorter = self.botmaster.prioritizeBuilders 470 if not sorter: 471 sorter = self._defaultSorter 472 473 # run it 474 try: 475 wfd = defer.waitForDeferred( 476 defer.maybeDeferred(lambda : 477 sorter(self.master, builders))) 478 yield wfd 479 builders = wfd.getResult() 480 except: 481 log.msg("Exception prioritizing builders; order unspecified") 482 log.err(Failure()) 483 484 # and return the names 485 yield [ b.name for b in builders ] 486 timer.stop()
487 488 @defer.deferredGenerator
489 - def _activityLoop(self):
490 self.active = True 491 492 timer = metrics.Timer('BuildRequestDistributor._activityLoop()') 493 timer.start() 494 495 while 1: 496 wfd = defer.waitForDeferred( 497 self.activity_lock.acquire()) 498 yield wfd 499 wfd.getResult() 500 501 # lock pending_builders, pop an element from it, and release 502 wfd = defer.waitForDeferred( 503 self.pending_builders_lock.acquire()) 504 yield wfd 505 wfd.getResult() 506 507 # bail out if we shouldn't keep looping 508 if not self.running or not self._pending_builders: 509 self.pending_builders_lock.release() 510 self.activity_lock.release() 511 break 512 513 bldr_name = self._pending_builders.pop(0) 514 self.pending_builders_lock.release() 515 516 try: 517 wfd = defer.waitForDeferred( 518 self._callABuilder(bldr_name)) 519 yield wfd 520 wfd.getResult() 521 except: 522 log.err(Failure(), 523 "from maybeStartBuild for builder '%s'" % (bldr_name,)) 524 525 self.activity_lock.release() 526 527 timer.stop() 528 529 self.active = False 530 self._quiet()
531
532 - def _callABuilder(self, bldr_name):
533 # get the actual builder object 534 bldr = self.botmaster.builders.get(bldr_name) 535 if not bldr: 536 return defer.succeed(None) 537 538 d = bldr.maybeStartBuild() 539 d.addErrback(log.err, 'in maybeStartBuild for %r' % (bldr,)) 540 return d
541
542 - def _quiet(self):
543 # shim for tests 544 pass # pragma: no cover
545
546 547 -class DuplicateSlaveArbitrator(object):
548 """Utility class to arbitrate the situation when a new slave connects with 549 the name of an existing, connected slave""" 550 # There are several likely duplicate slave scenarios in practice: 551 # 552 # 1. two slaves are configured with the same username/password 553 # 554 # 2. the same slave process believes it is disconnected (due to a network 555 # hiccup), and is trying to reconnect 556 # 557 # For the first case, we want to prevent the two slaves from repeatedly 558 # superseding one another (which results in lots of failed builds), so we 559 # will prefer the old slave. However, for the second case we need to 560 # detect situations where the old slave is "gone". Sometimes "gone" means 561 # that the TCP/IP connection to it is in a long timeout period (10-20m, 562 # depending on the OS configuration), so this can take a while. 563 564 PING_TIMEOUT = 10 565 """Timeout for pinging the old slave. Set this to something quite long, as 566 a very busy slave (e.g., one sending a big log chunk) may take a while to 567 return a ping. 568 569 @ivar old_slave: L{buildbot.process.slavebuilder.AbstractSlaveBuilder} 570 instance 571 """ 572
573 - def __init__(self, slave):
574 self.old_slave = slave
575
576 - def getPerspective(self, mind, slavename):
577 self.new_slave_mind = mind 578 579 old_tport = self.old_slave.slave.broker.transport 580 new_tport = mind.broker.transport 581 log.msg("duplicate slave %s; delaying new slave (%s) and pinging old (%s)" % 582 (self.old_slave.slavename, new_tport.getPeer(), old_tport.getPeer())) 583 584 # delay the new slave until we decide what to do with it 585 self.new_slave_d = defer.Deferred() 586 587 # Ping the old slave. If this kills it, then we can allow the new 588 # slave to connect. If this does not kill it, then we disconnect 589 # the new slave. 590 self.ping_old_slave_done = False 591 self.old_slave_connected = True 592 self.ping_old_slave(new_tport.getPeer()) 593 594 # Print a message on the new slave, if possible. 595 self.ping_new_slave_done = False 596 self.ping_new_slave() 597 598 return self.new_slave_d
599
600 - def ping_new_slave(self):
601 d = self.new_slave_mind.callRemote("print", 602 "master already has a connection named '%s' - checking its liveness" 603 % self.old_slave.slavename) 604 def done(_): 605 # failure or success, doesn't matter 606 self.ping_new_slave_done = True 607 self.maybe_done()
608 d.addBoth(done)
609
610 - def ping_old_slave(self, new_peer):
611 # set a timer on this ping, in case the network is bad. TODO: a timeout 612 # on the ping itself is not quite what we want. If there is other data 613 # flowing over the PB connection, then we should keep waiting. Bug #1703 614 def timeout(): 615 self.ping_old_slave_timeout = None 616 self.ping_old_slave_timed_out = True 617 self.old_slave_connected = False 618 self.ping_old_slave_done = True 619 self.maybe_done()
620 self.ping_old_slave_timeout = reactor.callLater(self.PING_TIMEOUT, timeout) 621 self.ping_old_slave_timed_out = False 622 623 d = self.old_slave.slave.callRemote("print", 624 "master got a duplicate connection from %s; keeping this one" % new_peer) 625 626 def clear_timeout(r): 627 if self.ping_old_slave_timeout: 628 self.ping_old_slave_timeout.cancel() 629 self.ping_old_slave_timeout = None 630 return r 631 d.addBoth(clear_timeout) 632 633 def old_gone(f): 634 if self.ping_old_slave_timed_out: 635 return # ignore after timeout 636 f.trap(pb.PBConnectionLost) 637 log.msg(("connection lost while pinging old slave '%s' - " + 638 "keeping new slave") % self.old_slave.slavename) 639 self.old_slave_connected = False 640 d.addErrback(old_gone) 641 642 def other_err(f): 643 if self.ping_old_slave_timed_out: 644 return # ignore after timeout 645 log.msg("unexpected error while pinging old slave; disconnecting it") 646 log.err(f) 647 self.old_slave_connected = False 648 d.addErrback(other_err) 649 650 def done(_): 651 if self.ping_old_slave_timed_out: 652 return # ignore after timeout 653 self.ping_old_slave_done = True 654 self.maybe_done() 655 d.addCallback(done) 656
657 - def maybe_done(self):
658 if not self.ping_new_slave_done or not self.ping_old_slave_done: 659 return 660 661 # both pings are done, so sort out the results 662 if self.old_slave_connected: 663 self.disconnect_new_slave() 664 else: 665 self.start_new_slave()
666
667 - def start_new_slave(self, count=20):
668 if not self.new_slave_d: 669 return 670 671 # we need to wait until the old slave has actually disconnected, which 672 # can take a little while -- but don't wait forever! 673 if self.old_slave.isConnected(): 674 if self.old_slave.slave: 675 self.old_slave.slave.broker.transport.loseConnection() 676 if count < 0: 677 log.msg("WEIRD: want to start new slave, but the old slave will not disconnect") 678 self.disconnect_new_slave() 679 else: 680 reactor.callLater(0.1, self.start_new_slave, count-1) 681 return 682 683 d = self.new_slave_d 684 self.new_slave_d = None 685 d.callback(self.old_slave)
686
687 - def disconnect_new_slave(self):
688 if not self.new_slave_d: 689 return 690 d = self.new_slave_d 691 self.new_slave_d = None 692 log.msg("rejecting duplicate slave with exception") 693 d.errback(Failure(RuntimeError("rejecting duplicate slave")))
694