Package buildbot :: Package db :: Module connector
[frames] | no frames]

Source Code for Module buildbot.db.connector

   1  # ***** BEGIN LICENSE BLOCK ***** 
   2  # Version: MPL 1.1/GPL 2.0/LGPL 2.1 
   3  # 
   4  # The contents of this file are subject to the Mozilla Public License Version 
   5  # 1.1 (the "License"); you may not use this file except in compliance with 
   6  # the License. You may obtain a copy of the License at 
   7  # http://www.mozilla.org/MPL/ 
   8  # 
   9  # Software distributed under the License is distributed on an "AS IS" basis, 
  10  # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License 
  11  # for the specific language governing rights and limitations under the 
  12  # License. 
  13  # 
  14  # The Original Code is Mozilla-specific Buildbot steps. 
  15  # 
  16  # The Initial Developer of the Original Code is 
  17  # Mozilla Foundation. 
  18  # Portions created by the Initial Developer are Copyright (C) 2009 
  19  # the Initial Developer. All Rights Reserved. 
  20  # 
  21  # Contributor(s): 
  22  #   Brian Warner <warner@lothar.com> 
  23  #   Chris AtLee <catlee@mozilla.com> 
  24  #   Dustin Mitchell <dustin@zmanda.com> 
  25  # 
  26  # Alternatively, the contents of this file may be used under the terms of 
  27  # either the GNU General Public License Version 2 or later (the "GPL"), or 
  28  # the GNU Lesser General Public License Version 2.1 or later (the "LGPL"), 
  29  # in which case the provisions of the GPL or the LGPL are applicable instead 
  30  # of those above. If you wish to allow use of your version of this file only 
  31  # under the terms of either the GPL or the LGPL, and not to allow others to 
  32  # use your version of this file under the terms of the MPL, indicate your 
  33  # decision by deleting the provisions above and replace them with the notice 
  34  # and other provisions required by the GPL or the LGPL. If you do not delete 
  35  # the provisions above, a recipient may use your version of this file under 
  36  # the terms of any one of the MPL, the GPL or the LGPL. 
  37  # 
  38  # ***** END LICENSE BLOCK ***** 
  39   
  40  import sys, collections, base64 
  41   
  42  from twisted.python import log, threadable 
  43  from twisted.internet import defer 
  44  from twisted.enterprise import adbapi 
  45  from buildbot import util 
  46  from buildbot.util import collections as bbcollections 
  47  from buildbot.changes.changes import Change 
  48  from buildbot.sourcestamp import SourceStamp 
  49  from buildbot.buildrequest import BuildRequest 
  50  from buildbot.process.properties import Properties 
  51  from buildbot.status.builder import SUCCESS, WARNINGS, FAILURE 
  52  from buildbot.util.eventual import eventually 
  53  from buildbot.util import json 
  54   
  55  # Don't auto-resubmit queries that encounter a broken connection: let them 
  56  # fail. Use the "notification doorbell" thing to provide the retry. Set 
  57  # cp_reconnect=True, so that a connection failure will prepare the 
  58  # ConnectionPool to reconnect next time. 
  59   
60 -class MyTransaction(adbapi.Transaction):
61 - def execute(self, *args, **kwargs):
62 #print "Q", args, kwargs 63 return self._cursor.execute(*args, **kwargs)
64 - def fetchall(self):
65 rc = self._cursor.fetchall() 66 #print " F", rc 67 return rc
68
69 -def _one_or_else(res, default=None, process_f=lambda x: x):
70 if not res: 71 return default 72 return process_f(res[0][0])
73
74 -def str_or_none(s):
75 if s is None: 76 return None 77 return str(s)
78
79 -class Token: # used for _start_operation/_end_operation
80 pass 81
82 -class DBConnector(util.ComparableMixin):
83 # this will refuse to create the database: use 'create-master' for that 84 compare_attrs = ["args", "kwargs"] 85 synchronized = ["notify", "_end_operation"] 86 MAX_QUERY_TIMES = 1000 87
88 - def __init__(self, spec):
89 # typical args = (dbmodule, dbname, username, password) 90 self._query_times = collections.deque() 91 self._spec = spec 92 93 # this is for synchronous calls: runQueryNow, runInteractionNow 94 self._dbapi = spec.get_dbapi() 95 self._nonpool = None 96 self._nonpool_lastused = None 97 self._nonpool_max_idle = spec.get_maxidle() 98 99 # pass queries in with "?" placeholders. If the backend uses a 100 # different style, we'll replace them. 101 self.paramstyle = self._dbapi.paramstyle 102 103 self._pool = spec.get_async_connection_pool() 104 self._pool.transactionFactory = MyTransaction 105 # the pool must be started before it can be used. The real 106 # buildmaster process will do this at reactor start. CLI tools (like 107 # "buildbot upgrade-master") must do it manually. Unit tests are run 108 # in an environment in which it is already started. 109 110 self._change_cache = util.LRUCache() 111 self._sourcestamp_cache = util.LRUCache() 112 self._active_operations = set() # protected by synchronized= 113 self._pending_notifications = [] 114 self._subscribers = bbcollections.defaultdict(set) 115 116 self._pending_operation_count = 0 117 118 self._started = False
119
120 - def _getCurrentTime(self):
121 # this is a seam for use in testing 122 return util.now()
123
124 - def start(self):
125 # this only *needs* to be called in reactorless environments (which 126 # should be eliminated anyway). but it doesn't hurt anyway 127 self._pool.start() 128 self._started = True
129
130 - def stop(self):
131 """Call this when you're done with me""" 132 133 # Close our synchronous connection if we've got one 134 if self._nonpool: 135 self._nonpool.close() 136 self._nonpool = None 137 self._nonpool_lastused = None 138 139 if not self._started: 140 return 141 self._pool.close() 142 self._started = False 143 del self._pool
144
145 - def quoteq(self, query):
146 """ 147 Given a query that contains qmark-style placeholders, like:: 148 INSERT INTO foo (col1, col2) VALUES (?,?) 149 replace the '?' with '%s' if the backend uses format-style 150 placeholders, like:: 151 INSERT INTO foo (col1, col2) VALUES (%s,%s) 152 """ 153 if self.paramstyle == "format": 154 return query.replace("?","%s") 155 assert self.paramstyle == "qmark" 156 return query
157
158 - def parmlist(self, count):
159 """ 160 When passing long lists of values to e.g., an INSERT query, it is 161 tedious to pass long strings of ? placeholders. This function will 162 create a parenthesis-enclosed list of COUNT placeholders. Note that 163 the placeholders have already had quoteq() applied. 164 """ 165 p = self.quoteq("?") 166 return "(" + ",".join([p]*count) + ")"
167
168 - def get_version(self):
169 """Returns None for an empty database, or a number (probably 1) for 170 the database's version""" 171 try: 172 res = self.runQueryNow("SELECT version FROM version") 173 except (self._dbapi.OperationalError, self._dbapi.ProgrammingError): 174 # this means the version table is missing: the db is empty 175 return None 176 assert len(res) == 1 177 return res[0][0]
178
179 - def runQueryNow(self, *args, **kwargs):
180 # synchronous+blocking version of runQuery() 181 assert self._started 182 return self.runInteractionNow(self._runQuery, *args, **kwargs)
183
184 - def _runQuery(self, c, *args, **kwargs):
185 c.execute(*args, **kwargs) 186 return c.fetchall()
187
188 - def _start_operation(self):
189 t = Token() 190 self._active_operations.add(t) 191 return t
192 - def _end_operation(self, t):
193 # this is always invoked from the main thread, but is wrapped by 194 # synchronized= and threadable.synchronous(), since it touches 195 # self._pending_notifications, which is also touched by 196 # runInteraction threads 197 self._active_operations.discard(t) 198 if self._active_operations: 199 return 200 for (category, args) in self._pending_notifications: 201 # in the distributed system, this will be a 202 # transport.write(" ".join([category] + [str(a) for a in args])) 203 eventually(self.send_notification, category, args) 204 self._pending_notifications = []
205
206 - def runInteractionNow(self, interaction, *args, **kwargs):
207 # synchronous+blocking version of runInteraction() 208 assert self._started 209 start = self._getCurrentTime() 210 t = self._start_operation() 211 try: 212 return self._runInteractionNow(interaction, *args, **kwargs) 213 finally: 214 self._end_operation(t) 215 self._add_query_time(start)
216
217 - def get_sync_connection(self):
218 # This is a wrapper around spec.get_sync_connection that maintains a 219 # single connection to the database for synchronous usage. It will get 220 # a new connection if the existing one has been idle for more than 221 # max_idle seconds. 222 if self._nonpool_max_idle is not None: 223 now = util.now() 224 if self._nonpool_lastused and self._nonpool_lastused + self._nonpool_max_idle < now: 225 self._nonpool = None 226 227 if not self._nonpool: 228 self._nonpool = self._spec.get_sync_connection() 229 230 self._nonpool_lastused = util.now() 231 return self._nonpool
232
233 - def _runInteractionNow(self, interaction, *args, **kwargs):
234 conn = self.get_sync_connection() 235 c = conn.cursor() 236 try: 237 result = interaction(c, *args, **kwargs) 238 c.close() 239 conn.commit() 240 return result 241 except: 242 excType, excValue, excTraceback = sys.exc_info() 243 try: 244 conn.rollback() 245 c2 = conn.cursor() 246 c2.execute(self._pool.good_sql) 247 c2.close() 248 conn.commit() 249 except: 250 log.msg("rollback failed, will reconnect next query") 251 log.err() 252 # and the connection is probably dead: clear the reference, 253 # so we'll establish a new connection next time 254 self._nonpool = None 255 raise excType, excValue, excTraceback
256
257 - def notify(self, category, *args):
258 # this is wrapped by synchronized= and threadable.synchronous(), 259 # since it will be invoked from runInteraction threads 260 self._pending_notifications.append( (category,args) )
261
262 - def send_notification(self, category, args):
263 # in the distributed system, this will be invoked by lineReceived() 264 #print "SEND", category, args 265 for observer in self._subscribers[category]: 266 eventually(observer, category, *args)
267
268 - def subscribe_to(self, category, observer):
269 self._subscribers[category].add(observer)
270
271 - def runQuery(self, *args, **kwargs):
272 assert self._started 273 self._pending_operation_count += 1 274 start = self._getCurrentTime() 275 #t = self._start_operation() # why is this commented out? -warner 276 d = self._pool.runQuery(*args, **kwargs) 277 #d.addBoth(self._runQuery_done, start, t) 278 return d
279 - def _runQuery_done(self, res, start, t):
280 self._end_operation(t) 281 self._add_query_time(start) 282 self._pending_operation_count -= 1 283 return res
284
285 - def _add_query_time(self, start):
286 elapsed = self._getCurrentTime() - start 287 self._query_times.append(elapsed) 288 if len(self._query_times) > self.MAX_QUERY_TIMES: 289 self._query_times.popleft()
290
291 - def runInteraction(self, *args, **kwargs):
292 assert self._started 293 self._pending_operation_count += 1 294 start = self._getCurrentTime() 295 t = self._start_operation() 296 d = self._pool.runInteraction(*args, **kwargs) 297 d.addBoth(self._runInteraction_done, start, t) 298 return d
299 - def _runInteraction_done(self, res, start, t):
300 self._end_operation(t) 301 self._add_query_time(start) 302 self._pending_operation_count -= 1 303 return res
304 305 # ChangeManager methods 306
307 - def addChangeToDatabase(self, change):
308 self.runInteractionNow(self._txn_addChangeToDatabase, change) 309 self._change_cache.add(change.number, change)
310
311 - def _txn_addChangeToDatabase(self, t, change):
312 q = self.quoteq("INSERT INTO changes" 313 " (author," 314 " comments, is_dir," 315 " branch, revision, revlink," 316 " when_timestamp, category," 317 " repository, project)" 318 " VALUES (?, ?,?, ?,?,?, ?,?, ?,?)") 319 # TODO: map None to.. empty string? 320 321 values = (change.who, 322 change.comments, change.isdir, 323 change.branch, change.revision, change.revlink, 324 change.when, change.category, change.repository, 325 change.project) 326 t.execute(q, values) 327 change.number = t.lastrowid 328 329 for link in change.links: 330 t.execute(self.quoteq("INSERT INTO change_links (changeid, link) " 331 "VALUES (?,?)"), 332 (change.number, link)) 333 for filename in change.files: 334 t.execute(self.quoteq("INSERT INTO change_files (changeid,filename)" 335 " VALUES (?,?)"), 336 (change.number, filename)) 337 for propname,propvalue in change.properties.properties.items(): 338 encoded_value = json.dumps(propvalue) 339 t.execute(self.quoteq("INSERT INTO change_properties" 340 " (changeid, property_name, property_value)" 341 " VALUES (?,?,?)"), 342 (change.number, propname, encoded_value)) 343 self.notify("add-change", change.number)
344
345 - def changeEventGenerator(self, branches=[], categories=[], committers=[], minTime=0):
346 q = "SELECT changeid FROM changes" 347 args = [] 348 if branches or categories or committers: 349 q += " WHERE " 350 pieces = [] 351 if branches: 352 pieces.append("branch IN %s" % self.parmlist(len(branches))) 353 args.extend(list(branches)) 354 if categories: 355 pieces.append("category IN %s" % self.parmlist(len(categories))) 356 args.extend(list(categories)) 357 if committers: 358 pieces.append("author IN %s" % self.parmlist(len(committers))) 359 args.extend(list(committers)) 360 if minTime: 361 pieces.append("when_timestamp > %d" % minTime) 362 q += " AND ".join(pieces) 363 q += " ORDER BY changeid DESC" 364 rows = self.runQueryNow(q, tuple(args)) 365 for (changeid,) in rows: 366 yield self.getChangeNumberedNow(changeid)
367
368 - def getLatestChangeNumberNow(self, t=None):
369 if t: 370 return self._txn_getLatestChangeNumber(t) 371 else: 372 return self.runInteractionNow(self._txn_getLatestChangeNumber)
373 - def _txn_getLatestChangeNumber(self, t):
374 q = self.quoteq("SELECT max(changeid) from changes") 375 t.execute(q) 376 row = t.fetchone() 377 if not row: 378 return 0 379 return row[0]
380
381 - def getChangeNumberedNow(self, changeid, t=None):
382 # this is a synchronous/blocking version of getChangeByNumber 383 assert changeid >= 0 384 c = self._change_cache.get(changeid) 385 if c: 386 return c 387 if t: 388 c = self._txn_getChangeNumberedNow(t, changeid) 389 else: 390 c = self.runInteractionNow(self._txn_getChangeNumberedNow, changeid) 391 self._change_cache.add(changeid, c) 392 return c
393 - def _txn_getChangeNumberedNow(self, t, changeid):
394 q = self.quoteq("SELECT author, comments," 395 " is_dir, branch, revision, revlink," 396 " when_timestamp, category," 397 " repository, project" 398 " FROM changes WHERE changeid = ?") 399 t.execute(q, (changeid,)) 400 rows = t.fetchall() 401 if not rows: 402 return None 403 (who, comments, 404 isdir, branch, revision, revlink, 405 when, category, repository, project) = rows[0] 406 branch = str_or_none(branch) 407 revision = str_or_none(revision) 408 q = self.quoteq("SELECT link FROM change_links WHERE changeid=?") 409 t.execute(q, (changeid,)) 410 rows = t.fetchall() 411 links = [row[0] for row in rows] 412 links.sort() 413 414 q = self.quoteq("SELECT filename FROM change_files WHERE changeid=?") 415 t.execute(q, (changeid,)) 416 rows = t.fetchall() 417 files = [row[0] for row in rows] 418 files.sort() 419 420 p = self.get_properties_from_db("change_properties", "changeid", 421 changeid, t) 422 c = Change(who=who, files=files, comments=comments, isdir=isdir, 423 links=links, revision=revision, when=when, 424 branch=branch, category=category, revlink=revlink, 425 repository=repository, project=project) 426 c.properties.updateFromProperties(p) 427 c.number = changeid 428 return c
429
430 - def getChangeByNumber(self, changeid):
431 # return a Deferred that fires with a Change instance, or None if 432 # there is no Change with that number 433 assert changeid >= 0 434 c = self._change_cache.get(changeid) 435 if c: 436 return defer.succeed(c) 437 d1 = self.runQuery(self.quoteq("SELECT author, comments," 438 " is_dir, branch, revision, revlink," 439 " when_timestamp, category," 440 " repository, project" 441 " FROM changes WHERE changeid = ?"), 442 (changeid,)) 443 d2 = self.runQuery(self.quoteq("SELECT link FROM change_links" 444 " WHERE changeid=?"), 445 (changeid,)) 446 d3 = self.runQuery(self.quoteq("SELECT filename FROM change_files" 447 " WHERE changeid=?"), 448 (changeid,)) 449 d4 = self.runInteraction(self._txn_get_properties_from_db, 450 "change_properties", "changeid", changeid) 451 d = defer.gatherResults([d1,d2,d3,d4]) 452 d.addCallback(self._getChangeByNumber_query_done, changeid) 453 return d
454
455 - def _getChangeByNumber_query_done(self, res, changeid):
456 (rows, link_rows, file_rows, properties) = res 457 if not rows: 458 return None 459 (who, comments, 460 isdir, branch, revision, revlink, 461 when, category, repository, project) = rows[0] 462 branch = str_or_none(branch) 463 revision = str_or_none(revision) 464 links = [row[0] for row in link_rows] 465 links.sort() 466 files = [row[0] for row in file_rows] 467 files.sort() 468 469 c = Change(who=who, files=files, comments=comments, isdir=isdir, 470 links=links, revision=revision, when=when, 471 branch=branch, category=category, revlink=revlink, 472 repository=repository, project=project) 473 c.properties.updateFromProperties(properties) 474 c.number = changeid 475 self._change_cache.add(changeid, c) 476 return c
477
478 - def getChangesGreaterThan(self, last_changeid, t=None):
479 """Return a Deferred that fires with a list of all Change instances 480 with numbers greater than the given value, sorted by number. This is 481 useful for catching up with everything that's happened since you last 482 called this function.""" 483 assert last_changeid >= 0 484 if t: 485 return self._txn_getChangesGreaterThan(t, last_changeid) 486 else: 487 return self.runInteractionNow(self._txn_getChangesGreaterThan, 488 last_changeid)
489 - def _txn_getChangesGreaterThan(self, t, last_changeid):
490 q = self.quoteq("SELECT changeid FROM changes WHERE changeid > ?") 491 t.execute(q, (last_changeid,)) 492 changes = [self.getChangeNumberedNow(changeid, t) 493 for (changeid,) in t.fetchall()] 494 changes.sort(key=lambda c: c.number) 495 return changes
496
497 - def getChangesByNumber(self, changeids):
498 return defer.gatherResults([self.getChangeByNumber(changeid) 499 for changeid in changeids])
500 501 # SourceStamp-manipulating methods 502
503 - def getSourceStampNumberedNow(self, ssid, t=None):
504 assert isinstance(ssid, (int, long)) 505 ss = self._sourcestamp_cache.get(ssid) 506 if ss: 507 return ss 508 if t: 509 ss = self._txn_getSourceStampNumbered(t, ssid) 510 else: 511 ss = self.runInteractionNow(self._txn_getSourceStampNumbered, 512 ssid) 513 self._sourcestamp_cache.add(ssid, ss) 514 return ss
515
516 - def _txn_getSourceStampNumbered(self, t, ssid):
517 assert isinstance(ssid, (int, long)) 518 t.execute(self.quoteq("SELECT branch,revision,patchid,project,repository" 519 " FROM sourcestamps WHERE id=?"), 520 (ssid,)) 521 r = t.fetchall() 522 if not r: 523 return None 524 (branch_u, revision_u, patchid, project, repository) = r[0] 525 branch = str_or_none(branch_u) 526 revision = str_or_none(revision_u) 527 528 patch = None 529 if patchid is not None: 530 t.execute(self.quoteq("SELECT patchlevel,patch_base64,subdir" 531 " FROM patches WHERE id=?"), 532 (patchid,)) 533 r = t.fetchall() 534 assert len(r) == 1 535 (patch_level, patch_text_base64, subdir_u) = r[0] 536 patch_text = base64.b64decode(patch_text_base64) 537 if subdir_u: 538 patch = (patch_level, patch_text, str(subdir_u)) 539 else: 540 patch = (patch_level, patch_text) 541 542 t.execute(self.quoteq("SELECT changeid FROM sourcestamp_changes" 543 " WHERE sourcestampid=?" 544 " ORDER BY changeid ASC"), 545 (ssid,)) 546 r = t.fetchall() 547 changes = None 548 if r: 549 changes = [self.getChangeNumberedNow(changeid, t) 550 for (changeid,) in r] 551 ss = SourceStamp(branch, revision, patch, changes, project=project, repository=repository) 552 ss.ssid = ssid 553 return ss
554 555 # Properties methods 556
557 - def get_properties_from_db(self, tablename, idname, id, t=None):
558 if t: 559 return self._txn_get_properties_from_db(t, tablename, idname, id) 560 else: 561 return self.runInteractionNow(self._txn_get_properties_from_db, 562 tablename, idname, id)
563
564 - def _txn_get_properties_from_db(self, t, tablename, idname, id):
565 # apparently you can't use argument placeholders for table names. Don't 566 # call this with a weird-looking tablename. 567 q = self.quoteq("SELECT property_name,property_value FROM %s WHERE %s=?" 568 % (tablename, idname)) 569 t.execute(q, (id,)) 570 retval = Properties() 571 for key, valuepair in t.fetchall(): 572 value, source = json.loads(valuepair) 573 retval.setProperty(str(key), value, source) 574 return retval
575 576 # Scheduler manipulation methods 577
578 - def addSchedulers(self, added):
579 return self.runInteraction(self._addSchedulers, added)
580 - def _addSchedulers(self, t, added):
581 for scheduler in added: 582 name = scheduler.name 583 assert name 584 class_name = "%s.%s" % (scheduler.__class__.__module__, 585 scheduler.__class__.__name__) 586 q = self.quoteq(""" 587 SELECT schedulerid, class_name FROM schedulers WHERE 588 name=? AND 589 (class_name=? OR class_name='') 590 """) 591 t.execute(q, (name, class_name)) 592 row = t.fetchone() 593 if row: 594 sid, db_class_name = row 595 if db_class_name == '': 596 # We're updating from an old schema where the class name 597 # wasn't stored. 598 # Update this row's class name and move on 599 q = self.quoteq("""UPDATE schedulers SET class_name=? 600 WHERE schedulerid=?""") 601 t.execute(q, (class_name, sid)) 602 elif db_class_name != class_name: 603 # A different scheduler is being used with this name. 604 # Ignore the old scheduler and create a new one 605 sid = None 606 else: 607 sid = None 608 609 if sid is None: 610 # create a new row, with the latest changeid (so it won't try 611 # to process all of the old changes) new Schedulers are 612 # supposed to ignore pre-existing Changes 613 q = ("SELECT changeid FROM changes" 614 " ORDER BY changeid DESC LIMIT 1") 615 t.execute(q) 616 max_changeid = _one_or_else(t.fetchall(), 0) 617 state = scheduler.get_initial_state(max_changeid) 618 state_json = json.dumps(state) 619 q = self.quoteq("INSERT INTO schedulers" 620 " (name, class_name, state)" 621 " VALUES (?,?,?)") 622 t.execute(q, (name, class_name, state_json)) 623 sid = t.lastrowid 624 log.msg("scheduler '%s' got id %d" % (scheduler.name, sid)) 625 scheduler.schedulerid = sid
626
627 - def scheduler_get_state(self, schedulerid, t):
628 q = self.quoteq("SELECT state FROM schedulers WHERE schedulerid=?") 629 t.execute(q, (schedulerid,)) 630 state_json = _one_or_else(t.fetchall()) 631 assert state_json is not None 632 return json.loads(state_json)
633
634 - def scheduler_set_state(self, schedulerid, t, state):
635 state_json = json.dumps(state) 636 q = self.quoteq("UPDATE schedulers SET state=? WHERE schedulerid=?") 637 t.execute(q, (state_json, schedulerid))
638
639 - def get_sourcestampid(self, ss, t):
640 """Given a SourceStamp (which may or may not have an ssid), make sure 641 the contents are in the database, and return the ssid. If the 642 SourceStamp originally came from the DB (and thus already has an 643 ssid), just return the ssid. If not, create a new row for it.""" 644 if ss.ssid is not None: 645 return ss.ssid 646 patchid = None 647 if ss.patch: 648 patchlevel = ss.patch[0] 649 diff = ss.patch[1] 650 subdir = None 651 if len(ss.patch) > 2: 652 subdir = ss.patch[2] 653 q = self.quoteq("INSERT INTO patches" 654 " (patchlevel, patch_base64, subdir)" 655 " VALUES (?,?,?)") 656 t.execute(q, (patchlevel, base64.b64encode(diff), subdir)) 657 patchid = t.lastrowid 658 t.execute(self.quoteq("INSERT INTO sourcestamps" 659 " (branch, revision, patchid, project, repository)" 660 " VALUES (?,?,?,?,?)"), 661 (ss.branch, ss.revision, patchid, ss.project, ss.repository)) 662 ss.ssid = t.lastrowid 663 q2 = self.quoteq("INSERT INTO sourcestamp_changes" 664 " (sourcestampid, changeid) VALUES (?,?)") 665 for c in ss.changes: 666 t.execute(q2, (ss.ssid, c.number)) 667 return ss.ssid
668
669 - def create_buildset(self, ssid, reason, properties, builderNames, t, 670 external_idstring=None):
671 # this creates both the BuildSet and the associated BuildRequests 672 now = self._getCurrentTime() 673 t.execute(self.quoteq("INSERT INTO buildsets" 674 " (external_idstring, reason," 675 " sourcestampid, submitted_at)" 676 " VALUES (?,?,?,?)"), 677 (external_idstring, reason, ssid, now)) 678 bsid = t.lastrowid 679 for propname, propvalue in properties.properties.items(): 680 encoded_value = json.dumps(propvalue) 681 t.execute(self.quoteq("INSERT INTO buildset_properties" 682 " (buildsetid, property_name, property_value)" 683 " VALUES (?,?,?)"), 684 (bsid, propname, encoded_value)) 685 brids = [] 686 for bn in builderNames: 687 t.execute(self.quoteq("INSERT INTO buildrequests" 688 " (buildsetid, buildername, submitted_at)" 689 " VALUES (?,?,?)"), 690 (bsid, bn, now)) 691 brid = t.lastrowid 692 brids.append(brid) 693 self.notify("add-buildset", bsid) 694 self.notify("add-buildrequest", *brids) 695 return bsid
696
697 - def scheduler_classify_change(self, schedulerid, number, important, t):
698 q = self.quoteq("INSERT INTO scheduler_changes" 699 " (schedulerid, changeid, important)" 700 " VALUES (?,?,?)") 701 t.execute(q, (schedulerid, number, bool(important)))
702
703 - def scheduler_get_classified_changes(self, schedulerid, t):
704 q = self.quoteq("SELECT changeid, important" 705 " FROM scheduler_changes" 706 " WHERE schedulerid=?") 707 t.execute(q, (schedulerid,)) 708 important = [] 709 unimportant = [] 710 for (changeid, is_important) in t.fetchall(): 711 c = self.getChangeNumberedNow(changeid, t) 712 if is_important: 713 important.append(c) 714 else: 715 unimportant.append(c) 716 return (important, unimportant)
717
718 - def scheduler_retire_changes(self, schedulerid, changeids, t):
719 t.execute(self.quoteq("DELETE FROM scheduler_changes" 720 " WHERE schedulerid=? AND changeid IN ") 721 + self.parmlist(len(changeids)), 722 (schedulerid,) + tuple(changeids))
723
724 - def scheduler_subscribe_to_buildset(self, schedulerid, bsid, t):
725 # scheduler_get_subscribed_buildsets(schedulerid) will return 726 # information about all buildsets that were subscribed this way 727 t.execute(self.quoteq("INSERT INTO scheduler_upstream_buildsets" 728 " (buildsetid, schedulerid, active)" 729 " VALUES (?,?,?)"), 730 (bsid, schedulerid, 1))
731
732 - def scheduler_get_subscribed_buildsets(self, schedulerid, t):
733 # returns list of (bsid, ssid, complete, results) pairs 734 t.execute(self.quoteq("SELECT bs.id, " 735 " bs.sourcestampid, bs.complete, bs.results" 736 " FROM scheduler_upstream_buildsets AS s," 737 " buildsets AS bs" 738 " WHERE s.buildsetid=bs.id" 739 " AND s.schedulerid=?" 740 " AND s.active=1"), 741 (schedulerid,)) 742 return t.fetchall()
743
744 - def scheduler_unsubscribe_buildset(self, schedulerid, buildsetid, t):
745 t.execute(self.quoteq("UPDATE scheduler_upstream_buildsets" 746 " SET active=0" 747 " WHERE buildsetid=? AND schedulerid=?"), 748 (buildsetid, schedulerid))
749 750 # BuildRequest-manipulation methods 751
752 - def getBuildRequestWithNumber(self, brid, t=None):
753 assert isinstance(brid, (int, long)) 754 if t: 755 br = self._txn_getBuildRequestWithNumber(t, brid) 756 else: 757 br = self.runInteractionNow(self._txn_getBuildRequestWithNumber, 758 brid) 759 return br
760 - def _txn_getBuildRequestWithNumber(self, t, brid):
761 assert isinstance(brid, (int, long)) 762 t.execute(self.quoteq("SELECT br.buildsetid, bs.reason," 763 " bs.sourcestampid, br.buildername," 764 " bs.submitted_at, br.priority" 765 " FROM buildrequests AS br, buildsets AS bs" 766 " WHERE br.id=? AND br.buildsetid=bs.id"), 767 (brid,)) 768 r = t.fetchall() 769 if not r: 770 return None 771 (bsid, reason, ssid, builder_name, submitted_at, priority) = r[0] 772 ss = self.getSourceStampNumberedNow(ssid, t) 773 properties = self.get_properties_from_db("buildset_properties", 774 "buildsetid", bsid, t) 775 br = BuildRequest(reason, ss, builder_name, properties) 776 br.submittedAt = submitted_at 777 br.priority = priority 778 br.id = brid 779 br.bsid = bsid 780 return br
781
782 - def get_buildername_for_brid(self, brid):
783 assert isinstance(brid, (int, long)) 784 return self.runInteractionNow(self._txn_get_buildername_for_brid, brid)
785 - def _txn_get_buildername_for_brid(self, t, brid):
786 assert isinstance(brid, (int, long)) 787 t.execute(self.quoteq("SELECT buildername FROM buildrequests" 788 " WHERE id=?"), 789 (brid,)) 790 r = t.fetchall() 791 if not r: 792 return None 793 return r[0][0]
794
795 - def get_unclaimed_buildrequests(self, buildername, old, master_name, 796 master_incarnation, t):
797 t.execute(self.quoteq("SELECT br.id" 798 " FROM buildrequests AS br, buildsets AS bs" 799 " WHERE br.buildername=? AND br.complete=0" 800 " AND br.buildsetid=bs.id" 801 " AND (br.claimed_at<?" 802 " OR (br.claimed_by_name=?" 803 " AND br.claimed_by_incarnation!=?))" 804 " ORDER BY br.priority DESC,bs.submitted_at ASC"), 805 (buildername, old, master_name, master_incarnation)) 806 requests = [self.getBuildRequestWithNumber(brid, t) 807 for (brid,) in t.fetchall()] 808 return requests
809
810 - def claim_buildrequests(self, now, master_name, master_incarnation, brids, 811 t=None):
812 if not brids: 813 return 814 if t: 815 self._txn_claim_buildrequests(t, now, master_name, 816 master_incarnation, brids) 817 else: 818 self.runInteractionNow(self._txn_claim_buildrequests, 819 now, master_name, master_incarnation, brids)
820 - def _txn_claim_buildrequests(self, t, now, master_name, master_incarnation, 821 brids):
822 q = self.quoteq("UPDATE buildrequests" 823 " SET claimed_at = ?," 824 " claimed_by_name = ?, claimed_by_incarnation = ?" 825 " WHERE id IN " + self.parmlist(len(brids))) 826 qargs = [now, master_name, master_incarnation] + list(brids) 827 t.execute(q, qargs)
828
829 - def build_started(self, brid, buildnumber):
830 return self.runInteractionNow(self._txn_build_started, brid, buildnumber)
831 - def _txn_build_started(self, t, brid, buildnumber):
832 now = self._getCurrentTime() 833 t.execute(self.quoteq("INSERT INTO builds (number, brid, start_time)" 834 " VALUES (?,?,?)"), 835 (buildnumber, brid, now)) 836 bid = t.lastrowid 837 self.notify("add-build", bid) 838 return bid
839
840 - def builds_finished(self, bids):
841 return self.runInteractionNow(self._txn_build_finished, bids)
842 - def _txn_build_finished(self, t, bids):
843 now = self._getCurrentTime() 844 q = self.quoteq("UPDATE builds SET finish_time = ?" 845 " WHERE id IN " + self.parmlist(len(bids))) 846 qargs = [now] + list(bids) 847 t.execute(q, qargs)
848
849 - def get_build_info(self, bid):
850 return self.runInteractionNow(self._txn_get_build_info, bid)
851 - def _txn_get_build_info(self, t, bid):
852 # brid, buildername, buildnum 853 t.execute(self.quoteq("SELECT b.brid,br.buildername,b.number" 854 " FROM builds AS b, buildrequests AS br" 855 " WHERE b.id=? AND b.brid=br.id"), 856 (bid,)) 857 res = t.fetchall() 858 if res: 859 return res[0] 860 return (None,None,None)
861
862 - def get_buildnums_for_brid(self, brid):
863 return self.runInteractionNow(self._txn_get_buildnums_for_brid, brid)
864 - def _txn_get_buildnums_for_brid(self, t, brid):
865 t.execute(self.quoteq("SELECT number FROM builds WHERE brid=?"), 866 (brid,)) 867 return [number for (number,) in t.fetchall()]
868
869 - def resubmit_buildrequests(self, brids):
870 return self.runInteraction(self._txn_resubmit_buildreqs, brids)
871 - def _txn_resubmit_buildreqs(self, t, brids):
872 # the interrupted build that gets resubmitted will still have the 873 # same submitted_at value, so it should be re-started first 874 q = self.quoteq("UPDATE buildrequests" 875 " SET claimed_at=0," 876 " claimed_by_name=NULL, claimed_by_incarnation=NULL" 877 " WHERE id IN " + self.parmlist(len(brids))) 878 t.execute(q, brids) 879 self.notify("add-buildrequest", *brids)
880
881 - def retire_buildrequests(self, brids, results):
882 return self.runInteractionNow(self._txn_retire_buildreqs, brids,results)
883 - def _txn_retire_buildreqs(self, t, brids, results):
884 now = self._getCurrentTime() 885 #q = self.db.quoteq("DELETE FROM buildrequests WHERE id IN " 886 # + self.db.parmlist(len(brids))) 887 q = self.quoteq("UPDATE buildrequests" 888 " SET complete=1, results=?, complete_at=?" 889 " WHERE id IN " + self.parmlist(len(brids))) 890 t.execute(q, [results, now]+brids) 891 # now, does this cause any buildsets to complete? 892 q = self.quoteq("SELECT bs.id" 893 " FROM buildsets AS bs, buildrequests AS br" 894 " WHERE br.buildsetid=bs.id AND bs.complete=0" 895 " AND br.id in " 896 + self.parmlist(len(brids))) 897 t.execute(q, brids) 898 bsids = [bsid for (bsid,) in t.fetchall()] 899 for bsid in bsids: 900 self._check_buildset(t, bsid, now) 901 self.notify("retire-buildrequest", *brids) 902 self.notify("modify-buildset", *bsids)
903
904 - def cancel_buildrequests(self, brids):
905 return self.runInteractionNow(self._txn_cancel_buildrequest, brids)
906 - def _txn_cancel_buildrequest(self, t, brids):
907 # TODO: we aren't entirely sure if it'd be safe to just delete the 908 # buildrequest: what else might be waiting on it that would then just 909 # hang forever?. _check_buildset() should handle it well (an empty 910 # buildset will appear complete and SUCCESS-ful). But we haven't 911 # thought it through enough to be sure. So for now, "cancel" means 912 # "mark as complete and FAILURE". 913 if True: 914 now = self._getCurrentTime() 915 q = self.quoteq("UPDATE buildrequests" 916 " SET complete=1, results=?, complete_at=?" 917 " WHERE id IN " + self.parmlist(len(brids))) 918 t.execute(q, [FAILURE, now]+brids) 919 else: 920 q = self.quoteq("DELETE FROM buildrequests" 921 " WHERE id IN " + self.parmlist(len(brids))) 922 t.execute(q, brids) 923 924 # now, does this cause any buildsets to complete? 925 q = self.quoteq("SELECT bs.id" 926 " FROM buildsets AS bs, buildrequests AS br" 927 " WHERE br.buildsetid=bs.id AND bs.complete=0" 928 " AND br.id in " 929 + self.parmlist(len(brids))) 930 t.execute(q, brids) 931 bsids = [bsid for (bsid,) in t.fetchall()] 932 for bsid in bsids: 933 self._check_buildset(t, bsid, now) 934 935 self.notify("cancel-buildrequest", *brids) 936 self.notify("modify-buildset", *bsids)
937
938 - def _check_buildset(self, t, bsid, now):
939 q = self.quoteq("SELECT br.complete,br.results" 940 " FROM buildsets AS bs, buildrequests AS br" 941 " WHERE bs.complete=0" 942 " AND br.buildsetid=bs.id AND bs.id=?") 943 t.execute(q, (bsid,)) 944 results = t.fetchall() 945 is_complete = True 946 bs_results = SUCCESS 947 for (complete, r) in results: 948 if not complete: 949 # still waiting 950 is_complete = False 951 if r == FAILURE: 952 bs_results = r 953 if is_complete: 954 # they were all successful 955 q = self.quoteq("UPDATE buildsets" 956 " SET complete=1, complete_at=?, results=?" 957 " WHERE id=?") 958 t.execute(q, (now, bs_results, bsid))
959
960 - def get_buildrequestids_for_buildset(self, bsid):
961 return self.runInteractionNow(self._txn_get_buildrequestids_for_buildset, 962 bsid)
963 - def _txn_get_buildrequestids_for_buildset(self, t, bsid):
964 t.execute(self.quoteq("SELECT buildername,id FROM buildrequests" 965 " WHERE buildsetid=?"), 966 (bsid,)) 967 return dict(t.fetchall())
968
969 - def examine_buildset(self, bsid):
970 return self.runInteractionNow(self._txn_examine_buildset, bsid)
971 - def _txn_examine_buildset(self, t, bsid):
972 # "finished" means complete=1 for all builds. Return False until 973 # all builds are complete, then True. 974 # "successful" means complete=1 and results!=FAILURE for all builds. 975 # Returns None until the last success or the first failure. Returns 976 # False if there is at least one failure. Returns True if all are 977 # successful. 978 q = self.quoteq("SELECT br.complete,br.results" 979 " FROM buildsets AS bs, buildrequests AS br" 980 " WHERE br.buildsetid=bs.id AND bs.id=?") 981 t.execute(q, (bsid,)) 982 results = t.fetchall() 983 finished = True 984 successful = None 985 for (c,r) in results: 986 if not c: 987 finished = False 988 if c and r not in (SUCCESS, WARNINGS): 989 successful = False 990 if finished and successful is None: 991 successful = True 992 return (successful, finished)
993
994 - def get_active_buildset_ids(self):
995 return self.runInteractionNow(self._txn_get_active_buildset_ids)
996 - def _txn_get_active_buildset_ids(self, t):
997 t.execute("SELECT id FROM buildsets WHERE complete=0") 998 return [bsid for (bsid,) in t.fetchall()]
999 - def get_buildset_info(self, bsid):
1000 return self.runInteractionNow(self._txn_get_buildset_info, bsid)
1001 - def _txn_get_buildset_info(self, t, bsid):
1002 q = self.quoteq("SELECT external_idstring, reason, sourcestampid," 1003 " complete, results" 1004 " FROM buildsets WHERE id=?") 1005 t.execute(q, (bsid,)) 1006 res = t.fetchall() 1007 if res: 1008 (external, reason, ssid, complete, results) = res[0] 1009 external_idstring = str_or_none(external) 1010 reason = str_or_none(reason) 1011 complete = bool(complete) 1012 return (external_idstring, reason, ssid, complete, results) 1013 return None # shouldn't happen
1014
1015 - def get_pending_brids_for_builder(self, buildername):
1016 return self.runInteractionNow(self._txn_get_pending_brids_for_builder, 1017 buildername)
1018 - def _txn_get_pending_brids_for_builder(self, t, buildername):
1019 # "pending" means unclaimed and incomplete. When a build is returned 1020 # to the pool (self.resubmit_buildrequests), the claimed_at= field is 1021 # reset to zero. 1022 t.execute(self.quoteq("SELECT id FROM buildrequests" 1023 " WHERE buildername=? AND" 1024 " complete=0 AND claimed_at=0"), 1025 (buildername,)) 1026 return [brid for (brid,) in t.fetchall()]
1027 1028 # test/debug methods 1029
1030 - def has_pending_operations(self):
1031 return bool(self._pending_operation_count)
1032 1033 1034 threadable.synchronize(DBConnector) 1035