Package buildbot :: Package util :: Module loop
[frames] | no frames]

Source Code for Module buildbot.util.loop

  1  # ***** BEGIN LICENSE BLOCK ***** 
  2  # Version: MPL 1.1/GPL 2.0/LGPL 2.1 
  3  # 
  4  # The contents of this file are subject to the Mozilla Public License Version 
  5  # 1.1 (the "License"); you may not use this file except in compliance with 
  6  # the License. You may obtain a copy of the License at 
  7  # http://www.mozilla.org/MPL/ 
  8  # 
  9  # Software distributed under the License is distributed on an "AS IS" basis, 
 10  # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License 
 11  # for the specific language governing rights and limitations under the 
 12  # License. 
 13  # 
 14  # The Original Code is Mozilla-specific Buildbot steps. 
 15  # 
 16  # The Initial Developer of the Original Code is 
 17  # Mozilla Foundation. 
 18  # Portions created by the Initial Developer are Copyright (C) 2009 
 19  # the Initial Developer. All Rights Reserved. 
 20  # 
 21  # Contributor(s): 
 22  #   Brian Warner <warner@lothar.com> 
 23  # 
 24  # Alternatively, the contents of this file may be used under the terms of 
 25  # either the GNU General Public License Version 2 or later (the "GPL"), or 
 26  # the GNU Lesser General Public License Version 2.1 or later (the "LGPL"), 
 27  # in which case the provisions of the GPL or the LGPL are applicable instead 
 28  # of those above. If you wish to allow use of your version of this file only 
 29  # under the terms of either the GPL or the LGPL, and not to allow others to 
 30  # use your version of this file under the terms of the MPL, indicate your 
 31  # decision by deleting the provisions above and replace them with the notice 
 32  # and other provisions required by the GPL or the LGPL. If you do not delete 
 33  # the provisions above, a recipient may use your version of this file under 
 34  # the terms of any one of the MPL, the GPL or the LGPL. 
 35  # 
 36  # ***** END LICENSE BLOCK ***** 
 37   
 38  """ 
 39  One-at-a-time notification-triggered Deferred event loop. Each such loop has a 
 40  'doorbell' named trigger() and a set of processing functions.  The processing 
 41  functions are expected to be callables like Scheduler methods, which examine a 
 42  database for work to do. The doorbell will be rung by other code that writes 
 43  into the database (possibly in a separate process). 
 44   
 45  At some point after the doorbell is rung, each function will be run in turn, 
 46  one at a time.  Each function can return a Deferred, and the next function will 
 47  not be run until the previous one's Deferred has fired.  That is, at all times, 
 48  at most one processing function will be active.  
 49   
 50  If the doorbell is rung during a run, the loop will be run again later. 
 51  Multiple rings may be handled by a single run, but the class guarantees that 
 52  there will be at least one full run that begins after the last ring.  The 
 53  relative order of processing functions within a run is not preserved.  If a 
 54  processing function is added to the loop more than once, it will still only be 
 55  called once per run. 
 56   
 57  If the Deferred returned by the processing function fires with a number, the 
 58  event loop will call that function again at or after the given time 
 59  (expressed as seconds since epoch). This can be used by processing functions 
 60  when they want to 'sleep' until some amount of time has passed, such as for a 
 61  Scheduler that is waiting for a tree-stable-timer to expire, or a Periodic 
 62  scheduler that wants to fire once every six hours. This delayed call will 
 63  obey the same one-at-a-time behavior as the run-everything trigger. 
 64   
 65  Each function's return-value-timer value will replace the previous timer. Any 
 66  outstanding timer will be cancelled just before invoking a processing 
 67  function. As a result, these functions should basically be idempotent: if the 
 68  database says that the Scheduler needs to wake up at 5pm, it should keep 
 69  returning '5pm' until it gets called after 5pm, at which point it should 
 70  start returning None. 
 71   
 72  The functions should also add an epsilon (perhaps one second) to their 
 73  desired wakeup time, so that rounding errors or low-resolution system timers 
 74  don't cause 'OCD Alarm Clock Syndrome' (in which they get woken up a moment 
 75  too early and then try to sleep repeatedly for zero seconds). The event loop 
 76  will silently impose a 5-second minimum delay time to avoid this. 
 77   
 78  Any errors in the processing functions are written to log.err and then 
 79  ignored. 
 80  """ 
 81   
 82  import time 
 83  from twisted.internet import reactor, defer 
 84  from twisted.application import service 
 85  from twisted.python import log 
 86   
 87  from buildbot.util.eventual import eventually 
 88  from buildbot import util 
 89   
90 -class LoopBase(service.MultiService):
91 OCD_MINIMUM_DELAY = 5.0 92
93 - def __init__(self):
94 service.MultiService.__init__(self) 95 self._loop_running = False 96 self._everything_needs_to_run = False 97 self._wakeup_timer = None 98 self._timers = {} 99 self._when_quiet_waiters = set() 100 self._start_timer = None 101 self._reactor = reactor # seam for tests to use t.i.t.Clock
102
103 - def stopService(self):
104 if self._start_timer and self._start_timer.active(): 105 self._start_timer.cancel() 106 if self._wakeup_timer and self._wakeup_timer.active(): 107 self._wakeup_timer.cancel() 108 return service.MultiService.stopService(self)
109
110 - def is_quiet(self):
111 return not self._loop_running
112
113 - def when_quiet(self):
114 d = defer.Deferred() 115 self._when_quiet_waiters.add(d) 116 return d
117
118 - def trigger(self):
119 # if we're triggered while not running, ignore it. We'll automatically 120 # trigger when the service starts 121 if not self.running: 122 print "loop triggered while service disabled; ignoring trigger" 123 return 124 self._mark_runnable(run_everything=True)
125
126 - def _mark_runnable(self, run_everything):
127 if run_everything: 128 self._everything_needs_to_run = True 129 # timers are now redundant, so cancel any existing ones 130 self._timers.clear() ; self._set_wakeup_timer() 131 if self._loop_running: 132 return 133 self._loop_running = True 134 self._start_timer = self._reactor.callLater(0, self._loop_start)
135 136 # subclasses must implement get_processors() 137
138 - def _loop_start(self):
139 if self._everything_needs_to_run: 140 self._everything_needs_to_run = False 141 self._timers.clear() ; self._set_wakeup_timer() 142 self._remaining = list(self.get_processors()) 143 else: 144 self._remaining = [] 145 now = util.now(self._reactor) 146 all_processors = self.get_processors() 147 for p in list(self._timers.keys()): 148 if self._timers[p] <= now: 149 del self._timers[p] 150 # don't run a processor that was removed while it still 151 # had a timer running 152 if p in all_processors: 153 self._remaining.append(p) 154 # consider sorting by 'when' 155 self._loop_next()
156
157 - def _loop_next(self):
158 if not self._remaining: 159 return self._loop_done() 160 p = self._remaining.pop(0) 161 self._timers.pop(p, None) 162 now = util.now(self._reactor) 163 d = defer.maybeDeferred(p) 164 d.addCallback(self._set_timer, now, p) 165 d.addErrback(log.err) 166 d.addBoth(self._one_done) 167 return None # no long Deferred chains
168
169 - def _one_done(self, ignored):
170 eventually(self._loop_next)
171
172 - def _loop_done(self):
173 if self._everything_needs_to_run: 174 self._loop_start() 175 return 176 self._loop_running = False 177 self._set_wakeup_timer() 178 if not self._timers: 179 # we're really idle, so notify waiters (used by unit tests) 180 while self._when_quiet_waiters: 181 d = self._when_quiet_waiters.pop() 182 self._reactor.callLater(0, d.callback, None) 183 self.loop_done()
184
185 - def loop_done(self):
186 # this can be overridden by subclasses to do more work when we've 187 # finished a pass through the loop and don't need to immediately 188 # start a new one 189 pass
190
191 - def _set_timer(self, res, now, p):
192 if isinstance(res, (int, float)): 193 assert res > now # give me absolute time, not an interval 194 # don't wake up right away. By doing this here instead of in 195 # _set_wakeup_timer, we avoid penalizing unrelated jobs which 196 # want to wake up a few seconds apart 197 when = max(res, now+self.OCD_MINIMUM_DELAY) 198 self._timers[p] = when
199
200 - def _set_wakeup_timer(self):
201 if not self._timers: 202 if self._wakeup_timer: 203 self._wakeup_timer.cancel() 204 self._wakeup_timer = None 205 return 206 when = min(self._timers.values()) 207 # to avoid waking too frequently, this could be: 208 # delay=max(when-now,OCD_MINIMUM_DELAY) 209 # but that delays unrelated jobs that want to wake few seconds apart 210 delay = max(0, when - util.now(self._reactor)) 211 if self._wakeup_timer: 212 self._wakeup_timer.reset(delay) 213 else: 214 self._wakeup_timer = self._reactor.callLater(delay, self._wakeup)
215
216 - def _wakeup(self):
217 self._wakeup_timer = None 218 self._mark_runnable(run_everything=False)
219
220 -class Loop(LoopBase):
221 - def __init__(self):
222 LoopBase.__init__(self) 223 self.processors = set()
224
225 - def add(self, processor):
226 self.processors.add(processor)
227
228 - def remove(self, processor):
229 self.processors.remove(processor)
230
231 - def get_processors(self):
232 return self.processors.copy()
233
234 -class DelegateLoop(LoopBase):
235 - def __init__(self, get_processors_function):
236 LoopBase.__init__(self) 237 self.get_processors = get_processors_function
238
239 -class MultiServiceLoop(LoopBase):
240 """I am a Loop which gets my processors from my service children. When I 241 run, I iterate over each of them, invoking their 'run' method.""" 242
243 - def get_processors(self):
244 return [child.run for child in self]
245