1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 from twisted.internet import reactor
17
18 from buildbot.changes import base
19 from buildbot.util import json
20 from buildbot import util
21 from twisted.python import log
22 from twisted.internet import defer
23 from twisted.internet.protocol import ProcessProtocol
26 """This source will maintain a connection to gerrit ssh server
27 that will provide us gerrit events in json format."""
28
29 compare_attrs = ["gerritserver", "gerritport"]
30
31 STREAM_GOOD_CONNECTION_TIME = 120
32 "(seconds) connections longer than this are considered good, and reset the backoff timer"
33
34 STREAM_BACKOFF_MIN = 0.5
35 "(seconds) minimum, but nonzero, time to wait before retrying a failed connection"
36
37 STREAM_BACKOFF_EXPONENT = 1.5
38 "multiplier used to increase the backoff from MIN to MAX on repeated failures"
39
40 STREAM_BACKOFF_MAX = 60
41 "(seconds) maximum time to wait before retrying a failed connection"
42
43 - def __init__(self, gerritserver, username, gerritport=29418, identity_file=None):
44 """
45 @type gerritserver: string
46 @param gerritserver: the dns or ip that host the gerrit ssh server,
47
48 @type gerritport: int
49 @param gerritport: the port of the gerrit ssh server,
50
51 @type username: string
52 @param username: the username to use to connect to gerrit,
53
54 @type identity_file: string
55 @param identity_file: identity file to for authentication (optional).
56
57 """
58
59
60 self.gerritserver = gerritserver
61 self.gerritport = gerritport
62 self.username = username
63 self.identity_file = identity_file
64 self.process = None
65 self.wantProcess = False
66 self.streamProcessTimeout = self.STREAM_BACKOFF_MIN
67
70 self.change_source = change_source
71 self.data = ""
72
73 @defer.inlineCallbacks
75 """Do line buffering."""
76 self.data += data
77 lines = self.data.split("\n")
78 self.data = lines.pop(-1)
79 for line in lines:
80 log.msg("gerrit: %s" % (line,))
81 yield self.change_source.lineReceived(line)
82
84 log.msg("gerrit stderr: %s" % (data,))
85
88
90 try:
91 event = json.loads(line.decode('utf-8'))
92 except ValueError:
93 log.msg("bad json line: %s" % (line,))
94 return defer.succeed(None)
95
96 if not(type(event) == type({}) and "type" in event):
97 log.msg("no type in event %s" % (line,))
98 return defer.succeed(None)
99 func = getattr(self, "eventReceived_"+event["type"].replace("-","_"), None)
100 if func == None:
101 log.msg("unsupported event %s" % (event["type"],))
102 return defer.succeed(None)
103
104
105 def flatten(event, base, d):
106 for k, v in d.items():
107 if type(v) == dict:
108 flatten(event, base + "." + k, v)
109 else:
110 event[base + "." + k] = v
111
112 properties = {}
113 flatten(properties, "event", event)
114 return func(properties,event)
116 d = self.master.addChange(**chdict)
117
118 d.addErrback(log.err, 'error adding change from GerritChangeSource')
119 return d
121 change = event["change"]
122 return self.addChange(dict(
123 author="%s <%s>" % (change["owner"]["name"], change["owner"]["email"]),
124 project=change["project"],
125 repository="ssh://%s@%s:%s/%s" % (
126 self.username, self.gerritserver, self.gerritport, change["project"]),
127 branch=change["branch"]+"/"+change["number"],
128 revision=event["patchSet"]["revision"],
129 revlink=change["url"],
130 comments=change["subject"],
131 files=["unknown"],
132 category=event["type"],
133 properties=properties))
135 ref = event["refUpdate"]
136 author = "gerrit"
137
138 if "submitter" in event:
139 author="%s <%s>" % (event["submitter"]["name"], event["submitter"]["email"])
140
141 return self.addChange(dict(
142 author=author,
143 project=ref["project"],
144 repository="ssh://%s@%s:%s/%s" % (
145 self.username, self.gerritserver, self.gerritport, ref["project"]),
146 branch=ref["refName"],
147 revision=ref["newRev"],
148 comments="Gerrit: patchset(s) merged.",
149 files=["unknown"],
150 category=event["type"],
151 properties=properties))
152
175
177 log.msg("starting 'gerrit stream-events'")
178 self.lastStreamProcessStart = util.now()
179 args = [ self.username+"@"+self.gerritserver,"-p", str(self.gerritport)]
180 if self.identity_file is not None:
181 args = args + [ '-i', self.identity_file ]
182 self.process = reactor.spawnProcess(self.LocalPP(self), "ssh",
183 [ "ssh" ] + args + [ "gerrit", "stream-events" ])
184
188
196
198 status = ""
199 if not self.process:
200 status = "[NOT CONNECTED - check log]"
201 str = ('GerritChangeSource watching the remote Gerrit repository %s@%s %s' %
202 (self.username, self.gerritserver, status))
203 return str
204