1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 from twisted.internet import defer
17 from twisted.python import log
18 from buildbot import util, interfaces, config
19 from buildbot.status.results import SUCCESS, WARNINGS
20 from buildbot.schedulers import base
23
24 compare_attrs = base.BaseScheduler.compare_attrs + ('upstream_name',)
25
26 - def __init__(self, name, upstream, builderNames, properties={}):
27 base.BaseScheduler.__init__(self, name, builderNames, properties)
28 if not interfaces.IScheduler.providedBy(upstream):
29 config.error(
30 "upstream must be another Scheduler instance")
31 self.upstream_name = upstream.name
32 self._buildset_addition_subscr = None
33 self._buildset_completion_subscr = None
34 self._cached_upstream_bsids = None
35
36
37
38
39 self._subscription_lock = defer.DeferredLock()
40
50
52 if self._buildset_addition_subscr:
53 self._buildset_addition_subscr.unsubscribe()
54 if self._buildset_completion_subscr:
55 self._buildset_completion_subscr.unsubscribe()
56 self._cached_upstream_bsids = None
57 return defer.succeed(None)
58
59 @util.deferredLocked('_subscription_lock')
61
62
63 submitter = properties.get('scheduler', (None, None))[0]
64 if submitter != self.upstream_name:
65 return
66
67
68 d = self._addUpstreamBuildset(bsid)
69 d.addErrback(log.err, 'while subscribing to buildset %d' % bsid)
70
72 d = self._checkCompletedBuildsets(bsid, result)
73 d.addErrback(log.err, 'while checking for completed buildsets')
74
75 @util.deferredLocked('_subscription_lock')
76 @defer.deferredGenerator
78 wfd = defer.waitForDeferred(
79 self._getUpstreamBuildsets())
80 yield wfd
81 subs = wfd.getResult()
82
83 sub_bsids = []
84 for (sub_bsid, sub_sssetid, sub_complete, sub_results) in subs:
85
86
87 if not sub_complete and sub_bsid != bsid:
88 continue
89
90
91 if sub_results in (SUCCESS, WARNINGS):
92 wfd = defer.waitForDeferred(
93 self.addBuildsetForSourceStamp(setid=sub_sssetid,
94 reason='downstream'))
95 yield wfd
96 wfd.getResult()
97
98 sub_bsids.append(sub_bsid)
99
100
101 wfd = defer.waitForDeferred(
102 self._removeUpstreamBuildsets(sub_bsids))
103 yield wfd
104 wfd.getResult()
105
106 @defer.deferredGenerator
108 if self._cached_upstream_bsids is None:
109 wfd = defer.waitForDeferred(
110 self.master.db.state.getState(self.objectid,
111 'upstream_bsids', []))
112 yield wfd
113 self._cached_upstream_bsids = wfd.getResult()[:]
114
115 @defer.deferredGenerator
117
118
119 wfd = defer.waitForDeferred(
120 self._updateCachedUpstreamBuilds())
121 yield wfd
122 wfd.getResult()
123
124 changed = False
125 rv = []
126 for bsid in self._cached_upstream_bsids[:]:
127 wfd = defer.waitForDeferred(
128 self.master.db.buildsets.getBuildset(bsid))
129 yield wfd
130 bsdict = wfd.getResult()
131 if not bsdict:
132 self._cached_upstream_bsids.remove(bsid)
133 changed = True
134 continue
135
136 rv.append((bsid, bsdict['sourcestampsetid'], bsdict['complete'],
137 bsdict['results']))
138
139 if changed:
140 wfd = defer.waitForDeferred(
141 self.master.db.state.setState(self.objectid,
142 'upstream_bsids', self._cached_upstream_bsids))
143 yield wfd
144 wfd.getResult()
145
146 yield rv
147
148 @defer.deferredGenerator
150 wfd = defer.waitForDeferred(
151 self._updateCachedUpstreamBuilds())
152 yield wfd
153 wfd.getResult()
154
155 if bsid not in self._cached_upstream_bsids:
156 self._cached_upstream_bsids.append(bsid)
157
158 wfd = defer.waitForDeferred(
159 self.master.db.state.setState(self.objectid,
160 'upstream_bsids', self._cached_upstream_bsids))
161 yield wfd
162 wfd.getResult()
163
164 @defer.deferredGenerator
166 wfd = defer.waitForDeferred(
167 self._updateCachedUpstreamBuilds())
168 yield wfd
169 wfd.getResult()
170
171 old = set(self._cached_upstream_bsids)
172 self._cached_upstream_bsids = list(old - set(bsids))
173
174 wfd = defer.waitForDeferred(
175 self.master.db.state.setState(self.objectid,
176 'upstream_bsids', self._cached_upstream_bsids))
177 yield wfd
178 wfd.getResult()
179