Package buildbot :: Package process :: Module botmaster
[frames] | no frames]

Source Code for Module buildbot.process.botmaster

  1  # This file is part of Buildbot.  Buildbot is free software: you can 
  2  # redistribute it and/or modify it under the terms of the GNU General Public 
  3  # License as published by the Free Software Foundation, version 2. 
  4  # 
  5  # This program is distributed in the hope that it will be useful, but WITHOUT 
  6  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 
  8  # details. 
  9  # 
 10  # You should have received a copy of the GNU General Public License along with 
 11  # this program; if not, write to the Free Software Foundation, Inc., 51 
 12  # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 13  # 
 14  # Copyright Buildbot Team Members 
 15   
 16   
 17  from twisted.python import log, reflect 
 18  from twisted.python.failure import Failure 
 19  from twisted.internet import defer, reactor 
 20  from twisted.spread import pb 
 21  from twisted.application import service 
 22   
 23  from buildbot.process.builder import Builder 
 24  from buildbot import interfaces, locks, config, util 
 25  from buildbot.process import metrics 
26 27 -class BotMaster(config.ReconfigurableServiceMixin, service.MultiService):
28 29 """This is the master-side service which manages remote buildbot slaves. 30 It provides them with BuildSlaves, and distributes build requests to 31 them.""" 32 33 debug = 0 34
35 - def __init__(self, master):
36 service.MultiService.__init__(self) 37 self.setName("botmaster") 38 self.master = master 39 40 self.builders = {} 41 self.builderNames = [] 42 # builders maps Builder names to instances of bb.p.builder.Builder, 43 # which is the master-side object that defines and controls a build. 44 45 # self.slaves contains a ready BuildSlave instance for each 46 # potential buildslave, i.e. all the ones listed in the config file. 47 # If the slave is connected, self.slaves[slavename].slave will 48 # contain a RemoteReference to their Bot instance. If it is not 49 # connected, that attribute will hold None. 50 self.slaves = {} # maps slavename to BuildSlave 51 self.watchers = {} 52 53 # self.locks holds the real Lock instances 54 self.locks = {} 55 56 # self.mergeRequests is the callable override for merging build 57 # requests 58 self.mergeRequests = None 59 60 self.shuttingDown = False 61 62 self.lastSlavePortnum = None 63 64 # subscription to new build requests 65 self.buildrequest_sub = None 66 67 # a distributor for incoming build requests; see below 68 self.brd = BuildRequestDistributor(self) 69 self.brd.setServiceParent(self)
70
71 - def cleanShutdown(self, _reactor=reactor):
72 """Shut down the entire process, once all currently-running builds are 73 complete.""" 74 if self.shuttingDown: 75 return 76 log.msg("Initiating clean shutdown") 77 self.shuttingDown = True 78 79 # first, stop the distributor; this will finish any ongoing scheduling 80 # operations before firing 81 d = self.brd.stopService() 82 83 # then wait for all builds to finish 84 def wait(_): 85 l = [] 86 for builder in self.builders.values(): 87 for build in builder.builder_status.getCurrentBuilds(): 88 l.append(build.waitUntilFinished()) 89 if len(l) == 0: 90 log.msg("No running jobs, starting shutdown immediately") 91 else: 92 log.msg("Waiting for %i build(s) to finish" % len(l)) 93 return defer.DeferredList(l)
94 d.addCallback(wait) 95 96 # Finally, shut the whole process down 97 def shutdown(ign): 98 # Double check that we're still supposed to be shutting down 99 # The shutdown may have been cancelled! 100 if self.shuttingDown: 101 # Check that there really aren't any running builds 102 for builder in self.builders.values(): 103 n = len(builder.builder_status.getCurrentBuilds()) 104 if n > 0: 105 log.msg("Not shutting down, builder %s has %i builds running" % (builder, n)) 106 log.msg("Trying shutdown sequence again") 107 self.shuttingDown = False 108 self.cleanShutdown() 109 return 110 log.msg("Stopping reactor") 111 _reactor.stop() 112 else: 113 self.brd.startService()
114 d.addCallback(shutdown) 115 d.addErrback(log.err, 'while processing cleanShutdown') 116
117 - def cancelCleanShutdown(self):
118 """Cancel a clean shutdown that is already in progress, if any""" 119 if not self.shuttingDown: 120 return 121 log.msg("Cancelling clean shutdown") 122 self.shuttingDown = False
123 124 @metrics.countMethod('BotMaster.slaveLost()')
125 - def slaveLost(self, bot):
126 metrics.MetricCountEvent.log("BotMaster.attached_slaves", -1) 127 for name, b in self.builders.items(): 128 if bot.slavename in b.config.slavenames: 129 b.detached(bot)
130 131 @metrics.countMethod('BotMaster.getBuildersForSlave()')
132 - def getBuildersForSlave(self, slavename):
133 return [ b for b in self.builders.values() 134 if slavename in b.config.slavenames ]
135
136 - def getBuildernames(self):
137 return self.builderNames
138
139 - def getBuilders(self):
140 return self.builders.values()
141
142 - def startService(self):
143 def buildRequestAdded(notif): 144 self.maybeStartBuildsForBuilder(notif['buildername'])
145 self.buildrequest_sub = \ 146 self.master.subscribeToBuildRequests(buildRequestAdded) 147 service.MultiService.startService(self) 148 149 @defer.inlineCallbacks
150 - def reconfigService(self, new_config):
151 timer = metrics.Timer("BotMaster.reconfigService") 152 timer.start() 153 154 # reconfigure slaves 155 yield self.reconfigServiceSlaves(new_config) 156 157 # reconfigure builders 158 yield self.reconfigServiceBuilders(new_config) 159 160 # call up 161 yield config.ReconfigurableServiceMixin.reconfigService(self, 162 new_config) 163 164 # try to start a build for every builder; this is necessary at master 165 # startup, and a good idea in any other case 166 self.maybeStartBuildsForAllBuilders() 167 168 timer.stop()
169 170 171 @defer.inlineCallbacks
172 - def reconfigServiceSlaves(self, new_config):
173 174 timer = metrics.Timer("BotMaster.reconfigServiceSlaves") 175 timer.start() 176 177 # arrange slaves by name 178 old_by_name = dict([ (s.slavename, s) 179 for s in list(self) 180 if interfaces.IBuildSlave.providedBy(s) ]) 181 old_set = set(old_by_name.iterkeys()) 182 new_by_name = dict([ (s.slavename, s) 183 for s in new_config.slaves ]) 184 new_set = set(new_by_name.iterkeys()) 185 186 # calculate new slaves, by name, and removed slaves 187 removed_names, added_names = util.diffSets(old_set, new_set) 188 189 # find any slaves for which the fully qualified class name has 190 # changed, and treat those as an add and remove 191 for n in old_set & new_set: 192 old = old_by_name[n] 193 new = new_by_name[n] 194 # detect changed class name 195 if reflect.qual(old.__class__) != reflect.qual(new.__class__): 196 removed_names.add(n) 197 added_names.add(n) 198 199 if removed_names or added_names: 200 log.msg("adding %d new slaves, removing %d" % 201 (len(added_names), len(removed_names))) 202 203 for n in removed_names: 204 slave = old_by_name[n] 205 206 del self.slaves[n] 207 slave.master = None 208 slave.botmaster = None 209 210 yield defer.maybeDeferred(lambda : 211 slave.disownServiceParent()) 212 213 for n in added_names: 214 slave = new_by_name[n] 215 slave.setServiceParent(self) 216 self.slaves[n] = slave 217 218 metrics.MetricCountEvent.log("num_slaves", 219 len(self.slaves), absolute=True) 220 221 timer.stop()
222 223 224 @defer.inlineCallbacks
225 - def reconfigServiceBuilders(self, new_config):
226 227 timer = metrics.Timer("BotMaster.reconfigServiceBuilders") 228 timer.start() 229 230 # arrange builders by name 231 old_by_name = dict([ (b.name, b) 232 for b in list(self) 233 if isinstance(b, Builder) ]) 234 old_set = set(old_by_name.iterkeys()) 235 new_by_name = dict([ (bc.name, bc) 236 for bc in new_config.builders ]) 237 new_set = set(new_by_name.iterkeys()) 238 239 # calculate new builders, by name, and removed builders 240 removed_names, added_names = util.diffSets(old_set, new_set) 241 242 if removed_names or added_names: 243 log.msg("adding %d new builders, removing %d" % 244 (len(added_names), len(removed_names))) 245 246 for n in removed_names: 247 builder = old_by_name[n] 248 249 del self.builders[n] 250 builder.master = None 251 builder.botmaster = None 252 253 yield defer.maybeDeferred(lambda : 254 builder.disownServiceParent()) 255 256 for n in added_names: 257 builder = Builder(n) 258 self.builders[n] = builder 259 260 builder.botmaster = self 261 builder.master = self.master 262 builder.setServiceParent(self) 263 264 self.builderNames = self.builders.keys() 265 266 metrics.MetricCountEvent.log("num_builders", 267 len(self.builders), absolute=True) 268 269 timer.stop()
270 271
272 - def stopService(self):
273 if self.buildrequest_sub: 274 self.buildrequest_sub.unsubscribe() 275 self.buildrequest_sub = None 276 for b in self.builders.values(): 277 b.builder_status.addPointEvent(["master", "shutdown"]) 278 b.builder_status.saveYourself() 279 return service.MultiService.stopService(self)
280
281 - def getLockByID(self, lockid):
282 """Convert a Lock identifier into an actual Lock instance. 283 @param lockid: a locks.MasterLock or locks.SlaveLock instance 284 @return: a locks.RealMasterLock or locks.RealSlaveLock instance 285 """ 286 assert isinstance(lockid, (locks.MasterLock, locks.SlaveLock)) 287 if not lockid in self.locks: 288 self.locks[lockid] = lockid.lockClass(lockid) 289 # if the master.cfg file has changed maxCount= on the lock, the next 290 # time a build is started, they'll get a new RealLock instance. Note 291 # that this requires that MasterLock and SlaveLock (marker) instances 292 # be hashable and that they should compare properly. 293 return self.locks[lockid]
294
295 - def maybeStartBuildsForBuilder(self, buildername):
296 """ 297 Call this when something suggests that a particular builder may now 298 be available to start a build. 299 300 @param buildername: the name of the builder 301 """ 302 d = self.brd.maybeStartBuildsOn([buildername]) 303 d.addErrback(log.err)
304
305 - def maybeStartBuildsForSlave(self, slave_name):
306 """ 307 Call this when something suggests that a particular slave may now be 308 available to start a build. 309 310 @param slave_name: the name of the slave 311 """ 312 builders = self.getBuildersForSlave(slave_name) 313 d = self.brd.maybeStartBuildsOn([ b.name for b in builders ]) 314 d.addErrback(log.err)
315
316 - def maybeStartBuildsForAllBuilders(self):
317 """ 318 Call this when something suggests that this would be a good time to start some 319 builds, but nothing more specific. 320 """ 321 d = self.brd.maybeStartBuildsOn(self.builderNames) 322 d.addErrback(log.err)
323
324 -class BuildRequestDistributor(service.Service):
325 """ 326 Special-purpose class to handle distributing build requests to builders by 327 calling their C{maybeStartBuild} method. 328 329 This takes account of the C{prioritizeBuilders} configuration, and is 330 highly re-entrant; that is, if a new build request arrives while builders 331 are still working on the previous build request, then this class will 332 correctly re-prioritize invocations of builders' C{maybeStartBuild} 333 methods. 334 """ 335
336 - def __init__(self, botmaster):
337 self.botmaster = botmaster 338 self.master = botmaster.master 339 340 # lock to ensure builders are only sorted once at any time 341 self.pending_builders_lock = defer.DeferredLock() 342 343 # sorted list of names of builders that need their maybeStartBuild 344 # method invoked. 345 self._pending_builders = [] 346 self.activity_lock = defer.DeferredLock() 347 self.active = False
348
349 - def stopService(self):
350 # let the parent stopService succeed between activity; then the loop 351 # will stop calling itself, since self.running is false 352 d = self.activity_lock.acquire() 353 d.addCallback(lambda _ : service.Service.stopService(self)) 354 d.addBoth(lambda _ : self.activity_lock.release()) 355 return d
356 357 @defer.inlineCallbacks
358 - def maybeStartBuildsOn(self, new_builders):
359 """ 360 Try to start any builds that can be started right now. This function 361 returns immediately, and promises to trigger those builders 362 eventually. 363 364 @param new_builders: names of new builders that should be given the 365 opportunity to check for new requests. 366 """ 367 new_builders = set(new_builders) 368 existing_pending = set(self._pending_builders) 369 370 # if we won't add any builders, there's nothing to do 371 if new_builders < existing_pending: 372 return 373 374 # reset the list of pending builders; this is async, so begin 375 # by grabbing a lock 376 yield self.pending_builders_lock.acquire() 377 378 try: 379 # re-fetch existing_pending, in case it has changed while acquiring 380 # the lock 381 existing_pending = set(self._pending_builders) 382 383 # then sort the new, expanded set of builders 384 self._pending_builders = \ 385 yield self._sortBuilders(list(existing_pending | new_builders)) 386 387 # start the activity loop, if we aren't already working on that. 388 if not self.active: 389 self._activityLoop() 390 except Exception: 391 log.err(Failure(), 392 "while attempting to start builds on %s" % self.name) 393 394 # release the lock unconditionally 395 self.pending_builders_lock.release()
396 397 @defer.inlineCallbacks
398 - def _defaultSorter(self, master, builders):
399 timer = metrics.Timer("BuildRequestDistributor._defaultSorter()") 400 timer.start() 401 # perform an asynchronous schwarzian transform, transforming None 402 # into sys.maxint so that it sorts to the end 403 def xform(bldr): 404 d = defer.maybeDeferred(lambda : 405 bldr.getOldestRequestTime()) 406 d.addCallback(lambda time : 407 (((time is None) and None or time),bldr)) 408 return d
409 xformed = yield defer.gatherResults( 410 [ xform(bldr) for bldr in builders ]) 411 412 # sort the transformed list synchronously, comparing None to the end of 413 # the list 414 def nonecmp(a,b): 415 if a[0] is None: return 1 416 if b[0] is None: return -1 417 return cmp(a,b)
418 xformed.sort(cmp=nonecmp) 419 420 # and reverse the transform 421 rv = [ xf[1] for xf in xformed ] 422 timer.stop() 423 defer.returnValue(rv) 424 425 @defer.inlineCallbacks
426 - def _sortBuilders(self, buildernames):
427 timer = metrics.Timer("BuildRequestDistributor._sortBuilders()") 428 timer.start() 429 # note that this takes and returns a list of builder names 430 431 # convert builder names to builders 432 builders_dict = self.botmaster.builders 433 builders = [ builders_dict.get(n) 434 for n in buildernames 435 if n in builders_dict ] 436 437 # find a sorting function 438 sorter = self.master.config.prioritizeBuilders 439 if not sorter: 440 sorter = self._defaultSorter 441 442 # run it 443 try: 444 builders = yield defer.maybeDeferred(lambda : 445 sorter(self.master, builders)) 446 except Exception: 447 log.msg("Exception prioritizing builders; order unspecified") 448 log.err(Failure()) 449 450 # and return the names 451 rv = [ b.name for b in builders ] 452 timer.stop() 453 defer.returnValue(rv)
454 455 @defer.inlineCallbacks
456 - def _activityLoop(self):
457 self.active = True 458 459 timer = metrics.Timer('BuildRequestDistributor._activityLoop()') 460 timer.start() 461 462 while 1: 463 yield self.activity_lock.acquire() 464 465 # lock pending_builders, pop an element from it, and release 466 yield self.pending_builders_lock.acquire() 467 468 # bail out if we shouldn't keep looping 469 if not self.running or not self._pending_builders: 470 self.pending_builders_lock.release() 471 self.activity_lock.release() 472 break 473 474 bldr_name = self._pending_builders.pop(0) 475 self.pending_builders_lock.release() 476 477 try: 478 yield self._callABuilder(bldr_name) 479 except Exception: 480 log.err(Failure(), 481 "from maybeStartBuild for builder '%s'" % (bldr_name,)) 482 483 self.activity_lock.release() 484 485 timer.stop() 486 487 self.active = False 488 self._quiet()
489
490 - def _callABuilder(self, bldr_name):
491 # get the actual builder object 492 bldr = self.botmaster.builders.get(bldr_name) 493 if not bldr: 494 return defer.succeed(None) 495 496 d = bldr.maybeStartBuild() 497 d.addErrback(log.err, 'in maybeStartBuild for %r' % (bldr,)) 498 return d
499
500 - def _quiet(self):
501 # shim for tests 502 pass # pragma: no cover
503
504 505 -class DuplicateSlaveArbitrator(object):
506 """Utility class to arbitrate the situation when a new slave connects with 507 the name of an existing, connected slave 508 509 @ivar buildslave: L{buildbot.process.slavebuilder.AbstractBuildSlave} 510 instance 511 @ivar old_remote: L{RemoteReference} to the old slave 512 @ivar new_remote: L{RemoteReference} to the new slave 513 """ 514 _reactor = reactor # for testing 515 516 # There are several likely duplicate slave scenarios in practice: 517 # 518 # 1. two slaves are configured with the same username/password 519 # 520 # 2. the same slave process believes it is disconnected (due to a network 521 # hiccup), and is trying to reconnect 522 # 523 # For the first case, we want to prevent the two slaves from repeatedly 524 # superseding one another (which results in lots of failed builds), so we 525 # will prefer the old slave. However, for the second case we need to 526 # detect situations where the old slave is "gone". Sometimes "gone" means 527 # that the TCP/IP connection to it is in a long timeout period (10-20m, 528 # depending on the OS configuration), so this can take a while. 529 530 PING_TIMEOUT = 10 531 """Timeout for pinging the old slave. Set this to something quite long, as 532 a very busy slave (e.g., one sending a big log chunk) may take a while to 533 return a ping. 534 """ 535
536 - def __init__(self, buildslave):
537 self.buildslave = buildslave 538 self.old_remote = self.buildslave.slave
539
540 - def getPerspective(self, mind, slavename):
541 self.new_remote = mind 542 self.ping_old_slave_done = False 543 self.old_slave_connected = True 544 self.ping_new_slave_done = False 545 546 old_tport = self.old_remote.broker.transport 547 new_tport = self.new_remote.broker.transport 548 log.msg("duplicate slave %s; delaying new slave (%s) and pinging old " 549 "(%s)" % (self.buildslave.slavename, new_tport.getPeer(), 550 old_tport.getPeer())) 551 552 # delay the new slave until we decide what to do with it 553 d = self.new_slave_d = defer.Deferred() 554 555 # Ping the old slave. If this kills it, then we can allow the new 556 # slave to connect. If this does not kill it, then we disconnect 557 # the new slave. 558 self.ping_old_slave(new_tport.getPeer()) 559 560 # Print a message on the new slave, if possible. 561 self.ping_new_slave() 562 563 return d
564
565 - def ping_new_slave(self):
566 d = defer.maybeDeferred(lambda : 567 self.new_remote.callRemote("print", "master already has a " 568 "connection named '%s' - checking its liveness" 569 % self.buildslave.slavename)) 570 def done(_): 571 # failure or success, doesn't matter - the ping is done. 572 self.ping_new_slave_done = True 573 self.maybe_done()
574 d.addBoth(done)
575
576 - def ping_old_slave(self, new_peer):
577 # set a timer on this ping, in case the network is bad. TODO: a 578 # timeout on the ping itself is not quite what we want. If there is 579 # other data flowing over the PB connection, then we should keep 580 # waiting. Bug #1703 581 def timeout(): 582 self.ping_old_slave_timeout = None 583 self.ping_old_slave_timed_out = True 584 self.old_slave_connected = False 585 self.ping_old_slave_done = True 586 self.maybe_done()
587 self.ping_old_slave_timeout = self._reactor.callLater( 588 self.PING_TIMEOUT, timeout) 589 self.ping_old_slave_timed_out = False 590 591 # call this in maybeDeferred because callRemote tends to raise 592 # exceptions instead of returning Failures 593 d = defer.maybeDeferred(lambda : 594 self.old_remote.callRemote("print", 595 "master got a duplicate connection from %s; keeping this one" 596 % new_peer)) 597 598 def clear_timeout(r): 599 if self.ping_old_slave_timeout: 600 self.ping_old_slave_timeout.cancel() 601 self.ping_old_slave_timeout = None 602 return r 603 d.addBoth(clear_timeout) 604 605 def old_gone(f): 606 if self.ping_old_slave_timed_out: 607 return # ignore after timeout 608 f.trap(pb.PBConnectionLost, pb.DeadReferenceError) 609 log.msg(("connection lost while pinging old slave '%s' - " + 610 "keeping new slave") % self.buildslave.slavename) 611 self.old_slave_connected = False 612 d.addErrback(old_gone) 613 614 def other_err(f): 615 log.err(f, "unexpected error pinging old slave; disconnecting it") 616 self.old_slave_connected = False 617 d.addErrback(other_err) 618 619 def done(_): 620 if self.ping_old_slave_timed_out: 621 return # ignore after timeout 622 self.ping_old_slave_done = True 623 self.maybe_done() 624 d.addCallback(done) 625
626 - def maybe_done(self):
627 if not self.ping_new_slave_done or not self.ping_old_slave_done: 628 return 629 630 # both pings are done, so sort out the results 631 if self.old_slave_connected: 632 self.disconnect_new_slave() 633 else: 634 self.start_new_slave()
635
636 - def start_new_slave(self):
637 # just in case 638 if not self.new_slave_d: # pragma: ignore 639 return 640 641 d = self.new_slave_d 642 self.new_slave_d = None 643 644 if self.buildslave.isConnected(): 645 # we need to wait until the old slave has fully detached, which can 646 # take a little while as buffers drain, etc. 647 def detached(): 648 d.callback(self.buildslave)
649 self.buildslave.subscribeToDetach(detached) 650 self.old_remote.broker.transport.loseConnection() 651 else: # pragma: ignore 652 # by some unusual timing, it's quite possible that the old slave 653 # has disconnected while the arbitration was going on. In that 654 # case, we're already done! 655 d.callback(self.buildslave) 656
657 - def disconnect_new_slave(self):
658 # just in case 659 if not self.new_slave_d: # pragma: ignore 660 return 661 662 d = self.new_slave_d 663 self.new_slave_d = None 664 log.msg("rejecting duplicate slave with exception") 665 d.errback(Failure(RuntimeError("rejecting duplicate slave")))
666