1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38 """
39 One-at-a-time notification-triggered Deferred event loop. Each such loop has a
40 'doorbell' named trigger() and a set of processing functions. The processing
41 functions are expected to be callables like Scheduler methods, which examine a
42 database for work to do. The doorbell will be rung by other code that writes
43 into the database (possibly in a separate process).
44
45 At some point after the doorbell is rung, each function will be run in turn,
46 one at a time. Each function can return a Deferred, and the next function will
47 not be run until the previous one's Deferred has fired. That is, at all times,
48 at most one processing function will be active.
49
50 If the doorbell is rung during a run, the loop will be run again later.
51 Multiple rings may be handled by a single run, but the class guarantees that
52 there will be at least one full run that begins after the last ring. The
53 relative order of processing functions within a run is not preserved. If a
54 processing function is added to the loop more than once, it will still only be
55 called once per run.
56
57 If the Deferred returned by the processing function fires with a number, the
58 event loop will call that function again at or after the given time
59 (expressed as seconds since epoch). This can be used by processing functions
60 when they want to 'sleep' until some amount of time has passed, such as for a
61 Scheduler that is waiting for a tree-stable-timer to expire, or a Periodic
62 scheduler that wants to fire once every six hours. This delayed call will
63 obey the same one-at-a-time behavior as the run-everything trigger.
64
65 Each function's return-value-timer value will replace the previous timer. Any
66 outstanding timer will be cancelled just before invoking a processing
67 function. As a result, these functions should basically be idempotent: if the
68 database says that the Scheduler needs to wake up at 5pm, it should keep
69 returning '5pm' until it gets called after 5pm, at which point it should
70 start returning None.
71
72 The functions should also add an epsilon (perhaps one second) to their
73 desired wakeup time, so that rounding errors or low-resolution system timers
74 don't cause 'OCD Alarm Clock Syndrome' (in which they get woken up a moment
75 too early and then try to sleep repeatedly for zero seconds). The event loop
76 will silently impose a 5-second minimum delay time to avoid this.
77
78 Any errors in the processing functions are written to log.err and then
79 ignored.
80 """
81
82
83 from twisted.internet import reactor, defer
84 from twisted.application import service
85 from twisted.python import log
86
87 from buildbot.util.eventual import eventually
88 from buildbot import util
89
91 OCD_MINIMUM_DELAY = 5.0
92
94 service.MultiService.__init__(self)
95 self._loop_running = False
96 self._everything_needs_to_run = False
97 self._wakeup_timer = None
98 self._timers = {}
99 self._when_quiet_waiters = set()
100 self._start_timer = None
101 self._reactor = reactor
102 self._remaining = []
103
105 if self._start_timer and self._start_timer.active():
106 self._start_timer.cancel()
107 if self._wakeup_timer and self._wakeup_timer.active():
108 self._wakeup_timer.cancel()
109 return service.MultiService.stopService(self)
110
112 return not self._loop_running
113
115 d = defer.Deferred()
116 self._when_quiet_waiters.add(d)
117 return d
118
120
121
122 if not self.running:
123 print "loop triggered while service disabled; ignoring trigger"
124 return
125 self._mark_runnable(run_everything=True)
126
128 if run_everything:
129 self._everything_needs_to_run = True
130
131 self._timers.clear()
132 self._set_wakeup_timer()
133 if self._loop_running:
134 return
135 self._loop_running = True
136 self._start_timer = self._reactor.callLater(0, self._loop_start)
137
139 raise Exception('subclasses must implement get_processors()')
140
142 if self._everything_needs_to_run:
143 self._everything_needs_to_run = False
144 self._timers.clear()
145 self._set_wakeup_timer()
146 self._remaining = list(self.get_processors())
147 else:
148 self._remaining = []
149 now = util.now(self._reactor)
150 all_processors = self.get_processors()
151 for p in list(self._timers.keys()):
152 if self._timers[p] <= now:
153 del self._timers[p]
154
155
156 if p in all_processors:
157 self._remaining.append(p)
158
159 self._loop_next()
160
162 if not self._remaining:
163 return self._loop_done()
164 p = self._remaining.pop(0)
165 self._timers.pop(p, None)
166 now = util.now(self._reactor)
167 d = defer.maybeDeferred(p)
168 d.addCallback(self._set_timer, now, p)
169 d.addErrback(log.err)
170 d.addBoth(self._one_done)
171 return None
172
175
177 if self._everything_needs_to_run:
178 self._loop_start()
179 return
180 self._loop_running = False
181 self._set_wakeup_timer()
182 if not self._timers:
183
184 while self._when_quiet_waiters:
185 d = self._when_quiet_waiters.pop()
186 self._reactor.callLater(0, d.callback, None)
187 self.loop_done()
188
194
196 if isinstance(res, (int, float)):
197 assert res > now
198
199
200
201 when = max(res, now+self.OCD_MINIMUM_DELAY)
202 self._timers[p] = when
203
205 if not self._timers:
206 if self._wakeup_timer:
207 self._wakeup_timer.cancel()
208 self._wakeup_timer = None
209 return
210 when = min(self._timers.values())
211
212
213
214 delay = max(0, when - util.now(self._reactor))
215 if self._wakeup_timer:
216 self._wakeup_timer.reset(delay)
217 else:
218 self._wakeup_timer = self._reactor.callLater(delay, self._wakeup)
219
221 self._wakeup_timer = None
222 self._mark_runnable(run_everything=False)
223
224 -class Loop(LoopBase):
228
229 - def add(self, processor):
230 self.processors.add(processor)
231
233 self.processors.remove(processor)
234
236 return self.processors.copy()
237
239 - def __init__(self, get_processors_function):
242
244 """I am a Loop which gets my processors from my service children. When I
245 run, I iterate over each of them, invoking their 'run' method."""
246
248 return [child.run for child in self]
249