Package buildbot :: Package changes :: Module gerritchangesource
[frames] | no frames]

Source Code for Module buildbot.changes.gerritchangesource

  1  # This file is part of Buildbot.  Buildbot is free software: you can 
  2  # redistribute it and/or modify it under the terms of the GNU General Public 
  3  # License as published by the Free Software Foundation, version 2. 
  4  # 
  5  # This program is distributed in the hope that it will be useful, but WITHOUT 
  6  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 
  8  # details. 
  9  # 
 10  # You should have received a copy of the GNU General Public License along with 
 11  # this program; if not, write to the Free Software Foundation, Inc., 51 
 12  # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 13  # 
 14  # Copyright Buildbot Team Members 
 15   
 16  from twisted.internet import reactor 
 17   
 18  from buildbot.changes import base 
 19  from buildbot.util import json 
 20  from buildbot import util 
 21  from twisted.python import log 
 22  from twisted.internet import defer 
 23  from twisted.internet.protocol import ProcessProtocol 
24 25 -class GerritChangeSource(base.ChangeSource):
26 """This source will maintain a connection to gerrit ssh server 27 that will provide us gerrit events in json format.""" 28 29 compare_attrs = ["gerritserver", "gerritport"] 30 31 STREAM_GOOD_CONNECTION_TIME = 120 32 "(seconds) connections longer than this are considered good, and reset the backoff timer" 33 34 STREAM_BACKOFF_MIN = 0.5 35 "(seconds) minimum, but nonzero, time to wait before retrying a failed connection" 36 37 STREAM_BACKOFF_EXPONENT = 1.5 38 "multiplier used to increase the backoff from MIN to MAX on repeated failures" 39 40 STREAM_BACKOFF_MAX = 60 41 "(seconds) maximum time to wait before retrying a failed connection" 42
43 - def __init__(self, gerritserver, username, gerritport=29418, identity_file=None):
44 """ 45 @type gerritserver: string 46 @param gerritserver: the dns or ip that host the gerrit ssh server, 47 48 @type gerritport: int 49 @param gerritport: the port of the gerrit ssh server, 50 51 @type username: string 52 @param username: the username to use to connect to gerrit, 53 54 @type identity_file: string 55 @param identity_file: identity file to for authentication (optional). 56 57 """ 58 # TODO: delete API comment when documented 59 60 self.gerritserver = gerritserver 61 self.gerritport = gerritport 62 self.username = username 63 self.identity_file = identity_file 64 self.process = None 65 self.wantProcess = False 66 self.streamProcessTimeout = self.STREAM_BACKOFF_MIN
67
68 - class LocalPP(ProcessProtocol):
69 - def __init__(self, change_source):
70 self.change_source = change_source 71 self.data = ""
72 73 @defer.inlineCallbacks
74 - def outReceived(self, data):
75 """Do line buffering.""" 76 self.data += data 77 lines = self.data.split("\n") 78 self.data = lines.pop(-1) # last line is either empty or incomplete 79 for line in lines: 80 log.msg("gerrit: %s" % (line,)) 81 yield self.change_source.lineReceived(line)
82
83 - def errReceived(self, data):
84 log.msg("gerrit stderr: %s" % (data,))
85
86 - def processEnded(self, status_object):
87 self.change_source.streamProcessStopped()
88
89 - def lineReceived(self, line):
90 try: 91 event = json.loads(line.decode('utf-8')) 92 except ValueError: 93 log.msg("bad json line: %s" % (line,)) 94 return defer.succeed(None) 95 96 if not(type(event) == type({}) and "type" in event): 97 log.msg("no type in event %s" % (line,)) 98 return defer.succeed(None) 99 func = getattr(self, "eventReceived_"+event["type"].replace("-","_"), None) 100 if func == None: 101 log.msg("unsupported event %s" % (event["type"],)) 102 return defer.succeed(None) 103 104 # flatten the event dictionary, for easy access with WithProperties 105 def flatten(event, base, d): 106 for k, v in d.items(): 107 if type(v) == dict: 108 flatten(event, base + "." + k, v) 109 else: # already there 110 event[base + "." + k] = v
111 112 properties = {} 113 flatten(properties, "event", event) 114 return func(properties,event)
115 - def addChange(self, chdict):
116 d = self.master.addChange(**chdict) 117 # eat failures.. 118 d.addErrback(log.err, 'error adding change from GerritChangeSource') 119 return d
120 - def eventReceived_patchset_created(self, properties, event):
121 change = event["change"] 122 return self.addChange(dict( 123 author="%s <%s>" % (change["owner"]["name"], change["owner"]["email"]), 124 project=change["project"], 125 repository="ssh://%s@%s:%s/%s" % ( 126 self.username, self.gerritserver, self.gerritport, change["project"]), 127 branch=change["branch"]+"/"+change["number"], 128 revision=event["patchSet"]["revision"], 129 revlink=change["url"], 130 comments=change["subject"], 131 files=["unknown"], 132 category=event["type"], 133 properties=properties))
134 - def eventReceived_ref_updated(self, properties, event):
135 ref = event["refUpdate"] 136 author = "gerrit" 137 138 if "submitter" in event: 139 author="%s <%s>" % (event["submitter"]["name"], event["submitter"]["email"]) 140 141 return self.addChange(dict( 142 author=author, 143 project=ref["project"], 144 repository="ssh://%s@%s:%s/%s" % ( 145 self.username, self.gerritserver, self.gerritport, ref["project"]), 146 branch=ref["refName"], 147 revision=ref["newRev"], 148 comments="Gerrit: patchset(s) merged.", 149 files=["unknown"], 150 category=event["type"], 151 properties=properties))
152
153 - def streamProcessStopped(self):
154 self.process = None 155 156 # if the service is stopped, don't try to restart the process 157 if not self.wantProcess: 158 log.msg("service is not running; not reconnecting") 159 return 160 161 now = util.now() 162 if now - self.lastStreamProcessStart < self.STREAM_GOOD_CONNECTION_TIME: 163 # bad startup; start the stream process again after a timeout, and then 164 # increase the timeout 165 log.msg("'gerrit stream-events' failed; restarting after %ds" % round(self.streamProcessTimeout)) 166 reactor.callLater(self.streamProcessTimeout, self.startStreamProcess) 167 self.streamProcessTimeout *= self.STREAM_BACKOFF_EXPONENT 168 if self.streamProcessTimeout > self.STREAM_BACKOFF_MAX: 169 self.streamProcessTimeout = self.STREAM_BACKOFF_MAX 170 else: 171 # good startup, but lost connection; restart immediately, and set the timeout 172 # to its minimum 173 self.startStreamProcess() 174 self.streamProcessTimeout = self.STREAM_BACKOFF_MIN
175
176 - def startStreamProcess(self):
177 log.msg("starting 'gerrit stream-events'") 178 self.lastStreamProcessStart = util.now() 179 args = [ self.username+"@"+self.gerritserver,"-p", str(self.gerritport)] 180 if self.identity_file is not None: 181 args = args + [ '-i', self.identity_file ] 182 self.process = reactor.spawnProcess(self.LocalPP(self), "ssh", 183 [ "ssh" ] + args + [ "gerrit", "stream-events" ])
184
185 - def startService(self):
186 self.wantProcess = True 187 self.startStreamProcess()
188
189 - def stopService(self):
190 self.wantProcess = False 191 if self.process: 192 self.process.signalProcess("KILL") 193 # TODO: if this occurs while the process is restarting, some exceptions may 194 # be logged, although things will settle down normally 195 return base.ChangeSource.stopService(self)
196
197 - def describe(self):
198 status = "" 199 if not self.process: 200 status = "[NOT CONNECTED - check log]" 201 str = ('GerritChangeSource watching the remote Gerrit repository %s@%s %s' % 202 (self.username, self.gerritserver, status)) 203 return str
204