1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16  from twisted.internet import defer 
 17  from twisted.python import log 
 18  from buildbot import util, interfaces, config 
 19  from buildbot.status.results import SUCCESS, WARNINGS 
 20  from buildbot.schedulers import base 
 23   
 24      compare_attrs = base.BaseScheduler.compare_attrs + ('upstream_name',) 
 25   
 26 -    def __init__(self, name, upstream, builderNames, properties={}): 
  27          base.BaseScheduler.__init__(self, name, builderNames, properties) 
 28          if not interfaces.IScheduler.providedBy(upstream): 
 29              config.error( 
 30                  "upstream must be another Scheduler instance") 
 31          self.upstream_name = upstream.name 
 32          self._buildset_addition_subscr = None 
 33          self._buildset_completion_subscr = None 
 34          self._cached_upstream_bsids = None 
 35   
 36           
 37           
 38           
 39          self._subscription_lock = defer.DeferredLock() 
  40   
 50   
 52          if self._buildset_addition_subscr: 
 53              self._buildset_addition_subscr.unsubscribe() 
 54          if self._buildset_completion_subscr: 
 55              self._buildset_completion_subscr.unsubscribe() 
 56          self._cached_upstream_bsids = None 
 57          return defer.succeed(None) 
  58   
 59      @util.deferredLocked('_subscription_lock') 
 61           
 62           
 63          submitter = properties.get('scheduler', (None, None))[0] 
 64          if submitter != self.upstream_name: 
 65              return 
 66   
 67           
 68          d = self._addUpstreamBuildset(bsid) 
 69          d.addErrback(log.err, 'while subscribing to buildset %d' % bsid) 
  70   
 72          d = self._checkCompletedBuildsets(bsid, result) 
 73          d.addErrback(log.err, 'while checking for completed buildsets') 
  74   
 75      @util.deferredLocked('_subscription_lock') 
 76      @defer.deferredGenerator 
 78          wfd = defer.waitForDeferred( 
 79              self._getUpstreamBuildsets()) 
 80          yield wfd 
 81          subs = wfd.getResult() 
 82   
 83          sub_bsids = [] 
 84          for (sub_bsid, sub_sssetid, sub_complete, sub_results) in subs: 
 85               
 86               
 87              if not sub_complete and sub_bsid != bsid: 
 88                  continue 
 89   
 90               
 91              if sub_results in (SUCCESS, WARNINGS): 
 92                  wfd = defer.waitForDeferred( 
 93                      self.addBuildsetForSourceStamp(setid=sub_sssetid, 
 94                                                 reason='downstream')) 
 95                  yield wfd 
 96                  wfd.getResult() 
 97   
 98              sub_bsids.append(sub_bsid) 
 99   
100           
101          wfd = defer.waitForDeferred( 
102              self._removeUpstreamBuildsets(sub_bsids)) 
103          yield wfd 
104          wfd.getResult() 
 105   
106      @defer.deferredGenerator 
108          if self._cached_upstream_bsids is None: 
109              wfd = defer.waitForDeferred( 
110                  self.master.db.state.getState(self.objectid, 
111                                          'upstream_bsids', [])) 
112              yield wfd 
113              self._cached_upstream_bsids = wfd.getResult()[:] 
 114   
115      @defer.deferredGenerator 
117           
118           
119          wfd = defer.waitForDeferred( 
120              self._updateCachedUpstreamBuilds()) 
121          yield wfd 
122          wfd.getResult() 
123   
124          changed = False 
125          rv = [] 
126          for bsid in self._cached_upstream_bsids[:]: 
127              wfd = defer.waitForDeferred( 
128                  self.master.db.buildsets.getBuildset(bsid)) 
129              yield wfd 
130              bsdict = wfd.getResult() 
131              if not bsdict: 
132                  self._cached_upstream_bsids.remove(bsid) 
133                  changed = True 
134                  continue 
135   
136              rv.append((bsid, bsdict['sourcestampsetid'], bsdict['complete'], 
137                  bsdict['results'])) 
138   
139          if changed: 
140              wfd = defer.waitForDeferred( 
141                  self.master.db.state.setState(self.objectid, 
142                                  'upstream_bsids', self._cached_upstream_bsids)) 
143              yield wfd 
144              wfd.getResult() 
145   
146          yield rv 
 147   
148      @defer.deferredGenerator 
150          wfd = defer.waitForDeferred( 
151              self._updateCachedUpstreamBuilds()) 
152          yield wfd 
153          wfd.getResult() 
154   
155          if bsid not in self._cached_upstream_bsids: 
156              self._cached_upstream_bsids.append(bsid) 
157   
158              wfd = defer.waitForDeferred( 
159                  self.master.db.state.setState(self.objectid, 
160                                  'upstream_bsids', self._cached_upstream_bsids)) 
161              yield wfd 
162              wfd.getResult() 
 163   
164      @defer.deferredGenerator 
166          wfd = defer.waitForDeferred( 
167              self._updateCachedUpstreamBuilds()) 
168          yield wfd 
169          wfd.getResult() 
170   
171          old = set(self._cached_upstream_bsids) 
172          self._cached_upstream_bsids = list(old - set(bsids)) 
173   
174          wfd = defer.waitForDeferred( 
175              self.master.db.state.setState(self.objectid, 
176                              'upstream_bsids', self._cached_upstream_bsids)) 
177          yield wfd 
178          wfd.getResult() 
  179