1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16  """ 
 17  One-at-a-time notification-triggered Deferred event loop. Each such loop has a 
 18  'doorbell' named trigger() and a set of processing functions.  The processing 
 19  functions are expected to be callables like Scheduler methods, which examine a 
 20  database for work to do. The doorbell will be rung by other code that writes 
 21  into the database (possibly in a separate process). 
 22   
 23  At some point after the doorbell is rung, each function will be run in turn, 
 24  one at a time.  Each function can return a Deferred, and the next function will 
 25  not be run until the previous one's Deferred has fired.  That is, at all times, 
 26  at most one processing function will be active.  
 27   
 28  If the doorbell is rung during a run, the loop will be run again later. 
 29  Multiple rings may be handled by a single run, but the class guarantees that 
 30  there will be at least one full run that begins after the last ring.  The 
 31  relative order of processing functions within a run is not preserved.  If a 
 32  processing function is added to the loop more than once, it will still only be 
 33  called once per run. 
 34   
 35  If the Deferred returned by the processing function fires with a number, the 
 36  event loop will call that function again at or after the given time 
 37  (expressed as seconds since epoch). This can be used by processing functions 
 38  when they want to 'sleep' until some amount of time has passed, such as for a 
 39  Scheduler that is waiting for a tree-stable-timer to expire, or a Periodic 
 40  scheduler that wants to fire once every six hours. This delayed call will 
 41  obey the same one-at-a-time behavior as the run-everything trigger. 
 42   
 43  Each function's return-value-timer value will replace the previous timer. Any 
 44  outstanding timer will be cancelled just before invoking a processing 
 45  function. As a result, these functions should basically be idempotent: if the 
 46  database says that the Scheduler needs to wake up at 5pm, it should keep 
 47  returning '5pm' until it gets called after 5pm, at which point it should 
 48  start returning None. 
 49   
 50  The functions should also add an epsilon (perhaps one second) to their 
 51  desired wakeup time, so that rounding errors or low-resolution system timers 
 52  don't cause 'OCD Alarm Clock Syndrome' (in which they get woken up a moment 
 53  too early and then try to sleep repeatedly for zero seconds). The event loop 
 54  will silently impose a 5-second minimum delay time to avoid this. 
 55   
 56  Any errors in the processing functions are written to log.err and then 
 57  ignored. 
 58  """ 
 59   
 60   
 61  from twisted.internet import reactor, defer 
 62  from twisted.application import service 
 63  from twisted.python import log 
 64   
 65  from buildbot.util.eventual import eventually 
 66  from buildbot import util 
 67  from buildbot.process.metrics import Timer, countMethod 
 70      OCD_MINIMUM_DELAY = 5.0 
 71   
 73          service.MultiService.__init__(self) 
 74          self._loop_running = False 
 75          self._everything_needs_to_run = False 
 76          self._wakeup_timer = None 
 77          self._timers = {} 
 78          self._when_quiet_waiters = set() 
 79          self._start_timer = None 
 80          self._reactor = reactor  
 81          self._remaining = [] 
  82   
 84          if self._start_timer and self._start_timer.active(): 
 85              self._start_timer.cancel() 
 86          if self._wakeup_timer and self._wakeup_timer.active(): 
 87              self._wakeup_timer.cancel() 
 88          return service.MultiService.stopService(self) 
  89   
 91          return not self._loop_running 
  92   
 94          d = defer.Deferred() 
 95          self._when_quiet_waiters.add(d) 
 96          return d 
  97   
 99           
100           
101          if not self.running: 
102              log.msg("loop triggered while service disabled; ignoring trigger") 
103              return 
104          self._mark_runnable(run_everything=True) 
 105   
107          if run_everything: 
108              self._everything_needs_to_run = True 
109               
110              self._timers.clear() 
111              self._set_wakeup_timer() 
112          if self._loop_running: 
113              return 
114          self._loop_running = True 
115          self._start_timer = self._reactor.callLater(0, self._loop_start) 
 116   
118          raise Exception('subclasses must implement get_processors()') 
 119   
120      _loop_timer = Timer('Loop.run') 
121      @_loop_timer.startTimer 
122      @countMethod('Loop._loop_start()') 
124          if self._everything_needs_to_run: 
125              self._everything_needs_to_run = False 
126              self._timers.clear() 
127              self._set_wakeup_timer() 
128              self._remaining = list(self.get_processors()) 
129          else: 
130              self._remaining = [] 
131              now = util.now(self._reactor) 
132              all_processors = self.get_processors() 
133              for p in list(self._timers.keys()): 
134                  if self._timers[p] <= now: 
135                      del self._timers[p] 
136                       
137                       
138                      if p in all_processors: 
139                          self._remaining.append(p) 
140                   
141          self._loop_next() 
 142   
143      @countMethod('Loop._loop_next()') 
145          if not self._remaining: 
146              return self._loop_done() 
147          p = self._remaining.pop(0) 
148          self._timers.pop(p, None) 
149          now = util.now(self._reactor) 
150          d = defer.maybeDeferred(p) 
151          d.addCallback(self._set_timer, now, p) 
152          d.addErrback(log.err) 
153          d.addBoth(self._one_done) 
154          return None  
 155   
158   
159      @_loop_timer.stopTimer 
161          if self._everything_needs_to_run: 
162              self._loop_start() 
163              return 
164          self._loop_running = False 
165          self._set_wakeup_timer() 
166          if not self._timers: 
167               
168              while self._when_quiet_waiters: 
169                  d = self._when_quiet_waiters.pop() 
170                  self._reactor.callLater(0, d.callback, None) 
171          self.loop_done() 
 172   
178   
187   
189          if not self._timers: 
190              if self._wakeup_timer: 
191                  self._wakeup_timer.cancel() 
192                  self._wakeup_timer = None 
193              return 
194          when = min(self._timers.values()) 
195           
196           
197           
198          delay = max(0, when - util.now(self._reactor)) 
199          if self._wakeup_timer: 
200              self._wakeup_timer.reset(delay) 
201          else: 
202              self._wakeup_timer = self._reactor.callLater(delay, self._wakeup) 
 203   
205          self._wakeup_timer = None 
206          self._mark_runnable(run_everything=False) 
  207   
208 -class Loop(LoopBase): 
 212   
213 -    def add(self, processor): 
 214          self.processors.add(processor) 
 215   
217          self.processors.remove(processor) 
 218   
220          return self.processors.copy() 
  221   
223 -    def __init__(self, get_processors_function): 
  226   
228      """I am a Loop which gets my processors from my service children. When I 
229      run, I iterate over each of them, invoking their 'run' method.""" 
230   
232          return [child.run for child in self] 
  233