Package buildbot :: Package changes :: Module gerritchangesource
[frames] | no frames]

Source Code for Module buildbot.changes.gerritchangesource

  1  # This file is part of Buildbot.  Buildbot is free software: you can 
  2  # redistribute it and/or modify it under the terms of the GNU General Public 
  3  # License as published by the Free Software Foundation, version 2. 
  4  # 
  5  # This program is distributed in the hope that it will be useful, but WITHOUT 
  6  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 
  8  # details. 
  9  # 
 10  # You should have received a copy of the GNU General Public License along with 
 11  # this program; if not, write to the Free Software Foundation, Inc., 51 
 12  # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 13  # 
 14  # Copyright Buildbot Team Members 
 15   
 16  from twisted.internet import reactor 
 17   
 18  from buildbot.changes import base 
 19  from buildbot.util import json 
 20  from buildbot import util 
 21  from twisted.python import log 
 22  from twisted.internet import defer 
 23  from twisted.internet.protocol import ProcessProtocol 
24 25 -class GerritChangeSource(base.ChangeSource):
26 """This source will maintain a connection to gerrit ssh server 27 that will provide us gerrit events in json format.""" 28 29 compare_attrs = ["gerritserver", "gerritport"] 30 31 STREAM_GOOD_CONNECTION_TIME = 120 32 "(seconds) connections longer than this are considered good, and reset the backoff timer" 33 34 STREAM_BACKOFF_MIN = 0.5 35 "(seconds) minimum, but nonzero, time to wait before retrying a failed connection" 36 37 STREAM_BACKOFF_EXPONENT = 1.5 38 "multiplier used to increase the backoff from MIN to MAX on repeated failures" 39 40 STREAM_BACKOFF_MAX = 60 41 "(seconds) maximum time to wait before retrying a failed connection" 42
43 - def __init__(self, gerritserver, username, gerritport=29418, identity_file=None):
44 """ 45 @type gerritserver: string 46 @param gerritserver: the dns or ip that host the gerrit ssh server, 47 48 @type gerritport: int 49 @param gerritport: the port of the gerrit ssh server, 50 51 @type username: string 52 @param username: the username to use to connect to gerrit, 53 54 @type identity_file: string 55 @param identity_file: identity file to for authentication (optional). 56 57 """ 58 # TODO: delete API comment when documented 59 60 self.gerritserver = gerritserver 61 self.gerritport = gerritport 62 self.username = username 63 self.identity_file = identity_file 64 self.process = None 65 self.streamProcessTimeout = self.STREAM_BACKOFF_MIN
66
67 - class LocalPP(ProcessProtocol):
68 - def __init__(self, change_source):
69 self.change_source = change_source 70 self.data = ""
71 72 @defer.deferredGenerator
73 - def outReceived(self, data):
74 """Do line buffering.""" 75 self.data += data 76 lines = self.data.split("\n") 77 self.data = lines.pop(-1) # last line is either empty or incomplete 78 for line in lines: 79 log.msg("gerrit: %s" % (line,)) 80 d = self.change_source.lineReceived(line) 81 wfd = defer.waitForDeferred(d) 82 yield wfd 83 wfd.getResult()
84
85 - def errReceived(self, data):
86 log.msg("gerrit stderr: %s" % (data,))
87
88 - def processEnded(self, status_object):
89 self.change_source.streamProcessStopped()
90
91 - def lineReceived(self, line):
92 try: 93 event = json.loads(line.decode('utf-8')) 94 except ValueError: 95 log.msg("bad json line: %s" % (line,)) 96 return defer.succeed(None) 97 98 if not(type(event) == type({}) and "type" in event): 99 log.msg("no type in event %s" % (line,)) 100 return defer.succeed(None) 101 func = getattr(self, "eventReceived_"+event["type"].replace("-","_"), None) 102 if func == None: 103 log.msg("unsupported event %s" % (event["type"],)) 104 return defer.succeed(None) 105 106 # flatten the event dictionary, for easy access with WithProperties 107 def flatten(event, base, d): 108 for k, v in d.items(): 109 if type(v) == dict: 110 flatten(event, base + "." + k, v) 111 else: # already there 112 event[base + "." + k] = v
113 114 properties = {} 115 flatten(properties, "event", event) 116 return func(properties,event)
117 - def addChange(self, chdict):
118 d = self.master.addChange(**chdict) 119 # eat failures.. 120 d.addErrback(log.err, 'error adding change from GerritChangeSource') 121 return d
122 - def eventReceived_patchset_created(self, properties, event):
123 change = event["change"] 124 return self.addChange(dict( 125 author="%s <%s>" % (change["owner"]["name"], change["owner"]["email"]), 126 project=change["project"], 127 branch=change["branch"]+"/"+change["number"], 128 revision=event["patchSet"]["revision"], 129 revlink=change["url"], 130 comments=change["subject"], 131 files=["unknown"], 132 category=event["type"], 133 properties=properties))
134 - def eventReceived_ref_updated(self, properties, event):
135 ref = event["refUpdate"] 136 author = "gerrit" 137 138 if "submitter" in event: 139 author="%s <%s>" % (event["submitter"]["name"], event["submitter"]["email"]) 140 141 return self.addChange(dict( 142 author=author, 143 project=ref["project"], 144 branch=ref["refName"], 145 revision=ref["newRev"], 146 comments="Gerrit: patchset(s) merged.", 147 files=["unknown"], 148 category=event["type"], 149 properties=properties))
150
151 - def streamProcessStopped(self):
152 self.process = None 153 154 # if the service is stopped, don't try to restart 155 if not self.parent: 156 log.msg("service is not running; not reconnecting") 157 return 158 159 now = util.now() 160 if now - self.lastStreamProcessStart < self.STREAM_GOOD_CONNECTION_TIME: 161 # bad startup; start the stream process again after a timeout, and then 162 # increase the timeout 163 log.msg("'gerrit stream-events' failed; restarting after %ds" % round(self.streamProcessTimeout)) 164 reactor.callLater(self.streamProcessTimeout, self.startStreamProcess) 165 self.streamProcessTimeout *= self.STREAM_BACKOFF_EXPONENT 166 if self.streamProcessTimeout > self.STREAM_BACKOFF_MAX: 167 self.streamProcessTimeout = self.STREAM_BACKOFF_MAX 168 else: 169 # good startup, but lost connection; restart immediately, and set the timeout 170 # to its minimum 171 self.startStreamProcess() 172 self.streamProcessTimeout = self.STREAM_BACKOFF_MIN
173
174 - def startStreamProcess(self):
175 log.msg("starting 'gerrit stream-events'") 176 self.lastStreamProcessStart = util.now() 177 args = [ self.username+"@"+self.gerritserver,"-p", str(self.gerritport)] 178 if self.identity_file is not None: 179 args = args + [ '-i', self.identity_file ] 180 self.process = reactor.spawnProcess(self.LocalPP(self), "ssh", 181 [ "ssh" ] + args + [ "gerrit", "stream-events" ])
182
183 - def startService(self):
184 self.startStreamProcess()
185
186 - def stopService(self):
187 if self.process: 188 self.process.signalProcess("KILL") 189 # TODO: if this occurs while the process is restarting, some exceptions may 190 # be logged, although things will settle down normally 191 return base.ChangeSource.stopService(self)
192
193 - def describe(self):
194 status = "" 195 if not self.process: 196 status = "[NOT CONNECTED - check log]" 197 str = ('GerritChangeSource watching the remote Gerrit repository %s@%s %s' % 198 (self.username, self.gerritserver, status)) 199 return str
200