1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 from twisted.python import log
18 from twisted.python.failure import Failure
19 from twisted.internet import defer, reactor
20 from twisted.spread import pb
21 from twisted.application import service
22
23 from buildbot.process.builder import Builder
24 from buildbot import interfaces, locks
25 from buildbot.process import metrics
28
29 """This is the master-side service which manages remote buildbot slaves.
30 It provides them with BuildSlaves, and distributes build requests to
31 them."""
32
33 debug = 0
34 reactor = reactor
35
37 service.MultiService.__init__(self)
38 self.master = master
39
40 self.builders = {}
41 self.builderNames = []
42
43
44
45
46
47
48
49
50 self.slaves = {}
51 self.watchers = {}
52
53
54 self.locks = {}
55
56
57
58 self.mergeRequests = None
59
60
61
62 self.prioritizeBuilders = None
63
64 self.shuttingDown = False
65
66 self.lastSlavePortnum = None
67
68
69 self.buildrequest_sub = None
70
71
72 self.brd = BuildRequestDistributor(self)
73 self.brd.setServiceParent(self)
74
76 """Shut down the entire process, once all currently-running builds are
77 complete."""
78 if self.shuttingDown:
79 return
80 log.msg("Initiating clean shutdown")
81 self.shuttingDown = True
82
83
84
85 d = self.brd.stopService()
86
87
88 def wait(_):
89 l = []
90 for builder in self.builders.values():
91 for build in builder.builder_status.getCurrentBuilds():
92 l.append(build.waitUntilFinished())
93 if len(l) == 0:
94 log.msg("No running jobs, starting shutdown immediately")
95 else:
96 log.msg("Waiting for %i build(s) to finish" % len(l))
97 return defer.DeferredList(l)
98 d.addCallback(wait)
99
100
101 def shutdown(ign):
102
103
104 if self.shuttingDown:
105
106 for builder in self.builders.values():
107 n = len(builder.builder_status.getCurrentBuilds())
108 if n > 0:
109 log.msg("Not shutting down, builder %s has %i builds running" % (builder, n))
110 log.msg("Trying shutdown sequence again")
111 self.shuttingDown = False
112 self.cleanShutdown()
113 return
114 log.msg("Stopping reactor")
115 _reactor.stop()
116 else:
117 self.brd.startService()
118 d.addCallback(shutdown)
119 d.addErrback(log.err, 'while processing cleanShutdown')
120
122 """Cancel a clean shutdown that is already in progress, if any"""
123 if not self.shuttingDown:
124 return
125 log.msg("Cancelling clean shutdown")
126 self.shuttingDown = False
127
129 timer = metrics.Timer("BotMaster.loadConfig_Slaves()")
130 timer.start()
131 new_portnum = (self.lastSlavePortnum is not None
132 and self.lastSlavePortnum != self.master.slavePortnum)
133 if new_portnum:
134
135 raise ValueError("changing slavePortnum in reconfig is not supported")
136 self.lastSlavePortnum = self.master.slavePortnum
137
138 old_slaves = [c for c in list(self)
139 if interfaces.IBuildSlave.providedBy(c)]
140
141
142
143
144
145
146
147
148
149
150
151
152 old_t = {}
153 for s in old_slaves:
154 old_t[s.identity()] = s
155 new_t = {}
156 for s in new_slaves:
157 new_t[s.identity()] = s
158 removed = [old_t[t]
159 for t in old_t
160 if t not in new_t]
161 added = [new_t[t]
162 for t in new_t
163 if t not in old_t]
164 remaining_t = [t
165 for t in new_t
166 if t in old_t]
167
168
169 dl = []
170 for s in removed:
171 dl.append(self.removeSlave(s))
172 d = defer.DeferredList(dl, fireOnOneErrback=True)
173
174 def add_new(res):
175 for s in added:
176 self.addSlave(s)
177 d.addCallback(add_new)
178
179 def update_remaining(_):
180 for t in remaining_t:
181 old_t[t].update(new_t[t])
182
183 d.addCallback(update_remaining)
184
185 def stop(_):
186 metrics.MetricCountEvent.log("num_slaves",
187 len(self.slaves), absolute=True)
188 timer.stop()
189 return _
190 d.addBoth(stop)
191
192 return d
193
201
202
203
204 @metrics.countMethod('BotMaster.removeSlave()')
211 d.addCallback(delslave)
212 return d
213
214 @metrics.countMethod('BotMaster.slaveLost()')
220
221 @metrics.countMethod('BotMaster.getBuildersForSlave()')
223 return [b
224 for b in self.builders.values()
225 if slavename in b.slavenames]
226
228 return self.builderNames
229
231 allBuilders = [self.builders[name] for name in self.builderNames]
232 return allBuilders
233
235
236
237 self.builders = {}
238 self.builderNames = []
239 d = defer.DeferredList([b.disownServiceParent() for b in list(self)
240 if isinstance(b, Builder)],
241 fireOnOneErrback=True)
242 def _add(ign):
243 log.msg("setBuilders._add: %s %s" % (list(self), [b.name for b in builders]))
244 for b in builders:
245 for slavename in b.slavenames:
246
247 assert slavename in self.slaves
248 self.builders[b.name] = b
249 self.builderNames.append(b.name)
250 b.setBotmaster(self)
251 b.setServiceParent(self)
252 d.addCallback(_add)
253 d.addCallback(lambda ign: self._updateAllSlaves())
254
255 d.addCallback(lambda _ :
256 self.maybeStartBuildsForAllBuilders())
257 return d
258
272 d.addBoth(stop)
273 return d
274
291
295 self.buildrequest_sub = \
296 self.master.subscribeToBuildRequests(buildRequestAdded)
297 service.MultiService.startService(self)
298
300 if self.buildrequest_sub:
301 self.buildrequest_sub.unsubscribe()
302 self.buildrequest_sub = None
303 for b in self.builders.values():
304 b.builder_status.addPointEvent(["master", "shutdown"])
305 b.builder_status.saveYourself()
306 return service.MultiService.stopService(self)
307
309 """Convert a Lock identifier into an actual Lock instance.
310 @param lockid: a locks.MasterLock or locks.SlaveLock instance
311 @return: a locks.RealMasterLock or locks.RealSlaveLock instance
312 """
313 assert isinstance(lockid, (locks.MasterLock, locks.SlaveLock))
314 if not lockid in self.locks:
315 self.locks[lockid] = lockid.lockClass(lockid)
316
317
318
319
320 return self.locks[lockid]
321
323 """
324 Call this when something suggests that a particular builder may now
325 be available to start a build.
326
327 @param buildername: the name of the builder
328 """
329 self.brd.maybeStartBuildsOn([buildername])
330
332 """
333 Call this when something suggests that a particular slave may now be
334 available to start a build.
335
336 @param slave_name: the name of the slave
337 """
338 builders = self.getBuildersForSlave(slave_name)
339 self.brd.maybeStartBuildsOn([ b.name for b in builders ])
340
342 """
343 Call this when something suggests that this would be a good time to start some
344 builds, but nothing more specific.
345 """
346 self.brd.maybeStartBuildsOn(self.builderNames)
347
349 """
350 Special-purpose class to handle distributing build requests to builders by
351 calling their C{maybeStartBuild} method.
352
353 This takes account of the C{prioritizeBuilders} configuration, and is
354 highly re-entrant; that is, if a new build request arrives while builders
355 are still working on the previous build request, then this class will
356 correctly re-prioritize invocations of builders' C{maybeStartBuild}
357 methods.
358 """
359
361 self.botmaster = botmaster
362 self.master = botmaster.master
363
364
365 self.pending_builders_lock = defer.DeferredLock()
366
367
368
369 self._pending_builders = []
370 self.activity_lock = defer.DeferredLock()
371 self.active = False
372
374
375
376 d = self.activity_lock.acquire()
377 d.addCallback(lambda _ : service.Service.stopService(self))
378 d.addBoth(lambda _ : self.activity_lock.release())
379 return d
380
381 @defer.deferredGenerator
383 """
384 Try to start any builds that can be started right now. This function
385 returns immediately, and promises to trigger those builders
386 eventually.
387
388 @param new_builders: names of new builders that should be given the
389 opportunity to check for new requests.
390 """
391 new_builders = set(new_builders)
392 existing_pending = set(self._pending_builders)
393
394
395 if new_builders < existing_pending:
396 return
397
398
399
400 wfd = defer.waitForDeferred(
401 self.pending_builders_lock.acquire())
402 yield wfd
403 wfd.getResult()
404
405 try:
406
407
408 existing_pending = set(self._pending_builders)
409
410
411 wfd = defer.waitForDeferred(
412 self._sortBuilders(list(existing_pending | new_builders)))
413 yield wfd
414 self._pending_builders = wfd.getResult()
415
416
417 if not self.active:
418 self._activityLoop()
419 except:
420 log.err(Failure(),
421 "while attempting to start builds on %s" % self.name)
422
423
424 self.pending_builders_lock.release()
425
426 @defer.deferredGenerator
428 timer = metrics.Timer("BuildRequestDistributor._defaultSorter()")
429 timer.start()
430
431
432 def xform(bldr):
433 d = defer.maybeDeferred(lambda :
434 bldr.getOldestRequestTime())
435 d.addCallback(lambda time :
436 (((time is None) and None or time),bldr))
437 return d
438 wfd = defer.waitForDeferred(
439 defer.gatherResults(
440 [ xform(bldr) for bldr in builders ]))
441 yield wfd
442 xformed = wfd.getResult()
443
444
445
446 def nonecmp(a,b):
447 if a[0] is None: return 1
448 if b[0] is None: return -1
449 return cmp(a,b)
450 xformed.sort(cmp=nonecmp)
451
452
453 yield [ xf[1] for xf in xformed ]
454 timer.stop()
455
456 @defer.deferredGenerator
458 timer = metrics.Timer("BuildRequestDistributor._sortBuilders()")
459 timer.start()
460
461
462
463 builders_dict = self.botmaster.builders
464 builders = [ builders_dict.get(n)
465 for n in buildernames
466 if n in builders_dict ]
467
468
469 sorter = self.botmaster.prioritizeBuilders
470 if not sorter:
471 sorter = self._defaultSorter
472
473
474 try:
475 wfd = defer.waitForDeferred(
476 defer.maybeDeferred(lambda :
477 sorter(self.master, builders)))
478 yield wfd
479 builders = wfd.getResult()
480 except:
481 log.msg("Exception prioritizing builders; order unspecified")
482 log.err(Failure())
483
484
485 yield [ b.name for b in builders ]
486 timer.stop()
487
488 @defer.deferredGenerator
490 self.active = True
491
492 timer = metrics.Timer('BuildRequestDistributor._activityLoop()')
493 timer.start()
494
495 while 1:
496 wfd = defer.waitForDeferred(
497 self.activity_lock.acquire())
498 yield wfd
499 wfd.getResult()
500
501
502 wfd = defer.waitForDeferred(
503 self.pending_builders_lock.acquire())
504 yield wfd
505 wfd.getResult()
506
507
508 if not self.running or not self._pending_builders:
509 self.pending_builders_lock.release()
510 self.activity_lock.release()
511 break
512
513 bldr_name = self._pending_builders.pop(0)
514 self.pending_builders_lock.release()
515
516 try:
517 wfd = defer.waitForDeferred(
518 self._callABuilder(bldr_name))
519 yield wfd
520 wfd.getResult()
521 except:
522 log.err(Failure(),
523 "from maybeStartBuild for builder '%s'" % (bldr_name,))
524
525 self.activity_lock.release()
526
527 timer.stop()
528
529 self.active = False
530 self._quiet()
531
533
534 bldr = self.botmaster.builders.get(bldr_name)
535 if not bldr:
536 return defer.succeed(None)
537
538 d = bldr.maybeStartBuild()
539 d.addErrback(log.err, 'in maybeStartBuild for %r' % (bldr,))
540 return d
541
545
548 """Utility class to arbitrate the situation when a new slave connects with
549 the name of an existing, connected slave"""
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564 PING_TIMEOUT = 10
565 """Timeout for pinging the old slave. Set this to something quite long, as
566 a very busy slave (e.g., one sending a big log chunk) may take a while to
567 return a ping.
568
569 @ivar old_slave: L{buildbot.process.slavebuilder.AbstractSlaveBuilder}
570 instance
571 """
572
574 self.old_slave = slave
575
577 self.new_slave_mind = mind
578
579 old_tport = self.old_slave.slave.broker.transport
580 new_tport = mind.broker.transport
581 log.msg("duplicate slave %s; delaying new slave (%s) and pinging old (%s)" %
582 (self.old_slave.slavename, new_tport.getPeer(), old_tport.getPeer()))
583
584
585 self.new_slave_d = defer.Deferred()
586
587
588
589
590 self.ping_old_slave_done = False
591 self.old_slave_connected = True
592 self.ping_old_slave(new_tport.getPeer())
593
594
595 self.ping_new_slave_done = False
596 self.ping_new_slave()
597
598 return self.new_slave_d
599
601 d = self.new_slave_mind.callRemote("print",
602 "master already has a connection named '%s' - checking its liveness"
603 % self.old_slave.slavename)
604 def done(_):
605
606 self.ping_new_slave_done = True
607 self.maybe_done()
608 d.addBoth(done)
609
611
612
613
614 def timeout():
615 self.ping_old_slave_timeout = None
616 self.ping_old_slave_timed_out = True
617 self.old_slave_connected = False
618 self.ping_old_slave_done = True
619 self.maybe_done()
620 self.ping_old_slave_timeout = reactor.callLater(self.PING_TIMEOUT, timeout)
621 self.ping_old_slave_timed_out = False
622
623 d = self.old_slave.slave.callRemote("print",
624 "master got a duplicate connection from %s; keeping this one" % new_peer)
625
626 def clear_timeout(r):
627 if self.ping_old_slave_timeout:
628 self.ping_old_slave_timeout.cancel()
629 self.ping_old_slave_timeout = None
630 return r
631 d.addBoth(clear_timeout)
632
633 def old_gone(f):
634 if self.ping_old_slave_timed_out:
635 return
636 f.trap(pb.PBConnectionLost)
637 log.msg(("connection lost while pinging old slave '%s' - " +
638 "keeping new slave") % self.old_slave.slavename)
639 self.old_slave_connected = False
640 d.addErrback(old_gone)
641
642 def other_err(f):
643 if self.ping_old_slave_timed_out:
644 return
645 log.msg("unexpected error while pinging old slave; disconnecting it")
646 log.err(f)
647 self.old_slave_connected = False
648 d.addErrback(other_err)
649
650 def done(_):
651 if self.ping_old_slave_timed_out:
652 return
653 self.ping_old_slave_done = True
654 self.maybe_done()
655 d.addCallback(done)
656
658 if not self.ping_new_slave_done or not self.ping_old_slave_done:
659 return
660
661
662 if self.old_slave_connected:
663 self.disconnect_new_slave()
664 else:
665 self.start_new_slave()
666
668 if not self.new_slave_d:
669 return
670
671
672
673 if self.old_slave.isConnected():
674 if self.old_slave.slave:
675 self.old_slave.slave.broker.transport.loseConnection()
676 if count < 0:
677 log.msg("WEIRD: want to start new slave, but the old slave will not disconnect")
678 self.disconnect_new_slave()
679 else:
680 reactor.callLater(0.1, self.start_new_slave, count-1)
681 return
682
683 d = self.new_slave_d
684 self.new_slave_d = None
685 d.callback(self.old_slave)
686
688 if not self.new_slave_d:
689 return
690 d = self.new_slave_d
691 self.new_slave_d = None
692 log.msg("rejecting duplicate slave with exception")
693 d.errback(Failure(RuntimeError("rejecting duplicate slave")))
694