1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """
17 One-at-a-time notification-triggered Deferred event loop. Each such loop has a
18 'doorbell' named trigger() and a set of processing functions. The processing
19 functions are expected to be callables like Scheduler methods, which examine a
20 database for work to do. The doorbell will be rung by other code that writes
21 into the database (possibly in a separate process).
22
23 At some point after the doorbell is rung, each function will be run in turn,
24 one at a time. Each function can return a Deferred, and the next function will
25 not be run until the previous one's Deferred has fired. That is, at all times,
26 at most one processing function will be active.
27
28 If the doorbell is rung during a run, the loop will be run again later.
29 Multiple rings may be handled by a single run, but the class guarantees that
30 there will be at least one full run that begins after the last ring. The
31 relative order of processing functions within a run is not preserved. If a
32 processing function is added to the loop more than once, it will still only be
33 called once per run.
34
35 If the Deferred returned by the processing function fires with a number, the
36 event loop will call that function again at or after the given time
37 (expressed as seconds since epoch). This can be used by processing functions
38 when they want to 'sleep' until some amount of time has passed, such as for a
39 Scheduler that is waiting for a tree-stable-timer to expire, or a Periodic
40 scheduler that wants to fire once every six hours. This delayed call will
41 obey the same one-at-a-time behavior as the run-everything trigger.
42
43 Each function's return-value-timer value will replace the previous timer. Any
44 outstanding timer will be cancelled just before invoking a processing
45 function. As a result, these functions should basically be idempotent: if the
46 database says that the Scheduler needs to wake up at 5pm, it should keep
47 returning '5pm' until it gets called after 5pm, at which point it should
48 start returning None.
49
50 The functions should also add an epsilon (perhaps one second) to their
51 desired wakeup time, so that rounding errors or low-resolution system timers
52 don't cause 'OCD Alarm Clock Syndrome' (in which they get woken up a moment
53 too early and then try to sleep repeatedly for zero seconds). The event loop
54 will silently impose a 5-second minimum delay time to avoid this.
55
56 Any errors in the processing functions are written to log.err and then
57 ignored.
58 """
59
60
61 from twisted.internet import reactor, defer
62 from twisted.application import service
63 from twisted.python import log
64
65 from buildbot.util.eventual import eventually
66 from buildbot import util
67
69 OCD_MINIMUM_DELAY = 5.0
70
72 service.MultiService.__init__(self)
73 self._loop_running = False
74 self._everything_needs_to_run = False
75 self._wakeup_timer = None
76 self._timers = {}
77 self._when_quiet_waiters = set()
78 self._start_timer = None
79 self._reactor = reactor
80 self._remaining = []
81
83 if self._start_timer and self._start_timer.active():
84 self._start_timer.cancel()
85 if self._wakeup_timer and self._wakeup_timer.active():
86 self._wakeup_timer.cancel()
87 return service.MultiService.stopService(self)
88
90 return not self._loop_running
91
93 d = defer.Deferred()
94 self._when_quiet_waiters.add(d)
95 return d
96
98
99
100 if not self.running:
101 print "loop triggered while service disabled; ignoring trigger"
102 return
103 self._mark_runnable(run_everything=True)
104
106 if run_everything:
107 self._everything_needs_to_run = True
108
109 self._timers.clear()
110 self._set_wakeup_timer()
111 if self._loop_running:
112 return
113 self._loop_running = True
114 self._start_timer = self._reactor.callLater(0, self._loop_start)
115
117 raise Exception('subclasses must implement get_processors()')
118
120 if self._everything_needs_to_run:
121 self._everything_needs_to_run = False
122 self._timers.clear()
123 self._set_wakeup_timer()
124 self._remaining = list(self.get_processors())
125 else:
126 self._remaining = []
127 now = util.now(self._reactor)
128 all_processors = self.get_processors()
129 for p in list(self._timers.keys()):
130 if self._timers[p] <= now:
131 del self._timers[p]
132
133
134 if p in all_processors:
135 self._remaining.append(p)
136
137 self._loop_next()
138
140 if not self._remaining:
141 return self._loop_done()
142 p = self._remaining.pop(0)
143 self._timers.pop(p, None)
144 now = util.now(self._reactor)
145 d = defer.maybeDeferred(p)
146 d.addCallback(self._set_timer, now, p)
147 d.addErrback(log.err)
148 d.addBoth(self._one_done)
149 return None
150
153
155 if self._everything_needs_to_run:
156 self._loop_start()
157 return
158 self._loop_running = False
159 self._set_wakeup_timer()
160 if not self._timers:
161
162 while self._when_quiet_waiters:
163 d = self._when_quiet_waiters.pop()
164 self._reactor.callLater(0, d.callback, None)
165 self.loop_done()
166
172
174 if isinstance(res, (int, float)):
175 assert res > now
176
177
178
179 when = max(res, now+self.OCD_MINIMUM_DELAY)
180 self._timers[p] = when
181
183 if not self._timers:
184 if self._wakeup_timer:
185 self._wakeup_timer.cancel()
186 self._wakeup_timer = None
187 return
188 when = min(self._timers.values())
189
190
191
192 delay = max(0, when - util.now(self._reactor))
193 if self._wakeup_timer:
194 self._wakeup_timer.reset(delay)
195 else:
196 self._wakeup_timer = self._reactor.callLater(delay, self._wakeup)
197
199 self._wakeup_timer = None
200 self._mark_runnable(run_everything=False)
201
202 -class Loop(LoopBase):
206
207 - def add(self, processor):
208 self.processors.add(processor)
209
211 self.processors.remove(processor)
212
214 return self.processors.copy()
215
217 - def __init__(self, get_processors_function):
220
222 """I am a Loop which gets my processors from my service children. When I
223 run, I iterate over each of them, invoking their 'run' method."""
224
226 return [child.run for child in self]
227