1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """
17 One-at-a-time notification-triggered Deferred event loop. Each such loop has a
18 'doorbell' named trigger() and a set of processing functions. The processing
19 functions are expected to be callables like Scheduler methods, which examine a
20 database for work to do. The doorbell will be rung by other code that writes
21 into the database (possibly in a separate process).
22
23 At some point after the doorbell is rung, each function will be run in turn,
24 one at a time. Each function can return a Deferred, and the next function will
25 not be run until the previous one's Deferred has fired. That is, at all times,
26 at most one processing function will be active.
27
28 If the doorbell is rung during a run, the loop will be run again later.
29 Multiple rings may be handled by a single run, but the class guarantees that
30 there will be at least one full run that begins after the last ring. The
31 relative order of processing functions within a run is not preserved. If a
32 processing function is added to the loop more than once, it will still only be
33 called once per run.
34
35 If the Deferred returned by the processing function fires with a number, the
36 event loop will call that function again at or after the given time
37 (expressed as seconds since epoch). This can be used by processing functions
38 when they want to 'sleep' until some amount of time has passed, such as for a
39 Scheduler that is waiting for a tree-stable-timer to expire, or a Periodic
40 scheduler that wants to fire once every six hours. This delayed call will
41 obey the same one-at-a-time behavior as the run-everything trigger.
42
43 Each function's return-value-timer value will replace the previous timer. Any
44 outstanding timer will be cancelled just before invoking a processing
45 function. As a result, these functions should basically be idempotent: if the
46 database says that the Scheduler needs to wake up at 5pm, it should keep
47 returning '5pm' until it gets called after 5pm, at which point it should
48 start returning None.
49
50 The functions should also add an epsilon (perhaps one second) to their
51 desired wakeup time, so that rounding errors or low-resolution system timers
52 don't cause 'OCD Alarm Clock Syndrome' (in which they get woken up a moment
53 too early and then try to sleep repeatedly for zero seconds). The event loop
54 will silently impose a 5-second minimum delay time to avoid this.
55
56 Any errors in the processing functions are written to log.err and then
57 ignored.
58 """
59
60
61 from twisted.internet import reactor, defer
62 from twisted.application import service
63 from twisted.python import log
64
65 from buildbot.util.eventual import eventually
66 from buildbot import util
67 from buildbot.process.metrics import Timer, countMethod
70 OCD_MINIMUM_DELAY = 5.0
71
73 service.MultiService.__init__(self)
74 self._loop_running = False
75 self._everything_needs_to_run = False
76 self._wakeup_timer = None
77 self._timers = {}
78 self._when_quiet_waiters = set()
79 self._start_timer = None
80 self._reactor = reactor
81 self._remaining = []
82
84 if self._start_timer and self._start_timer.active():
85 self._start_timer.cancel()
86 if self._wakeup_timer and self._wakeup_timer.active():
87 self._wakeup_timer.cancel()
88 return service.MultiService.stopService(self)
89
91 return not self._loop_running
92
94 d = defer.Deferred()
95 self._when_quiet_waiters.add(d)
96 return d
97
99
100
101 if not self.running:
102 log.msg("loop triggered while service disabled; ignoring trigger")
103 return
104 self._mark_runnable(run_everything=True)
105
107 if run_everything:
108 self._everything_needs_to_run = True
109
110 self._timers.clear()
111 self._set_wakeup_timer()
112 if self._loop_running:
113 return
114 self._loop_running = True
115 self._start_timer = self._reactor.callLater(0, self._loop_start)
116
118 raise Exception('subclasses must implement get_processors()')
119
120 _loop_timer = Timer('Loop.run')
121 @_loop_timer.startTimer
122 @countMethod('Loop._loop_start()')
124 if self._everything_needs_to_run:
125 self._everything_needs_to_run = False
126 self._timers.clear()
127 self._set_wakeup_timer()
128 self._remaining = list(self.get_processors())
129 else:
130 self._remaining = []
131 now = util.now(self._reactor)
132 all_processors = self.get_processors()
133 for p in list(self._timers.keys()):
134 if self._timers[p] <= now:
135 del self._timers[p]
136
137
138 if p in all_processors:
139 self._remaining.append(p)
140
141 self._loop_next()
142
143 @countMethod('Loop._loop_next()')
145 if not self._remaining:
146 return self._loop_done()
147 p = self._remaining.pop(0)
148 self._timers.pop(p, None)
149 now = util.now(self._reactor)
150 d = defer.maybeDeferred(p)
151 d.addCallback(self._set_timer, now, p)
152 d.addErrback(log.err)
153 d.addBoth(self._one_done)
154 return None
155
158
159 @_loop_timer.stopTimer
161 if self._everything_needs_to_run:
162 self._loop_start()
163 return
164 self._loop_running = False
165 self._set_wakeup_timer()
166 if not self._timers:
167
168 while self._when_quiet_waiters:
169 d = self._when_quiet_waiters.pop()
170 self._reactor.callLater(0, d.callback, None)
171 self.loop_done()
172
178
187
189 if not self._timers:
190 if self._wakeup_timer:
191 self._wakeup_timer.cancel()
192 self._wakeup_timer = None
193 return
194 when = min(self._timers.values())
195
196
197
198 delay = max(0, when - util.now(self._reactor))
199 if self._wakeup_timer:
200 self._wakeup_timer.reset(delay)
201 else:
202 self._wakeup_timer = self._reactor.callLater(delay, self._wakeup)
203
205 self._wakeup_timer = None
206 self._mark_runnable(run_everything=False)
207
208 -class Loop(LoopBase):
212
213 - def add(self, processor):
214 self.processors.add(processor)
215
217 self.processors.remove(processor)
218
220 return self.processors.copy()
221
223 - def __init__(self, get_processors_function):
226
228 """I am a Loop which gets my processors from my service children. When I
229 run, I iterate over each of them, invoking their 'run' method."""
230
232 return [child.run for child in self]
233