Package buildbot :: Package util :: Module loop
[frames] | no frames]

Source Code for Module buildbot.util.loop

  1  # This file is part of Buildbot.  Buildbot is free software: you can 
  2  # redistribute it and/or modify it under the terms of the GNU General Public 
  3  # License as published by the Free Software Foundation, version 2. 
  4  # 
  5  # This program is distributed in the hope that it will be useful, but WITHOUT 
  6  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 
  8  # details. 
  9  # 
 10  # You should have received a copy of the GNU General Public License along with 
 11  # this program; if not, write to the Free Software Foundation, Inc., 51 
 12  # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 13  # 
 14  # Copyright Buildbot Team Members 
 15   
 16  """ 
 17  One-at-a-time notification-triggered Deferred event loop. Each such loop has a 
 18  'doorbell' named trigger() and a set of processing functions.  The processing 
 19  functions are expected to be callables like Scheduler methods, which examine a 
 20  database for work to do. The doorbell will be rung by other code that writes 
 21  into the database (possibly in a separate process). 
 22   
 23  At some point after the doorbell is rung, each function will be run in turn, 
 24  one at a time.  Each function can return a Deferred, and the next function will 
 25  not be run until the previous one's Deferred has fired.  That is, at all times, 
 26  at most one processing function will be active.  
 27   
 28  If the doorbell is rung during a run, the loop will be run again later. 
 29  Multiple rings may be handled by a single run, but the class guarantees that 
 30  there will be at least one full run that begins after the last ring.  The 
 31  relative order of processing functions within a run is not preserved.  If a 
 32  processing function is added to the loop more than once, it will still only be 
 33  called once per run. 
 34   
 35  If the Deferred returned by the processing function fires with a number, the 
 36  event loop will call that function again at or after the given time 
 37  (expressed as seconds since epoch). This can be used by processing functions 
 38  when they want to 'sleep' until some amount of time has passed, such as for a 
 39  Scheduler that is waiting for a tree-stable-timer to expire, or a Periodic 
 40  scheduler that wants to fire once every six hours. This delayed call will 
 41  obey the same one-at-a-time behavior as the run-everything trigger. 
 42   
 43  Each function's return-value-timer value will replace the previous timer. Any 
 44  outstanding timer will be cancelled just before invoking a processing 
 45  function. As a result, these functions should basically be idempotent: if the 
 46  database says that the Scheduler needs to wake up at 5pm, it should keep 
 47  returning '5pm' until it gets called after 5pm, at which point it should 
 48  start returning None. 
 49   
 50  The functions should also add an epsilon (perhaps one second) to their 
 51  desired wakeup time, so that rounding errors or low-resolution system timers 
 52  don't cause 'OCD Alarm Clock Syndrome' (in which they get woken up a moment 
 53  too early and then try to sleep repeatedly for zero seconds). The event loop 
 54  will silently impose a 5-second minimum delay time to avoid this. 
 55   
 56  Any errors in the processing functions are written to log.err and then 
 57  ignored. 
 58  """ 
 59   
 60   
 61  from twisted.internet import reactor, defer 
 62  from twisted.application import service 
 63  from twisted.python import log 
 64   
 65  from buildbot.util.eventual import eventually 
 66  from buildbot import util 
 67  from buildbot.process.metrics import Timer, countMethod 
68 69 -class LoopBase(service.MultiService):
70 OCD_MINIMUM_DELAY = 5.0 71
72 - def __init__(self):
73 service.MultiService.__init__(self) 74 self._loop_running = False 75 self._everything_needs_to_run = False 76 self._wakeup_timer = None 77 self._timers = {} 78 self._when_quiet_waiters = set() 79 self._start_timer = None 80 self._reactor = reactor # seam for tests to use t.i.t.Clock 81 self._remaining = []
82
83 - def stopService(self):
84 if self._start_timer and self._start_timer.active(): 85 self._start_timer.cancel() 86 if self._wakeup_timer and self._wakeup_timer.active(): 87 self._wakeup_timer.cancel() 88 return service.MultiService.stopService(self)
89
90 - def is_quiet(self):
91 return not self._loop_running
92
93 - def when_quiet(self):
94 d = defer.Deferred() 95 self._when_quiet_waiters.add(d) 96 return d
97
98 - def trigger(self):
99 # if we're triggered while not running, ignore it. We'll automatically 100 # trigger when the service starts 101 if not self.running: 102 log.msg("loop triggered while service disabled; ignoring trigger") 103 return 104 self._mark_runnable(run_everything=True)
105
106 - def _mark_runnable(self, run_everything):
107 if run_everything: 108 self._everything_needs_to_run = True 109 # timers are now redundant, so cancel any existing ones 110 self._timers.clear() 111 self._set_wakeup_timer() 112 if self._loop_running: 113 return 114 self._loop_running = True 115 self._start_timer = self._reactor.callLater(0, self._loop_start)
116
117 - def get_processors(self):
118 raise Exception('subclasses must implement get_processors()')
119 120 _loop_timer = Timer('Loop.run') 121 @_loop_timer.startTimer 122 @countMethod('Loop._loop_start()')
123 - def _loop_start(self):
124 if self._everything_needs_to_run: 125 self._everything_needs_to_run = False 126 self._timers.clear() 127 self._set_wakeup_timer() 128 self._remaining = list(self.get_processors()) 129 else: 130 self._remaining = [] 131 now = util.now(self._reactor) 132 all_processors = self.get_processors() 133 for p in list(self._timers.keys()): 134 if self._timers[p] <= now: 135 del self._timers[p] 136 # don't run a processor that was removed while it still 137 # had a timer running 138 if p in all_processors: 139 self._remaining.append(p) 140 # consider sorting by 'when' 141 self._loop_next()
142 143 @countMethod('Loop._loop_next()')
144 - def _loop_next(self):
145 if not self._remaining: 146 return self._loop_done() 147 p = self._remaining.pop(0) 148 self._timers.pop(p, None) 149 now = util.now(self._reactor) 150 d = defer.maybeDeferred(p) 151 d.addCallback(self._set_timer, now, p) 152 d.addErrback(log.err) 153 d.addBoth(self._one_done) 154 return None # no long Deferred chains
155
156 - def _one_done(self, ignored):
157 eventually(self._loop_next)
158 159 @_loop_timer.stopTimer
160 - def _loop_done(self):
161 if self._everything_needs_to_run: 162 self._loop_start() 163 return 164 self._loop_running = False 165 self._set_wakeup_timer() 166 if not self._timers: 167 # we're really idle, so notify waiters (used by unit tests) 168 while self._when_quiet_waiters: 169 d = self._when_quiet_waiters.pop() 170 self._reactor.callLater(0, d.callback, None) 171 self.loop_done()
172
173 - def loop_done(self):
174 # this can be overridden by subclasses to do more work when we've 175 # finished a pass through the loop and don't need to immediately 176 # start a new one 177 pass
178
179 - def _set_timer(self, res, now, p):
180 if isinstance(res, (int, float)): 181 assert res > now # give me absolute time, not an interval 182 # don't wake up right away. By doing this here instead of in 183 # _set_wakeup_timer, we avoid penalizing unrelated jobs which 184 # want to wake up a few seconds apart 185 when = max(res, now+self.OCD_MINIMUM_DELAY) 186 self._timers[p] = when
187
188 - def _set_wakeup_timer(self):
189 if not self._timers: 190 if self._wakeup_timer: 191 self._wakeup_timer.cancel() 192 self._wakeup_timer = None 193 return 194 when = min(self._timers.values()) 195 # to avoid waking too frequently, this could be: 196 # delay=max(when-now,OCD_MINIMUM_DELAY) 197 # but that delays unrelated jobs that want to wake few seconds apart 198 delay = max(0, when - util.now(self._reactor)) 199 if self._wakeup_timer: 200 self._wakeup_timer.reset(delay) 201 else: 202 self._wakeup_timer = self._reactor.callLater(delay, self._wakeup)
203
204 - def _wakeup(self):
205 self._wakeup_timer = None 206 self._mark_runnable(run_everything=False)
207
208 -class Loop(LoopBase):
209 - def __init__(self):
210 LoopBase.__init__(self) 211 self.processors = set()
212
213 - def add(self, processor):
214 self.processors.add(processor)
215
216 - def remove(self, processor):
217 self.processors.remove(processor)
218
219 - def get_processors(self):
220 return self.processors.copy()
221
222 -class DelegateLoop(LoopBase):
223 - def __init__(self, get_processors_function):
224 LoopBase.__init__(self) 225 self.get_processors = get_processors_function
226
227 -class MultiServiceLoop(LoopBase):
228 """I am a Loop which gets my processors from my service children. When I 229 run, I iterate over each of them, invoking their 'run' method.""" 230
231 - def get_processors(self):
232 return [child.run for child in self]
233