1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 from twisted.python import log, reflect
18 from twisted.python.failure import Failure
19 from twisted.internet import defer, reactor
20 from twisted.spread import pb
21 from twisted.application import service
22
23 from buildbot.process.builder import Builder
24 from buildbot import interfaces, locks, config, util
25 from buildbot.process import metrics
26
27 -class BotMaster(config.ReconfigurableServiceMixin, service.MultiService):
28
29 """This is the master-side service which manages remote buildbot slaves.
30 It provides them with BuildSlaves, and distributes build requests to
31 them."""
32
33 debug = 0
34
70
72 """Shut down the entire process, once all currently-running builds are
73 complete."""
74 if self.shuttingDown:
75 return
76 log.msg("Initiating clean shutdown")
77 self.shuttingDown = True
78
79
80
81 d = self.brd.stopService()
82
83
84 def wait(_):
85 l = []
86 for builder in self.builders.values():
87 for build in builder.builder_status.getCurrentBuilds():
88 l.append(build.waitUntilFinished())
89 if len(l) == 0:
90 log.msg("No running jobs, starting shutdown immediately")
91 else:
92 log.msg("Waiting for %i build(s) to finish" % len(l))
93 return defer.DeferredList(l)
94 d.addCallback(wait)
95
96
97 def shutdown(ign):
98
99
100 if self.shuttingDown:
101
102 for builder in self.builders.values():
103 n = len(builder.builder_status.getCurrentBuilds())
104 if n > 0:
105 log.msg("Not shutting down, builder %s has %i builds running" % (builder, n))
106 log.msg("Trying shutdown sequence again")
107 self.shuttingDown = False
108 self.cleanShutdown()
109 return
110 log.msg("Stopping reactor")
111 _reactor.stop()
112 else:
113 self.brd.startService()
114 d.addCallback(shutdown)
115 d.addErrback(log.err, 'while processing cleanShutdown')
116
118 """Cancel a clean shutdown that is already in progress, if any"""
119 if not self.shuttingDown:
120 return
121 log.msg("Cancelling clean shutdown")
122 self.shuttingDown = False
123
124 @metrics.countMethod('BotMaster.slaveLost()')
130
131 @metrics.countMethod('BotMaster.getBuildersForSlave()')
133 return [ b for b in self.builders.values()
134 if slavename in b.config.slavenames ]
135
137 return self.builderNames
138
140 return self.builders.values()
141
145 self.buildrequest_sub = \
146 self.master.subscribeToBuildRequests(buildRequestAdded)
147 service.MultiService.startService(self)
148
149 @defer.deferredGenerator
177
178
179 @defer.deferredGenerator
181
182 timer = metrics.Timer("BotMaster.reconfigServiceSlaves")
183 timer.start()
184
185
186 old_by_name = dict([ (s.slavename, s)
187 for s in list(self)
188 if interfaces.IBuildSlave.providedBy(s) ])
189 old_set = set(old_by_name.iterkeys())
190 new_by_name = dict([ (s.slavename, s)
191 for s in new_config.slaves ])
192 new_set = set(new_by_name.iterkeys())
193
194
195 removed_names, added_names = util.diffSets(old_set, new_set)
196
197
198
199 for n in old_set & new_set:
200 old = old_by_name[n]
201 new = new_by_name[n]
202
203 if reflect.qual(old.__class__) != reflect.qual(new.__class__):
204 removed_names.add(n)
205 added_names.add(n)
206
207 if removed_names or added_names:
208 log.msg("adding %d new slaves, removing %d" %
209 (len(added_names), len(removed_names)))
210
211 for n in removed_names:
212 slave = old_by_name[n]
213
214 del self.slaves[n]
215 slave.master = None
216 slave.botmaster = None
217
218 wfd = defer.waitForDeferred(
219 defer.maybeDeferred(lambda :
220 slave.disownServiceParent()))
221 yield wfd
222 wfd.getResult()
223
224 for n in added_names:
225 slave = new_by_name[n]
226 slave.setServiceParent(self)
227
228 slave.botmaster = self
229 slave.master = self.master
230 self.slaves[n] = slave
231
232 metrics.MetricCountEvent.log("num_slaves",
233 len(self.slaves), absolute=True)
234
235 timer.stop()
236
237
238 @defer.deferredGenerator
240
241 timer = metrics.Timer("BotMaster.reconfigServiceBuilders")
242 timer.start()
243
244
245 old_by_name = dict([ (b.name, b)
246 for b in list(self)
247 if isinstance(b, Builder) ])
248 old_set = set(old_by_name.iterkeys())
249 new_by_name = dict([ (bc.name, bc)
250 for bc in new_config.builders ])
251 new_set = set(new_by_name.iterkeys())
252
253
254 removed_names, added_names = util.diffSets(old_set, new_set)
255
256 if removed_names or added_names:
257 log.msg("adding %d new builders, removing %d" %
258 (len(added_names), len(removed_names)))
259
260 for n in removed_names:
261 builder = old_by_name[n]
262
263 del self.builders[n]
264 builder.master = None
265 builder.botmaster = None
266
267 wfd = defer.waitForDeferred(
268 defer.maybeDeferred(lambda :
269 builder.disownServiceParent()))
270 yield wfd
271 wfd.getResult()
272
273 for n in added_names:
274 builder = Builder(n)
275 self.builders[n] = builder
276
277 builder.botmaster = self
278 builder.master = self.master
279 builder.setServiceParent(self)
280
281 self.builderNames = self.builders.keys()
282
283 metrics.MetricCountEvent.log("num_builders",
284 len(self.builders), absolute=True)
285
286 timer.stop()
287
288
290 if self.buildrequest_sub:
291 self.buildrequest_sub.unsubscribe()
292 self.buildrequest_sub = None
293 for b in self.builders.values():
294 b.builder_status.addPointEvent(["master", "shutdown"])
295 b.builder_status.saveYourself()
296 return service.MultiService.stopService(self)
297
299 """Convert a Lock identifier into an actual Lock instance.
300 @param lockid: a locks.MasterLock or locks.SlaveLock instance
301 @return: a locks.RealMasterLock or locks.RealSlaveLock instance
302 """
303 assert isinstance(lockid, (locks.MasterLock, locks.SlaveLock))
304 if not lockid in self.locks:
305 self.locks[lockid] = lockid.lockClass(lockid)
306
307
308
309
310 return self.locks[lockid]
311
313 """
314 Call this when something suggests that a particular builder may now
315 be available to start a build.
316
317 @param buildername: the name of the builder
318 """
319 self.brd.maybeStartBuildsOn([buildername])
320
322 """
323 Call this when something suggests that a particular slave may now be
324 available to start a build.
325
326 @param slave_name: the name of the slave
327 """
328 builders = self.getBuildersForSlave(slave_name)
329 self.brd.maybeStartBuildsOn([ b.name for b in builders ])
330
332 """
333 Call this when something suggests that this would be a good time to start some
334 builds, but nothing more specific.
335 """
336 self.brd.maybeStartBuildsOn(self.builderNames)
337
339 """
340 Special-purpose class to handle distributing build requests to builders by
341 calling their C{maybeStartBuild} method.
342
343 This takes account of the C{prioritizeBuilders} configuration, and is
344 highly re-entrant; that is, if a new build request arrives while builders
345 are still working on the previous build request, then this class will
346 correctly re-prioritize invocations of builders' C{maybeStartBuild}
347 methods.
348 """
349
351 self.botmaster = botmaster
352 self.master = botmaster.master
353
354
355 self.pending_builders_lock = defer.DeferredLock()
356
357
358
359 self._pending_builders = []
360 self.activity_lock = defer.DeferredLock()
361 self.active = False
362
364
365
366 d = self.activity_lock.acquire()
367 d.addCallback(lambda _ : service.Service.stopService(self))
368 d.addBoth(lambda _ : self.activity_lock.release())
369 return d
370
371 @defer.deferredGenerator
373 """
374 Try to start any builds that can be started right now. This function
375 returns immediately, and promises to trigger those builders
376 eventually.
377
378 @param new_builders: names of new builders that should be given the
379 opportunity to check for new requests.
380 """
381 new_builders = set(new_builders)
382 existing_pending = set(self._pending_builders)
383
384
385 if new_builders < existing_pending:
386 return
387
388
389
390 wfd = defer.waitForDeferred(
391 self.pending_builders_lock.acquire())
392 yield wfd
393 wfd.getResult()
394
395 try:
396
397
398 existing_pending = set(self._pending_builders)
399
400
401 wfd = defer.waitForDeferred(
402 self._sortBuilders(list(existing_pending | new_builders)))
403 yield wfd
404 self._pending_builders = wfd.getResult()
405
406
407 if not self.active:
408 self._activityLoop()
409 except:
410 log.err(Failure(),
411 "while attempting to start builds on %s" % self.name)
412
413
414 self.pending_builders_lock.release()
415
416 @defer.deferredGenerator
418 timer = metrics.Timer("BuildRequestDistributor._defaultSorter()")
419 timer.start()
420
421
422 def xform(bldr):
423 d = defer.maybeDeferred(lambda :
424 bldr.getOldestRequestTime())
425 d.addCallback(lambda time :
426 (((time is None) and None or time),bldr))
427 return d
428 wfd = defer.waitForDeferred(
429 defer.gatherResults(
430 [ xform(bldr) for bldr in builders ]))
431 yield wfd
432 xformed = wfd.getResult()
433
434
435
436 def nonecmp(a,b):
437 if a[0] is None: return 1
438 if b[0] is None: return -1
439 return cmp(a,b)
440 xformed.sort(cmp=nonecmp)
441
442
443 yield [ xf[1] for xf in xformed ]
444 timer.stop()
445
446 @defer.deferredGenerator
448 timer = metrics.Timer("BuildRequestDistributor._sortBuilders()")
449 timer.start()
450
451
452
453 builders_dict = self.botmaster.builders
454 builders = [ builders_dict.get(n)
455 for n in buildernames
456 if n in builders_dict ]
457
458
459 sorter = self.master.config.prioritizeBuilders
460 if not sorter:
461 sorter = self._defaultSorter
462
463
464 try:
465 wfd = defer.waitForDeferred(
466 defer.maybeDeferred(lambda :
467 sorter(self.master, builders)))
468 yield wfd
469 builders = wfd.getResult()
470 except:
471 log.msg("Exception prioritizing builders; order unspecified")
472 log.err(Failure())
473
474
475 yield [ b.name for b in builders ]
476 timer.stop()
477
478 @defer.deferredGenerator
480 self.active = True
481
482 timer = metrics.Timer('BuildRequestDistributor._activityLoop()')
483 timer.start()
484
485 while 1:
486 wfd = defer.waitForDeferred(
487 self.activity_lock.acquire())
488 yield wfd
489 wfd.getResult()
490
491
492 wfd = defer.waitForDeferred(
493 self.pending_builders_lock.acquire())
494 yield wfd
495 wfd.getResult()
496
497
498 if not self.running or not self._pending_builders:
499 self.pending_builders_lock.release()
500 self.activity_lock.release()
501 break
502
503 bldr_name = self._pending_builders.pop(0)
504 self.pending_builders_lock.release()
505
506 try:
507 wfd = defer.waitForDeferred(
508 self._callABuilder(bldr_name))
509 yield wfd
510 wfd.getResult()
511 except:
512 log.err(Failure(),
513 "from maybeStartBuild for builder '%s'" % (bldr_name,))
514
515 self.activity_lock.release()
516
517 timer.stop()
518
519 self.active = False
520 self._quiet()
521
523
524 bldr = self.botmaster.builders.get(bldr_name)
525 if not bldr:
526 return defer.succeed(None)
527
528 d = bldr.maybeStartBuild()
529 d.addErrback(log.err, 'in maybeStartBuild for %r' % (bldr,))
530 return d
531
535
538 """Utility class to arbitrate the situation when a new slave connects with
539 the name of an existing, connected slave
540
541 @ivar buildslave: L{buildbot.process.slavebuilder.AbstractBuildSlave}
542 instance
543 @ivar old_remote: L{RemoteReference} to the old slave
544 @ivar new_remote: L{RemoteReference} to the new slave
545 """
546 _reactor = reactor
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562 PING_TIMEOUT = 10
563 """Timeout for pinging the old slave. Set this to something quite long, as
564 a very busy slave (e.g., one sending a big log chunk) may take a while to
565 return a ping.
566 """
567
571
573 self.new_remote = mind
574 self.ping_old_slave_done = False
575 self.old_slave_connected = True
576 self.ping_new_slave_done = False
577
578 old_tport = self.old_remote.broker.transport
579 new_tport = self.new_remote.broker.transport
580 log.msg("duplicate slave %s; delaying new slave (%s) and pinging old "
581 "(%s)" % (self.buildslave.slavename, new_tport.getPeer(),
582 old_tport.getPeer()))
583
584
585 d = self.new_slave_d = defer.Deferred()
586
587
588
589
590 self.ping_old_slave(new_tport.getPeer())
591
592
593 self.ping_new_slave()
594
595 return d
596
598 d = defer.maybeDeferred(lambda :
599 self.new_remote.callRemote("print", "master already has a "
600 "connection named '%s' - checking its liveness"
601 % self.buildslave.slavename))
602 def done(_):
603
604 self.ping_new_slave_done = True
605 self.maybe_done()
606 d.addBoth(done)
607
609
610
611
612
613 def timeout():
614 self.ping_old_slave_timeout = None
615 self.ping_old_slave_timed_out = True
616 self.old_slave_connected = False
617 self.ping_old_slave_done = True
618 self.maybe_done()
619 self.ping_old_slave_timeout = self._reactor.callLater(
620 self.PING_TIMEOUT, timeout)
621 self.ping_old_slave_timed_out = False
622
623
624
625 d = defer.maybeDeferred(lambda :
626 self.old_remote.callRemote("print",
627 "master got a duplicate connection from %s; keeping this one"
628 % new_peer))
629
630 def clear_timeout(r):
631 if self.ping_old_slave_timeout:
632 self.ping_old_slave_timeout.cancel()
633 self.ping_old_slave_timeout = None
634 return r
635 d.addBoth(clear_timeout)
636
637 def old_gone(f):
638 if self.ping_old_slave_timed_out:
639 return
640 f.trap(pb.PBConnectionLost, pb.DeadReferenceError)
641 log.msg(("connection lost while pinging old slave '%s' - " +
642 "keeping new slave") % self.buildslave.slavename)
643 self.old_slave_connected = False
644 d.addErrback(old_gone)
645
646 def other_err(f):
647 log.err(f, "unexpected error pinging old slave; disconnecting it")
648 self.old_slave_connected = False
649 d.addErrback(other_err)
650
651 def done(_):
652 if self.ping_old_slave_timed_out:
653 return
654 self.ping_old_slave_done = True
655 self.maybe_done()
656 d.addCallback(done)
657
659 if not self.ping_new_slave_done or not self.ping_old_slave_done:
660 return
661
662
663 if self.old_slave_connected:
664 self.disconnect_new_slave()
665 else:
666 self.start_new_slave()
667
669
670 if not self.new_slave_d:
671 return
672
673 d = self.new_slave_d
674 self.new_slave_d = None
675
676 if self.buildslave.isConnected():
677
678
679 def detached():
680 d.callback(self.buildslave)
681 self.buildslave.subscribeToDetach(detached)
682 self.old_remote.broker.transport.loseConnection()
683 else:
684
685
686
687 d.callback(self.buildslave)
688
690
691 if not self.new_slave_d:
692 return
693
694 d = self.new_slave_d
695 self.new_slave_d = None
696 log.msg("rejecting duplicate slave with exception")
697 d.errback(Failure(RuntimeError("rejecting duplicate slave")))
698