Package buildbot :: Package util :: Module loop
[frames] | no frames]

Source Code for Module buildbot.util.loop

  1  # ***** BEGIN LICENSE BLOCK ***** 
  2  # Version: MPL 1.1/GPL 2.0/LGPL 2.1 
  3  # 
  4  # The contents of this file are subject to the Mozilla Public License Version 
  5  # 1.1 (the "License"); you may not use this file except in compliance with 
  6  # the License. You may obtain a copy of the License at 
  7  # http://www.mozilla.org/MPL/ 
  8  # 
  9  # Software distributed under the License is distributed on an "AS IS" basis, 
 10  # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License 
 11  # for the specific language governing rights and limitations under the 
 12  # License. 
 13  # 
 14  # The Original Code is Mozilla-specific Buildbot steps. 
 15  # 
 16  # The Initial Developer of the Original Code is 
 17  # Mozilla Foundation. 
 18  # Portions created by the Initial Developer are Copyright (C) 2009 
 19  # the Initial Developer. All Rights Reserved. 
 20  # 
 21  # Contributor(s): 
 22  #   Brian Warner <warner@lothar.com> 
 23  # 
 24  # Alternatively, the contents of this file may be used under the terms of 
 25  # either the GNU General Public License Version 2 or later (the "GPL"), or 
 26  # the GNU Lesser General Public License Version 2.1 or later (the "LGPL"), 
 27  # in which case the provisions of the GPL or the LGPL are applicable instead 
 28  # of those above. If you wish to allow use of your version of this file only 
 29  # under the terms of either the GPL or the LGPL, and not to allow others to 
 30  # use your version of this file under the terms of the MPL, indicate your 
 31  # decision by deleting the provisions above and replace them with the notice 
 32  # and other provisions required by the GPL or the LGPL. If you do not delete 
 33  # the provisions above, a recipient may use your version of this file under 
 34  # the terms of any one of the MPL, the GPL or the LGPL. 
 35  # 
 36  # ***** END LICENSE BLOCK ***** 
 37   
 38  """ 
 39  One-at-a-time notification-triggered Deferred event loop. Each such loop has a 
 40  'doorbell' named trigger() and a set of processing functions.  The processing 
 41  functions are expected to be callables like Scheduler methods, which examine a 
 42  database for work to do. The doorbell will be rung by other code that writes 
 43  into the database (possibly in a separate process). 
 44   
 45  At some point after the doorbell is rung, each function will be run in turn, 
 46  one at a time.  Each function can return a Deferred, and the next function will 
 47  not be run until the previous one's Deferred has fired.  That is, at all times, 
 48  at most one processing function will be active.  
 49   
 50  If the doorbell is rung during a run, the loop will be run again later. 
 51  Multiple rings may be handled by a single run, but the class guarantees that 
 52  there will be at least one full run that begins after the last ring.  The 
 53  relative order of processing functions within a run is not preserved.  If a 
 54  processing function is added to the loop more than once, it will still only be 
 55  called once per run. 
 56   
 57  If the Deferred returned by the processing function fires with a number, the 
 58  event loop will call that function again at or after the given time 
 59  (expressed as seconds since epoch). This can be used by processing functions 
 60  when they want to 'sleep' until some amount of time has passed, such as for a 
 61  Scheduler that is waiting for a tree-stable-timer to expire, or a Periodic 
 62  scheduler that wants to fire once every six hours. This delayed call will 
 63  obey the same one-at-a-time behavior as the run-everything trigger. 
 64   
 65  Each function's return-value-timer value will replace the previous timer. Any 
 66  outstanding timer will be cancelled just before invoking a processing 
 67  function. As a result, these functions should basically be idempotent: if the 
 68  database says that the Scheduler needs to wake up at 5pm, it should keep 
 69  returning '5pm' until it gets called after 5pm, at which point it should 
 70  start returning None. 
 71   
 72  The functions should also add an epsilon (perhaps one second) to their 
 73  desired wakeup time, so that rounding errors or low-resolution system timers 
 74  don't cause 'OCD Alarm Clock Syndrome' (in which they get woken up a moment 
 75  too early and then try to sleep repeatedly for zero seconds). The event loop 
 76  will silently impose a 5-second minimum delay time to avoid this. 
 77   
 78  Any errors in the processing functions are written to log.err and then 
 79  ignored. 
 80  """ 
 81   
 82   
 83  from twisted.internet import reactor, defer 
 84  from twisted.application import service 
 85  from twisted.python import log 
 86   
 87  from buildbot.util.eventual import eventually 
 88  from buildbot import util 
 89   
90 -class LoopBase(service.MultiService):
91 OCD_MINIMUM_DELAY = 5.0 92
93 - def __init__(self):
94 service.MultiService.__init__(self) 95 self._loop_running = False 96 self._everything_needs_to_run = False 97 self._wakeup_timer = None 98 self._timers = {} 99 self._when_quiet_waiters = set() 100 self._start_timer = None 101 self._reactor = reactor # seam for tests to use t.i.t.Clock 102 self._remaining = []
103
104 - def stopService(self):
105 if self._start_timer and self._start_timer.active(): 106 self._start_timer.cancel() 107 if self._wakeup_timer and self._wakeup_timer.active(): 108 self._wakeup_timer.cancel() 109 return service.MultiService.stopService(self)
110
111 - def is_quiet(self):
112 return not self._loop_running
113
114 - def when_quiet(self):
115 d = defer.Deferred() 116 self._when_quiet_waiters.add(d) 117 return d
118
119 - def trigger(self):
120 # if we're triggered while not running, ignore it. We'll automatically 121 # trigger when the service starts 122 if not self.running: 123 print "loop triggered while service disabled; ignoring trigger" 124 return 125 self._mark_runnable(run_everything=True)
126
127 - def _mark_runnable(self, run_everything):
128 if run_everything: 129 self._everything_needs_to_run = True 130 # timers are now redundant, so cancel any existing ones 131 self._timers.clear() 132 self._set_wakeup_timer() 133 if self._loop_running: 134 return 135 self._loop_running = True 136 self._start_timer = self._reactor.callLater(0, self._loop_start)
137
138 - def get_processors(self):
139 raise Exception('subclasses must implement get_processors()')
140
141 - def _loop_start(self):
142 if self._everything_needs_to_run: 143 self._everything_needs_to_run = False 144 self._timers.clear() 145 self._set_wakeup_timer() 146 self._remaining = list(self.get_processors()) 147 else: 148 self._remaining = [] 149 now = util.now(self._reactor) 150 all_processors = self.get_processors() 151 for p in list(self._timers.keys()): 152 if self._timers[p] <= now: 153 del self._timers[p] 154 # don't run a processor that was removed while it still 155 # had a timer running 156 if p in all_processors: 157 self._remaining.append(p) 158 # consider sorting by 'when' 159 self._loop_next()
160
161 - def _loop_next(self):
162 if not self._remaining: 163 return self._loop_done() 164 p = self._remaining.pop(0) 165 self._timers.pop(p, None) 166 now = util.now(self._reactor) 167 d = defer.maybeDeferred(p) 168 d.addCallback(self._set_timer, now, p) 169 d.addErrback(log.err) 170 d.addBoth(self._one_done) 171 return None # no long Deferred chains
172
173 - def _one_done(self, ignored):
174 eventually(self._loop_next)
175
176 - def _loop_done(self):
177 if self._everything_needs_to_run: 178 self._loop_start() 179 return 180 self._loop_running = False 181 self._set_wakeup_timer() 182 if not self._timers: 183 # we're really idle, so notify waiters (used by unit tests) 184 while self._when_quiet_waiters: 185 d = self._when_quiet_waiters.pop() 186 self._reactor.callLater(0, d.callback, None) 187 self.loop_done()
188
189 - def loop_done(self):
190 # this can be overridden by subclasses to do more work when we've 191 # finished a pass through the loop and don't need to immediately 192 # start a new one 193 pass
194
195 - def _set_timer(self, res, now, p):
196 if isinstance(res, (int, float)): 197 assert res > now # give me absolute time, not an interval 198 # don't wake up right away. By doing this here instead of in 199 # _set_wakeup_timer, we avoid penalizing unrelated jobs which 200 # want to wake up a few seconds apart 201 when = max(res, now+self.OCD_MINIMUM_DELAY) 202 self._timers[p] = when
203
204 - def _set_wakeup_timer(self):
205 if not self._timers: 206 if self._wakeup_timer: 207 self._wakeup_timer.cancel() 208 self._wakeup_timer = None 209 return 210 when = min(self._timers.values()) 211 # to avoid waking too frequently, this could be: 212 # delay=max(when-now,OCD_MINIMUM_DELAY) 213 # but that delays unrelated jobs that want to wake few seconds apart 214 delay = max(0, when - util.now(self._reactor)) 215 if self._wakeup_timer: 216 self._wakeup_timer.reset(delay) 217 else: 218 self._wakeup_timer = self._reactor.callLater(delay, self._wakeup)
219
220 - def _wakeup(self):
221 self._wakeup_timer = None 222 self._mark_runnable(run_everything=False)
223
224 -class Loop(LoopBase):
225 - def __init__(self):
226 LoopBase.__init__(self) 227 self.processors = set()
228
229 - def add(self, processor):
230 self.processors.add(processor)
231
232 - def remove(self, processor):
233 self.processors.remove(processor)
234
235 - def get_processors(self):
236 return self.processors.copy()
237
238 -class DelegateLoop(LoopBase):
239 - def __init__(self, get_processors_function):
240 LoopBase.__init__(self) 241 self.get_processors = get_processors_function
242
243 -class MultiServiceLoop(LoopBase):
244 """I am a Loop which gets my processors from my service children. When I 245 run, I iterate over each of them, invoking their 'run' method.""" 246
247 - def get_processors(self):
248 return [child.run for child in self]
249