Package buildbot :: Package changes :: Module gerritchangesource
[frames] | no frames]

Source Code for Module buildbot.changes.gerritchangesource

  1  # This file is part of Buildbot.  Buildbot is free software: you can 
  2  # redistribute it and/or modify it under the terms of the GNU General Public 
  3  # License as published by the Free Software Foundation, version 2. 
  4  # 
  5  # This program is distributed in the hope that it will be useful, but WITHOUT 
  6  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 
  8  # details. 
  9  # 
 10  # You should have received a copy of the GNU General Public License along with 
 11  # this program; if not, write to the Free Software Foundation, Inc., 51 
 12  # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 13  # 
 14  # Copyright Buildbot Team Members 
 15   
 16  from twisted.internet import reactor 
 17   
 18  from buildbot.changes import base, changes 
 19  from buildbot.util import json 
 20  from buildbot import util 
 21  from twisted.python import log 
 22  from twisted.internet.protocol import ProcessProtocol 
 23   
24 -class GerritChangeSource(base.ChangeSource):
25 """This source will maintain a connection to gerrit ssh server 26 that will provide us gerrit events in json format.""" 27 28 compare_attrs = ["gerritserver", "gerritport"] 29 30 parent = None # filled in when we're added 31 32 STREAM_GOOD_CONNECTION_TIME = 120 33 "(seconds) connections longer than this are considered good, and reset the backoff timer" 34 35 STREAM_BACKOFF_MIN = 0.5 36 "(seconds) minimum, but nonzero, time to wait before retrying a failed connection" 37 38 STREAM_BACKOFF_EXPONENT = 1.5 39 "multiplier used to increase the backoff from MIN to MAX on repeated failures" 40 41 STREAM_BACKOFF_MAX = 60 42 "(seconds) maximum time to wait before retrying a failed connection" 43
44 - def __init__(self, gerritserver, username, gerritport=29418):
45 """ 46 @type gerritserver: string 47 @param gerritserver: the dns or ip that host the gerrit ssh server, 48 49 @type gerritport: int 50 @param gerritport: the port of the gerrit ssh server, 51 52 @type username: string 53 @param username: the username to use to connect to gerrit 54 55 """ 56 # TODO: delete API comment when documented 57 58 self.gerritserver = gerritserver 59 self.gerritport = gerritport 60 self.username = username 61 self.process = None 62 self.streamProcessTimeout = self.STREAM_BACKOFF_MIN
63
64 - class LocalPP(ProcessProtocol):
65 - def __init__(self, change_source):
66 self.change_source = change_source 67 self.data = ""
68
69 - def outReceived(self, data):
70 """Do line buffering.""" 71 self.data += data 72 lines = self.data.split("\n") 73 self.data = lines.pop(-1) # last line is either empty or incomplete 74 for line in lines: 75 log.msg("gerrit: %s" % (line,)) 76 self.change_source.lineReceived(line)
77
78 - def errReceived(self, data):
79 log.msg("gerrit stderr: %s" % (data,))
80
81 - def processEnded(self, status_object):
82 self.change_source.streamProcessStopped()
83
84 - def lineReceived(self, line):
85 try: 86 event = json.loads(line) 87 except ValueError: 88 log.msg("bad json line: %s" % (line,)) 89 return 90 91 if type(event) == type({}) and "type" in event and event["type"] in ["patchset-created", "ref-updated"]: 92 # flatten the event dictionary, for easy access with WithProperties 93 def flatten(event, base, d): 94 for k, v in d.items(): 95 if type(v) == dict: 96 flatten(event, base + "." + k, v) 97 else: # already there 98 event[base + "." + k] = v
99 100 properties = {} 101 flatten(properties, "event", event) 102 103 if event["type"] == "patchset-created": 104 change = event["change"] 105 c = changes.Change(who="%s <%s>" % (change["owner"]["name"], change["owner"]["email"]), 106 project=change["project"], 107 branch=change["branch"], 108 revision=event["patchSet"]["revision"], 109 revlink=change["url"], 110 comments=change["subject"], 111 files=["unknown"], 112 category=event["type"], 113 properties=properties) 114 elif event["type"] == "ref-updated": 115 ref = event["refUpdate"] 116 c = changes.Change(who="%s <%s>" % (event["submitter"]["name"], event["submitter"]["email"]), 117 project=ref["project"], 118 branch=ref["refName"], 119 revision=ref["newRev"], 120 comments="Gerrit: patchset(s) merged.", 121 files=["unknown"], 122 category=event["type"], 123 properties=properties) 124 else: 125 return # this shouldn't happen anyway 126 127 self.parent.addChange(c)
128
129 - def streamProcessStopped(self):
130 self.process = None 131 132 # if the service is stopped, don't try to restart 133 if not self.parent: 134 log.msg("service is not running; not reconnecting") 135 return 136 137 now = util.now() 138 if now - self.lastStreamProcessStart < self.STREAM_GOOD_CONNECTION_TIME: 139 # bad startup; start the stream process again after a timeout, and then 140 # increase the timeout 141 log.msg("'gerrit stream-events' failed; restarting after %ds" % round(self.streamProcessTimeout)) 142 reactor.callLater(self.streamProcessTimeout, self.startStreamProcess) 143 self.streamProcessTimeout *= self.STREAM_BACKOFF_EXPONENT 144 if self.streamProcessTimeout > self.STREAM_BACKOFF_MAX: 145 self.streamProcessTimeout = self.STREAM_BACKOFF_MAX 146 else: 147 # good startup, but lost connection; restart immediately, and set the timeout 148 # to its minimum 149 self.startStreamProcess() 150 self.streamProcessTimeout = self.STREAM_BACKOFF_MIN
151
152 - def startStreamProcess(self):
153 log.msg("starting 'gerrit stream-events'") 154 self.lastStreamProcessStart = util.now() 155 self.process = reactor.spawnProcess(self.LocalPP(self), "ssh", ["ssh", self.username+"@"+self.gerritserver,"-p", str(self.gerritport), "gerrit","stream-events"])
156
157 - def startService(self):
158 self.startStreamProcess()
159
160 - def stopService(self):
161 if self.process: 162 self.process.signalProcess("KILL") 163 # TODO: if this occurs while the process is restarting, some exceptions may 164 # be logged, although things will settle down normally 165 return base.ChangeSource.stopService(self)
166
167 - def describe(self):
168 status = "" 169 if not self.process: 170 status = "[NOT CONNECTED - check log]" 171 str = ('GerritChangeSource watching the remote Gerrit repository %s@%s %s' % 172 (self.username, self.gerritserver, status)) 173 return str
174