1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 from twisted.internet import reactor
17
18 from buildbot.changes import base, changes
19 from buildbot.util import json
20 from buildbot import util
21 from twisted.python import log
22 from twisted.internet.protocol import ProcessProtocol
23
25 """This source will maintain a connection to gerrit ssh server
26 that will provide us gerrit events in json format."""
27
28 compare_attrs = ["gerritserver", "gerritport"]
29
30 parent = None
31
32 STREAM_GOOD_CONNECTION_TIME = 120
33 "(seconds) connections longer than this are considered good, and reset the backoff timer"
34
35 STREAM_BACKOFF_MIN = 0.5
36 "(seconds) minimum, but nonzero, time to wait before retrying a failed connection"
37
38 STREAM_BACKOFF_EXPONENT = 1.5
39 "multiplier used to increase the backoff from MIN to MAX on repeated failures"
40
41 STREAM_BACKOFF_MAX = 60
42 "(seconds) maximum time to wait before retrying a failed connection"
43
44 - def __init__(self, gerritserver, username, gerritport=29418):
45 """
46 @type gerritserver: string
47 @param gerritserver: the dns or ip that host the gerrit ssh server,
48
49 @type gerritport: int
50 @param gerritport: the port of the gerrit ssh server,
51
52 @type username: string
53 @param username: the username to use to connect to gerrit
54
55 """
56
57
58 self.gerritserver = gerritserver
59 self.gerritport = gerritport
60 self.username = username
61 self.process = None
62 self.streamProcessTimeout = self.STREAM_BACKOFF_MIN
63
66 self.change_source = change_source
67 self.data = ""
68
70 """Do line buffering."""
71 self.data += data
72 lines = self.data.split("\n")
73 self.data = lines.pop(-1)
74 for line in lines:
75 log.msg("gerrit: %s" % (line,))
76 self.change_source.lineReceived(line)
77
79 log.msg("gerrit stderr: %s" % (data,))
80
83
85 try:
86 event = json.loads(line)
87 except ValueError:
88 log.msg("bad json line: %s" % (line,))
89 return
90
91 if type(event) == type({}) and "type" in event and event["type"] in ["patchset-created", "ref-updated"]:
92
93 def flatten(event, base, d):
94 for k, v in d.items():
95 if type(v) == dict:
96 flatten(event, base + "." + k, v)
97 else:
98 event[base + "." + k] = v
99
100 properties = {}
101 flatten(properties, "event", event)
102
103 if event["type"] == "patchset-created":
104 change = event["change"]
105 c = changes.Change(who="%s <%s>" % (change["owner"]["name"], change["owner"]["email"]),
106 project=change["project"],
107 branch=change["branch"],
108 revision=event["patchSet"]["revision"],
109 revlink=change["url"],
110 comments=change["subject"],
111 files=["unknown"],
112 category=event["type"],
113 properties=properties)
114 elif event["type"] == "ref-updated":
115 ref = event["refUpdate"]
116 c = changes.Change(who="%s <%s>" % (event["submitter"]["name"], event["submitter"]["email"]),
117 project=ref["project"],
118 branch=ref["refName"],
119 revision=ref["newRev"],
120 comments="Gerrit: patchset(s) merged.",
121 files=["unknown"],
122 category=event["type"],
123 properties=properties)
124 else:
125 return
126
127 self.parent.addChange(c)
128
151
153 log.msg("starting 'gerrit stream-events'")
154 self.lastStreamProcessStart = util.now()
155 self.process = reactor.spawnProcess(self.LocalPP(self), "ssh", ["ssh", self.username+"@"+self.gerritserver,"-p", str(self.gerritport), "gerrit","stream-events"])
156
159
166
168 status = ""
169 if not self.process:
170 status = "[NOT CONNECTED - check log]"
171 str = ('GerritChangeSource watching the remote Gerrit repository %s@%s %s' %
172 (self.username, self.gerritserver, status))
173 return str
174