Package buildbot :: Package db :: Module connector
[frames] | no frames]

Source Code for Module buildbot.db.connector

   1  # This file is part of Buildbot.  Buildbot is free software: you can 
   2  # redistribute it and/or modify it under the terms of the GNU General Public 
   3  # License as published by the Free Software Foundation, version 2. 
   4  # 
   5  # This program is distributed in the hope that it will be useful, but WITHOUT 
   6  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
   7  # FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 
   8  # details. 
   9  # 
  10  # You should have received a copy of the GNU General Public License along with 
  11  # this program; if not, write to the Free Software Foundation, Inc., 51 
  12  # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
  13  # 
  14  # Copyright Buildbot Team Members 
  15   
  16  import sys, collections, base64 
  17   
  18  from twisted.python import log, threadable 
  19  from twisted.internet import defer 
  20  from twisted.enterprise import adbapi 
  21  from buildbot import util 
  22  from buildbot.util import collections as bbcollections 
  23  from buildbot.changes.changes import Change 
  24  from buildbot.sourcestamp import SourceStamp 
  25  from buildbot.buildrequest import BuildRequest 
  26  from buildbot.process.properties import Properties 
  27  from buildbot.status.builder import SUCCESS, WARNINGS, FAILURE 
  28  from buildbot.util.eventual import eventually 
  29  from buildbot.util import json 
  30   
  31  # Don't auto-resubmit queries that encounter a broken connection: let them 
  32  # fail. Use the "notification doorbell" thing to provide the retry. Set 
  33  # cp_reconnect=True, so that a connection failure will prepare the 
  34  # ConnectionPool to reconnect next time. 
  35   
36 -class MyTransaction(adbapi.Transaction):
37 - def execute(self, *args, **kwargs):
38 #print "Q", args, kwargs 39 return self._cursor.execute(*args, **kwargs)
40 - def fetchall(self):
41 rc = self._cursor.fetchall() 42 #print " F", rc 43 return rc
44
45 -def _one_or_else(res, default=None, process_f=lambda x: x):
46 if not res: 47 return default 48 return process_f(res[0][0])
49
50 -def str_or_none(s):
51 if s is None: 52 return None 53 return str(s)
54
55 -class Token: # used for _start_operation/_end_operation
56 pass 57
58 -class DBConnector(util.ComparableMixin):
59 # this will refuse to create the database: use 'create-master' for that 60 compare_attrs = ["args", "kwargs"] 61 synchronized = ["notify", "_end_operation"] 62 MAX_QUERY_TIMES = 1000 63
64 - def __init__(self, spec):
65 # typical args = (dbmodule, dbname, username, password) 66 self._query_times = collections.deque() 67 self._spec = spec 68 69 # this is for synchronous calls: runQueryNow, runInteractionNow 70 self._dbapi = spec.get_dbapi() 71 self._nonpool = None 72 self._nonpool_lastused = None 73 self._nonpool_max_idle = spec.get_maxidle() 74 75 # pass queries in with "?" placeholders. If the backend uses a 76 # different style, we'll replace them. 77 self.paramstyle = self._dbapi.paramstyle 78 79 self._pool = spec.get_async_connection_pool() 80 self._pool.transactionFactory = MyTransaction 81 # the pool must be started before it can be used. The real 82 # buildmaster process will do this at reactor start. CLI tools (like 83 # "buildbot upgrade-master") must do it manually. Unit tests are run 84 # in an environment in which it is already started. 85 86 self._change_cache = util.LRUCache() 87 self._sourcestamp_cache = util.LRUCache() 88 self._active_operations = set() # protected by synchronized= 89 self._pending_notifications = [] 90 self._subscribers = bbcollections.defaultdict(set) 91 92 self._pending_operation_count = 0 93 94 self._started = False
95
96 - def _getCurrentTime(self):
97 # this is a seam for use in testing 98 return util.now()
99
100 - def start(self):
101 # this only *needs* to be called in reactorless environments (which 102 # should be eliminated anyway). but it doesn't hurt anyway 103 self._pool.start() 104 self._started = True
105
106 - def stop(self):
107 """Call this when you're done with me""" 108 109 # Close our synchronous connection if we've got one 110 if self._nonpool: 111 self._nonpool.close() 112 self._nonpool = None 113 self._nonpool_lastused = None 114 115 if not self._started: 116 return 117 self._pool.close() 118 self._started = False 119 del self._pool
120
121 - def quoteq(self, query):
122 """ 123 Given a query that contains qmark-style placeholders, like:: 124 INSERT INTO foo (col1, col2) VALUES (?,?) 125 replace the '?' with '%s' if the backend uses format-style 126 placeholders, like:: 127 INSERT INTO foo (col1, col2) VALUES (%s,%s) 128 """ 129 if self.paramstyle == "format": 130 return query.replace("?","%s") 131 assert self.paramstyle == "qmark" 132 return query
133
134 - def parmlist(self, count):
135 """ 136 When passing long lists of values to e.g., an INSERT query, it is 137 tedious to pass long strings of ? placeholders. This function will 138 create a parenthesis-enclosed list of COUNT placeholders. Note that 139 the placeholders have already had quoteq() applied. 140 """ 141 p = self.quoteq("?") 142 return "(" + ",".join([p]*count) + ")"
143
144 - def get_version(self):
145 """Returns None for an empty database, or a number (probably 1) for 146 the database's version""" 147 try: 148 res = self.runQueryNow("SELECT version FROM version") 149 except (self._dbapi.OperationalError, self._dbapi.ProgrammingError): 150 # this means the version table is missing: the db is empty 151 return None 152 assert len(res) == 1 153 return res[0][0]
154
155 - def runQueryNow(self, *args, **kwargs):
156 # synchronous+blocking version of runQuery() 157 assert self._started 158 return self.runInteractionNow(self._runQuery, *args, **kwargs)
159
160 - def _runQuery(self, c, *args, **kwargs):
161 c.execute(*args, **kwargs) 162 return c.fetchall()
163
164 - def _start_operation(self):
165 t = Token() 166 self._active_operations.add(t) 167 return t
168 - def _end_operation(self, t):
169 # this is always invoked from the main thread, but is wrapped by 170 # synchronized= and threadable.synchronous(), since it touches 171 # self._pending_notifications, which is also touched by 172 # runInteraction threads 173 self._active_operations.discard(t) 174 if self._active_operations: 175 return 176 for (category, args) in self._pending_notifications: 177 # in the distributed system, this will be a 178 # transport.write(" ".join([category] + [str(a) for a in args])) 179 eventually(self.send_notification, category, args) 180 self._pending_notifications = []
181
182 - def runInteractionNow(self, interaction, *args, **kwargs):
183 # synchronous+blocking version of runInteraction() 184 assert self._started 185 start = self._getCurrentTime() 186 t = self._start_operation() 187 try: 188 return self._runInteractionNow(interaction, *args, **kwargs) 189 finally: 190 self._end_operation(t) 191 self._add_query_time(start)
192
193 - def get_sync_connection(self):
194 # This is a wrapper around spec.get_sync_connection that maintains a 195 # single connection to the database for synchronous usage. It will get 196 # a new connection if the existing one has been idle for more than 197 # max_idle seconds. 198 if self._nonpool_max_idle is not None: 199 now = util.now() 200 if self._nonpool_lastused and self._nonpool_lastused + self._nonpool_max_idle < now: 201 self._nonpool = None 202 203 if not self._nonpool: 204 self._nonpool = self._spec.get_sync_connection() 205 206 self._nonpool_lastused = util.now() 207 return self._nonpool
208
209 - def _runInteractionNow(self, interaction, *args, **kwargs):
210 conn = self.get_sync_connection() 211 c = conn.cursor() 212 try: 213 result = interaction(c, *args, **kwargs) 214 c.close() 215 conn.commit() 216 return result 217 except: 218 excType, excValue, excTraceback = sys.exc_info() 219 try: 220 conn.rollback() 221 c2 = conn.cursor() 222 c2.execute(self._pool.good_sql) 223 c2.close() 224 conn.commit() 225 except: 226 log.msg("rollback failed, will reconnect next query") 227 log.err() 228 # and the connection is probably dead: clear the reference, 229 # so we'll establish a new connection next time 230 self._nonpool = None 231 raise excType, excValue, excTraceback
232
233 - def notify(self, category, *args):
234 # this is wrapped by synchronized= and threadable.synchronous(), 235 # since it will be invoked from runInteraction threads 236 self._pending_notifications.append( (category,args) )
237
238 - def send_notification(self, category, args):
239 # in the distributed system, this will be invoked by lineReceived() 240 #print "SEND", category, args 241 for observer in self._subscribers[category]: 242 eventually(observer, category, *args)
243
244 - def subscribe_to(self, category, observer):
245 self._subscribers[category].add(observer)
246
247 - def runQuery(self, *args, **kwargs):
248 assert self._started 249 self._pending_operation_count += 1 250 d = self._pool.runQuery(*args, **kwargs) 251 return d
252
253 - def _runQuery_done(self, res, start, t):
254 self._end_operation(t) 255 self._add_query_time(start) 256 self._pending_operation_count -= 1 257 return res
258
259 - def _add_query_time(self, start):
260 elapsed = self._getCurrentTime() - start 261 self._query_times.append(elapsed) 262 if len(self._query_times) > self.MAX_QUERY_TIMES: 263 self._query_times.popleft()
264
265 - def runInteraction(self, *args, **kwargs):
266 assert self._started 267 self._pending_operation_count += 1 268 start = self._getCurrentTime() 269 t = self._start_operation() 270 d = self._pool.runInteraction(*args, **kwargs) 271 d.addBoth(self._runInteraction_done, start, t) 272 return d
273 - def _runInteraction_done(self, res, start, t):
274 self._end_operation(t) 275 self._add_query_time(start) 276 self._pending_operation_count -= 1 277 return res
278 279 # ChangeManager methods 280
281 - def addChangeToDatabase(self, change):
282 self.runInteractionNow(self._txn_addChangeToDatabase, change) 283 self._change_cache.add(change.number, change)
284
285 - def _txn_addChangeToDatabase(self, t, change):
286 q = self.quoteq("INSERT INTO changes" 287 " (author," 288 " comments, is_dir," 289 " branch, revision, revlink," 290 " when_timestamp, category," 291 " repository, project)" 292 " VALUES (?, ?,?, ?,?,?, ?,?, ?,?)") 293 # TODO: map None to.. empty string? 294 295 values = (change.who, 296 change.comments, change.isdir, 297 change.branch, change.revision, change.revlink, 298 change.when, change.category, change.repository, 299 change.project) 300 t.execute(q, values) 301 change.number = t.lastrowid 302 303 for link in change.links: 304 t.execute(self.quoteq("INSERT INTO change_links (changeid, link) " 305 "VALUES (?,?)"), 306 (change.number, link)) 307 for filename in change.files: 308 t.execute(self.quoteq("INSERT INTO change_files (changeid,filename)" 309 " VALUES (?,?)"), 310 (change.number, filename)) 311 for propname,propvalue in change.properties.properties.items(): 312 encoded_value = json.dumps(propvalue) 313 t.execute(self.quoteq("INSERT INTO change_properties" 314 " (changeid, property_name, property_value)" 315 " VALUES (?,?,?)"), 316 (change.number, propname, encoded_value)) 317 self.notify("add-change", change.number)
318
319 - def changeEventGenerator(self, branches=[], categories=[], committers=[], minTime=0):
320 q = "SELECT changeid FROM changes" 321 args = [] 322 if branches or categories or committers: 323 q += " WHERE " 324 pieces = [] 325 if branches: 326 pieces.append("branch IN %s" % self.parmlist(len(branches))) 327 args.extend(list(branches)) 328 if categories: 329 pieces.append("category IN %s" % self.parmlist(len(categories))) 330 args.extend(list(categories)) 331 if committers: 332 pieces.append("author IN %s" % self.parmlist(len(committers))) 333 args.extend(list(committers)) 334 if minTime: 335 pieces.append("when_timestamp > %d" % minTime) 336 q += " AND ".join(pieces) 337 q += " ORDER BY changeid DESC" 338 rows = self.runQueryNow(q, tuple(args)) 339 for (changeid,) in rows: 340 yield self.getChangeNumberedNow(changeid)
341
342 - def getLatestChangeNumberNow(self, branch=None, t=None):
343 if t: 344 return self._txn_getLatestChangeNumber(branch=branch, t=t) 345 else: 346 return self.runInteractionNow(self._txn_getLatestChangeNumber)
347 - def _txn_getLatestChangeNumber(self, branch, t):
348 args = None 349 if branch: 350 br_clause = "WHERE branch =? " 351 args = ( branch, ) 352 q = self.quoteq("SELECT max(changeid) from changes"+ br_clause) 353 t.execute(q, args) 354 row = t.fetchone() 355 if not row: 356 return 0 357 return row[0]
358
359 - def getChangeNumberedNow(self, changeid, t=None):
360 # this is a synchronous/blocking version of getChangeByNumber 361 assert changeid >= 0 362 c = self._change_cache.get(changeid) 363 if c: 364 return c 365 if t: 366 c = self._txn_getChangeNumberedNow(t, changeid) 367 else: 368 c = self.runInteractionNow(self._txn_getChangeNumberedNow, changeid) 369 self._change_cache.add(changeid, c) 370 return c
371 - def _txn_getChangeNumberedNow(self, t, changeid):
372 q = self.quoteq("SELECT author, comments," 373 " is_dir, branch, revision, revlink," 374 " when_timestamp, category," 375 " repository, project" 376 " FROM changes WHERE changeid = ?") 377 t.execute(q, (changeid,)) 378 rows = t.fetchall() 379 if not rows: 380 return None 381 (who, comments, 382 isdir, branch, revision, revlink, 383 when, category, repository, project) = rows[0] 384 branch = str_or_none(branch) 385 revision = str_or_none(revision) 386 q = self.quoteq("SELECT link FROM change_links WHERE changeid=?") 387 t.execute(q, (changeid,)) 388 rows = t.fetchall() 389 links = [row[0] for row in rows] 390 links.sort() 391 392 q = self.quoteq("SELECT filename FROM change_files WHERE changeid=?") 393 t.execute(q, (changeid,)) 394 rows = t.fetchall() 395 files = [row[0] for row in rows] 396 files.sort() 397 398 p = self.get_properties_from_db("change_properties", "changeid", 399 changeid, t) 400 c = Change(who=who, files=files, comments=comments, isdir=isdir, 401 links=links, revision=revision, when=when, 402 branch=branch, category=category, revlink=revlink, 403 repository=repository, project=project) 404 c.properties.updateFromProperties(p) 405 c.number = changeid 406 return c
407
408 - def getChangeByNumber(self, changeid):
409 # return a Deferred that fires with a Change instance, or None if 410 # there is no Change with that number 411 assert changeid >= 0 412 c = self._change_cache.get(changeid) 413 if c: 414 return defer.succeed(c) 415 d1 = self.runQuery(self.quoteq("SELECT author, comments," 416 " is_dir, branch, revision, revlink," 417 " when_timestamp, category," 418 " repository, project" 419 " FROM changes WHERE changeid = ?"), 420 (changeid,)) 421 d2 = self.runQuery(self.quoteq("SELECT link FROM change_links" 422 " WHERE changeid=?"), 423 (changeid,)) 424 d3 = self.runQuery(self.quoteq("SELECT filename FROM change_files" 425 " WHERE changeid=?"), 426 (changeid,)) 427 d4 = self.runInteraction(self._txn_get_properties_from_db, 428 "change_properties", "changeid", changeid) 429 d = defer.gatherResults([d1,d2,d3,d4]) 430 d.addCallback(self._getChangeByNumber_query_done, changeid) 431 return d
432
433 - def _getChangeByNumber_query_done(self, res, changeid):
434 (rows, link_rows, file_rows, properties) = res 435 if not rows: 436 return None 437 (who, comments, 438 isdir, branch, revision, revlink, 439 when, category, repository, project) = rows[0] 440 branch = str_or_none(branch) 441 revision = str_or_none(revision) 442 links = [row[0] for row in link_rows] 443 links.sort() 444 files = [row[0] for row in file_rows] 445 files.sort() 446 447 c = Change(who=who, files=files, comments=comments, isdir=isdir, 448 links=links, revision=revision, when=when, 449 branch=branch, category=category, revlink=revlink, 450 repository=repository, project=project) 451 c.properties.updateFromProperties(properties) 452 c.number = changeid 453 self._change_cache.add(changeid, c) 454 return c
455
456 - def getChangesGreaterThan(self, last_changeid, t=None):
457 """Return a Deferred that fires with a list of all Change instances 458 with numbers greater than the given value, sorted by number. This is 459 useful for catching up with everything that's happened since you last 460 called this function.""" 461 assert last_changeid >= 0 462 if t: 463 return self._txn_getChangesGreaterThan(t, last_changeid) 464 else: 465 return self.runInteractionNow(self._txn_getChangesGreaterThan, 466 last_changeid)
467 - def _txn_getChangesGreaterThan(self, t, last_changeid):
468 q = self.quoteq("SELECT changeid FROM changes WHERE changeid > ?") 469 t.execute(q, (last_changeid,)) 470 changes = [self.getChangeNumberedNow(changeid, t) 471 for (changeid,) in t.fetchall()] 472 changes.sort(key=lambda c: c.number) 473 return changes
474
475 - def getChangeIdsLessThanIdNow(self, new_changeid):
476 """Return a list of all extant change id's less than the given value, 477 sorted by number.""" 478 def txn(t): 479 q = self.quoteq("SELECT changeid FROM changes WHERE changeid < ?") 480 t.execute(q, (new_changeid,)) 481 changes = [changeid for (changeid,) in t.fetchall()] 482 changes.sort() 483 return changes
484 return self.runInteractionNow(txn)
485
486 - def removeChangeNow(self, changeid):
487 """Thoroughly remove a change from the database, including all dependent 488 tables""" 489 def txn(t): 490 for table in ('changes', 'scheduler_changes', 'sourcestamp_changes', 491 'change_files', 'change_links', 'change_properties'): 492 q = self.quoteq("DELETE FROM %s WHERE changeid = ?" % table) 493 t.execute(q, (changeid,))
494 return self.runInteractionNow(txn) 495
496 - def getChangesByNumber(self, changeids):
497 return defer.gatherResults([self.getChangeByNumber(changeid) 498 for changeid in changeids])
499 500 # SourceStamp-manipulating methods 501
502 - def getSourceStampNumberedNow(self, ssid, t=None):
503 assert isinstance(ssid, (int, long)) 504 ss = self._sourcestamp_cache.get(ssid) 505 if ss: 506 return ss 507 if t: 508 ss = self._txn_getSourceStampNumbered(t, ssid) 509 else: 510 ss = self.runInteractionNow(self._txn_getSourceStampNumbered, 511 ssid) 512 self._sourcestamp_cache.add(ssid, ss) 513 return ss
514
515 - def _txn_getSourceStampNumbered(self, t, ssid):
516 assert isinstance(ssid, (int, long)) 517 t.execute(self.quoteq("SELECT branch,revision,patchid,project,repository" 518 " FROM sourcestamps WHERE id=?"), 519 (ssid,)) 520 r = t.fetchall() 521 if not r: 522 return None 523 (branch_u, revision_u, patchid, project, repository) = r[0] 524 branch = str_or_none(branch_u) 525 revision = str_or_none(revision_u) 526 527 patch = None 528 if patchid is not None: 529 t.execute(self.quoteq("SELECT patchlevel,patch_base64,subdir" 530 " FROM patches WHERE id=?"), 531 (patchid,)) 532 r = t.fetchall() 533 assert len(r) == 1 534 (patch_level, patch_text_base64, subdir_u) = r[0] 535 patch_text = base64.b64decode(patch_text_base64) 536 if subdir_u: 537 patch = (patch_level, patch_text, str(subdir_u)) 538 else: 539 patch = (patch_level, patch_text) 540 541 t.execute(self.quoteq("SELECT changeid FROM sourcestamp_changes" 542 " WHERE sourcestampid=?" 543 " ORDER BY changeid ASC"), 544 (ssid,)) 545 r = t.fetchall() 546 changes = None 547 if r: 548 changes = [self.getChangeNumberedNow(changeid, t) 549 for (changeid,) in r] 550 ss = SourceStamp(branch, revision, patch, changes, project=project, repository=repository) 551 ss.ssid = ssid 552 return ss
553 554 # Properties methods 555
556 - def get_properties_from_db(self, tablename, idname, id, t=None):
557 if t: 558 return self._txn_get_properties_from_db(t, tablename, idname, id) 559 else: 560 return self.runInteractionNow(self._txn_get_properties_from_db, 561 tablename, idname, id)
562
563 - def _txn_get_properties_from_db(self, t, tablename, idname, id):
564 # apparently you can't use argument placeholders for table names. Don't 565 # call this with a weird-looking tablename. 566 q = self.quoteq("SELECT property_name,property_value FROM %s WHERE %s=?" 567 % (tablename, idname)) 568 t.execute(q, (id,)) 569 retval = Properties() 570 for key, valuepair in t.fetchall(): 571 value, source = json.loads(valuepair) 572 retval.setProperty(str(key), value, source) 573 return retval
574 575 # Scheduler manipulation methods 576
577 - def addSchedulers(self, added):
578 return self.runInteraction(self._addSchedulers, added)
579 - def _addSchedulers(self, t, added):
580 for scheduler in added: 581 name = scheduler.name 582 assert name 583 class_name = "%s.%s" % (scheduler.__class__.__module__, 584 scheduler.__class__.__name__) 585 q = self.quoteq(""" 586 SELECT schedulerid, class_name FROM schedulers WHERE 587 name=? AND 588 (class_name=? OR class_name='') 589 """) 590 t.execute(q, (name, class_name)) 591 row = t.fetchone() 592 if row: 593 sid, db_class_name = row 594 if db_class_name == '': 595 # We're updating from an old schema where the class name 596 # wasn't stored. 597 # Update this row's class name and move on 598 q = self.quoteq("""UPDATE schedulers SET class_name=? 599 WHERE schedulerid=?""") 600 t.execute(q, (class_name, sid)) 601 elif db_class_name != class_name: 602 # A different scheduler is being used with this name. 603 # Ignore the old scheduler and create a new one 604 sid = None 605 else: 606 sid = None 607 608 if sid is None: 609 # create a new row, with the latest changeid (so it won't try 610 # to process all of the old changes) new Schedulers are 611 # supposed to ignore pre-existing Changes 612 q = ("SELECT changeid FROM changes" 613 " ORDER BY changeid DESC LIMIT 1") 614 t.execute(q) 615 max_changeid = _one_or_else(t.fetchall(), 0) 616 state = scheduler.get_initial_state(max_changeid) 617 state_json = json.dumps(state) 618 q = self.quoteq("INSERT INTO schedulers" 619 " (name, class_name, state)" 620 " VALUES (?,?,?)") 621 t.execute(q, (name, class_name, state_json)) 622 sid = t.lastrowid 623 log.msg("scheduler '%s' got id %d" % (scheduler.name, sid)) 624 scheduler.schedulerid = sid
625
626 - def scheduler_get_state(self, schedulerid, t):
627 q = self.quoteq("SELECT state FROM schedulers WHERE schedulerid=?") 628 t.execute(q, (schedulerid,)) 629 state_json = _one_or_else(t.fetchall()) 630 assert state_json is not None 631 return json.loads(state_json)
632
633 - def scheduler_set_state(self, schedulerid, t, state):
634 state_json = json.dumps(state) 635 q = self.quoteq("UPDATE schedulers SET state=? WHERE schedulerid=?") 636 t.execute(q, (state_json, schedulerid))
637
638 - def get_sourcestampid(self, ss, t):
639 """Given a SourceStamp (which may or may not have an ssid), make sure 640 the contents are in the database, and return the ssid. If the 641 SourceStamp originally came from the DB (and thus already has an 642 ssid), just return the ssid. If not, create a new row for it.""" 643 if ss.ssid is not None: 644 return ss.ssid 645 patchid = None 646 if ss.patch: 647 patchlevel = ss.patch[0] 648 diff = ss.patch[1] 649 subdir = None 650 if len(ss.patch) > 2: 651 subdir = ss.patch[2] 652 q = self.quoteq("INSERT INTO patches" 653 " (patchlevel, patch_base64, subdir)" 654 " VALUES (?,?,?)") 655 t.execute(q, (patchlevel, base64.b64encode(diff), subdir)) 656 patchid = t.lastrowid 657 t.execute(self.quoteq("INSERT INTO sourcestamps" 658 " (branch, revision, patchid, project, repository)" 659 " VALUES (?,?,?,?,?)"), 660 (ss.branch, ss.revision, patchid, ss.project, ss.repository)) 661 ss.ssid = t.lastrowid 662 q2 = self.quoteq("INSERT INTO sourcestamp_changes" 663 " (sourcestampid, changeid) VALUES (?,?)") 664 for c in ss.changes: 665 t.execute(q2, (ss.ssid, c.number)) 666 return ss.ssid
667
668 - def create_buildset(self, ssid, reason, properties, builderNames, t, 669 external_idstring=None):
670 # this creates both the BuildSet and the associated BuildRequests 671 now = self._getCurrentTime() 672 t.execute(self.quoteq("INSERT INTO buildsets" 673 " (external_idstring, reason," 674 " sourcestampid, submitted_at)" 675 " VALUES (?,?,?,?)"), 676 (external_idstring, reason, ssid, now)) 677 bsid = t.lastrowid 678 for propname, propvalue in properties.properties.items(): 679 encoded_value = json.dumps(propvalue) 680 t.execute(self.quoteq("INSERT INTO buildset_properties" 681 " (buildsetid, property_name, property_value)" 682 " VALUES (?,?,?)"), 683 (bsid, propname, encoded_value)) 684 brids = [] 685 for bn in builderNames: 686 t.execute(self.quoteq("INSERT INTO buildrequests" 687 " (buildsetid, buildername, submitted_at)" 688 " VALUES (?,?,?)"), 689 (bsid, bn, now)) 690 brid = t.lastrowid 691 brids.append(brid) 692 self.notify("add-buildset", bsid) 693 self.notify("add-buildrequest", *brids) 694 return bsid
695
696 - def scheduler_classify_change(self, schedulerid, number, important, t):
697 q = self.quoteq("INSERT INTO scheduler_changes" 698 " (schedulerid, changeid, important)" 699 " VALUES (?,?,?)") 700 t.execute(q, (schedulerid, number, bool(important)))
701
702 - def scheduler_get_classified_changes(self, schedulerid, t):
703 q = self.quoteq("SELECT changeid, important" 704 " FROM scheduler_changes" 705 " WHERE schedulerid=?") 706 t.execute(q, (schedulerid,)) 707 important = [] 708 unimportant = [] 709 for (changeid, is_important) in t.fetchall(): 710 c = self.getChangeNumberedNow(changeid, t) 711 if is_important: 712 important.append(c) 713 else: 714 unimportant.append(c) 715 return (important, unimportant)
716
717 - def scheduler_retire_changes(self, schedulerid, changeids, t):
718 while changeids: 719 # sqlite has a maximum of 999 parameters, but we'll try to come in far 720 # short of that 721 batch, changeids = changeids[:100], changeids[100:] 722 t.execute(self.quoteq("DELETE FROM scheduler_changes" 723 " WHERE schedulerid=? AND changeid IN ") 724 + self.parmlist(len(batch)), 725 (schedulerid,) + tuple(batch))
726
727 - def scheduler_subscribe_to_buildset(self, schedulerid, bsid, t):
728 # scheduler_get_subscribed_buildsets(schedulerid) will return 729 # information about all buildsets that were subscribed this way 730 t.execute(self.quoteq("INSERT INTO scheduler_upstream_buildsets" 731 " (buildsetid, schedulerid, active)" 732 " VALUES (?,?,?)"), 733 (bsid, schedulerid, 1))
734
735 - def scheduler_get_subscribed_buildsets(self, schedulerid, t):
736 # returns list of (bsid, ssid, complete, results) pairs 737 t.execute(self.quoteq("SELECT bs.id, " 738 " bs.sourcestampid, bs.complete, bs.results" 739 " FROM scheduler_upstream_buildsets AS s," 740 " buildsets AS bs" 741 " WHERE s.buildsetid=bs.id" 742 " AND s.schedulerid=?" 743 " AND s.active=1"), 744 (schedulerid,)) 745 return t.fetchall()
746
747 - def scheduler_unsubscribe_buildset(self, schedulerid, buildsetid, t):
748 t.execute(self.quoteq("UPDATE scheduler_upstream_buildsets" 749 " SET active=0" 750 " WHERE buildsetid=? AND schedulerid=?"), 751 (buildsetid, schedulerid))
752 753 # BuildRequest-manipulation methods 754
755 - def getBuildRequestWithNumber(self, brid, t=None):
756 assert isinstance(brid, (int, long)) 757 if t: 758 br = self._txn_getBuildRequestWithNumber(t, brid) 759 else: 760 br = self.runInteractionNow(self._txn_getBuildRequestWithNumber, 761 brid) 762 return br
763 - def _txn_getBuildRequestWithNumber(self, t, brid):
764 assert isinstance(brid, (int, long)) 765 t.execute(self.quoteq("SELECT br.buildsetid, bs.reason," 766 " bs.sourcestampid, br.buildername," 767 " bs.submitted_at, br.priority" 768 " FROM buildrequests AS br, buildsets AS bs" 769 " WHERE br.id=? AND br.buildsetid=bs.id"), 770 (brid,)) 771 r = t.fetchall() 772 if not r: 773 return None 774 (bsid, reason, ssid, builder_name, submitted_at, priority) = r[0] 775 ss = self.getSourceStampNumberedNow(ssid, t) 776 properties = self.get_properties_from_db("buildset_properties", 777 "buildsetid", bsid, t) 778 br = BuildRequest(reason, ss, builder_name, properties) 779 br.submittedAt = submitted_at 780 br.priority = priority 781 br.id = brid 782 br.bsid = bsid 783 return br
784
785 - def get_buildername_for_brid(self, brid):
786 assert isinstance(brid, (int, long)) 787 return self.runInteractionNow(self._txn_get_buildername_for_brid, brid)
788 - def _txn_get_buildername_for_brid(self, t, brid):
789 assert isinstance(brid, (int, long)) 790 t.execute(self.quoteq("SELECT buildername FROM buildrequests" 791 " WHERE id=?"), 792 (brid,)) 793 r = t.fetchall() 794 if not r: 795 return None 796 return r[0][0]
797
798 - def get_unclaimed_buildrequests(self, buildername, old, master_name, 799 master_incarnation, t, limit=None):
800 q = ("SELECT br.id" 801 " FROM buildrequests AS br, buildsets AS bs" 802 " WHERE br.buildername=? AND br.complete=0" 803 " AND br.buildsetid=bs.id" 804 " AND (br.claimed_at<?" 805 " OR (br.claimed_by_name=?" 806 " AND br.claimed_by_incarnation!=?))" 807 " ORDER BY br.priority DESC,bs.submitted_at ASC") 808 if limit: 809 q += " LIMIT %s" % limit 810 t.execute(self.quoteq(q), 811 (buildername, old, master_name, master_incarnation)) 812 requests = [self.getBuildRequestWithNumber(brid, t) 813 for (brid,) in t.fetchall()] 814 return requests
815
816 - def claim_buildrequests(self, now, master_name, master_incarnation, brids, 817 t=None):
818 if not brids: 819 return 820 if t: 821 self._txn_claim_buildrequests(t, now, master_name, 822 master_incarnation, brids) 823 else: 824 self.runInteractionNow(self._txn_claim_buildrequests, 825 now, master_name, master_incarnation, brids)
826 - def _txn_claim_buildrequests(self, t, now, master_name, master_incarnation, 827 brids):
828 brids = list(brids) # in case it's a set 829 while brids: 830 batch, brids = brids[:100], brids[100:] 831 q = self.quoteq("UPDATE buildrequests" 832 " SET claimed_at = ?," 833 " claimed_by_name = ?, claimed_by_incarnation = ?" 834 " WHERE id IN " + self.parmlist(len(batch))) 835 qargs = [now, master_name, master_incarnation] + list(batch) 836 t.execute(q, qargs)
837
838 - def build_started(self, brid, buildnumber):
839 return self.runInteractionNow(self._txn_build_started, brid, buildnumber)
840 - def _txn_build_started(self, t, brid, buildnumber):
841 now = self._getCurrentTime() 842 t.execute(self.quoteq("INSERT INTO builds (number, brid, start_time)" 843 " VALUES (?,?,?)"), 844 (buildnumber, brid, now)) 845 bid = t.lastrowid 846 self.notify("add-build", bid) 847 return bid
848
849 - def builds_finished(self, bids):
850 return self.runInteractionNow(self._txn_build_finished, bids)
851 - def _txn_build_finished(self, t, bids):
852 now = self._getCurrentTime() 853 while bids: 854 batch, bids = bids[:100], bids[100:] 855 q = self.quoteq("UPDATE builds SET finish_time = ?" 856 " WHERE id IN " + self.parmlist(len(batch))) 857 qargs = [now] + list(batch) 858 t.execute(q, qargs)
859
860 - def get_build_info(self, bid):
861 return self.runInteractionNow(self._txn_get_build_info, bid)
862 - def _txn_get_build_info(self, t, bid):
863 # brid, buildername, buildnum 864 t.execute(self.quoteq("SELECT b.brid,br.buildername,b.number" 865 " FROM builds AS b, buildrequests AS br" 866 " WHERE b.id=? AND b.brid=br.id"), 867 (bid,)) 868 res = t.fetchall() 869 if res: 870 return res[0] 871 return (None,None,None)
872
873 - def get_buildnums_for_brid(self, brid):
874 return self.runInteractionNow(self._txn_get_buildnums_for_brid, brid)
875 - def _txn_get_buildnums_for_brid(self, t, brid):
876 t.execute(self.quoteq("SELECT number FROM builds WHERE brid=?"), 877 (brid,)) 878 return [number for (number,) in t.fetchall()]
879
880 - def resubmit_buildrequests(self, brids):
881 return self.runInteraction(self._txn_resubmit_buildreqs, brids)
882 - def _txn_resubmit_buildreqs(self, t, brids):
883 # the interrupted build that gets resubmitted will still have the 884 # same submitted_at value, so it should be re-started first 885 while brids: 886 batch, brids = brids[:100], brids[100:] 887 q = self.quoteq("UPDATE buildrequests" 888 " SET claimed_at=0," 889 " claimed_by_name=NULL, claimed_by_incarnation=NULL" 890 " WHERE id IN " + self.parmlist(len(batch))) 891 t.execute(q, batch) 892 self.notify("add-buildrequest", *brids)
893
894 - def retire_buildrequests(self, brids, results):
895 return self.runInteractionNow(self._txn_retire_buildreqs, brids,results)
896 - def _txn_retire_buildreqs(self, t, brids, results):
897 now = self._getCurrentTime() 898 #q = self.db.quoteq("DELETE FROM buildrequests WHERE id IN " 899 # + self.db.parmlist(len(brids))) 900 while brids: 901 batch, brids = brids[:100], brids[100:] 902 903 q = self.quoteq("UPDATE buildrequests" 904 " SET complete=1, results=?, complete_at=?" 905 " WHERE id IN " + self.parmlist(len(batch))) 906 t.execute(q, [results, now]+batch) 907 # now, does this cause any buildsets to complete? 908 q = self.quoteq("SELECT bs.id" 909 " FROM buildsets AS bs, buildrequests AS br" 910 " WHERE br.buildsetid=bs.id AND bs.complete=0" 911 " AND br.id in " 912 + self.parmlist(len(batch))) 913 t.execute(q, batch) 914 bsids = [bsid for (bsid,) in t.fetchall()] 915 for bsid in bsids: 916 self._check_buildset(t, bsid, now) 917 self.notify("retire-buildrequest", *brids) 918 self.notify("modify-buildset", *bsids)
919
920 - def cancel_buildrequests(self, brids):
921 return self.runInteractionNow(self._txn_cancel_buildrequest, brids)
922 - def _txn_cancel_buildrequest(self, t, brids):
923 # TODO: we aren't entirely sure if it'd be safe to just delete the 924 # buildrequest: what else might be waiting on it that would then just 925 # hang forever?. _check_buildset() should handle it well (an empty 926 # buildset will appear complete and SUCCESS-ful). But we haven't 927 # thought it through enough to be sure. So for now, "cancel" means 928 # "mark as complete and FAILURE". 929 while brids: 930 batch, brids = brids[:100], brids[100:] 931 932 if True: 933 now = self._getCurrentTime() 934 q = self.quoteq("UPDATE buildrequests" 935 " SET complete=1, results=?, complete_at=?" 936 " WHERE id IN " + self.parmlist(len(batch))) 937 t.execute(q, [FAILURE, now]+batch) 938 else: 939 q = self.quoteq("DELETE FROM buildrequests" 940 " WHERE id IN " + self.parmlist(len(batch))) 941 t.execute(q, batch) 942 943 # now, does this cause any buildsets to complete? 944 q = self.quoteq("SELECT bs.id" 945 " FROM buildsets AS bs, buildrequests AS br" 946 " WHERE br.buildsetid=bs.id AND bs.complete=0" 947 " AND br.id in " 948 + self.parmlist(len(batch))) 949 t.execute(q, batch) 950 bsids = [bsid for (bsid,) in t.fetchall()] 951 for bsid in bsids: 952 self._check_buildset(t, bsid, now) 953 954 self.notify("cancel-buildrequest", *brids) 955 self.notify("modify-buildset", *bsids)
956
957 - def _check_buildset(self, t, bsid, now):
958 q = self.quoteq("SELECT br.complete,br.results" 959 " FROM buildsets AS bs, buildrequests AS br" 960 " WHERE bs.complete=0" 961 " AND br.buildsetid=bs.id AND bs.id=?") 962 t.execute(q, (bsid,)) 963 results = t.fetchall() 964 is_complete = True 965 bs_results = SUCCESS 966 for (complete, r) in results: 967 if not complete: 968 # still waiting 969 is_complete = False 970 # mark the buildset as a failure if anything worse than 971 # WARNINGS resulted from any one of the buildrequests 972 if r not in (SUCCESS, WARNINGS): 973 bs_results = FAILURE 974 if is_complete: 975 # they were all successful 976 q = self.quoteq("UPDATE buildsets" 977 " SET complete=1, complete_at=?, results=?" 978 " WHERE id=?") 979 t.execute(q, (now, bs_results, bsid))
980
981 - def get_buildrequestids_for_buildset(self, bsid):
982 return self.runInteractionNow(self._txn_get_buildrequestids_for_buildset, 983 bsid)
984 - def _txn_get_buildrequestids_for_buildset(self, t, bsid):
985 t.execute(self.quoteq("SELECT buildername,id FROM buildrequests" 986 " WHERE buildsetid=?"), 987 (bsid,)) 988 return dict(t.fetchall())
989
990 - def examine_buildset(self, bsid):
991 return self.runInteractionNow(self._txn_examine_buildset, bsid)
992 - def _txn_examine_buildset(self, t, bsid):
993 # "finished" means complete=1 for all builds. Return False until 994 # all builds are complete, then True. 995 # "successful" means complete=1 and results!=FAILURE for all builds. 996 # Returns None until the last success or the first failure. Returns 997 # False if there is at least one failure. Returns True if all are 998 # successful. 999 q = self.quoteq("SELECT br.complete,br.results" 1000 " FROM buildsets AS bs, buildrequests AS br" 1001 " WHERE br.buildsetid=bs.id AND bs.id=?") 1002 t.execute(q, (bsid,)) 1003 results = t.fetchall() 1004 finished = True 1005 successful = None 1006 for (c,r) in results: 1007 if not c: 1008 finished = False 1009 if c and r not in (SUCCESS, WARNINGS): 1010 successful = False 1011 if finished and successful is None: 1012 successful = True 1013 return (successful, finished)
1014
1015 - def get_active_buildset_ids(self):
1016 return self.runInteractionNow(self._txn_get_active_buildset_ids)
1017 - def _txn_get_active_buildset_ids(self, t):
1018 t.execute("SELECT id FROM buildsets WHERE complete=0") 1019 return [bsid for (bsid,) in t.fetchall()]
1020 - def get_buildset_info(self, bsid):
1021 return self.runInteractionNow(self._txn_get_buildset_info, bsid)
1022 - def _txn_get_buildset_info(self, t, bsid):
1023 q = self.quoteq("SELECT external_idstring, reason, sourcestampid," 1024 " complete, results" 1025 " FROM buildsets WHERE id=?") 1026 t.execute(q, (bsid,)) 1027 res = t.fetchall() 1028 if res: 1029 (external, reason, ssid, complete, results) = res[0] 1030 external_idstring = str_or_none(external) 1031 reason = str_or_none(reason) 1032 complete = bool(complete) 1033 return (external_idstring, reason, ssid, complete, results) 1034 return None # shouldn't happen
1035
1036 - def get_pending_brids_for_builder(self, buildername):
1037 return self.runInteractionNow(self._txn_get_pending_brids_for_builder, 1038 buildername)
1039 - def _txn_get_pending_brids_for_builder(self, t, buildername):
1040 # "pending" means unclaimed and incomplete. When a build is returned 1041 # to the pool (self.resubmit_buildrequests), the claimed_at= field is 1042 # reset to zero. 1043 t.execute(self.quoteq("SELECT id FROM buildrequests" 1044 " WHERE buildername=? AND" 1045 " complete=0 AND claimed_at=0"), 1046 (buildername,)) 1047 return [brid for (brid,) in t.fetchall()]
1048 1049 # test/debug methods 1050
1051 - def has_pending_operations(self):
1052 return bool(self._pending_operation_count)
1053
1054 - def setChangeCacheSize(self, max_size):
1055 self._change_cache.setMaxSize(max_size)
1056 1057 1058 threadable.synchronize(DBConnector) 1059