Package buildbot :: Package db :: Module connector
[frames] | no frames]

Source Code for Module buildbot.db.connector

   1  # ***** BEGIN LICENSE BLOCK ***** 
   2  # Version: MPL 1.1/GPL 2.0/LGPL 2.1 
   3  # 
   4  # The contents of this file are subject to the Mozilla Public License Version 
   5  # 1.1 (the "License"); you may not use this file except in compliance with 
   6  # the License. You may obtain a copy of the License at 
   7  # http://www.mozilla.org/MPL/ 
   8  # 
   9  # Software distributed under the License is distributed on an "AS IS" basis, 
  10  # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License 
  11  # for the specific language governing rights and limitations under the 
  12  # License. 
  13  # 
  14  # The Original Code is Mozilla-specific Buildbot steps. 
  15  # 
  16  # The Initial Developer of the Original Code is 
  17  # Mozilla Foundation. 
  18  # Portions created by the Initial Developer are Copyright (C) 2009 
  19  # the Initial Developer. All Rights Reserved. 
  20  # 
  21  # Contributor(s): 
  22  #   Brian Warner <warner@lothar.com> 
  23  #   Chris AtLee <catlee@mozilla.com> 
  24  #   Dustin Mitchell <dustin@zmanda.com> 
  25  # 
  26  # Alternatively, the contents of this file may be used under the terms of 
  27  # either the GNU General Public License Version 2 or later (the "GPL"), or 
  28  # the GNU Lesser General Public License Version 2.1 or later (the "LGPL"), 
  29  # in which case the provisions of the GPL or the LGPL are applicable instead 
  30  # of those above. If you wish to allow use of your version of this file only 
  31  # under the terms of either the GPL or the LGPL, and not to allow others to 
  32  # use your version of this file under the terms of the MPL, indicate your 
  33  # decision by deleting the provisions above and replace them with the notice 
  34  # and other provisions required by the GPL or the LGPL. If you do not delete 
  35  # the provisions above, a recipient may use your version of this file under 
  36  # the terms of any one of the MPL, the GPL or the LGPL. 
  37  # 
  38  # ***** END LICENSE BLOCK ***** 
  39   
  40  import sys, collections, base64 
  41   
  42  from twisted.python import log, threadable 
  43  from twisted.internet import defer 
  44  from twisted.enterprise import adbapi 
  45  from buildbot import util 
  46  from buildbot.util import collections as bbcollections 
  47  from buildbot.changes.changes import Change 
  48  from buildbot.sourcestamp import SourceStamp 
  49  from buildbot.buildrequest import BuildRequest 
  50  from buildbot.process.properties import Properties 
  51  from buildbot.status.builder import SUCCESS, WARNINGS, FAILURE 
  52  from buildbot.util.eventual import eventually 
  53  from buildbot.util import json 
  54   
  55  # Don't auto-resubmit queries that encounter a broken connection: let them 
  56  # fail. Use the "notification doorbell" thing to provide the retry. Set 
  57  # cp_reconnect=True, so that a connection failure will prepare the 
  58  # ConnectionPool to reconnect next time. 
  59   
60 -class MyTransaction(adbapi.Transaction):
61 - def execute(self, *args, **kwargs):
62 #print "Q", args, kwargs 63 return self._cursor.execute(*args, **kwargs)
64 - def fetchall(self):
65 rc = self._cursor.fetchall() 66 #print " F", rc 67 return rc
68
69 -def _one_or_else(res, default=None, process_f=lambda x: x):
70 if not res: 71 return default 72 return process_f(res[0][0])
73
74 -def str_or_none(s):
75 if s is None: 76 return None 77 return str(s)
78
79 -class Token: # used for _start_operation/_end_operation
80 pass 81
82 -class DBConnector(util.ComparableMixin):
83 # this will refuse to create the database: use 'create-master' for that 84 compare_attrs = ["args", "kwargs"] 85 synchronized = ["notify", "_end_operation"] 86 MAX_QUERY_TIMES = 1000 87
88 - def __init__(self, spec):
89 # typical args = (dbmodule, dbname, username, password) 90 self._query_times = collections.deque() 91 self._spec = spec 92 93 # this is for synchronous calls: runQueryNow, runInteractionNow 94 self._dbapi = spec.get_dbapi() 95 self._nonpool = None 96 self._nonpool_lastused = None 97 self._nonpool_max_idle = spec.get_maxidle() 98 99 # pass queries in with "?" placeholders. If the backend uses a 100 # different style, we'll replace them. 101 self.paramstyle = self._dbapi.paramstyle 102 103 self._pool = spec.get_async_connection_pool() 104 self._pool.transactionFactory = MyTransaction 105 # the pool must be started before it can be used. The real 106 # buildmaster process will do this at reactor start. CLI tools (like 107 # "buildbot upgrade-master") must do it manually. Unit tests are run 108 # in an environment in which it is already started. 109 110 self._change_cache = util.LRUCache() 111 self._sourcestamp_cache = util.LRUCache() 112 self._active_operations = set() # protected by synchronized= 113 self._pending_notifications = [] 114 self._subscribers = bbcollections.defaultdict(set) 115 116 self._pending_operation_count = 0 117 118 self._started = False
119
120 - def _getCurrentTime(self):
121 # this is a seam for use in testing 122 return util.now()
123
124 - def start(self):
125 # this only *needs* to be called in reactorless environments (which 126 # should be eliminated anyway). but it doesn't hurt anyway 127 self._pool.start() 128 self._started = True
129
130 - def stop(self):
131 """Call this when you're done with me""" 132 133 # Close our synchronous connection if we've got one 134 if self._nonpool: 135 self._nonpool.close() 136 self._nonpool = None 137 self._nonpool_lastused = None 138 139 if not self._started: 140 return 141 self._pool.close() 142 self._started = False 143 del self._pool
144
145 - def quoteq(self, query):
146 """ 147 Given a query that contains qmark-style placeholders, like:: 148 INSERT INTO foo (col1, col2) VALUES (?,?) 149 replace the '?' with '%s' if the backend uses format-style 150 placeholders, like:: 151 INSERT INTO foo (col1, col2) VALUES (%s,%s) 152 """ 153 if self.paramstyle == "format": 154 return query.replace("?","%s") 155 assert self.paramstyle == "qmark" 156 return query
157
158 - def parmlist(self, count):
159 """ 160 When passing long lists of values to e.g., an INSERT query, it is 161 tedious to pass long strings of ? placeholders. This function will 162 create a parenthesis-enclosed list of COUNT placeholders. Note that 163 the placeholders have already had quoteq() applied. 164 """ 165 p = self.quoteq("?") 166 return "(" + ",".join([p]*count) + ")"
167
168 - def get_version(self):
169 """Returns None for an empty database, or a number (probably 1) for 170 the database's version""" 171 try: 172 res = self.runQueryNow("SELECT version FROM version") 173 except (self._dbapi.OperationalError, self._dbapi.ProgrammingError): 174 # this means the version table is missing: the db is empty 175 return None 176 assert len(res) == 1 177 return res[0][0]
178
179 - def runQueryNow(self, *args, **kwargs):
180 # synchronous+blocking version of runQuery() 181 assert self._started 182 return self.runInteractionNow(self._runQuery, *args, **kwargs)
183
184 - def _runQuery(self, c, *args, **kwargs):
185 c.execute(*args, **kwargs) 186 return c.fetchall()
187
188 - def _start_operation(self):
189 t = Token() 190 self._active_operations.add(t) 191 return t
192 - def _end_operation(self, t):
193 # this is always invoked from the main thread, but is wrapped by 194 # synchronized= and threadable.synchronous(), since it touches 195 # self._pending_notifications, which is also touched by 196 # runInteraction threads 197 self._active_operations.discard(t) 198 if self._active_operations: 199 return 200 for (category, args) in self._pending_notifications: 201 # in the distributed system, this will be a 202 # transport.write(" ".join([category] + [str(a) for a in args])) 203 eventually(self.send_notification, category, args) 204 self._pending_notifications = []
205
206 - def runInteractionNow(self, interaction, *args, **kwargs):
207 # synchronous+blocking version of runInteraction() 208 assert self._started 209 start = self._getCurrentTime() 210 t = self._start_operation() 211 try: 212 return self._runInteractionNow(interaction, *args, **kwargs) 213 finally: 214 self._end_operation(t) 215 self._add_query_time(start)
216
217 - def get_sync_connection(self):
218 # This is a wrapper around spec.get_sync_connection that maintains a 219 # single connection to the database for synchronous usage. It will get 220 # a new connection if the existing one has been idle for more than 221 # max_idle seconds. 222 if self._nonpool_max_idle is not None: 223 now = util.now() 224 if self._nonpool_lastused and self._nonpool_lastused + self._nonpool_max_idle < now: 225 self._nonpool = None 226 227 if not self._nonpool: 228 self._nonpool = self._spec.get_sync_connection() 229 230 self._nonpool_lastused = util.now() 231 return self._nonpool
232
233 - def _runInteractionNow(self, interaction, *args, **kwargs):
234 conn = self.get_sync_connection() 235 c = conn.cursor() 236 try: 237 result = interaction(c, *args, **kwargs) 238 c.close() 239 conn.commit() 240 return result 241 except: 242 excType, excValue, excTraceback = sys.exc_info() 243 try: 244 conn.rollback() 245 c2 = conn.cursor() 246 c2.execute(self._pool.good_sql) 247 c2.close() 248 conn.commit() 249 except: 250 log.msg("rollback failed, will reconnect next query") 251 log.err() 252 # and the connection is probably dead: clear the reference, 253 # so we'll establish a new connection next time 254 self._nonpool = None 255 raise excType, excValue, excTraceback
256
257 - def notify(self, category, *args):
258 # this is wrapped by synchronized= and threadable.synchronous(), 259 # since it will be invoked from runInteraction threads 260 self._pending_notifications.append( (category,args) )
261
262 - def send_notification(self, category, args):
263 # in the distributed system, this will be invoked by lineReceived() 264 #print "SEND", category, args 265 for observer in self._subscribers[category]: 266 eventually(observer, category, *args)
267
268 - def subscribe_to(self, category, observer):
269 self._subscribers[category].add(observer)
270
271 - def runQuery(self, *args, **kwargs):
272 assert self._started 273 self._pending_operation_count += 1 274 start = self._getCurrentTime() 275 #t = self._start_operation() # why is this commented out? -warner 276 d = self._pool.runQuery(*args, **kwargs) 277 #d.addBoth(self._runQuery_done, start, t) 278 return d
279 - def _runQuery_done(self, res, start, t):
280 self._end_operation(t) 281 self._add_query_time(start) 282 self._pending_operation_count -= 1 283 return res
284
285 - def _add_query_time(self, start):
286 elapsed = self._getCurrentTime() - start 287 self._query_times.append(elapsed) 288 if len(self._query_times) > self.MAX_QUERY_TIMES: 289 self._query_times.popleft()
290
291 - def runInteraction(self, *args, **kwargs):
292 assert self._started 293 self._pending_operation_count += 1 294 start = self._getCurrentTime() 295 t = self._start_operation() 296 d = self._pool.runInteraction(*args, **kwargs) 297 d.addBoth(self._runInteraction_done, start, t) 298 return d
299 - def _runInteraction_done(self, res, start, t):
300 self._end_operation(t) 301 self._add_query_time(start) 302 self._pending_operation_count -= 1 303 return res
304 305 # ChangeManager methods 306
307 - def addChangeToDatabase(self, change):
308 self.runInteractionNow(self._txn_addChangeToDatabase, change) 309 self._change_cache.add(change.number, change)
310
311 - def _txn_addChangeToDatabase(self, t, change):
312 q = self.quoteq("INSERT INTO changes" 313 " (author," 314 " comments, is_dir," 315 " branch, revision, revlink," 316 " when_timestamp, category," 317 " repository, project)" 318 " VALUES (?, ?,?, ?,?,?, ?,?, ?,?)") 319 # TODO: map None to.. empty string? 320 321 values = (change.who, 322 change.comments, change.isdir, 323 change.branch, change.revision, change.revlink, 324 change.when, change.category, change.repository, 325 change.project) 326 t.execute(q, values) 327 change.number = t.lastrowid 328 329 for link in change.links: 330 t.execute(self.quoteq("INSERT INTO change_links (changeid, link) " 331 "VALUES (?,?)"), 332 (change.number, link)) 333 for filename in change.files: 334 t.execute(self.quoteq("INSERT INTO change_files (changeid,filename)" 335 " VALUES (?,?)"), 336 (change.number, filename)) 337 for propname,propvalue in change.properties.properties.items(): 338 encoded_value = json.dumps(propvalue) 339 t.execute(self.quoteq("INSERT INTO change_properties" 340 " (changeid, property_name, property_value)" 341 " VALUES (?,?,?)"), 342 (change.number, propname, encoded_value)) 343 self.notify("add-change", change.number)
344
345 - def changeEventGenerator(self, branches=[], categories=[], committers=[], minTime=0):
346 q = "SELECT changeid FROM changes" 347 args = [] 348 if branches or categories or committers: 349 q += " WHERE " 350 pieces = [] 351 if branches: 352 pieces.append("branch IN %s" % self.parmlist(len(branches))) 353 args.extend(list(branches)) 354 if categories: 355 pieces.append("category IN %s" % self.parmlist(len(categories))) 356 args.extend(list(categories)) 357 if committers: 358 pieces.append("author IN %s" % self.parmlist(len(committers))) 359 args.extend(list(committers)) 360 if minTime: 361 pieces.append("when_timestamp > %d" % minTime) 362 q += " AND ".join(pieces) 363 q += " ORDER BY changeid DESC" 364 rows = self.runQueryNow(q, tuple(args)) 365 for (changeid,) in rows: 366 yield self.getChangeNumberedNow(changeid)
367
368 - def getLatestChangeNumberNow(self, branch=None, t=None):
369 if t: 370 return self._txn_getLatestChangeNumber(branch=branch, t=t) 371 else: 372 return self.runInteractionNow(self._txn_getLatestChangeNumber)
373 - def _txn_getLatestChangeNumber(self, branch, t):
374 args = None 375 if branch: 376 br_clause = "WHERE branch =? " 377 args = ( branch, ) 378 q = self.quoteq("SELECT max(changeid) from changes"+ br_clause) 379 t.execute(q, args) 380 row = t.fetchone() 381 if not row: 382 return 0 383 return row[0]
384
385 - def getChangeNumberedNow(self, changeid, t=None):
386 # this is a synchronous/blocking version of getChangeByNumber 387 assert changeid >= 0 388 c = self._change_cache.get(changeid) 389 if c: 390 return c 391 if t: 392 c = self._txn_getChangeNumberedNow(t, changeid) 393 else: 394 c = self.runInteractionNow(self._txn_getChangeNumberedNow, changeid) 395 self._change_cache.add(changeid, c) 396 return c
397 - def _txn_getChangeNumberedNow(self, t, changeid):
398 q = self.quoteq("SELECT author, comments," 399 " is_dir, branch, revision, revlink," 400 " when_timestamp, category," 401 " repository, project" 402 " FROM changes WHERE changeid = ?") 403 t.execute(q, (changeid,)) 404 rows = t.fetchall() 405 if not rows: 406 return None 407 (who, comments, 408 isdir, branch, revision, revlink, 409 when, category, repository, project) = rows[0] 410 branch = str_or_none(branch) 411 revision = str_or_none(revision) 412 q = self.quoteq("SELECT link FROM change_links WHERE changeid=?") 413 t.execute(q, (changeid,)) 414 rows = t.fetchall() 415 links = [row[0] for row in rows] 416 links.sort() 417 418 q = self.quoteq("SELECT filename FROM change_files WHERE changeid=?") 419 t.execute(q, (changeid,)) 420 rows = t.fetchall() 421 files = [row[0] for row in rows] 422 files.sort() 423 424 p = self.get_properties_from_db("change_properties", "changeid", 425 changeid, t) 426 c = Change(who=who, files=files, comments=comments, isdir=isdir, 427 links=links, revision=revision, when=when, 428 branch=branch, category=category, revlink=revlink, 429 repository=repository, project=project) 430 c.properties.updateFromProperties(p) 431 c.number = changeid 432 return c
433
434 - def getChangeByNumber(self, changeid):
435 # return a Deferred that fires with a Change instance, or None if 436 # there is no Change with that number 437 assert changeid >= 0 438 c = self._change_cache.get(changeid) 439 if c: 440 return defer.succeed(c) 441 d1 = self.runQuery(self.quoteq("SELECT author, comments," 442 " is_dir, branch, revision, revlink," 443 " when_timestamp, category," 444 " repository, project" 445 " FROM changes WHERE changeid = ?"), 446 (changeid,)) 447 d2 = self.runQuery(self.quoteq("SELECT link FROM change_links" 448 " WHERE changeid=?"), 449 (changeid,)) 450 d3 = self.runQuery(self.quoteq("SELECT filename FROM change_files" 451 " WHERE changeid=?"), 452 (changeid,)) 453 d4 = self.runInteraction(self._txn_get_properties_from_db, 454 "change_properties", "changeid", changeid) 455 d = defer.gatherResults([d1,d2,d3,d4]) 456 d.addCallback(self._getChangeByNumber_query_done, changeid) 457 return d
458
459 - def _getChangeByNumber_query_done(self, res, changeid):
460 (rows, link_rows, file_rows, properties) = res 461 if not rows: 462 return None 463 (who, comments, 464 isdir, branch, revision, revlink, 465 when, category, repository, project) = rows[0] 466 branch = str_or_none(branch) 467 revision = str_or_none(revision) 468 links = [row[0] for row in link_rows] 469 links.sort() 470 files = [row[0] for row in file_rows] 471 files.sort() 472 473 c = Change(who=who, files=files, comments=comments, isdir=isdir, 474 links=links, revision=revision, when=when, 475 branch=branch, category=category, revlink=revlink, 476 repository=repository, project=project) 477 c.properties.updateFromProperties(properties) 478 c.number = changeid 479 self._change_cache.add(changeid, c) 480 return c
481
482 - def getChangesGreaterThan(self, last_changeid, t=None):
483 """Return a Deferred that fires with a list of all Change instances 484 with numbers greater than the given value, sorted by number. This is 485 useful for catching up with everything that's happened since you last 486 called this function.""" 487 assert last_changeid >= 0 488 if t: 489 return self._txn_getChangesGreaterThan(t, last_changeid) 490 else: 491 return self.runInteractionNow(self._txn_getChangesGreaterThan, 492 last_changeid)
493 - def _txn_getChangesGreaterThan(self, t, last_changeid):
494 q = self.quoteq("SELECT changeid FROM changes WHERE changeid > ?") 495 t.execute(q, (last_changeid,)) 496 changes = [self.getChangeNumberedNow(changeid, t) 497 for (changeid,) in t.fetchall()] 498 changes.sort(key=lambda c: c.number) 499 return changes
500
501 - def getChangeIdsLessThanIdNow(self, new_changeid):
502 """Return a list of all extant change id's less than the given value, 503 sorted by number.""" 504 def txn(t): 505 q = self.quoteq("SELECT changeid FROM changes WHERE changeid < ?") 506 t.execute(q, (new_changeid,)) 507 changes = [changeid for (changeid,) in t.fetchall()] 508 changes.sort() 509 return changes
510 return self.runInteractionNow(txn)
511
512 - def removeChangeNow(self, changeid):
513 """Thoroughly remove a change from the database, including all dependent 514 tables""" 515 def txn(t): 516 for table in ('changes', 'scheduler_changes', 'sourcestamp_changes', 517 'change_files', 'change_links', 'change_properties'): 518 q = self.quoteq("DELETE FROM %s WHERE changeid = ?" % table) 519 t.execute(q, (changeid,))
520 return self.runInteractionNow(txn) 521
522 - def getChangesByNumber(self, changeids):
523 return defer.gatherResults([self.getChangeByNumber(changeid) 524 for changeid in changeids])
525 526 # SourceStamp-manipulating methods 527
528 - def getSourceStampNumberedNow(self, ssid, t=None):
529 assert isinstance(ssid, (int, long)) 530 ss = self._sourcestamp_cache.get(ssid) 531 if ss: 532 return ss 533 if t: 534 ss = self._txn_getSourceStampNumbered(t, ssid) 535 else: 536 ss = self.runInteractionNow(self._txn_getSourceStampNumbered, 537 ssid) 538 self._sourcestamp_cache.add(ssid, ss) 539 return ss
540
541 - def _txn_getSourceStampNumbered(self, t, ssid):
542 assert isinstance(ssid, (int, long)) 543 t.execute(self.quoteq("SELECT branch,revision,patchid,project,repository" 544 " FROM sourcestamps WHERE id=?"), 545 (ssid,)) 546 r = t.fetchall() 547 if not r: 548 return None 549 (branch_u, revision_u, patchid, project, repository) = r[0] 550 branch = str_or_none(branch_u) 551 revision = str_or_none(revision_u) 552 553 patch = None 554 if patchid is not None: 555 t.execute(self.quoteq("SELECT patchlevel,patch_base64,subdir" 556 " FROM patches WHERE id=?"), 557 (patchid,)) 558 r = t.fetchall() 559 assert len(r) == 1 560 (patch_level, patch_text_base64, subdir_u) = r[0] 561 patch_text = base64.b64decode(patch_text_base64) 562 if subdir_u: 563 patch = (patch_level, patch_text, str(subdir_u)) 564 else: 565 patch = (patch_level, patch_text) 566 567 t.execute(self.quoteq("SELECT changeid FROM sourcestamp_changes" 568 " WHERE sourcestampid=?" 569 " ORDER BY changeid ASC"), 570 (ssid,)) 571 r = t.fetchall() 572 changes = None 573 if r: 574 changes = [self.getChangeNumberedNow(changeid, t) 575 for (changeid,) in r] 576 ss = SourceStamp(branch, revision, patch, changes, project=project, repository=repository) 577 ss.ssid = ssid 578 return ss
579 580 # Properties methods 581
582 - def get_properties_from_db(self, tablename, idname, id, t=None):
583 if t: 584 return self._txn_get_properties_from_db(t, tablename, idname, id) 585 else: 586 return self.runInteractionNow(self._txn_get_properties_from_db, 587 tablename, idname, id)
588
589 - def _txn_get_properties_from_db(self, t, tablename, idname, id):
590 # apparently you can't use argument placeholders for table names. Don't 591 # call this with a weird-looking tablename. 592 q = self.quoteq("SELECT property_name,property_value FROM %s WHERE %s=?" 593 % (tablename, idname)) 594 t.execute(q, (id,)) 595 retval = Properties() 596 for key, valuepair in t.fetchall(): 597 value, source = json.loads(valuepair) 598 retval.setProperty(str(key), value, source) 599 return retval
600 601 # Scheduler manipulation methods 602
603 - def addSchedulers(self, added):
604 return self.runInteraction(self._addSchedulers, added)
605 - def _addSchedulers(self, t, added):
606 for scheduler in added: 607 name = scheduler.name 608 assert name 609 class_name = "%s.%s" % (scheduler.__class__.__module__, 610 scheduler.__class__.__name__) 611 q = self.quoteq(""" 612 SELECT schedulerid, class_name FROM schedulers WHERE 613 name=? AND 614 (class_name=? OR class_name='') 615 """) 616 t.execute(q, (name, class_name)) 617 row = t.fetchone() 618 if row: 619 sid, db_class_name = row 620 if db_class_name == '': 621 # We're updating from an old schema where the class name 622 # wasn't stored. 623 # Update this row's class name and move on 624 q = self.quoteq("""UPDATE schedulers SET class_name=? 625 WHERE schedulerid=?""") 626 t.execute(q, (class_name, sid)) 627 elif db_class_name != class_name: 628 # A different scheduler is being used with this name. 629 # Ignore the old scheduler and create a new one 630 sid = None 631 else: 632 sid = None 633 634 if sid is None: 635 # create a new row, with the latest changeid (so it won't try 636 # to process all of the old changes) new Schedulers are 637 # supposed to ignore pre-existing Changes 638 q = ("SELECT changeid FROM changes" 639 " ORDER BY changeid DESC LIMIT 1") 640 t.execute(q) 641 max_changeid = _one_or_else(t.fetchall(), 0) 642 state = scheduler.get_initial_state(max_changeid) 643 state_json = json.dumps(state) 644 q = self.quoteq("INSERT INTO schedulers" 645 " (name, class_name, state)" 646 " VALUES (?,?,?)") 647 t.execute(q, (name, class_name, state_json)) 648 sid = t.lastrowid 649 log.msg("scheduler '%s' got id %d" % (scheduler.name, sid)) 650 scheduler.schedulerid = sid
651
652 - def scheduler_get_state(self, schedulerid, t):
653 q = self.quoteq("SELECT state FROM schedulers WHERE schedulerid=?") 654 t.execute(q, (schedulerid,)) 655 state_json = _one_or_else(t.fetchall()) 656 assert state_json is not None 657 return json.loads(state_json)
658
659 - def scheduler_set_state(self, schedulerid, t, state):
660 state_json = json.dumps(state) 661 q = self.quoteq("UPDATE schedulers SET state=? WHERE schedulerid=?") 662 t.execute(q, (state_json, schedulerid))
663
664 - def get_sourcestampid(self, ss, t):
665 """Given a SourceStamp (which may or may not have an ssid), make sure 666 the contents are in the database, and return the ssid. If the 667 SourceStamp originally came from the DB (and thus already has an 668 ssid), just return the ssid. If not, create a new row for it.""" 669 if ss.ssid is not None: 670 return ss.ssid 671 patchid = None 672 if ss.patch: 673 patchlevel = ss.patch[0] 674 diff = ss.patch[1] 675 subdir = None 676 if len(ss.patch) > 2: 677 subdir = ss.patch[2] 678 q = self.quoteq("INSERT INTO patches" 679 " (patchlevel, patch_base64, subdir)" 680 " VALUES (?,?,?)") 681 t.execute(q, (patchlevel, base64.b64encode(diff), subdir)) 682 patchid = t.lastrowid 683 t.execute(self.quoteq("INSERT INTO sourcestamps" 684 " (branch, revision, patchid, project, repository)" 685 " VALUES (?,?,?,?,?)"), 686 (ss.branch, ss.revision, patchid, ss.project, ss.repository)) 687 ss.ssid = t.lastrowid 688 q2 = self.quoteq("INSERT INTO sourcestamp_changes" 689 " (sourcestampid, changeid) VALUES (?,?)") 690 for c in ss.changes: 691 t.execute(q2, (ss.ssid, c.number)) 692 return ss.ssid
693
694 - def create_buildset(self, ssid, reason, properties, builderNames, t, 695 external_idstring=None):
696 # this creates both the BuildSet and the associated BuildRequests 697 now = self._getCurrentTime() 698 t.execute(self.quoteq("INSERT INTO buildsets" 699 " (external_idstring, reason," 700 " sourcestampid, submitted_at)" 701 " VALUES (?,?,?,?)"), 702 (external_idstring, reason, ssid, now)) 703 bsid = t.lastrowid 704 for propname, propvalue in properties.properties.items(): 705 encoded_value = json.dumps(propvalue) 706 t.execute(self.quoteq("INSERT INTO buildset_properties" 707 " (buildsetid, property_name, property_value)" 708 " VALUES (?,?,?)"), 709 (bsid, propname, encoded_value)) 710 brids = [] 711 for bn in builderNames: 712 t.execute(self.quoteq("INSERT INTO buildrequests" 713 " (buildsetid, buildername, submitted_at)" 714 " VALUES (?,?,?)"), 715 (bsid, bn, now)) 716 brid = t.lastrowid 717 brids.append(brid) 718 self.notify("add-buildset", bsid) 719 self.notify("add-buildrequest", *brids) 720 return bsid
721
722 - def scheduler_classify_change(self, schedulerid, number, important, t):
723 q = self.quoteq("INSERT INTO scheduler_changes" 724 " (schedulerid, changeid, important)" 725 " VALUES (?,?,?)") 726 t.execute(q, (schedulerid, number, bool(important)))
727
728 - def scheduler_get_classified_changes(self, schedulerid, t):
729 q = self.quoteq("SELECT changeid, important" 730 " FROM scheduler_changes" 731 " WHERE schedulerid=?") 732 t.execute(q, (schedulerid,)) 733 important = [] 734 unimportant = [] 735 for (changeid, is_important) in t.fetchall(): 736 c = self.getChangeNumberedNow(changeid, t) 737 if is_important: 738 important.append(c) 739 else: 740 unimportant.append(c) 741 return (important, unimportant)
742
743 - def scheduler_retire_changes(self, schedulerid, changeids, t):
744 while changeids: 745 # sqlite has a maximum of 999 parameters, but we'll try to come in far 746 # short of that 747 batch, changeids = changeids[:100], changeids[100:] 748 t.execute(self.quoteq("DELETE FROM scheduler_changes" 749 " WHERE schedulerid=? AND changeid IN ") 750 + self.parmlist(len(batch)), 751 (schedulerid,) + tuple(batch))
752
753 - def scheduler_subscribe_to_buildset(self, schedulerid, bsid, t):
754 # scheduler_get_subscribed_buildsets(schedulerid) will return 755 # information about all buildsets that were subscribed this way 756 t.execute(self.quoteq("INSERT INTO scheduler_upstream_buildsets" 757 " (buildsetid, schedulerid, active)" 758 " VALUES (?,?,?)"), 759 (bsid, schedulerid, 1))
760
761 - def scheduler_get_subscribed_buildsets(self, schedulerid, t):
762 # returns list of (bsid, ssid, complete, results) pairs 763 t.execute(self.quoteq("SELECT bs.id, " 764 " bs.sourcestampid, bs.complete, bs.results" 765 " FROM scheduler_upstream_buildsets AS s," 766 " buildsets AS bs" 767 " WHERE s.buildsetid=bs.id" 768 " AND s.schedulerid=?" 769 " AND s.active=1"), 770 (schedulerid,)) 771 return t.fetchall()
772
773 - def scheduler_unsubscribe_buildset(self, schedulerid, buildsetid, t):
774 t.execute(self.quoteq("UPDATE scheduler_upstream_buildsets" 775 " SET active=0" 776 " WHERE buildsetid=? AND schedulerid=?"), 777 (buildsetid, schedulerid))
778 779 # BuildRequest-manipulation methods 780
781 - def getBuildRequestWithNumber(self, brid, t=None):
782 assert isinstance(brid, (int, long)) 783 if t: 784 br = self._txn_getBuildRequestWithNumber(t, brid) 785 else: 786 br = self.runInteractionNow(self._txn_getBuildRequestWithNumber, 787 brid) 788 return br
789 - def _txn_getBuildRequestWithNumber(self, t, brid):
790 assert isinstance(brid, (int, long)) 791 t.execute(self.quoteq("SELECT br.buildsetid, bs.reason," 792 " bs.sourcestampid, br.buildername," 793 " bs.submitted_at, br.priority" 794 " FROM buildrequests AS br, buildsets AS bs" 795 " WHERE br.id=? AND br.buildsetid=bs.id"), 796 (brid,)) 797 r = t.fetchall() 798 if not r: 799 return None 800 (bsid, reason, ssid, builder_name, submitted_at, priority) = r[0] 801 ss = self.getSourceStampNumberedNow(ssid, t) 802 properties = self.get_properties_from_db("buildset_properties", 803 "buildsetid", bsid, t) 804 br = BuildRequest(reason, ss, builder_name, properties) 805 br.submittedAt = submitted_at 806 br.priority = priority 807 br.id = brid 808 br.bsid = bsid 809 return br
810
811 - def get_buildername_for_brid(self, brid):
812 assert isinstance(brid, (int, long)) 813 return self.runInteractionNow(self._txn_get_buildername_for_brid, brid)
814 - def _txn_get_buildername_for_brid(self, t, brid):
815 assert isinstance(brid, (int, long)) 816 t.execute(self.quoteq("SELECT buildername FROM buildrequests" 817 " WHERE id=?"), 818 (brid,)) 819 r = t.fetchall() 820 if not r: 821 return None 822 return r[0][0]
823
824 - def get_unclaimed_buildrequests(self, buildername, old, master_name, 825 master_incarnation, t, limit=None):
826 q = ("SELECT br.id" 827 " FROM buildrequests AS br, buildsets AS bs" 828 " WHERE br.buildername=? AND br.complete=0" 829 " AND br.buildsetid=bs.id" 830 " AND (br.claimed_at<?" 831 " OR (br.claimed_by_name=?" 832 " AND br.claimed_by_incarnation!=?))" 833 " ORDER BY br.priority DESC,bs.submitted_at ASC") 834 if limit: 835 q += " LIMIT %s" % limit 836 t.execute(self.quoteq(q), 837 (buildername, old, master_name, master_incarnation)) 838 requests = [self.getBuildRequestWithNumber(brid, t) 839 for (brid,) in t.fetchall()] 840 return requests
841
842 - def claim_buildrequests(self, now, master_name, master_incarnation, brids, 843 t=None):
844 if not brids: 845 return 846 if t: 847 self._txn_claim_buildrequests(t, now, master_name, 848 master_incarnation, brids) 849 else: 850 self.runInteractionNow(self._txn_claim_buildrequests, 851 now, master_name, master_incarnation, brids)
852 - def _txn_claim_buildrequests(self, t, now, master_name, master_incarnation, 853 brids):
854 brids = list(brids) # in case it's a set 855 while brids: 856 batch, brids = brids[:100], brids[100:] 857 q = self.quoteq("UPDATE buildrequests" 858 " SET claimed_at = ?," 859 " claimed_by_name = ?, claimed_by_incarnation = ?" 860 " WHERE id IN " + self.parmlist(len(batch))) 861 qargs = [now, master_name, master_incarnation] + list(batch) 862 t.execute(q, qargs)
863
864 - def build_started(self, brid, buildnumber):
865 return self.runInteractionNow(self._txn_build_started, brid, buildnumber)
866 - def _txn_build_started(self, t, brid, buildnumber):
867 now = self._getCurrentTime() 868 t.execute(self.quoteq("INSERT INTO builds (number, brid, start_time)" 869 " VALUES (?,?,?)"), 870 (buildnumber, brid, now)) 871 bid = t.lastrowid 872 self.notify("add-build", bid) 873 return bid
874
875 - def builds_finished(self, bids):
876 return self.runInteractionNow(self._txn_build_finished, bids)
877 - def _txn_build_finished(self, t, bids):
878 now = self._getCurrentTime() 879 while bids: 880 batch, bids = bids[:100], bids[100:] 881 q = self.quoteq("UPDATE builds SET finish_time = ?" 882 " WHERE id IN " + self.parmlist(len(batch))) 883 qargs = [now] + list(batch) 884 t.execute(q, qargs)
885
886 - def get_build_info(self, bid):
887 return self.runInteractionNow(self._txn_get_build_info, bid)
888 - def _txn_get_build_info(self, t, bid):
889 # brid, buildername, buildnum 890 t.execute(self.quoteq("SELECT b.brid,br.buildername,b.number" 891 " FROM builds AS b, buildrequests AS br" 892 " WHERE b.id=? AND b.brid=br.id"), 893 (bid,)) 894 res = t.fetchall() 895 if res: 896 return res[0] 897 return (None,None,None)
898
899 - def get_buildnums_for_brid(self, brid):
900 return self.runInteractionNow(self._txn_get_buildnums_for_brid, brid)
901 - def _txn_get_buildnums_for_brid(self, t, brid):
902 t.execute(self.quoteq("SELECT number FROM builds WHERE brid=?"), 903 (brid,)) 904 return [number for (number,) in t.fetchall()]
905
906 - def resubmit_buildrequests(self, brids):
907 return self.runInteraction(self._txn_resubmit_buildreqs, brids)
908 - def _txn_resubmit_buildreqs(self, t, brids):
909 # the interrupted build that gets resubmitted will still have the 910 # same submitted_at value, so it should be re-started first 911 while brids: 912 batch, brids = brids[:100], brids[100:] 913 q = self.quoteq("UPDATE buildrequests" 914 " SET claimed_at=0," 915 " claimed_by_name=NULL, claimed_by_incarnation=NULL" 916 " WHERE id IN " + self.parmlist(len(batch))) 917 t.execute(q, batch) 918 self.notify("add-buildrequest", *brids)
919
920 - def retire_buildrequests(self, brids, results):
921 return self.runInteractionNow(self._txn_retire_buildreqs, brids,results)
922 - def _txn_retire_buildreqs(self, t, brids, results):
923 now = self._getCurrentTime() 924 #q = self.db.quoteq("DELETE FROM buildrequests WHERE id IN " 925 # + self.db.parmlist(len(brids))) 926 while brids: 927 batch, brids = brids[:100], brids[100:] 928 929 q = self.quoteq("UPDATE buildrequests" 930 " SET complete=1, results=?, complete_at=?" 931 " WHERE id IN " + self.parmlist(len(batch))) 932 t.execute(q, [results, now]+batch) 933 # now, does this cause any buildsets to complete? 934 q = self.quoteq("SELECT bs.id" 935 " FROM buildsets AS bs, buildrequests AS br" 936 " WHERE br.buildsetid=bs.id AND bs.complete=0" 937 " AND br.id in " 938 + self.parmlist(len(batch))) 939 t.execute(q, batch) 940 bsids = [bsid for (bsid,) in t.fetchall()] 941 for bsid in bsids: 942 self._check_buildset(t, bsid, now) 943 self.notify("retire-buildrequest", *brids) 944 self.notify("modify-buildset", *bsids)
945
946 - def cancel_buildrequests(self, brids):
947 return self.runInteractionNow(self._txn_cancel_buildrequest, brids)
948 - def _txn_cancel_buildrequest(self, t, brids):
949 # TODO: we aren't entirely sure if it'd be safe to just delete the 950 # buildrequest: what else might be waiting on it that would then just 951 # hang forever?. _check_buildset() should handle it well (an empty 952 # buildset will appear complete and SUCCESS-ful). But we haven't 953 # thought it through enough to be sure. So for now, "cancel" means 954 # "mark as complete and FAILURE". 955 while brids: 956 batch, brids = brids[:100], brids[100:] 957 958 if True: 959 now = self._getCurrentTime() 960 q = self.quoteq("UPDATE buildrequests" 961 " SET complete=1, results=?, complete_at=?" 962 " WHERE id IN " + self.parmlist(len(batch))) 963 t.execute(q, [FAILURE, now]+batch) 964 else: 965 q = self.quoteq("DELETE FROM buildrequests" 966 " WHERE id IN " + self.parmlist(len(batch))) 967 t.execute(q, batch) 968 969 # now, does this cause any buildsets to complete? 970 q = self.quoteq("SELECT bs.id" 971 " FROM buildsets AS bs, buildrequests AS br" 972 " WHERE br.buildsetid=bs.id AND bs.complete=0" 973 " AND br.id in " 974 + self.parmlist(len(batch))) 975 t.execute(q, batch) 976 bsids = [bsid for (bsid,) in t.fetchall()] 977 for bsid in bsids: 978 self._check_buildset(t, bsid, now) 979 980 self.notify("cancel-buildrequest", *brids) 981 self.notify("modify-buildset", *bsids)
982
983 - def _check_buildset(self, t, bsid, now):
984 q = self.quoteq("SELECT br.complete,br.results" 985 " FROM buildsets AS bs, buildrequests AS br" 986 " WHERE bs.complete=0" 987 " AND br.buildsetid=bs.id AND bs.id=?") 988 t.execute(q, (bsid,)) 989 results = t.fetchall() 990 is_complete = True 991 bs_results = SUCCESS 992 for (complete, r) in results: 993 if not complete: 994 # still waiting 995 is_complete = False 996 # mark the buildset as a failure if anything worse than 997 # WARNINGS resulted from any one of the buildrequests 998 if r not in (SUCCESS, WARNINGS): 999 bs_results = FAILURE 1000 if is_complete: 1001 # they were all successful 1002 q = self.quoteq("UPDATE buildsets" 1003 " SET complete=1, complete_at=?, results=?" 1004 " WHERE id=?") 1005 t.execute(q, (now, bs_results, bsid))
1006
1007 - def get_buildrequestids_for_buildset(self, bsid):
1008 return self.runInteractionNow(self._txn_get_buildrequestids_for_buildset, 1009 bsid)
1010 - def _txn_get_buildrequestids_for_buildset(self, t, bsid):
1011 t.execute(self.quoteq("SELECT buildername,id FROM buildrequests" 1012 " WHERE buildsetid=?"), 1013 (bsid,)) 1014 return dict(t.fetchall())
1015
1016 - def examine_buildset(self, bsid):
1017 return self.runInteractionNow(self._txn_examine_buildset, bsid)
1018 - def _txn_examine_buildset(self, t, bsid):
1019 # "finished" means complete=1 for all builds. Return False until 1020 # all builds are complete, then True. 1021 # "successful" means complete=1 and results!=FAILURE for all builds. 1022 # Returns None until the last success or the first failure. Returns 1023 # False if there is at least one failure. Returns True if all are 1024 # successful. 1025 q = self.quoteq("SELECT br.complete,br.results" 1026 " FROM buildsets AS bs, buildrequests AS br" 1027 " WHERE br.buildsetid=bs.id AND bs.id=?") 1028 t.execute(q, (bsid,)) 1029 results = t.fetchall() 1030 finished = True 1031 successful = None 1032 for (c,r) in results: 1033 if not c: 1034 finished = False 1035 if c and r not in (SUCCESS, WARNINGS): 1036 successful = False 1037 if finished and successful is None: 1038 successful = True 1039 return (successful, finished)
1040
1041 - def get_active_buildset_ids(self):
1042 return self.runInteractionNow(self._txn_get_active_buildset_ids)
1043 - def _txn_get_active_buildset_ids(self, t):
1044 t.execute("SELECT id FROM buildsets WHERE complete=0") 1045 return [bsid for (bsid,) in t.fetchall()]
1046 - def get_buildset_info(self, bsid):
1047 return self.runInteractionNow(self._txn_get_buildset_info, bsid)
1048 - def _txn_get_buildset_info(self, t, bsid):
1049 q = self.quoteq("SELECT external_idstring, reason, sourcestampid," 1050 " complete, results" 1051 " FROM buildsets WHERE id=?") 1052 t.execute(q, (bsid,)) 1053 res = t.fetchall() 1054 if res: 1055 (external, reason, ssid, complete, results) = res[0] 1056 external_idstring = str_or_none(external) 1057 reason = str_or_none(reason) 1058 complete = bool(complete) 1059 return (external_idstring, reason, ssid, complete, results) 1060 return None # shouldn't happen
1061
1062 - def get_pending_brids_for_builder(self, buildername):
1063 return self.runInteractionNow(self._txn_get_pending_brids_for_builder, 1064 buildername)
1065 - def _txn_get_pending_brids_for_builder(self, t, buildername):
1066 # "pending" means unclaimed and incomplete. When a build is returned 1067 # to the pool (self.resubmit_buildrequests), the claimed_at= field is 1068 # reset to zero. 1069 t.execute(self.quoteq("SELECT id FROM buildrequests" 1070 " WHERE buildername=? AND" 1071 " complete=0 AND claimed_at=0"), 1072 (buildername,)) 1073 return [brid for (brid,) in t.fetchall()]
1074 1075 # test/debug methods 1076
1077 - def has_pending_operations(self):
1078 return bool(self._pending_operation_count)
1079
1080 - def setChangeCacheSize(self, max_size):
1081 self._change_cache.setMaxSize(max_size)
1082 1083 1084 threadable.synchronize(DBConnector) 1085