Package buildbot :: Package util :: Module loop
[frames] | no frames]

Source Code for Module buildbot.util.loop

  1  # This file is part of Buildbot.  Buildbot is free software: you can 
  2  # redistribute it and/or modify it under the terms of the GNU General Public 
  3  # License as published by the Free Software Foundation, version 2. 
  4  # 
  5  # This program is distributed in the hope that it will be useful, but WITHOUT 
  6  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 
  8  # details. 
  9  # 
 10  # You should have received a copy of the GNU General Public License along with 
 11  # this program; if not, write to the Free Software Foundation, Inc., 51 
 12  # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 13  # 
 14  # Copyright Buildbot Team Members 
 15   
 16  """ 
 17  One-at-a-time notification-triggered Deferred event loop. Each such loop has a 
 18  'doorbell' named trigger() and a set of processing functions.  The processing 
 19  functions are expected to be callables like Scheduler methods, which examine a 
 20  database for work to do. The doorbell will be rung by other code that writes 
 21  into the database (possibly in a separate process). 
 22   
 23  At some point after the doorbell is rung, each function will be run in turn, 
 24  one at a time.  Each function can return a Deferred, and the next function will 
 25  not be run until the previous one's Deferred has fired.  That is, at all times, 
 26  at most one processing function will be active.  
 27   
 28  If the doorbell is rung during a run, the loop will be run again later. 
 29  Multiple rings may be handled by a single run, but the class guarantees that 
 30  there will be at least one full run that begins after the last ring.  The 
 31  relative order of processing functions within a run is not preserved.  If a 
 32  processing function is added to the loop more than once, it will still only be 
 33  called once per run. 
 34   
 35  If the Deferred returned by the processing function fires with a number, the 
 36  event loop will call that function again at or after the given time 
 37  (expressed as seconds since epoch). This can be used by processing functions 
 38  when they want to 'sleep' until some amount of time has passed, such as for a 
 39  Scheduler that is waiting for a tree-stable-timer to expire, or a Periodic 
 40  scheduler that wants to fire once every six hours. This delayed call will 
 41  obey the same one-at-a-time behavior as the run-everything trigger. 
 42   
 43  Each function's return-value-timer value will replace the previous timer. Any 
 44  outstanding timer will be cancelled just before invoking a processing 
 45  function. As a result, these functions should basically be idempotent: if the 
 46  database says that the Scheduler needs to wake up at 5pm, it should keep 
 47  returning '5pm' until it gets called after 5pm, at which point it should 
 48  start returning None. 
 49   
 50  The functions should also add an epsilon (perhaps one second) to their 
 51  desired wakeup time, so that rounding errors or low-resolution system timers 
 52  don't cause 'OCD Alarm Clock Syndrome' (in which they get woken up a moment 
 53  too early and then try to sleep repeatedly for zero seconds). The event loop 
 54  will silently impose a 5-second minimum delay time to avoid this. 
 55   
 56  Any errors in the processing functions are written to log.err and then 
 57  ignored. 
 58  """ 
 59   
 60   
 61  from twisted.internet import reactor, defer 
 62  from twisted.application import service 
 63  from twisted.python import log 
 64   
 65  from buildbot.util.eventual import eventually 
 66  from buildbot import util 
 67   
68 -class LoopBase(service.MultiService):
69 OCD_MINIMUM_DELAY = 5.0 70
71 - def __init__(self):
72 service.MultiService.__init__(self) 73 self._loop_running = False 74 self._everything_needs_to_run = False 75 self._wakeup_timer = None 76 self._timers = {} 77 self._when_quiet_waiters = set() 78 self._start_timer = None 79 self._reactor = reactor # seam for tests to use t.i.t.Clock 80 self._remaining = []
81
82 - def stopService(self):
83 if self._start_timer and self._start_timer.active(): 84 self._start_timer.cancel() 85 if self._wakeup_timer and self._wakeup_timer.active(): 86 self._wakeup_timer.cancel() 87 return service.MultiService.stopService(self)
88
89 - def is_quiet(self):
90 return not self._loop_running
91
92 - def when_quiet(self):
93 d = defer.Deferred() 94 self._when_quiet_waiters.add(d) 95 return d
96
97 - def trigger(self):
98 # if we're triggered while not running, ignore it. We'll automatically 99 # trigger when the service starts 100 if not self.running: 101 print "loop triggered while service disabled; ignoring trigger" 102 return 103 self._mark_runnable(run_everything=True)
104
105 - def _mark_runnable(self, run_everything):
106 if run_everything: 107 self._everything_needs_to_run = True 108 # timers are now redundant, so cancel any existing ones 109 self._timers.clear() 110 self._set_wakeup_timer() 111 if self._loop_running: 112 return 113 self._loop_running = True 114 self._start_timer = self._reactor.callLater(0, self._loop_start)
115
116 - def get_processors(self):
117 raise Exception('subclasses must implement get_processors()')
118
119 - def _loop_start(self):
120 if self._everything_needs_to_run: 121 self._everything_needs_to_run = False 122 self._timers.clear() 123 self._set_wakeup_timer() 124 self._remaining = list(self.get_processors()) 125 else: 126 self._remaining = [] 127 now = util.now(self._reactor) 128 all_processors = self.get_processors() 129 for p in list(self._timers.keys()): 130 if self._timers[p] <= now: 131 del self._timers[p] 132 # don't run a processor that was removed while it still 133 # had a timer running 134 if p in all_processors: 135 self._remaining.append(p) 136 # consider sorting by 'when' 137 self._loop_next()
138
139 - def _loop_next(self):
140 if not self._remaining: 141 return self._loop_done() 142 p = self._remaining.pop(0) 143 self._timers.pop(p, None) 144 now = util.now(self._reactor) 145 d = defer.maybeDeferred(p) 146 d.addCallback(self._set_timer, now, p) 147 d.addErrback(log.err) 148 d.addBoth(self._one_done) 149 return None # no long Deferred chains
150
151 - def _one_done(self, ignored):
152 eventually(self._loop_next)
153
154 - def _loop_done(self):
155 if self._everything_needs_to_run: 156 self._loop_start() 157 return 158 self._loop_running = False 159 self._set_wakeup_timer() 160 if not self._timers: 161 # we're really idle, so notify waiters (used by unit tests) 162 while self._when_quiet_waiters: 163 d = self._when_quiet_waiters.pop() 164 self._reactor.callLater(0, d.callback, None) 165 self.loop_done()
166
167 - def loop_done(self):
168 # this can be overridden by subclasses to do more work when we've 169 # finished a pass through the loop and don't need to immediately 170 # start a new one 171 pass
172
173 - def _set_timer(self, res, now, p):
174 if isinstance(res, (int, float)): 175 assert res > now # give me absolute time, not an interval 176 # don't wake up right away. By doing this here instead of in 177 # _set_wakeup_timer, we avoid penalizing unrelated jobs which 178 # want to wake up a few seconds apart 179 when = max(res, now+self.OCD_MINIMUM_DELAY) 180 self._timers[p] = when
181
182 - def _set_wakeup_timer(self):
183 if not self._timers: 184 if self._wakeup_timer: 185 self._wakeup_timer.cancel() 186 self._wakeup_timer = None 187 return 188 when = min(self._timers.values()) 189 # to avoid waking too frequently, this could be: 190 # delay=max(when-now,OCD_MINIMUM_DELAY) 191 # but that delays unrelated jobs that want to wake few seconds apart 192 delay = max(0, when - util.now(self._reactor)) 193 if self._wakeup_timer: 194 self._wakeup_timer.reset(delay) 195 else: 196 self._wakeup_timer = self._reactor.callLater(delay, self._wakeup)
197
198 - def _wakeup(self):
199 self._wakeup_timer = None 200 self._mark_runnable(run_everything=False)
201
202 -class Loop(LoopBase):
203 - def __init__(self):
204 LoopBase.__init__(self) 205 self.processors = set()
206
207 - def add(self, processor):
208 self.processors.add(processor)
209
210 - def remove(self, processor):
211 self.processors.remove(processor)
212
213 - def get_processors(self):
214 return self.processors.copy()
215
216 -class DelegateLoop(LoopBase):
217 - def __init__(self, get_processors_function):
218 LoopBase.__init__(self) 219 self.get_processors = get_processors_function
220
221 -class MultiServiceLoop(LoopBase):
222 """I am a Loop which gets my processors from my service children. When I 223 run, I iterate over each of them, invoking their 'run' method.""" 224
225 - def get_processors(self):
226 return [child.run for child in self]
227