Package buildbot :: Package changes :: Module hgpoller
[frames] | no frames]

Source Code for Module buildbot.changes.hgpoller

  1  # This file is part of Buildbot.  Buildbot is free software: you can 
  2  # redistribute it and/or modify it under the terms of the GNU General Public 
  3  # License as published by the Free Software Foundation, version 2. 
  4  # 
  5  # This program is distributed in the hope that it will be useful, but WITHOUT 
  6  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 
  8  # details. 
  9  # 
 10  # You should have received a copy of the GNU General Public License along with 
 11  # this program; if not, write to the Free Software Foundation, Inc., 51 
 12  # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 13  # 
 14  # Copyright Buildbot Team Members 
 15   
 16  import time 
 17  import os 
 18  from twisted.python import log 
 19  from twisted.internet import defer, utils 
 20   
 21  from buildbot import config 
 22  from buildbot.util import deferredLocked 
 23  from buildbot.changes import base 
 24  from buildbot.util import epoch2datetime 
25 26 -class HgPoller(base.PollingChangeSource):
27 """This source will poll a remote hg repo for changes and submit 28 them to the change master.""" 29 30 compare_attrs = ["repourl", "branch", "workdir", 31 "pollInterval", "hgpoller", "usetimestamps", 32 "category", "project"] 33 34 db_class_name = 'HgPoller' 35
36 - def __init__(self, repourl, branch='default', 37 workdir=None, pollInterval=10*60, 38 hgbin='hg', usetimestamps=True, 39 category=None, project='', 40 encoding='utf-8'):
41 42 self.repourl = repourl 43 self.branch = branch 44 base.PollingChangeSource.__init__( 45 self, name=repourl, pollInterval=pollInterval) 46 self.encoding = encoding 47 self.lastChange = time.time() 48 self.lastPoll = time.time() 49 self.hgbin = hgbin 50 self.workdir = workdir 51 self.usetimestamps = usetimestamps 52 self.category = category 53 self.project = project 54 self.commitInfo = {} 55 self.initLock = defer.DeferredLock() 56 57 if self.workdir == None: 58 config.error("workdir is mandatory for now in HgPoller")
59
60 - def describe(self):
61 status = "" 62 if not self.master: 63 status = "[STOPPED - check log]" 64 return ("HgPoller watching the remote Mercurial repository %r, " 65 "branch: %r, in workdir %r %s") % (self.repourl, self.branch, 66 self.workdir, status)
67 68 @deferredLocked('initLock')
69 - def poll(self):
70 d = self._getChanges() 71 d.addCallback(self._processChanges) 72 d.addErrback(self._processChangesFailure) 73 return d
74
75 - def _absWorkdir(self):
76 workdir = self.workdir 77 if os.path.isabs(workdir): 78 return workdir 79 return os.path.join(self.master.basedir, workdir)
80
81 - def _getRevDetails(self, rev):
82 """Return a deferred for (date, author, files, comments) of given rev. 83 84 Deferred will be in error if rev is unknown. 85 """ 86 args = ['log', '-r', rev, os.linesep.join(( 87 '--template={date|hgdate}', 88 '{author}', 89 '{files}', 90 '{desc|strip}'))] 91 # Mercurial fails with status 255 if rev is unknown 92 d = utils.getProcessOutput(self.hgbin, args, path=self._absWorkdir(), 93 env=os.environ, errortoo=False ) 94 def process(output): 95 # fortunately, Mercurial issues all filenames one one line 96 date, author, files, comments = output.decode(self.encoding, "replace").split( 97 os.linesep, 3) 98 99 if not self.usetimestamps: 100 stamp = None 101 else: 102 try: 103 stamp = float(date.split()[0]) 104 except: 105 log.msg('hgpoller: caught exception converting output %r ' 106 'to timestamp' % date) 107 raise 108 return stamp, author.strip(), files.split(), comments.strip()
109 110 d.addCallback(process) 111 return d
112
113 - def _isRepositoryReady(self):
114 """Easy to patch in tests.""" 115 return os.path.exists(os.path.join(self._absWorkdir(), '.hg'))
116
117 - def _initRepository(self):
118 """Have mercurial init the workdir as a repository (hg init) if needed. 119 120 hg init will also create all needed intermediate directories. 121 """ 122 if self._isRepositoryReady(): 123 return defer.succeed(None) 124 log.msg('hgpoller: initializing working dir from %s' % self.repourl) 125 d = utils.getProcessOutputAndValue(self.hgbin, 126 ['init', self._absWorkdir()], 127 env=os.environ) 128 d.addCallback(self._convertNonZeroToFailure) 129 d.addErrback(self._stopOnFailure) 130 d.addCallback(lambda _ : log.msg( 131 "hgpoller: finished initializing working dir %r" % self.workdir)) 132 return d
133
134 - def _getChanges(self):
135 self.lastPoll = time.time() 136 137 d = self._initRepository() 138 d.addCallback(lambda _ : log.msg( 139 "hgpoller: polling hg repo at %s" % self.repourl)) 140 141 # get a deferred object that performs the fetch 142 args = ['pull', '-b', self.branch, self.repourl] 143 144 # This command always produces data on stderr, but we actually do not 145 # care about the stderr or stdout from this command. 146 # We set errortoo=True to avoid an errback from the deferred. 147 # The callback which will be added to this 148 # deferred will not use the response. 149 d.addCallback(lambda _: utils.getProcessOutput( 150 self.hgbin, args, path=self._absWorkdir(), 151 env=os.environ, errortoo=True)) 152 153 return d
154
155 - def _getStateObjectId(self):
156 """Return a deferred for object id in state db. 157 158 Being unique among pollers, workdir is used with branch as instance 159 name for db. 160 """ 161 return self.master.db.state.getObjectId( 162 '#'.join((self.workdir, self.branch)), self.db_class_name)
163
164 - def _getCurrentRev(self):
165 """Return a deferred for object id in state db and current numeric rev. 166 167 If never has been set, current rev is None. 168 """ 169 d = self._getStateObjectId() 170 def oid_cb(oid): 171 current = self.master.db.state.getState(oid, 'current_rev', None) 172 def to_int(cur): 173 return oid, cur and int(cur) or None
174 current.addCallback(to_int) 175 return current 176 d.addCallback(oid_cb) 177 return d 178
179 - def _setCurrentRev(self, rev, oid=None):
180 """Return a deferred to set current revision in persistent state. 181 182 oid is self's id for state db. It can be passed to avoid a db lookup.""" 183 if oid is None: 184 d = self._getStateObjectId() 185 else: 186 d = defer.succeed(oid) 187 188 def set_in_state(obj_id): 189 return self.master.db.state.setState(obj_id, 'current_rev', rev)
190 d.addCallback(set_in_state) 191 192 return d 193
194 - def _getHead(self):
195 """Return a deferred for branch head revision or None. 196 197 We'll get an error if there is no head for this branch, which is 198 proabably a good thing, since it's probably a mispelling 199 (if really buildbotting a branch that does not have any changeset 200 yet, one shouldn't be surprised to get errors) 201 """ 202 d = utils.getProcessOutput(self.hgbin, 203 ['heads', self.branch, '--template={rev}' + os.linesep], 204 path=self._absWorkdir(), env=os.environ, errortoo=False) 205 206 def no_head_err(exc): 207 log.err("hgpoller: could not find branch %r in repository %r" % ( 208 self.branch, self.repourl))
209 d.addErrback(no_head_err) 210 211 def results(heads): 212 if not heads: 213 return 214 215 if len(heads.split()) > 1: 216 log.err(("hgpoller: caught several heads in branch %r " 217 "from repository %r. Staying at previous revision" 218 "You should wait until the situation is normal again " 219 "due to a merge or directly strip if remote repo " 220 "gets stripped later.") % (self.branch, self.repourl)) 221 return 222 223 # in case of whole reconstruction, are we sure that we'll get the 224 # same node -> rev assignations ? 225 return int(heads.strip()) 226 227 d.addCallback(results) 228 return d 229 230 @defer.inlineCallbacks
231 - def _processChanges(self, unused_output):
232 """Send info about pulled changes to the master and record current. 233 234 GitPoller does the recording by moving the working dir to the head 235 of the branch. 236 We don't update the tree (unnecessary treatment and waste of space) 237 instead, we simply store the current rev number in a file. 238 Recall that hg rev numbers are local and incremental. 239 """ 240 oid, current = yield self._getCurrentRev() 241 # hg log on a range of revisions is never empty 242 # also, if a numeric revision does not exist, a node may match. 243 # Therefore, we have to check explicitely that branch head > current. 244 head = yield self._getHead() 245 if head <= current: 246 return 247 if current is None: 248 # we could have used current = -1 convention as well (as hg does) 249 revrange = '0:%d' % head 250 else: 251 revrange = '%d:%s' % (current + 1, head) 252 253 # two passes for hg log makes parsing simpler (comments is multi-lines) 254 revListArgs = ['log', '-b', self.branch, '-r', revrange, 255 r'--template={rev}:{node}\n'] 256 results = yield utils.getProcessOutput(self.hgbin, revListArgs, 257 path=self._absWorkdir(), env=os.environ, errortoo=False ) 258 259 revNodeList = [rn.split(':', 1) for rn in results.strip().split()] 260 261 log.msg('hgpoller: processing %d changes: %r in %r' 262 % (len(revNodeList), revNodeList, self._absWorkdir())) 263 for rev, node in revNodeList: 264 timestamp, author, files, comments = yield self._getRevDetails( 265 node) 266 yield self.master.addChange( 267 author=author, 268 revision=node, 269 files=files, 270 comments=comments, 271 when_timestamp=epoch2datetime(timestamp), 272 branch=self.branch, 273 category=self.category, 274 project=self.project, 275 repository=self.repourl, 276 src='hg') 277 # writing after addChange so that a rev is never missed, 278 # but at once to avoid impact from later errors 279 yield self._setCurrentRev(rev, oid=oid)
280
281 - def _processChangesFailure(self, f):
282 log.msg('hgpoller: repo poll failed') 283 log.err(f) 284 # eat the failure to continue along the defered chain - we still want to catch up 285 return None
286
287 - def _convertNonZeroToFailure(self, res):
288 "utility method to handle the result of getProcessOutputAndValue" 289 (stdout, stderr, code) = res 290 if code != 0: 291 raise EnvironmentError('command failed with exit code %d: %s' % (code, stderr)) 292 return (stdout, stderr, code)
293
294 - def _stopOnFailure(self, f):
295 "utility method to stop the service when a failure occurs" 296 if self.running: 297 d = defer.maybeDeferred(lambda : self.stopService()) 298 d.addErrback(log.err, 'while stopping broken HgPoller service') 299 return f
300