Package buildbot :: Package changes :: Module monotone
[frames] | no frames]

Source Code for Module buildbot.changes.monotone

  1   
  2  import tempfile 
  3  import os 
  4  from cStringIO import StringIO 
  5   
  6  from twisted.python import log 
  7  from twisted.application import service 
  8  from twisted.internet import defer, protocol, error, reactor 
  9  from twisted.internet.task import LoopingCall 
 10   
 11  from buildbot import util 
 12  from buildbot.interfaces import IChangeSource 
 13  from buildbot.changes.changes import Change 
 14   
15 -class _MTProtocol(protocol.ProcessProtocol):
16
17 - def __init__(self, deferred, cmdline):
18 self.cmdline = cmdline 19 self.deferred = deferred 20 self.s = StringIO()
21
22 - def errReceived(self, text):
23 log.msg("stderr: %s" % text)
24
25 - def outReceived(self, text):
26 log.msg("stdout: %s" % text) 27 self.s.write(text)
28
29 - def processEnded(self, reason):
30 log.msg("Command %r exited with value %s" % (self.cmdline, reason)) 31 if isinstance(reason.value, error.ProcessDone): 32 self.deferred.callback(self.s.getvalue()) 33 else: 34 self.deferred.errback(reason)
35
36 -class Monotone:
37 """All methods of this class return a Deferred.""" 38
39 - def __init__(self, bin, db):
40 self.bin = bin 41 self.db = db
42
43 - def _run_monotone(self, args):
44 d = defer.Deferred() 45 cmdline = (self.bin, "--db=" + self.db) + tuple(args) 46 p = _MTProtocol(d, cmdline) 47 log.msg("Running command: %r" % (cmdline,)) 48 log.msg("wd: %s" % os.getcwd()) 49 reactor.spawnProcess(p, self.bin, cmdline) 50 return d
51
52 - def _process_revision_list(self, output):
53 if output: 54 return output.strip().split("\n") 55 else: 56 return []
57
58 - def get_interface_version(self):
59 d = self._run_monotone(["automate", "interface_version"]) 60 d.addCallback(self._process_interface_version) 61 return d
62
63 - def _process_interface_version(self, output):
64 return tuple(map(int, output.strip().split(".")))
65
66 - def db_init(self):
67 return self._run_monotone(["db", "init"])
68
69 - def db_migrate(self):
70 return self._run_monotone(["db", "migrate"])
71
72 - def pull(self, server, pattern):
73 return self._run_monotone(["pull", server, pattern])
74
75 - def get_revision(self, rid):
76 return self._run_monotone(["cat", "revision", rid])
77
78 - def get_heads(self, branch, rcfile=""):
79 cmd = ["automate", "heads", branch] 80 if rcfile: 81 cmd += ["--rcfile=" + rcfile] 82 d = self._run_monotone(cmd) 83 d.addCallback(self._process_revision_list) 84 return d
85
86 - def erase_ancestors(self, revs):
87 d = self._run_monotone(["automate", "erase_ancestors"] + revs) 88 d.addCallback(self._process_revision_list) 89 return d
90
91 - def ancestry_difference(self, new_rev, old_revs):
92 d = self._run_monotone(["automate", "ancestry_difference", new_rev] 93 + old_revs) 94 d.addCallback(self._process_revision_list) 95 return d
96
97 - def descendents(self, rev):
98 d = self._run_monotone(["automate", "descendents", rev]) 99 d.addCallback(self._process_revision_list) 100 return d
101
102 - def log(self, rev, depth=None):
103 if depth is not None: 104 depth_arg = ["--last=%i" % (depth,)] 105 else: 106 depth_arg = [] 107 return self._run_monotone(["log", "-r", rev] + depth_arg)
108 109
110 -class MonotoneSource(service.Service, util.ComparableMixin):
111 """This source will poll a monotone server for changes and submit them to 112 the change master. 113 114 @param server_addr: monotone server specification (host:portno) 115 116 @param branch: monotone branch to watch 117 118 @param trusted_keys: list of keys whose code you trust 119 120 @param db_path: path to monotone database to pull into 121 122 @param pollinterval: interval in seconds between polls, defaults to 10 minutes 123 @param monotone_exec: path to monotone executable, defaults to "monotone" 124 """ 125 126 __implements__ = IChangeSource, service.Service.__implements__ 127 compare_attrs = ["server_addr", "trusted_keys", "db_path", 128 "pollinterval", "branch", "monotone_exec"] 129 130 parent = None # filled in when we're added 131 done_revisions = [] 132 last_revision = None 133 loop = None 134 d = None 135 tmpfile = None 136 monotone = None 137 volatile = ["loop", "d", "tmpfile", "monotone"] 138
139 - def __init__(self, server_addr, branch, trusted_keys, db_path, 140 pollinterval=60 * 10, monotone_exec="monotone"):
141 self.server_addr = server_addr 142 self.branch = branch 143 self.trusted_keys = trusted_keys 144 self.db_path = db_path 145 self.pollinterval = pollinterval 146 self.monotone_exec = monotone_exec 147 self.monotone = Monotone(self.monotone_exec, self.db_path)
148
149 - def startService(self):
150 self.loop = LoopingCall(self.start_poll) 151 self.loop.start(self.pollinterval) 152 service.Service.startService(self)
153
154 - def stopService(self):
155 self.loop.stop() 156 return service.Service.stopService(self)
157
158 - def describe(self):
159 return "monotone_source %s %s" % (self.server_addr, 160 self.branch)
161
162 - def start_poll(self):
163 if self.d is not None: 164 log.msg("last poll still in progress, skipping next poll") 165 return 166 log.msg("starting poll") 167 self.d = self._maybe_init_db() 168 self.d.addCallback(self._do_netsync) 169 self.d.addCallback(self._get_changes) 170 self.d.addErrback(self._handle_error)
171
172 - def _handle_error(self, failure):
173 log.err(failure) 174 self.d = None
175
176 - def _maybe_init_db(self):
177 if not os.path.exists(self.db_path): 178 log.msg("init'ing db") 179 return self.monotone.db_init() 180 else: 181 log.msg("db already exists, migrating") 182 return self.monotone.db_migrate()
183
184 - def _do_netsync(self, output):
185 return self.monotone.pull(self.server_addr, self.branch)
186
187 - def _get_changes(self, output):
188 d = self._get_new_head() 189 d.addCallback(self._process_new_head) 190 return d
191
192 - def _get_new_head(self):
193 # This function returns a deferred that resolves to a good pick of new 194 # head (or None if there is no good new head.) 195 196 # First need to get all new heads... 197 rcfile = """function get_revision_cert_trust(signers, id, name, val) 198 local trusted_signers = { %s } 199 local ts_table = {} 200 for k, v in pairs(trusted_signers) do ts_table[v] = 1 end 201 for k, v in pairs(signers) do 202 if ts_table[v] then 203 return true 204 end 205 end 206 return false 207 end 208 """ 209 trusted_list = ", ".join(['"' + key + '"' for key in self.trusted_keys]) 210 # mktemp is unsafe, but mkstemp is not 2.2 compatible. 211 tmpfile_name = tempfile.mktemp() 212 f = open(tmpfile_name, "w") 213 f.write(rcfile % trusted_list) 214 f.close() 215 d = self.monotone.get_heads(self.branch, tmpfile_name) 216 d.addCallback(self._find_new_head, tmpfile_name) 217 return d
218
219 - def _find_new_head(self, new_heads, tmpfile_name):
220 os.unlink(tmpfile_name) 221 # Now get the old head's descendents... 222 if self.last_revision is not None: 223 d = self.monotone.descendents(self.last_revision) 224 else: 225 d = defer.succeed(new_heads) 226 d.addCallback(self._pick_new_head, new_heads) 227 return d
228
229 - def _pick_new_head(self, old_head_descendents, new_heads):
230 for r in new_heads: 231 if r in old_head_descendents: 232 return r 233 return None
234
235 - def _process_new_head(self, new_head):
236 if new_head is None: 237 log.msg("No new head") 238 self.d = None 239 return None 240 # Okay, we have a new head; we need to get all the revisions since 241 # then and create change objects for them. 242 # Step 1: simplify set of processed revisions. 243 d = self._simplify_revisions() 244 # Step 2: get the list of new revisions 245 d.addCallback(self._get_new_revisions, new_head) 246 # Step 3: add a change for each 247 d.addCallback(self._add_changes_for_revisions) 248 # Step 4: all done 249 d.addCallback(self._finish_changes, new_head) 250 return d
251
252 - def _simplify_revisions(self):
253 d = self.monotone.erase_ancestors(self.done_revisions) 254 d.addCallback(self._reset_done_revisions) 255 return d
256
257 - def _reset_done_revisions(self, new_done_revisions):
258 self.done_revisions = new_done_revisions 259 return None
260
261 - def _get_new_revisions(self, blah, new_head):
262 if self.done_revisions: 263 return self.monotone.ancestry_difference(new_head, 264 self.done_revisions) 265 else: 266 # Don't force feed the builder with every change since the 267 # beginning of time when it's first started up. 268 return defer.succeed([new_head])
269
270 - def _add_changes_for_revisions(self, revs):
271 d = defer.succeed(None) 272 for rid in revs: 273 d.addCallback(self._add_change_for_revision, rid) 274 return d
275
276 - def _add_change_for_revision(self, blah, rid):
277 d = self.monotone.log(rid, 1) 278 d.addCallback(self._add_change_from_log, rid) 279 return d
280
281 - def _add_change_from_log(self, log, rid):
282 d = self.monotone.get_revision(rid) 283 d.addCallback(self._add_change_from_log_and_revision, log, rid) 284 return d
285
286 - def _add_change_from_log_and_revision(self, revision, log, rid):
287 # Stupid way to pull out everything inside quotes (which currently 288 # uniquely identifies filenames inside a changeset). 289 pieces = revision.split('"') 290 files = [] 291 for i in range(len(pieces)): 292 if (i % 2) == 1: 293 files.append(pieces[i]) 294 # Also pull out author key and date 295 author = "unknown author" 296 pieces = log.split('\n') 297 for p in pieces: 298 if p.startswith("Author:"): 299 author = p.split()[1] 300 self.parent.addChange(Change(author, files, log, revision=rid))
301
302 - def _finish_changes(self, blah, new_head):
303 self.done_revisions.append(new_head) 304 self.last_revision = new_head 305 self.d = None
306