1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 from twisted.internet import reactor
17
18 from buildbot.changes import base
19 from buildbot.util import json
20 from buildbot import util
21 from twisted.python import log
22 from twisted.internet import defer
23 from twisted.internet.protocol import ProcessProtocol
26 """This source will maintain a connection to gerrit ssh server
27 that will provide us gerrit events in json format."""
28
29 compare_attrs = ["gerritserver", "gerritport"]
30
31 STREAM_GOOD_CONNECTION_TIME = 120
32 "(seconds) connections longer than this are considered good, and reset the backoff timer"
33
34 STREAM_BACKOFF_MIN = 0.5
35 "(seconds) minimum, but nonzero, time to wait before retrying a failed connection"
36
37 STREAM_BACKOFF_EXPONENT = 1.5
38 "multiplier used to increase the backoff from MIN to MAX on repeated failures"
39
40 STREAM_BACKOFF_MAX = 60
41 "(seconds) maximum time to wait before retrying a failed connection"
42
43 - def __init__(self, gerritserver, username, gerritport=29418, identity_file=None):
44 """
45 @type gerritserver: string
46 @param gerritserver: the dns or ip that host the gerrit ssh server,
47
48 @type gerritport: int
49 @param gerritport: the port of the gerrit ssh server,
50
51 @type username: string
52 @param username: the username to use to connect to gerrit,
53
54 @type identity_file: string
55 @param identity_file: identity file to for authentication (optional).
56
57 """
58
59
60 self.gerritserver = gerritserver
61 self.gerritport = gerritport
62 self.username = username
63 self.identity_file = identity_file
64 self.process = None
65 self.streamProcessTimeout = self.STREAM_BACKOFF_MIN
66
69 self.change_source = change_source
70 self.data = ""
71
72 @defer.deferredGenerator
74 """Do line buffering."""
75 self.data += data
76 lines = self.data.split("\n")
77 self.data = lines.pop(-1)
78 for line in lines:
79 log.msg("gerrit: %s" % (line,))
80 d = self.change_source.lineReceived(line)
81 wfd = defer.waitForDeferred(d)
82 yield wfd
83 wfd.getResult()
84
86 log.msg("gerrit stderr: %s" % (data,))
87
90
92 try:
93 event = json.loads(line.decode('utf-8'))
94 except ValueError:
95 log.msg("bad json line: %s" % (line,))
96 return defer.succeed(None)
97
98 if not(type(event) == type({}) and "type" in event):
99 log.msg("no type in event %s" % (line,))
100 return defer.succeed(None)
101 func = getattr(self, "eventReceived_"+event["type"].replace("-","_"), None)
102 if func == None:
103 log.msg("unsupported event %s" % (event["type"],))
104 return defer.succeed(None)
105
106
107 def flatten(event, base, d):
108 for k, v in d.items():
109 if type(v) == dict:
110 flatten(event, base + "." + k, v)
111 else:
112 event[base + "." + k] = v
113
114 properties = {}
115 flatten(properties, "event", event)
116 return func(properties,event)
118 d = self.master.addChange(**chdict)
119
120 d.addErrback(log.err, 'error adding change from GerritChangeSource')
121 return d
123 change = event["change"]
124 return self.addChange(dict(
125 author="%s <%s>" % (change["owner"]["name"], change["owner"]["email"]),
126 project=change["project"],
127 branch=change["branch"]+"/"+change["number"],
128 revision=event["patchSet"]["revision"],
129 revlink=change["url"],
130 comments=change["subject"],
131 files=["unknown"],
132 category=event["type"],
133 properties=properties))
135 ref = event["refUpdate"]
136 author = "gerrit"
137
138 if "submitter" in event:
139 author="%s <%s>" % (event["submitter"]["name"], event["submitter"]["email"])
140
141 return self.addChange(dict(
142 author=author,
143 project=ref["project"],
144 branch=ref["refName"],
145 revision=ref["newRev"],
146 comments="Gerrit: patchset(s) merged.",
147 files=["unknown"],
148 category=event["type"],
149 properties=properties))
150
173
175 log.msg("starting 'gerrit stream-events'")
176 self.lastStreamProcessStart = util.now()
177 args = [ self.username+"@"+self.gerritserver,"-p", str(self.gerritport)]
178 if self.identity_file is not None:
179 args = args + [ '-i', self.identity_file ]
180 self.process = reactor.spawnProcess(self.LocalPP(self), "ssh",
181 [ "ssh" ] + args + [ "gerrit", "stream-events" ])
182
185
192
194 status = ""
195 if not self.process:
196 status = "[NOT CONNECTED - check log]"
197 str = ('GerritChangeSource watching the remote Gerrit repository %s@%s %s' %
198 (self.username, self.gerritserver, status))
199 return str
200