Package buildbot :: Package changes :: Module gerritchangesource
[frames] | no frames]

Source Code for Module buildbot.changes.gerritchangesource

  1  # This file is part of Buildbot.  Buildbot is free software: you can 
  2  # redistribute it and/or modify it under the terms of the GNU General Public 
  3  # License as published by the Free Software Foundation, version 2. 
  4  # 
  5  # This program is distributed in the hope that it will be useful, but WITHOUT 
  6  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 
  8  # details. 
  9  # 
 10  # You should have received a copy of the GNU General Public License along with 
 11  # this program; if not, write to the Free Software Foundation, Inc., 51 
 12  # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 13  # 
 14  # Copyright Buildbot Team Members 
 15   
 16  from twisted.internet import reactor 
 17   
 18  from buildbot.changes import base 
 19  from buildbot.util import json 
 20  from buildbot import util 
 21  from twisted.python import log 
 22  from twisted.internet import defer 
 23  from twisted.internet.protocol import ProcessProtocol 
24 25 -class GerritChangeSource(base.ChangeSource):
26 """This source will maintain a connection to gerrit ssh server 27 that will provide us gerrit events in json format.""" 28 29 compare_attrs = ["gerritserver", "gerritport"] 30 31 STREAM_GOOD_CONNECTION_TIME = 120 32 "(seconds) connections longer than this are considered good, and reset the backoff timer" 33 34 STREAM_BACKOFF_MIN = 0.5 35 "(seconds) minimum, but nonzero, time to wait before retrying a failed connection" 36 37 STREAM_BACKOFF_EXPONENT = 1.5 38 "multiplier used to increase the backoff from MIN to MAX on repeated failures" 39 40 STREAM_BACKOFF_MAX = 60 41 "(seconds) maximum time to wait before retrying a failed connection" 42
43 - def __init__(self, gerritserver, username, gerritport=29418, identity_file=None):
44 """ 45 @type gerritserver: string 46 @param gerritserver: the dns or ip that host the gerrit ssh server, 47 48 @type gerritport: int 49 @param gerritport: the port of the gerrit ssh server, 50 51 @type username: string 52 @param username: the username to use to connect to gerrit, 53 54 @type identity_file: string 55 @param identity_file: identity file to for authentication (optional). 56 57 """ 58 # TODO: delete API comment when documented 59 60 self.gerritserver = gerritserver 61 self.gerritport = gerritport 62 self.username = username 63 self.identity_file = identity_file 64 self.process = None 65 self.wantProcess = False 66 self.streamProcessTimeout = self.STREAM_BACKOFF_MIN
67
68 - class LocalPP(ProcessProtocol):
69 - def __init__(self, change_source):
70 self.change_source = change_source 71 self.data = ""
72 73 @defer.deferredGenerator
74 - def outReceived(self, data):
75 """Do line buffering.""" 76 self.data += data 77 lines = self.data.split("\n") 78 self.data = lines.pop(-1) # last line is either empty or incomplete 79 for line in lines: 80 log.msg("gerrit: %s" % (line,)) 81 d = self.change_source.lineReceived(line) 82 wfd = defer.waitForDeferred(d) 83 yield wfd 84 wfd.getResult()
85
86 - def errReceived(self, data):
87 log.msg("gerrit stderr: %s" % (data,))
88
89 - def processEnded(self, status_object):
90 self.change_source.streamProcessStopped()
91
92 - def lineReceived(self, line):
93 try: 94 event = json.loads(line.decode('utf-8')) 95 except ValueError: 96 log.msg("bad json line: %s" % (line,)) 97 return defer.succeed(None) 98 99 if not(type(event) == type({}) and "type" in event): 100 log.msg("no type in event %s" % (line,)) 101 return defer.succeed(None) 102 func = getattr(self, "eventReceived_"+event["type"].replace("-","_"), None) 103 if func == None: 104 log.msg("unsupported event %s" % (event["type"],)) 105 return defer.succeed(None) 106 107 # flatten the event dictionary, for easy access with WithProperties 108 def flatten(event, base, d): 109 for k, v in d.items(): 110 if type(v) == dict: 111 flatten(event, base + "." + k, v) 112 else: # already there 113 event[base + "." + k] = v
114 115 properties = {} 116 flatten(properties, "event", event) 117 return func(properties,event)
118 - def addChange(self, chdict):
119 d = self.master.addChange(**chdict) 120 # eat failures.. 121 d.addErrback(log.err, 'error adding change from GerritChangeSource') 122 return d
123 - def eventReceived_patchset_created(self, properties, event):
124 change = event["change"] 125 return self.addChange(dict( 126 author="%s <%s>" % (change["owner"]["name"], change["owner"]["email"]), 127 project=change["project"], 128 repository="ssh://%s@%s:%s/%s" % ( 129 self.username, self.gerritserver, self.gerritport, change["project"]), 130 branch=change["branch"]+"/"+change["number"], 131 revision=event["patchSet"]["revision"], 132 revlink=change["url"], 133 comments=change["subject"], 134 files=["unknown"], 135 category=event["type"], 136 properties=properties))
137 - def eventReceived_ref_updated(self, properties, event):
138 ref = event["refUpdate"] 139 author = "gerrit" 140 141 if "submitter" in event: 142 author="%s <%s>" % (event["submitter"]["name"], event["submitter"]["email"]) 143 144 return self.addChange(dict( 145 author=author, 146 project=ref["project"], 147 repository="ssh://%s@%s:%s/%s" % ( 148 self.username, self.gerritserver, self.gerritport, ref["project"]), 149 branch=ref["refName"], 150 revision=ref["newRev"], 151 comments="Gerrit: patchset(s) merged.", 152 files=["unknown"], 153 category=event["type"], 154 properties=properties))
155
156 - def streamProcessStopped(self):
157 self.process = None 158 159 # if the service is stopped, don't try to restart the process 160 if not self.wantProcess: 161 log.msg("service is not running; not reconnecting") 162 return 163 164 now = util.now() 165 if now - self.lastStreamProcessStart < self.STREAM_GOOD_CONNECTION_TIME: 166 # bad startup; start the stream process again after a timeout, and then 167 # increase the timeout 168 log.msg("'gerrit stream-events' failed; restarting after %ds" % round(self.streamProcessTimeout)) 169 reactor.callLater(self.streamProcessTimeout, self.startStreamProcess) 170 self.streamProcessTimeout *= self.STREAM_BACKOFF_EXPONENT 171 if self.streamProcessTimeout > self.STREAM_BACKOFF_MAX: 172 self.streamProcessTimeout = self.STREAM_BACKOFF_MAX 173 else: 174 # good startup, but lost connection; restart immediately, and set the timeout 175 # to its minimum 176 self.startStreamProcess() 177 self.streamProcessTimeout = self.STREAM_BACKOFF_MIN
178
179 - def startStreamProcess(self):
180 log.msg("starting 'gerrit stream-events'") 181 self.lastStreamProcessStart = util.now() 182 args = [ self.username+"@"+self.gerritserver,"-p", str(self.gerritport)] 183 if self.identity_file is not None: 184 args = args + [ '-i', self.identity_file ] 185 self.process = reactor.spawnProcess(self.LocalPP(self), "ssh", 186 [ "ssh" ] + args + [ "gerrit", "stream-events" ])
187
188 - def startService(self):
189 self.wantProcess = True 190 self.startStreamProcess()
191
192 - def stopService(self):
193 self.wantProcess = False 194 if self.process: 195 self.process.signalProcess("KILL") 196 # TODO: if this occurs while the process is restarting, some exceptions may 197 # be logged, although things will settle down normally 198 return base.ChangeSource.stopService(self)
199
200 - def describe(self):
201 status = "" 202 if not self.process: 203 status = "[NOT CONNECTED - check log]" 204 str = ('GerritChangeSource watching the remote Gerrit repository %s@%s %s' % 205 (self.username, self.gerritserver, status)) 206 return str
207