1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38 """
39 One-at-a-time notification-triggered Deferred event loop. Each such loop has a
40 'doorbell' named trigger() and a set of processing functions. The processing
41 functions are expected to be callables like Scheduler methods, which examine a
42 database for work to do. The doorbell will be rung by other code that writes
43 into the database (possibly in a separate process).
44
45 At some point after the doorbell is rung, each function will be run in turn,
46 one at a time. Each function can return a Deferred, and the next function will
47 not be run until the previous one's Deferred has fired. That is, at all times,
48 at most one processing function will be active.
49
50 If the doorbell is rung during a run, the loop will be run again later.
51 Multiple rings may be handled by a single run, but the class guarantees that
52 there will be at least one full run that begins after the last ring. The
53 relative order of processing functions within a run is not preserved. If a
54 processing function is added to the loop more than once, it will still only be
55 called once per run.
56
57 If the Deferred returned by the processing function fires with a number, the
58 event loop will call that function again at or after the given time
59 (expressed as seconds since epoch). This can be used by processing functions
60 when they want to 'sleep' until some amount of time has passed, such as for a
61 Scheduler that is waiting for a tree-stable-timer to expire, or a Periodic
62 scheduler that wants to fire once every six hours. This delayed call will
63 obey the same one-at-a-time behavior as the run-everything trigger.
64
65 Each function's return-value-timer value will replace the previous timer. Any
66 outstanding timer will be cancelled just before invoking a processing
67 function. As a result, these functions should basically be idempotent: if the
68 database says that the Scheduler needs to wake up at 5pm, it should keep
69 returning '5pm' until it gets called after 5pm, at which point it should
70 start returning None.
71
72 The functions should also add an epsilon (perhaps one second) to their
73 desired wakeup time, so that rounding errors or low-resolution system timers
74 don't cause 'OCD Alarm Clock Syndrome' (in which they get woken up a moment
75 too early and then try to sleep repeatedly for zero seconds). The event loop
76 will silently impose a 5-second minimum delay time to avoid this.
77
78 Any errors in the processing functions are written to log.err and then
79 ignored.
80 """
81
82 import time
83 from twisted.internet import reactor, defer
84 from twisted.application import service
85 from twisted.python import log
86
87 from buildbot.util.eventual import eventually
88 from buildbot import util
89
91 OCD_MINIMUM_DELAY = 5.0
92
94 service.MultiService.__init__(self)
95 self._loop_running = False
96 self._everything_needs_to_run = False
97 self._wakeup_timer = None
98 self._timers = {}
99 self._when_quiet_waiters = set()
100 self._start_timer = None
101 self._reactor = reactor
102
104 if self._start_timer and self._start_timer.active():
105 self._start_timer.cancel()
106 if self._wakeup_timer and self._wakeup_timer.active():
107 self._wakeup_timer.cancel()
108 return service.MultiService.stopService(self)
109
111 return not self._loop_running
112
114 d = defer.Deferred()
115 self._when_quiet_waiters.add(d)
116 return d
117
119
120
121 if not self.running:
122 print "loop triggered while service disabled; ignoring trigger"
123 return
124 self._mark_runnable(run_everything=True)
125
127 if run_everything:
128 self._everything_needs_to_run = True
129
130 self._timers.clear() ; self._set_wakeup_timer()
131 if self._loop_running:
132 return
133 self._loop_running = True
134 self._start_timer = self._reactor.callLater(0, self._loop_start)
135
136
137
139 if self._everything_needs_to_run:
140 self._everything_needs_to_run = False
141 self._timers.clear() ; self._set_wakeup_timer()
142 self._remaining = list(self.get_processors())
143 else:
144 self._remaining = []
145 now = util.now(self._reactor)
146 all_processors = self.get_processors()
147 for p in list(self._timers.keys()):
148 if self._timers[p] <= now:
149 del self._timers[p]
150
151
152 if p in all_processors:
153 self._remaining.append(p)
154
155 self._loop_next()
156
158 if not self._remaining:
159 return self._loop_done()
160 p = self._remaining.pop(0)
161 self._timers.pop(p, None)
162 now = util.now(self._reactor)
163 d = defer.maybeDeferred(p)
164 d.addCallback(self._set_timer, now, p)
165 d.addErrback(log.err)
166 d.addBoth(self._one_done)
167 return None
168
171
173 if self._everything_needs_to_run:
174 self._loop_start()
175 return
176 self._loop_running = False
177 self._set_wakeup_timer()
178 if not self._timers:
179
180 while self._when_quiet_waiters:
181 d = self._when_quiet_waiters.pop()
182 self._reactor.callLater(0, d.callback, None)
183 self.loop_done()
184
190
192 if isinstance(res, (int, float)):
193 assert res > now
194
195
196
197 when = max(res, now+self.OCD_MINIMUM_DELAY)
198 self._timers[p] = when
199
201 if not self._timers:
202 if self._wakeup_timer:
203 self._wakeup_timer.cancel()
204 self._wakeup_timer = None
205 return
206 when = min(self._timers.values())
207
208
209
210 delay = max(0, when - util.now(self._reactor))
211 if self._wakeup_timer:
212 self._wakeup_timer.reset(delay)
213 else:
214 self._wakeup_timer = self._reactor.callLater(delay, self._wakeup)
215
217 self._wakeup_timer = None
218 self._mark_runnable(run_everything=False)
219
220 -class Loop(LoopBase):
224
225 - def add(self, processor):
226 self.processors.add(processor)
227
229 self.processors.remove(processor)
230
232 return self.processors.copy()
233
235 - def __init__(self, get_processors_function):
238
240 """I am a Loop which gets my processors from my service children. When I
241 run, I iterate over each of them, invoking their 'run' method."""
242
244 return [child.run for child in self]
245