1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 from twisted.internet import reactor
17
18 from buildbot.changes import base
19 from buildbot.util import json
20 from buildbot import util
21 from twisted.python import log
22 from twisted.internet import defer
23 from twisted.internet.protocol import ProcessProtocol
26 """This source will maintain a connection to gerrit ssh server
27 that will provide us gerrit events in json format."""
28
29 compare_attrs = ["gerritserver", "gerritport"]
30
31 STREAM_GOOD_CONNECTION_TIME = 120
32 "(seconds) connections longer than this are considered good, and reset the backoff timer"
33
34 STREAM_BACKOFF_MIN = 0.5
35 "(seconds) minimum, but nonzero, time to wait before retrying a failed connection"
36
37 STREAM_BACKOFF_EXPONENT = 1.5
38 "multiplier used to increase the backoff from MIN to MAX on repeated failures"
39
40 STREAM_BACKOFF_MAX = 60
41 "(seconds) maximum time to wait before retrying a failed connection"
42
43 - def __init__(self, gerritserver, username, gerritport=29418, identity_file=None):
44 """
45 @type gerritserver: string
46 @param gerritserver: the dns or ip that host the gerrit ssh server,
47
48 @type gerritport: int
49 @param gerritport: the port of the gerrit ssh server,
50
51 @type username: string
52 @param username: the username to use to connect to gerrit,
53
54 @type identity_file: string
55 @param identity_file: identity file to for authentication (optional).
56
57 """
58
59
60 self.gerritserver = gerritserver
61 self.gerritport = gerritport
62 self.username = username
63 self.identity_file = identity_file
64 self.process = None
65 self.wantProcess = False
66 self.streamProcessTimeout = self.STREAM_BACKOFF_MIN
67
70 self.change_source = change_source
71 self.data = ""
72
73 @defer.deferredGenerator
75 """Do line buffering."""
76 self.data += data
77 lines = self.data.split("\n")
78 self.data = lines.pop(-1)
79 for line in lines:
80 log.msg("gerrit: %s" % (line,))
81 d = self.change_source.lineReceived(line)
82 wfd = defer.waitForDeferred(d)
83 yield wfd
84 wfd.getResult()
85
87 log.msg("gerrit stderr: %s" % (data,))
88
91
93 try:
94 event = json.loads(line.decode('utf-8'))
95 except ValueError:
96 log.msg("bad json line: %s" % (line,))
97 return defer.succeed(None)
98
99 if not(type(event) == type({}) and "type" in event):
100 log.msg("no type in event %s" % (line,))
101 return defer.succeed(None)
102 func = getattr(self, "eventReceived_"+event["type"].replace("-","_"), None)
103 if func == None:
104 log.msg("unsupported event %s" % (event["type"],))
105 return defer.succeed(None)
106
107
108 def flatten(event, base, d):
109 for k, v in d.items():
110 if type(v) == dict:
111 flatten(event, base + "." + k, v)
112 else:
113 event[base + "." + k] = v
114
115 properties = {}
116 flatten(properties, "event", event)
117 return func(properties,event)
119 d = self.master.addChange(**chdict)
120
121 d.addErrback(log.err, 'error adding change from GerritChangeSource')
122 return d
124 change = event["change"]
125 return self.addChange(dict(
126 author="%s <%s>" % (change["owner"]["name"], change["owner"]["email"]),
127 project=change["project"],
128 repository="ssh://%s@%s:%s/%s" % (
129 self.username, self.gerritserver, self.gerritport, change["project"]),
130 branch=change["branch"]+"/"+change["number"],
131 revision=event["patchSet"]["revision"],
132 revlink=change["url"],
133 comments=change["subject"],
134 files=["unknown"],
135 category=event["type"],
136 properties=properties))
138 ref = event["refUpdate"]
139 author = "gerrit"
140
141 if "submitter" in event:
142 author="%s <%s>" % (event["submitter"]["name"], event["submitter"]["email"])
143
144 return self.addChange(dict(
145 author=author,
146 project=ref["project"],
147 repository="ssh://%s@%s:%s/%s" % (
148 self.username, self.gerritserver, self.gerritport, ref["project"]),
149 branch=ref["refName"],
150 revision=ref["newRev"],
151 comments="Gerrit: patchset(s) merged.",
152 files=["unknown"],
153 category=event["type"],
154 properties=properties))
155
178
180 log.msg("starting 'gerrit stream-events'")
181 self.lastStreamProcessStart = util.now()
182 args = [ self.username+"@"+self.gerritserver,"-p", str(self.gerritport)]
183 if self.identity_file is not None:
184 args = args + [ '-i', self.identity_file ]
185 self.process = reactor.spawnProcess(self.LocalPP(self), "ssh",
186 [ "ssh" ] + args + [ "gerrit", "stream-events" ])
187
191
199
201 status = ""
202 if not self.process:
203 status = "[NOT CONNECTED - check log]"
204 str = ('GerritChangeSource watching the remote Gerrit repository %s@%s %s' %
205 (self.username, self.gerritserver, status))
206 return str
207