Package buildbot :: Package process :: Module botmaster
[frames] | no frames]

Source Code for Module buildbot.process.botmaster

  1  # This file is part of Buildbot.  Buildbot is free software: you can 
  2  # redistribute it and/or modify it under the terms of the GNU General Public 
  3  # License as published by the Free Software Foundation, version 2. 
  4  # 
  5  # This program is distributed in the hope that it will be useful, but WITHOUT 
  6  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 
  8  # details. 
  9  # 
 10  # You should have received a copy of the GNU General Public License along with 
 11  # this program; if not, write to the Free Software Foundation, Inc., 51 
 12  # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 13  # 
 14  # Copyright Buildbot Team Members 
 15   
 16   
 17  from twisted.python import log, reflect 
 18  from twisted.python.failure import Failure 
 19  from twisted.internet import defer, reactor 
 20  from twisted.spread import pb 
 21  from twisted.application import service 
 22   
 23  from buildbot.process.builder import Builder 
 24  from buildbot import interfaces, locks, config, util 
 25  from buildbot.process import metrics 
26 27 -class BotMaster(config.ReconfigurableServiceMixin, service.MultiService):
28 29 """This is the master-side service which manages remote buildbot slaves. 30 It provides them with BuildSlaves, and distributes build requests to 31 them.""" 32 33 debug = 0 34
35 - def __init__(self, master):
36 service.MultiService.__init__(self) 37 self.setName("botmaster") 38 self.master = master 39 40 self.builders = {} 41 self.builderNames = [] 42 # builders maps Builder names to instances of bb.p.builder.Builder, 43 # which is the master-side object that defines and controls a build. 44 45 # self.slaves contains a ready BuildSlave instance for each 46 # potential buildslave, i.e. all the ones listed in the config file. 47 # If the slave is connected, self.slaves[slavename].slave will 48 # contain a RemoteReference to their Bot instance. If it is not 49 # connected, that attribute will hold None. 50 self.slaves = {} # maps slavename to BuildSlave 51 self.watchers = {} 52 53 # self.locks holds the real Lock instances 54 self.locks = {} 55 56 # self.mergeRequests is the callable override for merging build 57 # requests 58 self.mergeRequests = None 59 60 self.shuttingDown = False 61 62 self.lastSlavePortnum = None 63 64 # subscription to new build requests 65 self.buildrequest_sub = None 66 67 # a distributor for incoming build requests; see below 68 self.brd = BuildRequestDistributor(self) 69 self.brd.setServiceParent(self)
70
71 - def cleanShutdown(self, _reactor=reactor):
72 """Shut down the entire process, once all currently-running builds are 73 complete.""" 74 if self.shuttingDown: 75 return 76 log.msg("Initiating clean shutdown") 77 self.shuttingDown = True 78 79 # first, stop the distributor; this will finish any ongoing scheduling 80 # operations before firing 81 d = self.brd.stopService() 82 83 # then wait for all builds to finish 84 def wait(_): 85 l = [] 86 for builder in self.builders.values(): 87 for build in builder.builder_status.getCurrentBuilds(): 88 l.append(build.waitUntilFinished()) 89 if len(l) == 0: 90 log.msg("No running jobs, starting shutdown immediately") 91 else: 92 log.msg("Waiting for %i build(s) to finish" % len(l)) 93 return defer.DeferredList(l)
94 d.addCallback(wait) 95 96 # Finally, shut the whole process down 97 def shutdown(ign): 98 # Double check that we're still supposed to be shutting down 99 # The shutdown may have been cancelled! 100 if self.shuttingDown: 101 # Check that there really aren't any running builds 102 for builder in self.builders.values(): 103 n = len(builder.builder_status.getCurrentBuilds()) 104 if n > 0: 105 log.msg("Not shutting down, builder %s has %i builds running" % (builder, n)) 106 log.msg("Trying shutdown sequence again") 107 self.shuttingDown = False 108 self.cleanShutdown() 109 return 110 log.msg("Stopping reactor") 111 _reactor.stop() 112 else: 113 self.brd.startService()
114 d.addCallback(shutdown) 115 d.addErrback(log.err, 'while processing cleanShutdown') 116
117 - def cancelCleanShutdown(self):
118 """Cancel a clean shutdown that is already in progress, if any""" 119 if not self.shuttingDown: 120 return 121 log.msg("Cancelling clean shutdown") 122 self.shuttingDown = False
123 124 @metrics.countMethod('BotMaster.slaveLost()')
125 - def slaveLost(self, bot):
126 metrics.MetricCountEvent.log("BotMaster.attached_slaves", -1) 127 for name, b in self.builders.items(): 128 if bot.slavename in b.config.slavenames: 129 b.detached(bot)
130 131 @metrics.countMethod('BotMaster.getBuildersForSlave()')
132 - def getBuildersForSlave(self, slavename):
133 return [ b for b in self.builders.values() 134 if slavename in b.config.slavenames ]
135
136 - def getBuildernames(self):
137 return self.builderNames
138
139 - def getBuilders(self):
140 return self.builders.values()
141
142 - def startService(self):
143 def buildRequestAdded(notif): 144 self.maybeStartBuildsForBuilder(notif['buildername'])
145 self.buildrequest_sub = \ 146 self.master.subscribeToBuildRequests(buildRequestAdded) 147 service.MultiService.startService(self) 148 149 @defer.deferredGenerator
150 - def reconfigService(self, new_config):
151 timer = metrics.Timer("BotMaster.reconfigService") 152 timer.start() 153 154 # reconfigure slaves 155 wfd = defer.waitForDeferred( 156 self.reconfigServiceSlaves(new_config)) 157 yield wfd 158 wfd.getResult() 159 160 # reconfigure builders 161 wfd = defer.waitForDeferred( 162 self.reconfigServiceBuilders(new_config)) 163 yield wfd 164 wfd.getResult() 165 166 wfd = defer.waitForDeferred( 167 config.ReconfigurableServiceMixin.reconfigService(self, 168 new_config)) 169 yield wfd 170 wfd.getResult() 171 172 # try to start a build for every builder; this is necessary at master 173 # startup, and a good idea in any other case 174 self.maybeStartBuildsForAllBuilders() 175 176 timer.stop()
177 178 179 @defer.deferredGenerator
180 - def reconfigServiceSlaves(self, new_config):
181 182 timer = metrics.Timer("BotMaster.reconfigServiceSlaves") 183 timer.start() 184 185 # arrange slaves by name 186 old_by_name = dict([ (s.slavename, s) 187 for s in list(self) 188 if interfaces.IBuildSlave.providedBy(s) ]) 189 old_set = set(old_by_name.iterkeys()) 190 new_by_name = dict([ (s.slavename, s) 191 for s in new_config.slaves ]) 192 new_set = set(new_by_name.iterkeys()) 193 194 # calculate new slaves, by name, and removed slaves 195 removed_names, added_names = util.diffSets(old_set, new_set) 196 197 # find any slaves for which the fully qualified class name has 198 # changed, and treat those as an add and remove 199 for n in old_set & new_set: 200 old = old_by_name[n] 201 new = new_by_name[n] 202 # detect changed class name 203 if reflect.qual(old.__class__) != reflect.qual(new.__class__): 204 removed_names.add(n) 205 added_names.add(n) 206 207 if removed_names or added_names: 208 log.msg("adding %d new slaves, removing %d" % 209 (len(added_names), len(removed_names))) 210 211 for n in removed_names: 212 slave = old_by_name[n] 213 214 del self.slaves[n] 215 slave.master = None 216 slave.botmaster = None 217 218 wfd = defer.waitForDeferred( 219 defer.maybeDeferred(lambda : 220 slave.disownServiceParent())) 221 yield wfd 222 wfd.getResult() 223 224 for n in added_names: 225 slave = new_by_name[n] 226 slave.setServiceParent(self) 227 228 slave.botmaster = self 229 slave.master = self.master 230 self.slaves[n] = slave 231 232 metrics.MetricCountEvent.log("num_slaves", 233 len(self.slaves), absolute=True) 234 235 timer.stop()
236 237 238 @defer.deferredGenerator
239 - def reconfigServiceBuilders(self, new_config):
240 241 timer = metrics.Timer("BotMaster.reconfigServiceBuilders") 242 timer.start() 243 244 # arrange builders by name 245 old_by_name = dict([ (b.name, b) 246 for b in list(self) 247 if isinstance(b, Builder) ]) 248 old_set = set(old_by_name.iterkeys()) 249 new_by_name = dict([ (bc.name, bc) 250 for bc in new_config.builders ]) 251 new_set = set(new_by_name.iterkeys()) 252 253 # calculate new builders, by name, and removed builders 254 removed_names, added_names = util.diffSets(old_set, new_set) 255 256 if removed_names or added_names: 257 log.msg("adding %d new builders, removing %d" % 258 (len(added_names), len(removed_names))) 259 260 for n in removed_names: 261 builder = old_by_name[n] 262 263 del self.builders[n] 264 builder.master = None 265 builder.botmaster = None 266 267 wfd = defer.waitForDeferred( 268 defer.maybeDeferred(lambda : 269 builder.disownServiceParent())) 270 yield wfd 271 wfd.getResult() 272 273 for n in added_names: 274 builder = Builder(n) 275 self.builders[n] = builder 276 277 builder.botmaster = self 278 builder.master = self.master 279 builder.setServiceParent(self) 280 281 self.builderNames = self.builders.keys() 282 283 metrics.MetricCountEvent.log("num_builders", 284 len(self.builders), absolute=True) 285 286 timer.stop()
287 288
289 - def stopService(self):
290 if self.buildrequest_sub: 291 self.buildrequest_sub.unsubscribe() 292 self.buildrequest_sub = None 293 for b in self.builders.values(): 294 b.builder_status.addPointEvent(["master", "shutdown"]) 295 b.builder_status.saveYourself() 296 return service.MultiService.stopService(self)
297
298 - def getLockByID(self, lockid):
299 """Convert a Lock identifier into an actual Lock instance. 300 @param lockid: a locks.MasterLock or locks.SlaveLock instance 301 @return: a locks.RealMasterLock or locks.RealSlaveLock instance 302 """ 303 assert isinstance(lockid, (locks.MasterLock, locks.SlaveLock)) 304 if not lockid in self.locks: 305 self.locks[lockid] = lockid.lockClass(lockid) 306 # if the master.cfg file has changed maxCount= on the lock, the next 307 # time a build is started, they'll get a new RealLock instance. Note 308 # that this requires that MasterLock and SlaveLock (marker) instances 309 # be hashable and that they should compare properly. 310 return self.locks[lockid]
311
312 - def maybeStartBuildsForBuilder(self, buildername):
313 """ 314 Call this when something suggests that a particular builder may now 315 be available to start a build. 316 317 @param buildername: the name of the builder 318 """ 319 self.brd.maybeStartBuildsOn([buildername])
320
321 - def maybeStartBuildsForSlave(self, slave_name):
322 """ 323 Call this when something suggests that a particular slave may now be 324 available to start a build. 325 326 @param slave_name: the name of the slave 327 """ 328 builders = self.getBuildersForSlave(slave_name) 329 self.brd.maybeStartBuildsOn([ b.name for b in builders ])
330
331 - def maybeStartBuildsForAllBuilders(self):
332 """ 333 Call this when something suggests that this would be a good time to start some 334 builds, but nothing more specific. 335 """ 336 self.brd.maybeStartBuildsOn(self.builderNames)
337
338 -class BuildRequestDistributor(service.Service):
339 """ 340 Special-purpose class to handle distributing build requests to builders by 341 calling their C{maybeStartBuild} method. 342 343 This takes account of the C{prioritizeBuilders} configuration, and is 344 highly re-entrant; that is, if a new build request arrives while builders 345 are still working on the previous build request, then this class will 346 correctly re-prioritize invocations of builders' C{maybeStartBuild} 347 methods. 348 """ 349
350 - def __init__(self, botmaster):
351 self.botmaster = botmaster 352 self.master = botmaster.master 353 354 # lock to ensure builders are only sorted once at any time 355 self.pending_builders_lock = defer.DeferredLock() 356 357 # sorted list of names of builders that need their maybeStartBuild 358 # method invoked. 359 self._pending_builders = [] 360 self.activity_lock = defer.DeferredLock() 361 self.active = False
362
363 - def stopService(self):
364 # let the parent stopService succeed between activity; then the loop 365 # will stop calling itself, since self.running is false 366 d = self.activity_lock.acquire() 367 d.addCallback(lambda _ : service.Service.stopService(self)) 368 d.addBoth(lambda _ : self.activity_lock.release()) 369 return d
370 371 @defer.deferredGenerator
372 - def maybeStartBuildsOn(self, new_builders):
373 """ 374 Try to start any builds that can be started right now. This function 375 returns immediately, and promises to trigger those builders 376 eventually. 377 378 @param new_builders: names of new builders that should be given the 379 opportunity to check for new requests. 380 """ 381 new_builders = set(new_builders) 382 existing_pending = set(self._pending_builders) 383 384 # if we won't add any builders, there's nothing to do 385 if new_builders < existing_pending: 386 return 387 388 # reset the list of pending builders; this is async, so begin 389 # by grabbing a lock 390 wfd = defer.waitForDeferred( 391 self.pending_builders_lock.acquire()) 392 yield wfd 393 wfd.getResult() 394 395 try: 396 # re-fetch existing_pending, in case it has changed while acquiring 397 # the lock 398 existing_pending = set(self._pending_builders) 399 400 # then sort the new, expanded set of builders 401 wfd = defer.waitForDeferred( 402 self._sortBuilders(list(existing_pending | new_builders))) 403 yield wfd 404 self._pending_builders = wfd.getResult() 405 406 # start the activity loop, if we aren't already working on that. 407 if not self.active: 408 self._activityLoop() 409 except: 410 log.err(Failure(), 411 "while attempting to start builds on %s" % self.name) 412 413 # release the lock unconditionally 414 self.pending_builders_lock.release()
415 416 @defer.deferredGenerator
417 - def _defaultSorter(self, master, builders):
418 timer = metrics.Timer("BuildRequestDistributor._defaultSorter()") 419 timer.start() 420 # perform an asynchronous schwarzian transform, transforming None 421 # into sys.maxint so that it sorts to the end 422 def xform(bldr): 423 d = defer.maybeDeferred(lambda : 424 bldr.getOldestRequestTime()) 425 d.addCallback(lambda time : 426 (((time is None) and None or time),bldr)) 427 return d
428 wfd = defer.waitForDeferred( 429 defer.gatherResults( 430 [ xform(bldr) for bldr in builders ])) 431 yield wfd 432 xformed = wfd.getResult() 433 434 # sort the transformed list synchronously, comparing None to the end of 435 # the list 436 def nonecmp(a,b): 437 if a[0] is None: return 1 438 if b[0] is None: return -1 439 return cmp(a,b)
440 xformed.sort(cmp=nonecmp) 441 442 # and reverse the transform 443 yield [ xf[1] for xf in xformed ] 444 timer.stop() 445 446 @defer.deferredGenerator
447 - def _sortBuilders(self, buildernames):
448 timer = metrics.Timer("BuildRequestDistributor._sortBuilders()") 449 timer.start() 450 # note that this takes and returns a list of builder names 451 452 # convert builder names to builders 453 builders_dict = self.botmaster.builders 454 builders = [ builders_dict.get(n) 455 for n in buildernames 456 if n in builders_dict ] 457 458 # find a sorting function 459 sorter = self.master.config.prioritizeBuilders 460 if not sorter: 461 sorter = self._defaultSorter 462 463 # run it 464 try: 465 wfd = defer.waitForDeferred( 466 defer.maybeDeferred(lambda : 467 sorter(self.master, builders))) 468 yield wfd 469 builders = wfd.getResult() 470 except: 471 log.msg("Exception prioritizing builders; order unspecified") 472 log.err(Failure()) 473 474 # and return the names 475 yield [ b.name for b in builders ] 476 timer.stop()
477 478 @defer.deferredGenerator
479 - def _activityLoop(self):
480 self.active = True 481 482 timer = metrics.Timer('BuildRequestDistributor._activityLoop()') 483 timer.start() 484 485 while 1: 486 wfd = defer.waitForDeferred( 487 self.activity_lock.acquire()) 488 yield wfd 489 wfd.getResult() 490 491 # lock pending_builders, pop an element from it, and release 492 wfd = defer.waitForDeferred( 493 self.pending_builders_lock.acquire()) 494 yield wfd 495 wfd.getResult() 496 497 # bail out if we shouldn't keep looping 498 if not self.running or not self._pending_builders: 499 self.pending_builders_lock.release() 500 self.activity_lock.release() 501 break 502 503 bldr_name = self._pending_builders.pop(0) 504 self.pending_builders_lock.release() 505 506 try: 507 wfd = defer.waitForDeferred( 508 self._callABuilder(bldr_name)) 509 yield wfd 510 wfd.getResult() 511 except: 512 log.err(Failure(), 513 "from maybeStartBuild for builder '%s'" % (bldr_name,)) 514 515 self.activity_lock.release() 516 517 timer.stop() 518 519 self.active = False 520 self._quiet()
521
522 - def _callABuilder(self, bldr_name):
523 # get the actual builder object 524 bldr = self.botmaster.builders.get(bldr_name) 525 if not bldr: 526 return defer.succeed(None) 527 528 d = bldr.maybeStartBuild() 529 d.addErrback(log.err, 'in maybeStartBuild for %r' % (bldr,)) 530 return d
531
532 - def _quiet(self):
533 # shim for tests 534 pass # pragma: no cover
535
536 537 -class DuplicateSlaveArbitrator(object):
538 """Utility class to arbitrate the situation when a new slave connects with 539 the name of an existing, connected slave 540 541 @ivar buildslave: L{buildbot.process.slavebuilder.AbstractBuildSlave} 542 instance 543 @ivar old_remote: L{RemoteReference} to the old slave 544 @ivar new_remote: L{RemoteReference} to the new slave 545 """ 546 _reactor = reactor # for testing 547 548 # There are several likely duplicate slave scenarios in practice: 549 # 550 # 1. two slaves are configured with the same username/password 551 # 552 # 2. the same slave process believes it is disconnected (due to a network 553 # hiccup), and is trying to reconnect 554 # 555 # For the first case, we want to prevent the two slaves from repeatedly 556 # superseding one another (which results in lots of failed builds), so we 557 # will prefer the old slave. However, for the second case we need to 558 # detect situations where the old slave is "gone". Sometimes "gone" means 559 # that the TCP/IP connection to it is in a long timeout period (10-20m, 560 # depending on the OS configuration), so this can take a while. 561 562 PING_TIMEOUT = 10 563 """Timeout for pinging the old slave. Set this to something quite long, as 564 a very busy slave (e.g., one sending a big log chunk) may take a while to 565 return a ping. 566 """ 567
568 - def __init__(self, buildslave):
569 self.buildslave = buildslave 570 self.old_remote = self.buildslave.slave
571
572 - def getPerspective(self, mind, slavename):
573 self.new_remote = mind 574 self.ping_old_slave_done = False 575 self.old_slave_connected = True 576 self.ping_new_slave_done = False 577 578 old_tport = self.old_remote.broker.transport 579 new_tport = self.new_remote.broker.transport 580 log.msg("duplicate slave %s; delaying new slave (%s) and pinging old " 581 "(%s)" % (self.buildslave.slavename, new_tport.getPeer(), 582 old_tport.getPeer())) 583 584 # delay the new slave until we decide what to do with it 585 d = self.new_slave_d = defer.Deferred() 586 587 # Ping the old slave. If this kills it, then we can allow the new 588 # slave to connect. If this does not kill it, then we disconnect 589 # the new slave. 590 self.ping_old_slave(new_tport.getPeer()) 591 592 # Print a message on the new slave, if possible. 593 self.ping_new_slave() 594 595 return d
596
597 - def ping_new_slave(self):
598 d = defer.maybeDeferred(lambda : 599 self.new_remote.callRemote("print", "master already has a " 600 "connection named '%s' - checking its liveness" 601 % self.buildslave.slavename)) 602 def done(_): 603 # failure or success, doesn't matter - the ping is done. 604 self.ping_new_slave_done = True 605 self.maybe_done()
606 d.addBoth(done)
607
608 - def ping_old_slave(self, new_peer):
609 # set a timer on this ping, in case the network is bad. TODO: a 610 # timeout on the ping itself is not quite what we want. If there is 611 # other data flowing over the PB connection, then we should keep 612 # waiting. Bug #1703 613 def timeout(): 614 self.ping_old_slave_timeout = None 615 self.ping_old_slave_timed_out = True 616 self.old_slave_connected = False 617 self.ping_old_slave_done = True 618 self.maybe_done()
619 self.ping_old_slave_timeout = self._reactor.callLater( 620 self.PING_TIMEOUT, timeout) 621 self.ping_old_slave_timed_out = False 622 623 # call this in maybeDeferred because callRemote tends to raise 624 # exceptions instead of returning Failures 625 d = defer.maybeDeferred(lambda : 626 self.old_remote.callRemote("print", 627 "master got a duplicate connection from %s; keeping this one" 628 % new_peer)) 629 630 def clear_timeout(r): 631 if self.ping_old_slave_timeout: 632 self.ping_old_slave_timeout.cancel() 633 self.ping_old_slave_timeout = None 634 return r 635 d.addBoth(clear_timeout) 636 637 def old_gone(f): 638 if self.ping_old_slave_timed_out: 639 return # ignore after timeout 640 f.trap(pb.PBConnectionLost, pb.DeadReferenceError) 641 log.msg(("connection lost while pinging old slave '%s' - " + 642 "keeping new slave") % self.buildslave.slavename) 643 self.old_slave_connected = False 644 d.addErrback(old_gone) 645 646 def other_err(f): 647 log.err(f, "unexpected error pinging old slave; disconnecting it") 648 self.old_slave_connected = False 649 d.addErrback(other_err) 650 651 def done(_): 652 if self.ping_old_slave_timed_out: 653 return # ignore after timeout 654 self.ping_old_slave_done = True 655 self.maybe_done() 656 d.addCallback(done) 657
658 - def maybe_done(self):
659 if not self.ping_new_slave_done or not self.ping_old_slave_done: 660 return 661 662 # both pings are done, so sort out the results 663 if self.old_slave_connected: 664 self.disconnect_new_slave() 665 else: 666 self.start_new_slave()
667
668 - def start_new_slave(self):
669 # just in case 670 if not self.new_slave_d: # pragma: ignore 671 return 672 673 d = self.new_slave_d 674 self.new_slave_d = None 675 676 if self.buildslave.isConnected(): 677 # we need to wait until the old slave has fully detached, which can 678 # take a little while as buffers drain, etc. 679 def detached(): 680 d.callback(self.buildslave)
681 self.buildslave.subscribeToDetach(detached) 682 self.old_remote.broker.transport.loseConnection() 683 else: # pragma: ignore 684 # by some unusual timing, it's quite possible that the old slave 685 # has disconnected while the arbitration was going on. In that 686 # case, we're already done! 687 d.callback(self.buildslave) 688
689 - def disconnect_new_slave(self):
690 # just in case 691 if not self.new_slave_d: # pragma: ignore 692 return 693 694 d = self.new_slave_d 695 self.new_slave_d = None 696 log.msg("rejecting duplicate slave with exception") 697 d.errback(Failure(RuntimeError("rejecting duplicate slave")))
698