1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 from twisted.python import log, reflect
18 from twisted.python.failure import Failure
19 from twisted.internet import defer, reactor
20 from twisted.spread import pb
21 from twisted.application import service
22
23 from buildbot.process.builder import Builder
24 from buildbot import interfaces, locks, config, util
25 from buildbot.process import metrics
26
27 -class BotMaster(config.ReconfigurableServiceMixin, service.MultiService):
28
29 """This is the master-side service which manages remote buildbot slaves.
30 It provides them with BuildSlaves, and distributes build requests to
31 them."""
32
33 debug = 0
34
70
72 """Shut down the entire process, once all currently-running builds are
73 complete."""
74 if self.shuttingDown:
75 return
76 log.msg("Initiating clean shutdown")
77 self.shuttingDown = True
78
79
80
81 d = self.brd.stopService()
82
83
84 def wait(_):
85 l = []
86 for builder in self.builders.values():
87 for build in builder.builder_status.getCurrentBuilds():
88 l.append(build.waitUntilFinished())
89 if len(l) == 0:
90 log.msg("No running jobs, starting shutdown immediately")
91 else:
92 log.msg("Waiting for %i build(s) to finish" % len(l))
93 return defer.DeferredList(l)
94 d.addCallback(wait)
95
96
97 def shutdown(ign):
98
99
100 if self.shuttingDown:
101
102 for builder in self.builders.values():
103 n = len(builder.builder_status.getCurrentBuilds())
104 if n > 0:
105 log.msg("Not shutting down, builder %s has %i builds running" % (builder, n))
106 log.msg("Trying shutdown sequence again")
107 self.shuttingDown = False
108 self.cleanShutdown()
109 return
110 log.msg("Stopping reactor")
111 _reactor.stop()
112 else:
113 self.brd.startService()
114 d.addCallback(shutdown)
115 d.addErrback(log.err, 'while processing cleanShutdown')
116
118 """Cancel a clean shutdown that is already in progress, if any"""
119 if not self.shuttingDown:
120 return
121 log.msg("Cancelling clean shutdown")
122 self.shuttingDown = False
123
124 @metrics.countMethod('BotMaster.slaveLost()')
130
131 @metrics.countMethod('BotMaster.getBuildersForSlave()')
133 return [ b for b in self.builders.values()
134 if slavename in b.config.slavenames ]
135
137 return self.builderNames
138
140 return self.builders.values()
141
145 self.buildrequest_sub = \
146 self.master.subscribeToBuildRequests(buildRequestAdded)
147 service.MultiService.startService(self)
148
149 @defer.inlineCallbacks
169
170
171 @defer.inlineCallbacks
173
174 timer = metrics.Timer("BotMaster.reconfigServiceSlaves")
175 timer.start()
176
177
178 old_by_name = dict([ (s.slavename, s)
179 for s in list(self)
180 if interfaces.IBuildSlave.providedBy(s) ])
181 old_set = set(old_by_name.iterkeys())
182 new_by_name = dict([ (s.slavename, s)
183 for s in new_config.slaves ])
184 new_set = set(new_by_name.iterkeys())
185
186
187 removed_names, added_names = util.diffSets(old_set, new_set)
188
189
190
191 for n in old_set & new_set:
192 old = old_by_name[n]
193 new = new_by_name[n]
194
195 if reflect.qual(old.__class__) != reflect.qual(new.__class__):
196 removed_names.add(n)
197 added_names.add(n)
198
199 if removed_names or added_names:
200 log.msg("adding %d new slaves, removing %d" %
201 (len(added_names), len(removed_names)))
202
203 for n in removed_names:
204 slave = old_by_name[n]
205
206 del self.slaves[n]
207 slave.master = None
208 slave.botmaster = None
209
210 yield defer.maybeDeferred(lambda :
211 slave.disownServiceParent())
212
213 for n in added_names:
214 slave = new_by_name[n]
215 slave.setServiceParent(self)
216 self.slaves[n] = slave
217
218 metrics.MetricCountEvent.log("num_slaves",
219 len(self.slaves), absolute=True)
220
221 timer.stop()
222
223
224 @defer.inlineCallbacks
270
271
273 if self.buildrequest_sub:
274 self.buildrequest_sub.unsubscribe()
275 self.buildrequest_sub = None
276 for b in self.builders.values():
277 b.builder_status.addPointEvent(["master", "shutdown"])
278 b.builder_status.saveYourself()
279 return service.MultiService.stopService(self)
280
282 """Convert a Lock identifier into an actual Lock instance.
283 @param lockid: a locks.MasterLock or locks.SlaveLock instance
284 @return: a locks.RealMasterLock or locks.RealSlaveLock instance
285 """
286 assert isinstance(lockid, (locks.MasterLock, locks.SlaveLock))
287 if not lockid in self.locks:
288 self.locks[lockid] = lockid.lockClass(lockid)
289
290
291
292
293 return self.locks[lockid]
294
296 """
297 Call this when something suggests that a particular builder may now
298 be available to start a build.
299
300 @param buildername: the name of the builder
301 """
302 d = self.brd.maybeStartBuildsOn([buildername])
303 d.addErrback(log.err)
304
306 """
307 Call this when something suggests that a particular slave may now be
308 available to start a build.
309
310 @param slave_name: the name of the slave
311 """
312 builders = self.getBuildersForSlave(slave_name)
313 d = self.brd.maybeStartBuildsOn([ b.name for b in builders ])
314 d.addErrback(log.err)
315
317 """
318 Call this when something suggests that this would be a good time to start some
319 builds, but nothing more specific.
320 """
321 d = self.brd.maybeStartBuildsOn(self.builderNames)
322 d.addErrback(log.err)
323
325 """
326 Special-purpose class to handle distributing build requests to builders by
327 calling their C{maybeStartBuild} method.
328
329 This takes account of the C{prioritizeBuilders} configuration, and is
330 highly re-entrant; that is, if a new build request arrives while builders
331 are still working on the previous build request, then this class will
332 correctly re-prioritize invocations of builders' C{maybeStartBuild}
333 methods.
334 """
335
337 self.botmaster = botmaster
338 self.master = botmaster.master
339
340
341 self.pending_builders_lock = defer.DeferredLock()
342
343
344
345 self._pending_builders = []
346 self.activity_lock = defer.DeferredLock()
347 self.active = False
348
350
351
352 d = self.activity_lock.acquire()
353 d.addCallback(lambda _ : service.Service.stopService(self))
354 d.addBoth(lambda _ : self.activity_lock.release())
355 return d
356
357 @defer.inlineCallbacks
359 """
360 Try to start any builds that can be started right now. This function
361 returns immediately, and promises to trigger those builders
362 eventually.
363
364 @param new_builders: names of new builders that should be given the
365 opportunity to check for new requests.
366 """
367 new_builders = set(new_builders)
368 existing_pending = set(self._pending_builders)
369
370
371 if new_builders < existing_pending:
372 return
373
374
375
376 yield self.pending_builders_lock.acquire()
377
378 try:
379
380
381 existing_pending = set(self._pending_builders)
382
383
384 self._pending_builders = \
385 yield self._sortBuilders(list(existing_pending | new_builders))
386
387
388 if not self.active:
389 self._activityLoop()
390 except Exception:
391 log.err(Failure(),
392 "while attempting to start builds on %s" % self.name)
393
394
395 self.pending_builders_lock.release()
396
397 @defer.inlineCallbacks
399 timer = metrics.Timer("BuildRequestDistributor._defaultSorter()")
400 timer.start()
401
402
403 def xform(bldr):
404 d = defer.maybeDeferred(lambda :
405 bldr.getOldestRequestTime())
406 d.addCallback(lambda time :
407 (((time is None) and None or time),bldr))
408 return d
409 xformed = yield defer.gatherResults(
410 [ xform(bldr) for bldr in builders ])
411
412
413
414 def nonecmp(a,b):
415 if a[0] is None: return 1
416 if b[0] is None: return -1
417 return cmp(a,b)
418 xformed.sort(cmp=nonecmp)
419
420
421 rv = [ xf[1] for xf in xformed ]
422 timer.stop()
423 defer.returnValue(rv)
424
425 @defer.inlineCallbacks
427 timer = metrics.Timer("BuildRequestDistributor._sortBuilders()")
428 timer.start()
429
430
431
432 builders_dict = self.botmaster.builders
433 builders = [ builders_dict.get(n)
434 for n in buildernames
435 if n in builders_dict ]
436
437
438 sorter = self.master.config.prioritizeBuilders
439 if not sorter:
440 sorter = self._defaultSorter
441
442
443 try:
444 builders = yield defer.maybeDeferred(lambda :
445 sorter(self.master, builders))
446 except Exception:
447 log.msg("Exception prioritizing builders; order unspecified")
448 log.err(Failure())
449
450
451 rv = [ b.name for b in builders ]
452 timer.stop()
453 defer.returnValue(rv)
454
455 @defer.inlineCallbacks
457 self.active = True
458
459 timer = metrics.Timer('BuildRequestDistributor._activityLoop()')
460 timer.start()
461
462 while 1:
463 yield self.activity_lock.acquire()
464
465
466 yield self.pending_builders_lock.acquire()
467
468
469 if not self.running or not self._pending_builders:
470 self.pending_builders_lock.release()
471 self.activity_lock.release()
472 break
473
474 bldr_name = self._pending_builders.pop(0)
475 self.pending_builders_lock.release()
476
477 try:
478 yield self._callABuilder(bldr_name)
479 except Exception:
480 log.err(Failure(),
481 "from maybeStartBuild for builder '%s'" % (bldr_name,))
482
483 self.activity_lock.release()
484
485 timer.stop()
486
487 self.active = False
488 self._quiet()
489
491
492 bldr = self.botmaster.builders.get(bldr_name)
493 if not bldr:
494 return defer.succeed(None)
495
496 d = bldr.maybeStartBuild()
497 d.addErrback(log.err, 'in maybeStartBuild for %r' % (bldr,))
498 return d
499
503
506 """Utility class to arbitrate the situation when a new slave connects with
507 the name of an existing, connected slave
508
509 @ivar buildslave: L{buildbot.process.slavebuilder.AbstractBuildSlave}
510 instance
511 @ivar old_remote: L{RemoteReference} to the old slave
512 @ivar new_remote: L{RemoteReference} to the new slave
513 """
514 _reactor = reactor
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530 PING_TIMEOUT = 10
531 """Timeout for pinging the old slave. Set this to something quite long, as
532 a very busy slave (e.g., one sending a big log chunk) may take a while to
533 return a ping.
534 """
535
539
541 self.new_remote = mind
542 self.ping_old_slave_done = False
543 self.old_slave_connected = True
544 self.ping_new_slave_done = False
545
546 old_tport = self.old_remote.broker.transport
547 new_tport = self.new_remote.broker.transport
548 log.msg("duplicate slave %s; delaying new slave (%s) and pinging old "
549 "(%s)" % (self.buildslave.slavename, new_tport.getPeer(),
550 old_tport.getPeer()))
551
552
553 d = self.new_slave_d = defer.Deferred()
554
555
556
557
558 self.ping_old_slave(new_tport.getPeer())
559
560
561 self.ping_new_slave()
562
563 return d
564
566 d = defer.maybeDeferred(lambda :
567 self.new_remote.callRemote("print", "master already has a "
568 "connection named '%s' - checking its liveness"
569 % self.buildslave.slavename))
570 def done(_):
571
572 self.ping_new_slave_done = True
573 self.maybe_done()
574 d.addBoth(done)
575
577
578
579
580
581 def timeout():
582 self.ping_old_slave_timeout = None
583 self.ping_old_slave_timed_out = True
584 self.old_slave_connected = False
585 self.ping_old_slave_done = True
586 self.maybe_done()
587 self.ping_old_slave_timeout = self._reactor.callLater(
588 self.PING_TIMEOUT, timeout)
589 self.ping_old_slave_timed_out = False
590
591
592
593 d = defer.maybeDeferred(lambda :
594 self.old_remote.callRemote("print",
595 "master got a duplicate connection from %s; keeping this one"
596 % new_peer))
597
598 def clear_timeout(r):
599 if self.ping_old_slave_timeout:
600 self.ping_old_slave_timeout.cancel()
601 self.ping_old_slave_timeout = None
602 return r
603 d.addBoth(clear_timeout)
604
605 def old_gone(f):
606 if self.ping_old_slave_timed_out:
607 return
608 f.trap(pb.PBConnectionLost, pb.DeadReferenceError)
609 log.msg(("connection lost while pinging old slave '%s' - " +
610 "keeping new slave") % self.buildslave.slavename)
611 self.old_slave_connected = False
612 d.addErrback(old_gone)
613
614 def other_err(f):
615 log.err(f, "unexpected error pinging old slave; disconnecting it")
616 self.old_slave_connected = False
617 d.addErrback(other_err)
618
619 def done(_):
620 if self.ping_old_slave_timed_out:
621 return
622 self.ping_old_slave_done = True
623 self.maybe_done()
624 d.addCallback(done)
625
627 if not self.ping_new_slave_done or not self.ping_old_slave_done:
628 return
629
630
631 if self.old_slave_connected:
632 self.disconnect_new_slave()
633 else:
634 self.start_new_slave()
635
637
638 if not self.new_slave_d:
639 return
640
641 d = self.new_slave_d
642 self.new_slave_d = None
643
644 if self.buildslave.isConnected():
645
646
647 def detached():
648 d.callback(self.buildslave)
649 self.buildslave.subscribeToDetach(detached)
650 self.old_remote.broker.transport.loseConnection()
651 else:
652
653
654
655 d.callback(self.buildslave)
656
658
659 if not self.new_slave_d:
660 return
661
662 d = self.new_slave_d
663 self.new_slave_d = None
664 log.msg("rejecting duplicate slave with exception")
665 d.errback(Failure(RuntimeError("rejecting duplicate slave")))
666