1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 from twisted.internet import defer
17 from twisted.python import log
18 from buildbot import util, interfaces, config
19 from buildbot.status.results import SUCCESS, WARNINGS
20 from buildbot.schedulers import base
23
24 compare_attrs = base.BaseScheduler.compare_attrs + ('upstream_name',)
25
26 - def __init__(self, name, upstream, builderNames, properties={}, **kwargs):
27 base.BaseScheduler.__init__(self, name, builderNames, properties,
28 **kwargs)
29 if not interfaces.IScheduler.providedBy(upstream):
30 config.error(
31 "upstream must be another Scheduler instance")
32 self.upstream_name = upstream.name
33 self._buildset_addition_subscr = None
34 self._buildset_completion_subscr = None
35 self._cached_upstream_bsids = None
36
37
38
39
40 self._subscription_lock = defer.DeferredLock()
41
51
53 if self._buildset_addition_subscr:
54 self._buildset_addition_subscr.unsubscribe()
55 if self._buildset_completion_subscr:
56 self._buildset_completion_subscr.unsubscribe()
57 self._cached_upstream_bsids = None
58 return defer.succeed(None)
59
60 @util.deferredLocked('_subscription_lock')
62
63
64 submitter = properties.get('scheduler', (None, None))[0]
65 if submitter != self.upstream_name:
66 return
67
68
69 d = self._addUpstreamBuildset(bsid)
70 d.addErrback(log.err, 'while subscribing to buildset %d' % bsid)
71
73 d = self._checkCompletedBuildsets(bsid, result)
74 d.addErrback(log.err, 'while checking for completed buildsets')
75
76 @util.deferredLocked('_subscription_lock')
77 @defer.inlineCallbacks
79 subs = yield self._getUpstreamBuildsets()
80
81 sub_bsids = []
82 for (sub_bsid, sub_sssetid, sub_complete, sub_results) in subs:
83
84
85 if not sub_complete and sub_bsid != bsid:
86 continue
87
88
89 if sub_results in (SUCCESS, WARNINGS):
90 yield self.addBuildsetForSourceStamp(setid=sub_sssetid,
91 reason='downstream')
92
93 sub_bsids.append(sub_bsid)
94
95
96 yield self._removeUpstreamBuildsets(sub_bsids)
97
98 @defer.inlineCallbacks
100 if self._cached_upstream_bsids is None:
101 bsids = yield self.master.db.state.getState(self.objectid,
102 'upstream_bsids', [])
103 self._cached_upstream_bsids = bsids
104
105 @defer.inlineCallbacks
107
108
109 yield self._updateCachedUpstreamBuilds()
110
111 changed = False
112 rv = []
113 for bsid in self._cached_upstream_bsids[:]:
114 bsdict = yield self.master.db.buildsets.getBuildset(bsid)
115 if not bsdict:
116 self._cached_upstream_bsids.remove(bsid)
117 changed = True
118 continue
119
120 rv.append((bsid, bsdict['sourcestampsetid'], bsdict['complete'],
121 bsdict['results']))
122
123 if changed:
124 yield self.master.db.state.setState(self.objectid,
125 'upstream_bsids', self._cached_upstream_bsids)
126
127 defer.returnValue(rv)
128
129 @defer.inlineCallbacks
131 yield self._updateCachedUpstreamBuilds()
132
133 if bsid not in self._cached_upstream_bsids:
134 self._cached_upstream_bsids.append(bsid)
135
136 yield self.master.db.state.setState(self.objectid,
137 'upstream_bsids', self._cached_upstream_bsids)
138
139 @defer.inlineCallbacks
141 yield self._updateCachedUpstreamBuilds()
142
143 old = set(self._cached_upstream_bsids)
144 self._cached_upstream_bsids = list(old - set(bsids))
145
146 yield self.master.db.state.setState(self.objectid,
147 'upstream_bsids', self._cached_upstream_bsids)
148