Package buildbot :: Package db :: Module connector
[frames] | no frames]

Source Code for Module buildbot.db.connector

   1  # ***** BEGIN LICENSE BLOCK ***** 
   2  # Version: MPL 1.1/GPL 2.0/LGPL 2.1 
   3  # 
   4  # The contents of this file are subject to the Mozilla Public License Version 
   5  # 1.1 (the "License"); you may not use this file except in compliance with 
   6  # the License. You may obtain a copy of the License at 
   7  # http://www.mozilla.org/MPL/ 
   8  # 
   9  # Software distributed under the License is distributed on an "AS IS" basis, 
  10  # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License 
  11  # for the specific language governing rights and limitations under the 
  12  # License. 
  13  # 
  14  # The Original Code is Mozilla-specific Buildbot steps. 
  15  # 
  16  # The Initial Developer of the Original Code is 
  17  # Mozilla Foundation. 
  18  # Portions created by the Initial Developer are Copyright (C) 2009 
  19  # the Initial Developer. All Rights Reserved. 
  20  # 
  21  # Contributor(s): 
  22  #   Brian Warner <warner@lothar.com> 
  23  #   Chris AtLee <catlee@mozilla.com> 
  24  #   Dustin Mitchell <dustin@zmanda.com> 
  25  # 
  26  # Alternatively, the contents of this file may be used under the terms of 
  27  # either the GNU General Public License Version 2 or later (the "GPL"), or 
  28  # the GNU Lesser General Public License Version 2.1 or later (the "LGPL"), 
  29  # in which case the provisions of the GPL or the LGPL are applicable instead 
  30  # of those above. If you wish to allow use of your version of this file only 
  31  # under the terms of either the GPL or the LGPL, and not to allow others to 
  32  # use your version of this file under the terms of the MPL, indicate your 
  33  # decision by deleting the provisions above and replace them with the notice 
  34  # and other provisions required by the GPL or the LGPL. If you do not delete 
  35  # the provisions above, a recipient may use your version of this file under 
  36  # the terms of any one of the MPL, the GPL or the LGPL. 
  37  # 
  38  # ***** END LICENSE BLOCK ***** 
  39   
  40  import sys, collections, base64 
  41   
  42  from twisted.python import log, threadable 
  43  from twisted.internet import defer 
  44  from twisted.enterprise import adbapi 
  45  from buildbot import util 
  46  from buildbot.util import collections as bbcollections 
  47  from buildbot.changes.changes import Change 
  48  from buildbot.sourcestamp import SourceStamp 
  49  from buildbot.buildrequest import BuildRequest 
  50  from buildbot.process.properties import Properties 
  51  from buildbot.status.builder import SUCCESS, WARNINGS, FAILURE 
  52  from buildbot.util.eventual import eventually 
  53  from buildbot.util import json 
  54   
  55  # Don't auto-resubmit queries that encounter a broken connection: let them 
  56  # fail. Use the "notification doorbell" thing to provide the retry. Set 
  57  # cp_reconnect=True, so that a connection failure will prepare the 
  58  # ConnectionPool to reconnect next time. 
  59   
60 -class MyTransaction(adbapi.Transaction):
61 - def execute(self, *args, **kwargs):
62 #print "Q", args, kwargs 63 return self._cursor.execute(*args, **kwargs)
64 - def fetchall(self):
65 rc = self._cursor.fetchall() 66 #print " F", rc 67 return rc
68
69 -def _one_or_else(res, default=None, process_f=lambda x: x):
70 if not res: 71 return default 72 return process_f(res[0][0])
73
74 -def str_or_none(s):
75 if s is None: 76 return None 77 return str(s)
78
79 -class Token: # used for _start_operation/_end_operation
80 pass 81
82 -class DBConnector(util.ComparableMixin):
83 # this will refuse to create the database: use 'create-master' for that 84 compare_attrs = ["args", "kwargs"] 85 synchronized = ["notify", "_end_operation"] 86 MAX_QUERY_TIMES = 1000 87
88 - def __init__(self, spec):
89 # typical args = (dbmodule, dbname, username, password) 90 self._query_times = collections.deque() 91 self._spec = spec 92 93 # this is for synchronous calls: runQueryNow, runInteractionNow 94 self._dbapi = spec.get_dbapi() 95 self._nonpool = None 96 self._nonpool_lastused = None 97 self._nonpool_max_idle = spec.get_maxidle() 98 99 # pass queries in with "?" placeholders. If the backend uses a 100 # different style, we'll replace them. 101 self.paramstyle = self._dbapi.paramstyle 102 103 self._pool = spec.get_async_connection_pool() 104 self._pool.transactionFactory = MyTransaction 105 # the pool must be started before it can be used. The real 106 # buildmaster process will do this at reactor start. CLI tools (like 107 # "buildbot upgrade-master") must do it manually. Unit tests are run 108 # in an environment in which it is already started. 109 110 self._change_cache = util.LRUCache() 111 self._sourcestamp_cache = util.LRUCache() 112 self._active_operations = set() # protected by synchronized= 113 self._pending_notifications = [] 114 self._subscribers = bbcollections.defaultdict(set) 115 116 self._pending_operation_count = 0 117 118 self._started = False
119
120 - def _getCurrentTime(self):
121 # this is a seam for use in testing 122 return util.now()
123
124 - def start(self):
125 # this only *needs* to be called in reactorless environments (which 126 # should be eliminated anyway). but it doesn't hurt anyway 127 self._pool.start() 128 self._started = True
129
130 - def stop(self):
131 """Call this when you're done with me""" 132 133 # Close our synchronous connection if we've got one 134 if self._nonpool: 135 self._nonpool.close() 136 self._nonpool = None 137 self._nonpool_lastused = None 138 139 if not self._started: 140 return 141 self._pool.close() 142 self._started = False 143 del self._pool
144
145 - def quoteq(self, query):
146 """ 147 Given a query that contains qmark-style placeholders, like:: 148 INSERT INTO foo (col1, col2) VALUES (?,?) 149 replace the '?' with '%s' if the backend uses format-style 150 placeholders, like:: 151 INSERT INTO foo (col1, col2) VALUES (%s,%s) 152 """ 153 if self.paramstyle == "format": 154 return query.replace("?","%s") 155 assert self.paramstyle == "qmark" 156 return query
157
158 - def parmlist(self, count):
159 """ 160 When passing long lists of values to e.g., an INSERT query, it is 161 tedious to pass long strings of ? placeholders. This function will 162 create a parenthesis-enclosed list of COUNT placeholders. Note that 163 the placeholders have already had quoteq() applied. 164 """ 165 p = self.quoteq("?") 166 return "(" + ",".join([p]*count) + ")"
167
168 - def get_version(self):
169 """Returns None for an empty database, or a number (probably 1) for 170 the database's version""" 171 try: 172 res = self.runQueryNow("SELECT version FROM version") 173 except (self._dbapi.OperationalError, self._dbapi.ProgrammingError): 174 # this means the version table is missing: the db is empty 175 return None 176 assert len(res) == 1 177 return res[0][0]
178
179 - def runQueryNow(self, *args, **kwargs):
180 # synchronous+blocking version of runQuery() 181 assert self._started 182 return self.runInteractionNow(self._runQuery, *args, **kwargs)
183
184 - def _runQuery(self, c, *args, **kwargs):
185 c.execute(*args, **kwargs) 186 return c.fetchall()
187
188 - def _start_operation(self):
189 t = Token() 190 self._active_operations.add(t) 191 return t
192 - def _end_operation(self, t):
193 # this is always invoked from the main thread, but is wrapped by 194 # synchronized= and threadable.synchronous(), since it touches 195 # self._pending_notifications, which is also touched by 196 # runInteraction threads 197 self._active_operations.discard(t) 198 if self._active_operations: 199 return 200 for (category, args) in self._pending_notifications: 201 # in the distributed system, this will be a 202 # transport.write(" ".join([category] + [str(a) for a in args])) 203 eventually(self.send_notification, category, args) 204 self._pending_notifications = []
205
206 - def runInteractionNow(self, interaction, *args, **kwargs):
207 # synchronous+blocking version of runInteraction() 208 assert self._started 209 start = self._getCurrentTime() 210 t = self._start_operation() 211 try: 212 return self._runInteractionNow(interaction, *args, **kwargs) 213 finally: 214 self._end_operation(t) 215 self._add_query_time(start)
216
217 - def get_sync_connection(self):
218 # This is a wrapper around spec.get_sync_connection that maintains a 219 # single connection to the database for synchronous usage. It will get 220 # a new connection if the existing one has been idle for more than 221 # max_idle seconds. 222 if self._nonpool_max_idle is not None: 223 now = util.now() 224 if self._nonpool_lastused and self._nonpool_lastused + self._nonpool_max_idle < now: 225 self._nonpool = None 226 227 if not self._nonpool: 228 self._nonpool = self._spec.get_sync_connection() 229 230 self._nonpool_lastused = util.now() 231 return self._nonpool
232
233 - def _runInteractionNow(self, interaction, *args, **kwargs):
234 conn = self.get_sync_connection() 235 c = conn.cursor() 236 try: 237 result = interaction(c, *args, **kwargs) 238 c.close() 239 conn.commit() 240 return result 241 except: 242 excType, excValue, excTraceback = sys.exc_info() 243 try: 244 conn.rollback() 245 c2 = conn.cursor() 246 c2.execute(self._pool.good_sql) 247 c2.close() 248 conn.commit() 249 except: 250 log.msg("rollback failed, will reconnect next query") 251 log.err() 252 # and the connection is probably dead: clear the reference, 253 # so we'll establish a new connection next time 254 self._nonpool = None 255 raise excType, excValue, excTraceback
256
257 - def notify(self, category, *args):
258 # this is wrapped by synchronized= and threadable.synchronous(), 259 # since it will be invoked from runInteraction threads 260 self._pending_notifications.append( (category,args) )
261
262 - def send_notification(self, category, args):
263 # in the distributed system, this will be invoked by lineReceived() 264 #print "SEND", category, args 265 for observer in self._subscribers[category]: 266 eventually(observer, category, *args)
267
268 - def subscribe_to(self, category, observer):
269 self._subscribers[category].add(observer)
270
271 - def runQuery(self, *args, **kwargs):
272 assert self._started 273 self._pending_operation_count += 1 274 start = self._getCurrentTime() 275 #t = self._start_operation() # why is this commented out? -warner 276 d = self._pool.runQuery(*args, **kwargs) 277 #d.addBoth(self._runQuery_done, start, t) 278 return d
279 - def _runQuery_done(self, res, start, t):
280 self._end_operation(t) 281 self._add_query_time(start) 282 self._pending_operation_count -= 1 283 return res
284
285 - def _add_query_time(self, start):
286 elapsed = self._getCurrentTime() - start 287 self._query_times.append(elapsed) 288 if len(self._query_times) > self.MAX_QUERY_TIMES: 289 self._query_times.popleft()
290
291 - def runInteraction(self, *args, **kwargs):
292 assert self._started 293 self._pending_operation_count += 1 294 start = self._getCurrentTime() 295 t = self._start_operation() 296 d = self._pool.runInteraction(*args, **kwargs) 297 d.addBoth(self._runInteraction_done, start, t) 298 return d
299 - def _runInteraction_done(self, res, start, t):
300 self._end_operation(t) 301 self._add_query_time(start) 302 self._pending_operation_count -= 1 303 return res
304 305 # ChangeManager methods 306
307 - def addChangeToDatabase(self, change):
308 self.runInteractionNow(self._txn_addChangeToDatabase, change) 309 self._change_cache.add(change.number, change)
310
311 - def _txn_addChangeToDatabase(self, t, change):
312 q = self.quoteq("INSERT INTO changes" 313 " (author," 314 " comments, is_dir," 315 " branch, revision, revlink," 316 " when_timestamp, category," 317 " repository, project)" 318 " VALUES (?, ?,?, ?,?,?, ?,?, ?,?)") 319 # TODO: map None to.. empty string? 320 321 values = (change.who, 322 change.comments, change.isdir, 323 change.branch, change.revision, change.revlink, 324 change.when, change.category, change.repository, 325 change.project) 326 t.execute(q, values) 327 change.number = t.lastrowid 328 329 for link in change.links: 330 t.execute(self.quoteq("INSERT INTO change_links (changeid, link) " 331 "VALUES (?,?)"), 332 (change.number, link)) 333 for filename in change.files: 334 t.execute(self.quoteq("INSERT INTO change_files (changeid,filename)" 335 " VALUES (?,?)"), 336 (change.number, filename)) 337 for propname,propvalue in change.properties.properties.items(): 338 encoded_value = json.dumps(propvalue) 339 t.execute(self.quoteq("INSERT INTO change_properties" 340 " (changeid, property_name, property_value)" 341 " VALUES (?,?,?)"), 342 (change.number, propname, encoded_value)) 343 self.notify("add-change", change.number)
344
345 - def changeEventGenerator(self, branches=[], categories=[], committers=[], minTime=0):
346 q = "SELECT changeid FROM changes" 347 args = [] 348 if branches or categories or committers: 349 q += " WHERE " 350 pieces = [] 351 if branches: 352 pieces.append("branch IN %s" % self.parmlist(len(branches))) 353 args.extend(list(branches)) 354 if categories: 355 pieces.append("category IN %s" % self.parmlist(len(categories))) 356 args.extend(list(categories)) 357 if committers: 358 pieces.append("author IN %s" % self.parmlist(len(committers))) 359 args.extend(list(committers)) 360 if minTime: 361 pieces.append("when_timestamp > %d" % minTime) 362 q += " AND ".join(pieces) 363 q += " ORDER BY changeid DESC" 364 rows = self.runQueryNow(q, tuple(args)) 365 for (changeid,) in rows: 366 yield self.getChangeNumberedNow(changeid)
367
368 - def getLatestChangeNumberNow(self, t=None):
369 if t: 370 return self._txn_getLatestChangeNumber(t) 371 else: 372 return self.runInteractionNow(self._txn_getLatestChangeNumber)
373 - def _txn_getLatestChangeNumber(self, t):
374 q = self.quoteq("SELECT max(changeid) from changes") 375 t.execute(q) 376 row = t.fetchone() 377 if not row: 378 return 0 379 return row[0]
380
381 - def getChangeNumberedNow(self, changeid, t=None):
382 # this is a synchronous/blocking version of getChangeByNumber 383 assert changeid >= 0 384 c = self._change_cache.get(changeid) 385 if c: 386 return c 387 if t: 388 c = self._txn_getChangeNumberedNow(t, changeid) 389 else: 390 c = self.runInteractionNow(self._txn_getChangeNumberedNow, changeid) 391 self._change_cache.add(changeid, c) 392 return c
393 - def _txn_getChangeNumberedNow(self, t, changeid):
394 q = self.quoteq("SELECT author, comments," 395 " is_dir, branch, revision, revlink," 396 " when_timestamp, category," 397 " repository, project" 398 " FROM changes WHERE changeid = ?") 399 t.execute(q, (changeid,)) 400 rows = t.fetchall() 401 if not rows: 402 return None 403 (who, comments, 404 isdir, branch, revision, revlink, 405 when, category, repository, project) = rows[0] 406 branch = str_or_none(branch) 407 revision = str_or_none(revision) 408 q = self.quoteq("SELECT link FROM change_links WHERE changeid=?") 409 t.execute(q, (changeid,)) 410 rows = t.fetchall() 411 links = [row[0] for row in rows] 412 links.sort() 413 414 q = self.quoteq("SELECT filename FROM change_files WHERE changeid=?") 415 t.execute(q, (changeid,)) 416 rows = t.fetchall() 417 files = [row[0] for row in rows] 418 files.sort() 419 420 p = self.get_properties_from_db("change_properties", "changeid", 421 changeid, t) 422 c = Change(who=who, files=files, comments=comments, isdir=isdir, 423 links=links, revision=revision, when=when, 424 branch=branch, category=category, revlink=revlink, 425 repository=repository, project=project) 426 c.properties.updateFromProperties(p) 427 c.number = changeid 428 return c
429
430 - def getChangeByNumber(self, changeid):
431 # return a Deferred that fires with a Change instance, or None if 432 # there is no Change with that number 433 assert changeid >= 0 434 c = self._change_cache.get(changeid) 435 if c: 436 return defer.succeed(c) 437 d1 = self.runQuery(self.quoteq("SELECT author, comments," 438 " is_dir, branch, revision, revlink," 439 " when_timestamp, category," 440 " repository, project" 441 " FROM changes WHERE changeid = ?"), 442 (changeid,)) 443 d2 = self.runQuery(self.quoteq("SELECT link FROM change_links" 444 " WHERE changeid=?"), 445 (changeid,)) 446 d3 = self.runQuery(self.quoteq("SELECT filename FROM change_files" 447 " WHERE changeid=?"), 448 (changeid,)) 449 d4 = self.runInteraction(self._txn_get_properties_from_db, 450 "change_properties", "changeid", changeid) 451 d = defer.gatherResults([d1,d2,d3,d4]) 452 d.addCallback(self._getChangeByNumber_query_done, changeid) 453 return d
454
455 - def _getChangeByNumber_query_done(self, res, changeid):
456 (rows, link_rows, file_rows, properties) = res 457 if not rows: 458 return None 459 (who, comments, 460 isdir, branch, revision, revlink, 461 when, category, repository, project) = rows[0] 462 branch = str_or_none(branch) 463 revision = str_or_none(revision) 464 links = [row[0] for row in link_rows] 465 links.sort() 466 files = [row[0] for row in file_rows] 467 files.sort() 468 469 c = Change(who=who, files=files, comments=comments, isdir=isdir, 470 links=links, revision=revision, when=when, 471 branch=branch, category=category, revlink=revlink, 472 repository=repository, project=project) 473 c.properties.updateFromProperties(properties) 474 c.number = changeid 475 self._change_cache.add(changeid, c) 476 return c
477
478 - def getChangesGreaterThan(self, last_changeid, t=None):
479 """Return a Deferred that fires with a list of all Change instances 480 with numbers greater than the given value, sorted by number. This is 481 useful for catching up with everything that's happened since you last 482 called this function.""" 483 assert last_changeid >= 0 484 if t: 485 return self._txn_getChangesGreaterThan(t, last_changeid) 486 else: 487 return self.runInteractionNow(self._txn_getChangesGreaterThan, 488 last_changeid)
489 - def _txn_getChangesGreaterThan(self, t, last_changeid):
490 q = self.quoteq("SELECT changeid FROM changes WHERE changeid > ?") 491 t.execute(q, (last_changeid,)) 492 changes = [self.getChangeNumberedNow(changeid, t) 493 for (changeid,) in t.fetchall()] 494 changes.sort(key=lambda c: c.number) 495 return changes
496
497 - def getChangesByNumber(self, changeids):
498 return defer.gatherResults([self.getChangeByNumber(changeid) 499 for changeid in changeids])
500 501 # SourceStamp-manipulating methods 502
503 - def getSourceStampNumberedNow(self, ssid, t=None):
504 assert isinstance(ssid, (int, long)) 505 ss = self._sourcestamp_cache.get(ssid) 506 if ss: 507 return ss 508 if t: 509 ss = self._txn_getSourceStampNumbered(t, ssid) 510 else: 511 ss = self.runInteractionNow(self._txn_getSourceStampNumbered, 512 ssid) 513 self._sourcestamp_cache.add(ssid, ss) 514 return ss
515
516 - def _txn_getSourceStampNumbered(self, t, ssid):
517 assert isinstance(ssid, (int, long)) 518 t.execute(self.quoteq("SELECT branch,revision,patchid,project,repository" 519 " FROM sourcestamps WHERE id=?"), 520 (ssid,)) 521 r = t.fetchall() 522 if not r: 523 return None 524 (branch_u, revision_u, patchid, project, repository) = r[0] 525 branch = str_or_none(branch_u) 526 revision = str_or_none(revision_u) 527 528 patch = None 529 if patchid is not None: 530 t.execute(self.quoteq("SELECT patchlevel,patch_base64,subdir" 531 " FROM patches WHERE id=?"), 532 (patchid,)) 533 r = t.fetchall() 534 assert len(r) == 1 535 (patch_level, patch_text_base64, subdir_u) = r[0] 536 patch_text = base64.b64decode(patch_text_base64) 537 if subdir_u: 538 patch = (patch_level, patch_text, str(subdir_u)) 539 else: 540 patch = (patch_level, patch_text) 541 542 t.execute(self.quoteq("SELECT changeid FROM sourcestamp_changes" 543 " WHERE sourcestampid=?" 544 " ORDER BY changeid ASC"), 545 (ssid,)) 546 r = t.fetchall() 547 changes = None 548 if r: 549 changes = [self.getChangeNumberedNow(changeid, t) 550 for (changeid,) in r] 551 ss = SourceStamp(branch, revision, patch, changes, project=project, repository=repository) 552 ss.ssid = ssid 553 return ss
554 555 # Properties methods 556
557 - def get_properties_from_db(self, tablename, idname, id, t=None):
558 if t: 559 return self._txn_get_properties_from_db(t, tablename, idname, id) 560 else: 561 return self.runInteractionNow(self._txn_get_properties_from_db, 562 tablename, idname, id)
563
564 - def _txn_get_properties_from_db(self, t, tablename, idname, id):
565 # apparently you can't use argument placeholders for table names. Don't 566 # call this with a weird-looking tablename. 567 q = self.quoteq("SELECT property_name,property_value FROM %s WHERE %s=?" 568 % (tablename, idname)) 569 t.execute(q, (id,)) 570 retval = Properties() 571 for key, valuepair in t.fetchall(): 572 value, source = json.loads(valuepair) 573 retval.setProperty(str(key), value, source) 574 return retval
575 576 # Scheduler manipulation methods 577
578 - def addSchedulers(self, added):
579 return self.runInteraction(self._addSchedulers, added)
580 - def _addSchedulers(self, t, added):
581 for scheduler in added: 582 name = scheduler.name 583 assert name 584 class_name = "%s.%s" % (scheduler.__class__.__module__, 585 scheduler.__class__.__name__) 586 q = self.quoteq(""" 587 SELECT schedulerid, class_name FROM schedulers WHERE 588 name=? AND 589 (class_name=? OR class_name='') 590 """) 591 t.execute(q, (name, class_name)) 592 row = t.fetchone() 593 if row: 594 sid, db_class_name = row 595 if db_class_name == '': 596 # We're updating from an old schema where the class name 597 # wasn't stored. 598 # Update this row's class name and move on 599 q = self.quoteq("""UPDATE schedulers SET class_name=? 600 WHERE schedulerid=?""") 601 t.execute(q, (class_name, sid)) 602 elif db_class_name != class_name: 603 # A different scheduler is being used with this name. 604 # Ignore the old scheduler and create a new one 605 sid = None 606 else: 607 sid = None 608 609 if sid is None: 610 # create a new row, with the latest changeid (so it won't try 611 # to process all of the old changes) new Schedulers are 612 # supposed to ignore pre-existing Changes 613 q = ("SELECT changeid FROM changes" 614 " ORDER BY changeid DESC LIMIT 1") 615 t.execute(q) 616 max_changeid = _one_or_else(t.fetchall(), 0) 617 state = scheduler.get_initial_state(max_changeid) 618 state_json = json.dumps(state) 619 q = self.quoteq("INSERT INTO schedulers" 620 " (name, class_name, state)" 621 " VALUES (?,?,?)") 622 t.execute(q, (name, class_name, state_json)) 623 sid = t.lastrowid 624 log.msg("scheduler '%s' got id %d" % (scheduler.name, sid)) 625 scheduler.schedulerid = sid
626
627 - def scheduler_get_state(self, schedulerid, t):
628 q = self.quoteq("SELECT state FROM schedulers WHERE schedulerid=?") 629 t.execute(q, (schedulerid,)) 630 state_json = _one_or_else(t.fetchall()) 631 assert state_json is not None 632 return json.loads(state_json)
633
634 - def scheduler_set_state(self, schedulerid, t, state):
635 state_json = json.dumps(state) 636 q = self.quoteq("UPDATE schedulers SET state=? WHERE schedulerid=?") 637 t.execute(q, (state_json, schedulerid))
638
639 - def get_sourcestampid(self, ss, t):
640 """Given a SourceStamp (which may or may not have an ssid), make sure 641 the contents are in the database, and return the ssid. If the 642 SourceStamp originally came from the DB (and thus already has an 643 ssid), just return the ssid. If not, create a new row for it.""" 644 if ss.ssid is not None: 645 return ss.ssid 646 patchid = None 647 if ss.patch: 648 patchlevel = ss.patch[0] 649 diff = ss.patch[1] 650 subdir = None 651 if len(ss.patch) > 2: 652 subdir = ss.patch[2] 653 q = self.quoteq("INSERT INTO patches" 654 " (patchlevel, patch_base64, subdir)" 655 " VALUES (?,?,?)") 656 t.execute(q, (patchlevel, base64.b64encode(diff), subdir)) 657 patchid = t.lastrowid 658 t.execute(self.quoteq("INSERT INTO sourcestamps" 659 " (branch, revision, patchid, project, repository)" 660 " VALUES (?,?,?,?,?)"), 661 (ss.branch, ss.revision, patchid, ss.project, ss.repository)) 662 ss.ssid = t.lastrowid 663 q2 = self.quoteq("INSERT INTO sourcestamp_changes" 664 " (sourcestampid, changeid) VALUES (?,?)") 665 for c in ss.changes: 666 t.execute(q2, (ss.ssid, c.number)) 667 return ss.ssid
668
669 - def create_buildset(self, ssid, reason, properties, builderNames, t, 670 external_idstring=None):
671 # this creates both the BuildSet and the associated BuildRequests 672 now = self._getCurrentTime() 673 t.execute(self.quoteq("INSERT INTO buildsets" 674 " (external_idstring, reason," 675 " sourcestampid, submitted_at)" 676 " VALUES (?,?,?,?)"), 677 (external_idstring, reason, ssid, now)) 678 bsid = t.lastrowid 679 for propname, propvalue in properties.properties.items(): 680 encoded_value = json.dumps(propvalue) 681 t.execute(self.quoteq("INSERT INTO buildset_properties" 682 " (buildsetid, property_name, property_value)" 683 " VALUES (?,?,?)"), 684 (bsid, propname, encoded_value)) 685 brids = [] 686 for bn in builderNames: 687 t.execute(self.quoteq("INSERT INTO buildrequests" 688 " (buildsetid, buildername, submitted_at)" 689 " VALUES (?,?,?)"), 690 (bsid, bn, now)) 691 brid = t.lastrowid 692 brids.append(brid) 693 self.notify("add-buildset", bsid) 694 self.notify("add-buildrequest", *brids) 695 return bsid
696
697 - def scheduler_classify_change(self, schedulerid, number, important, t):
698 q = self.quoteq("INSERT INTO scheduler_changes" 699 " (schedulerid, changeid, important)" 700 " VALUES (?,?,?)") 701 t.execute(q, (schedulerid, number, bool(important)))
702
703 - def scheduler_get_classified_changes(self, schedulerid, t):
704 q = self.quoteq("SELECT changeid, important" 705 " FROM scheduler_changes" 706 " WHERE schedulerid=?") 707 t.execute(q, (schedulerid,)) 708 important = [] 709 unimportant = [] 710 for (changeid, is_important) in t.fetchall(): 711 c = self.getChangeNumberedNow(changeid, t) 712 if is_important: 713 important.append(c) 714 else: 715 unimportant.append(c) 716 return (important, unimportant)
717
718 - def scheduler_retire_changes(self, schedulerid, changeids, t):
719 t.execute(self.quoteq("DELETE FROM scheduler_changes" 720 " WHERE schedulerid=? AND changeid IN ") 721 + self.parmlist(len(changeids)), 722 (schedulerid,) + tuple(changeids))
723
724 - def scheduler_subscribe_to_buildset(self, schedulerid, bsid, t):
725 # scheduler_get_subscribed_buildsets(schedulerid) will return 726 # information about all buildsets that were subscribed this way 727 t.execute(self.quoteq("INSERT INTO scheduler_upstream_buildsets" 728 " (buildsetid, schedulerid, active)" 729 " VALUES (?,?,?)"), 730 (bsid, schedulerid, 1))
731
732 - def scheduler_get_subscribed_buildsets(self, schedulerid, t):
733 # returns list of (bsid, ssid, complete, results) pairs 734 t.execute(self.quoteq("SELECT bs.id, " 735 " bs.sourcestampid, bs.complete, bs.results" 736 " FROM scheduler_upstream_buildsets AS s," 737 " buildsets AS bs" 738 " WHERE s.buildsetid=bs.id" 739 " AND s.schedulerid=?" 740 " AND s.active=1"), 741 (schedulerid,)) 742 return t.fetchall()
743
744 - def scheduler_unsubscribe_buildset(self, schedulerid, buildsetid, t):
745 t.execute(self.quoteq("UPDATE scheduler_upstream_buildsets" 746 " SET active=0" 747 " WHERE buildsetid=? AND schedulerid=?"), 748 (buildsetid, schedulerid))
749 750 # BuildRequest-manipulation methods 751
752 - def getBuildRequestWithNumber(self, brid, t=None):
753 assert isinstance(brid, (int, long)) 754 if t: 755 br = self._txn_getBuildRequestWithNumber(t, brid) 756 else: 757 br = self.runInteractionNow(self._txn_getBuildRequestWithNumber, 758 brid) 759 return br
760 - def _txn_getBuildRequestWithNumber(self, t, brid):
761 assert isinstance(brid, (int, long)) 762 t.execute(self.quoteq("SELECT br.buildsetid, bs.reason," 763 " bs.sourcestampid, br.buildername," 764 " bs.submitted_at, br.priority" 765 " FROM buildrequests AS br, buildsets AS bs" 766 " WHERE br.id=? AND br.buildsetid=bs.id"), 767 (brid,)) 768 r = t.fetchall() 769 if not r: 770 return None 771 (bsid, reason, ssid, builder_name, submitted_at, priority) = r[0] 772 ss = self.getSourceStampNumberedNow(ssid, t) 773 properties = self.get_properties_from_db("buildset_properties", 774 "buildsetid", bsid, t) 775 br = BuildRequest(reason, ss, builder_name, properties) 776 br.submittedAt = submitted_at 777 br.priority = priority 778 br.id = brid 779 br.bsid = bsid 780 return br
781
782 - def get_buildername_for_brid(self, brid):
783 assert isinstance(brid, (int, long)) 784 return self.runInteractionNow(self._txn_get_buildername_for_brid, brid)
785 - def _txn_get_buildername_for_brid(self, t, brid):
786 assert isinstance(brid, (int, long)) 787 t.execute(self.quoteq("SELECT buildername FROM buildrequests" 788 " WHERE id=?"), 789 (brid,)) 790 r = t.fetchall() 791 if not r: 792 return None 793 return r[0][0]
794
795 - def get_unclaimed_buildrequests(self, buildername, old, master_name, 796 master_incarnation, t, limit=None):
797 q = ("SELECT br.id" 798 " FROM buildrequests AS br, buildsets AS bs" 799 " WHERE br.buildername=? AND br.complete=0" 800 " AND br.buildsetid=bs.id" 801 " AND (br.claimed_at<?" 802 " OR (br.claimed_by_name=?" 803 " AND br.claimed_by_incarnation!=?))" 804 " ORDER BY br.priority DESC,bs.submitted_at ASC") 805 if limit: 806 q += " LIMIT %s" % limit 807 t.execute(self.quoteq(q), 808 (buildername, old, master_name, master_incarnation)) 809 requests = [self.getBuildRequestWithNumber(brid, t) 810 for (brid,) in t.fetchall()] 811 return requests
812
813 - def claim_buildrequests(self, now, master_name, master_incarnation, brids, 814 t=None):
815 if not brids: 816 return 817 if t: 818 self._txn_claim_buildrequests(t, now, master_name, 819 master_incarnation, brids) 820 else: 821 self.runInteractionNow(self._txn_claim_buildrequests, 822 now, master_name, master_incarnation, brids)
823 - def _txn_claim_buildrequests(self, t, now, master_name, master_incarnation, 824 brids):
825 q = self.quoteq("UPDATE buildrequests" 826 " SET claimed_at = ?," 827 " claimed_by_name = ?, claimed_by_incarnation = ?" 828 " WHERE id IN " + self.parmlist(len(brids))) 829 qargs = [now, master_name, master_incarnation] + list(brids) 830 t.execute(q, qargs)
831
832 - def build_started(self, brid, buildnumber):
833 return self.runInteractionNow(self._txn_build_started, brid, buildnumber)
834 - def _txn_build_started(self, t, brid, buildnumber):
835 now = self._getCurrentTime() 836 t.execute(self.quoteq("INSERT INTO builds (number, brid, start_time)" 837 " VALUES (?,?,?)"), 838 (buildnumber, brid, now)) 839 bid = t.lastrowid 840 self.notify("add-build", bid) 841 return bid
842
843 - def builds_finished(self, bids):
844 return self.runInteractionNow(self._txn_build_finished, bids)
845 - def _txn_build_finished(self, t, bids):
846 now = self._getCurrentTime() 847 q = self.quoteq("UPDATE builds SET finish_time = ?" 848 " WHERE id IN " + self.parmlist(len(bids))) 849 qargs = [now] + list(bids) 850 t.execute(q, qargs)
851
852 - def get_build_info(self, bid):
853 return self.runInteractionNow(self._txn_get_build_info, bid)
854 - def _txn_get_build_info(self, t, bid):
855 # brid, buildername, buildnum 856 t.execute(self.quoteq("SELECT b.brid,br.buildername,b.number" 857 " FROM builds AS b, buildrequests AS br" 858 " WHERE b.id=? AND b.brid=br.id"), 859 (bid,)) 860 res = t.fetchall() 861 if res: 862 return res[0] 863 return (None,None,None)
864
865 - def get_buildnums_for_brid(self, brid):
866 return self.runInteractionNow(self._txn_get_buildnums_for_brid, brid)
867 - def _txn_get_buildnums_for_brid(self, t, brid):
868 t.execute(self.quoteq("SELECT number FROM builds WHERE brid=?"), 869 (brid,)) 870 return [number for (number,) in t.fetchall()]
871
872 - def resubmit_buildrequests(self, brids):
873 return self.runInteraction(self._txn_resubmit_buildreqs, brids)
874 - def _txn_resubmit_buildreqs(self, t, brids):
875 # the interrupted build that gets resubmitted will still have the 876 # same submitted_at value, so it should be re-started first 877 q = self.quoteq("UPDATE buildrequests" 878 " SET claimed_at=0," 879 " claimed_by_name=NULL, claimed_by_incarnation=NULL" 880 " WHERE id IN " + self.parmlist(len(brids))) 881 t.execute(q, brids) 882 self.notify("add-buildrequest", *brids)
883
884 - def retire_buildrequests(self, brids, results):
885 return self.runInteractionNow(self._txn_retire_buildreqs, brids,results)
886 - def _txn_retire_buildreqs(self, t, brids, results):
887 now = self._getCurrentTime() 888 #q = self.db.quoteq("DELETE FROM buildrequests WHERE id IN " 889 # + self.db.parmlist(len(brids))) 890 q = self.quoteq("UPDATE buildrequests" 891 " SET complete=1, results=?, complete_at=?" 892 " WHERE id IN " + self.parmlist(len(brids))) 893 t.execute(q, [results, now]+brids) 894 # now, does this cause any buildsets to complete? 895 q = self.quoteq("SELECT bs.id" 896 " FROM buildsets AS bs, buildrequests AS br" 897 " WHERE br.buildsetid=bs.id AND bs.complete=0" 898 " AND br.id in " 899 + self.parmlist(len(brids))) 900 t.execute(q, brids) 901 bsids = [bsid for (bsid,) in t.fetchall()] 902 for bsid in bsids: 903 self._check_buildset(t, bsid, now) 904 self.notify("retire-buildrequest", *brids) 905 self.notify("modify-buildset", *bsids)
906
907 - def cancel_buildrequests(self, brids):
908 return self.runInteractionNow(self._txn_cancel_buildrequest, brids)
909 - def _txn_cancel_buildrequest(self, t, brids):
910 # TODO: we aren't entirely sure if it'd be safe to just delete the 911 # buildrequest: what else might be waiting on it that would then just 912 # hang forever?. _check_buildset() should handle it well (an empty 913 # buildset will appear complete and SUCCESS-ful). But we haven't 914 # thought it through enough to be sure. So for now, "cancel" means 915 # "mark as complete and FAILURE". 916 if True: 917 now = self._getCurrentTime() 918 q = self.quoteq("UPDATE buildrequests" 919 " SET complete=1, results=?, complete_at=?" 920 " WHERE id IN " + self.parmlist(len(brids))) 921 t.execute(q, [FAILURE, now]+brids) 922 else: 923 q = self.quoteq("DELETE FROM buildrequests" 924 " WHERE id IN " + self.parmlist(len(brids))) 925 t.execute(q, brids) 926 927 # now, does this cause any buildsets to complete? 928 q = self.quoteq("SELECT bs.id" 929 " FROM buildsets AS bs, buildrequests AS br" 930 " WHERE br.buildsetid=bs.id AND bs.complete=0" 931 " AND br.id in " 932 + self.parmlist(len(brids))) 933 t.execute(q, brids) 934 bsids = [bsid for (bsid,) in t.fetchall()] 935 for bsid in bsids: 936 self._check_buildset(t, bsid, now) 937 938 self.notify("cancel-buildrequest", *brids) 939 self.notify("modify-buildset", *bsids)
940
941 - def _check_buildset(self, t, bsid, now):
942 q = self.quoteq("SELECT br.complete,br.results" 943 " FROM buildsets AS bs, buildrequests AS br" 944 " WHERE bs.complete=0" 945 " AND br.buildsetid=bs.id AND bs.id=?") 946 t.execute(q, (bsid,)) 947 results = t.fetchall() 948 is_complete = True 949 bs_results = SUCCESS 950 for (complete, r) in results: 951 if not complete: 952 # still waiting 953 is_complete = False 954 if r == FAILURE: 955 bs_results = r 956 if is_complete: 957 # they were all successful 958 q = self.quoteq("UPDATE buildsets" 959 " SET complete=1, complete_at=?, results=?" 960 " WHERE id=?") 961 t.execute(q, (now, bs_results, bsid))
962
963 - def get_buildrequestids_for_buildset(self, bsid):
964 return self.runInteractionNow(self._txn_get_buildrequestids_for_buildset, 965 bsid)
966 - def _txn_get_buildrequestids_for_buildset(self, t, bsid):
967 t.execute(self.quoteq("SELECT buildername,id FROM buildrequests" 968 " WHERE buildsetid=?"), 969 (bsid,)) 970 return dict(t.fetchall())
971
972 - def examine_buildset(self, bsid):
973 return self.runInteractionNow(self._txn_examine_buildset, bsid)
974 - def _txn_examine_buildset(self, t, bsid):
975 # "finished" means complete=1 for all builds. Return False until 976 # all builds are complete, then True. 977 # "successful" means complete=1 and results!=FAILURE for all builds. 978 # Returns None until the last success or the first failure. Returns 979 # False if there is at least one failure. Returns True if all are 980 # successful. 981 q = self.quoteq("SELECT br.complete,br.results" 982 " FROM buildsets AS bs, buildrequests AS br" 983 " WHERE br.buildsetid=bs.id AND bs.id=?") 984 t.execute(q, (bsid,)) 985 results = t.fetchall() 986 finished = True 987 successful = None 988 for (c,r) in results: 989 if not c: 990 finished = False 991 if c and r not in (SUCCESS, WARNINGS): 992 successful = False 993 if finished and successful is None: 994 successful = True 995 return (successful, finished)
996
997 - def get_active_buildset_ids(self):
998 return self.runInteractionNow(self._txn_get_active_buildset_ids)
999 - def _txn_get_active_buildset_ids(self, t):
1000 t.execute("SELECT id FROM buildsets WHERE complete=0") 1001 return [bsid for (bsid,) in t.fetchall()]
1002 - def get_buildset_info(self, bsid):
1003 return self.runInteractionNow(self._txn_get_buildset_info, bsid)
1004 - def _txn_get_buildset_info(self, t, bsid):
1005 q = self.quoteq("SELECT external_idstring, reason, sourcestampid," 1006 " complete, results" 1007 " FROM buildsets WHERE id=?") 1008 t.execute(q, (bsid,)) 1009 res = t.fetchall() 1010 if res: 1011 (external, reason, ssid, complete, results) = res[0] 1012 external_idstring = str_or_none(external) 1013 reason = str_or_none(reason) 1014 complete = bool(complete) 1015 return (external_idstring, reason, ssid, complete, results) 1016 return None # shouldn't happen
1017
1018 - def get_pending_brids_for_builder(self, buildername):
1019 return self.runInteractionNow(self._txn_get_pending_brids_for_builder, 1020 buildername)
1021 - def _txn_get_pending_brids_for_builder(self, t, buildername):
1022 # "pending" means unclaimed and incomplete. When a build is returned 1023 # to the pool (self.resubmit_buildrequests), the claimed_at= field is 1024 # reset to zero. 1025 t.execute(self.quoteq("SELECT id FROM buildrequests" 1026 " WHERE buildername=? AND" 1027 " complete=0 AND claimed_at=0"), 1028 (buildername,)) 1029 return [brid for (brid,) in t.fetchall()]
1030 1031 # test/debug methods 1032
1033 - def has_pending_operations(self):
1034 return bool(self._pending_operation_count)
1035 1036 1037 threadable.synchronize(DBConnector) 1038