1   
   2   
   3   
   4   
   5   
   6   
   7   
   8   
   9   
  10   
  11   
  12   
  13   
  14   
  15   
  16   
  17   
  18   
  19   
  20   
  21   
  22   
  23   
  24   
  25   
  26   
  27   
  28   
  29   
  30   
  31   
  32   
  33   
  34   
  35   
  36   
  37   
  38   
  39   
  40  import sys, collections, base64 
  41   
  42  from twisted.python import log, threadable 
  43  from twisted.internet import defer 
  44  from twisted.enterprise import adbapi 
  45  from buildbot import util 
  46  from buildbot.util import collections as bbcollections 
  47  from buildbot.changes.changes import Change 
  48  from buildbot.sourcestamp import SourceStamp 
  49  from buildbot.buildrequest import BuildRequest 
  50  from buildbot.process.properties import Properties 
  51  from buildbot.status.builder import SUCCESS, WARNINGS, FAILURE 
  52  from buildbot.util.eventual import eventually 
  53  from buildbot.util import json 
  54   
  55   
  56   
  57   
  58   
  59   
  61 -    def execute(self, *args, **kwargs): 
    68   
  70      if not res: 
  71          return default 
  72      return process_f(res[0][0]) 
   73   
  75      if s is None: 
  76          return None 
  77      return str(s) 
   78   
  80      pass 
  81   
  83       
  84      compare_attrs = ["args", "kwargs"] 
  85      synchronized = ["notify", "_end_operation"] 
  86      MAX_QUERY_TIMES = 1000 
  87   
  89           
  90          self._query_times = collections.deque() 
  91          self._spec = spec 
  92   
  93           
  94          self._dbapi = spec.get_dbapi() 
  95          self._nonpool = None 
  96          self._nonpool_lastused = None 
  97          self._nonpool_max_idle = spec.get_maxidle() 
  98   
  99           
 100           
 101          self.paramstyle = self._dbapi.paramstyle 
 102   
 103          self._pool = spec.get_async_connection_pool() 
 104          self._pool.transactionFactory = MyTransaction 
 105           
 106           
 107           
 108           
 109   
 110          self._change_cache = util.LRUCache() 
 111          self._sourcestamp_cache = util.LRUCache() 
 112          self._active_operations = set()  
 113          self._pending_notifications = [] 
 114          self._subscribers = bbcollections.defaultdict(set) 
 115   
 116          self._pending_operation_count = 0 
 117   
 118          self._started = False 
  119   
 123   
 125           
 126           
 127          self._pool.start() 
 128          self._started = True 
  129   
 131          """Call this when you're done with me""" 
 132   
 133           
 134          if self._nonpool: 
 135              self._nonpool.close() 
 136              self._nonpool = None 
 137              self._nonpool_lastused = None 
 138   
 139          if not self._started: 
 140              return 
 141          self._pool.close() 
 142          self._started = False 
 143          del self._pool 
  144   
 146          """ 
 147          Given a query that contains qmark-style placeholders, like:: 
 148           INSERT INTO foo (col1, col2) VALUES (?,?) 
 149          replace the '?' with '%s' if the backend uses format-style 
 150          placeholders, like:: 
 151           INSERT INTO foo (col1, col2) VALUES (%s,%s) 
 152          """ 
 153          if self.paramstyle == "format": 
 154              return query.replace("?","%s") 
 155          assert self.paramstyle == "qmark" 
 156          return query 
  157   
 159          """ 
 160          When passing long lists of values to e.g., an INSERT query, it is 
 161          tedious to pass long strings of ? placeholders.  This function will 
 162          create a parenthesis-enclosed list of COUNT placeholders.  Note that 
 163          the placeholders have already had quoteq() applied. 
 164          """ 
 165          p = self.quoteq("?") 
 166          return "(" + ",".join([p]*count) + ")" 
  167   
 169          """Returns None for an empty database, or a number (probably 1) for 
 170          the database's version""" 
 171          try: 
 172              res = self.runQueryNow("SELECT version FROM version") 
 173          except (self._dbapi.OperationalError, self._dbapi.ProgrammingError): 
 174               
 175              return None 
 176          assert len(res) == 1 
 177          return res[0][0] 
  178   
 180           
 181          assert self._started 
 182          return self.runInteractionNow(self._runQuery, *args, **kwargs) 
  183   
 187   
 189          t = Token() 
 190          self._active_operations.add(t) 
 191          return t 
  193           
 194           
 195           
 196           
 197          self._active_operations.discard(t) 
 198          if self._active_operations: 
 199              return 
 200          for (category, args) in self._pending_notifications: 
 201               
 202               
 203              eventually(self.send_notification, category, args) 
 204          self._pending_notifications = [] 
  205   
 207           
 208          assert self._started 
 209          start = self._getCurrentTime() 
 210          t = self._start_operation() 
 211          try: 
 212              return self._runInteractionNow(interaction, *args, **kwargs) 
 213          finally: 
 214              self._end_operation(t) 
 215              self._add_query_time(start) 
  216   
 218           
 219           
 220           
 221           
 222          if self._nonpool_max_idle is not None: 
 223              now = util.now() 
 224              if self._nonpool_lastused and self._nonpool_lastused + self._nonpool_max_idle < now: 
 225                  self._nonpool = None 
 226   
 227          if not self._nonpool: 
 228              self._nonpool = self._spec.get_sync_connection() 
 229   
 230          self._nonpool_lastused = util.now() 
 231          return self._nonpool 
  232   
 234          conn = self.get_sync_connection() 
 235          c = conn.cursor() 
 236          try: 
 237              result = interaction(c, *args, **kwargs) 
 238              c.close() 
 239              conn.commit() 
 240              return result 
 241          except: 
 242              excType, excValue, excTraceback = sys.exc_info() 
 243              try: 
 244                  conn.rollback() 
 245                  c2 = conn.cursor() 
 246                  c2.execute(self._pool.good_sql) 
 247                  c2.close() 
 248                  conn.commit() 
 249              except: 
 250                  log.msg("rollback failed, will reconnect next query") 
 251                  log.err() 
 252                   
 253                   
 254                  self._nonpool = None 
 255              raise excType, excValue, excTraceback 
  256   
 257 -    def notify(self, category, *args): 
  258           
 259           
 260          self._pending_notifications.append( (category,args) ) 
  261   
 267   
 270   
 272          assert self._started 
 273          self._pending_operation_count += 1 
 274          start = self._getCurrentTime() 
 275           
 276          d = self._pool.runQuery(*args, **kwargs) 
 277           
 278          return d 
  280          self._end_operation(t) 
 281          self._add_query_time(start) 
 282          self._pending_operation_count -= 1 
 283          return res 
  284   
 286          elapsed = self._getCurrentTime() - start 
 287          self._query_times.append(elapsed) 
 288          if len(self._query_times) > self.MAX_QUERY_TIMES: 
 289              self._query_times.popleft() 
  290   
 292          assert self._started 
 293          self._pending_operation_count += 1 
 294          start = self._getCurrentTime() 
 295          t = self._start_operation() 
 296          d = self._pool.runInteraction(*args, **kwargs) 
 297          d.addBoth(self._runInteraction_done, start, t) 
 298          return d 
  300          self._end_operation(t) 
 301          self._add_query_time(start) 
 302          self._pending_operation_count -= 1 
 303          return res 
  304   
 305       
 306   
 310   
 312          q = self.quoteq("INSERT INTO changes" 
 313                          " (author," 
 314                          "  comments, is_dir," 
 315                          "  branch, revision, revlink," 
 316                          "  when_timestamp, category," 
 317                          "  repository, project)" 
 318                          " VALUES (?, ?,?, ?,?,?, ?,?, ?,?)") 
 319           
 320   
 321          values = (change.who, 
 322                    change.comments, change.isdir, 
 323                    change.branch, change.revision, change.revlink, 
 324                    change.when, change.category, change.repository, 
 325                    change.project) 
 326          t.execute(q, values) 
 327          change.number = t.lastrowid 
 328   
 329          for link in change.links: 
 330              t.execute(self.quoteq("INSERT INTO change_links (changeid, link) " 
 331                                    "VALUES (?,?)"), 
 332                        (change.number, link)) 
 333          for filename in change.files: 
 334              t.execute(self.quoteq("INSERT INTO change_files (changeid,filename)" 
 335                                    " VALUES (?,?)"), 
 336                        (change.number, filename)) 
 337          for propname,propvalue in change.properties.properties.items(): 
 338              encoded_value = json.dumps(propvalue) 
 339              t.execute(self.quoteq("INSERT INTO change_properties" 
 340                                    " (changeid, property_name, property_value)" 
 341                                    " VALUES (?,?,?)"), 
 342                        (change.number, propname, encoded_value)) 
 343          self.notify("add-change", change.number) 
  344   
 346          q = "SELECT changeid FROM changes" 
 347          args = [] 
 348          if branches or categories or committers: 
 349              q += " WHERE " 
 350              pieces = [] 
 351              if branches: 
 352                  pieces.append("branch IN %s" % self.parmlist(len(branches))) 
 353                  args.extend(list(branches)) 
 354              if categories: 
 355                  pieces.append("category IN %s" % self.parmlist(len(categories))) 
 356                  args.extend(list(categories)) 
 357              if committers: 
 358                  pieces.append("author IN %s" % self.parmlist(len(committers))) 
 359                  args.extend(list(committers)) 
 360              if minTime: 
 361                  pieces.append("when_timestamp > %d" % minTime) 
 362              q += " AND ".join(pieces) 
 363          q += " ORDER BY changeid DESC" 
 364          rows = self.runQueryNow(q, tuple(args)) 
 365          for (changeid,) in rows: 
 366              yield self.getChangeNumberedNow(changeid) 
  367   
 369          if t: 
 370              return self._txn_getLatestChangeNumber(t) 
 371          else: 
 372              return self.runInteractionNow(self._txn_getLatestChangeNumber) 
  374          q = self.quoteq("SELECT max(changeid) from changes") 
 375          t.execute(q) 
 376          row = t.fetchone() 
 377          if not row: 
 378              return 0 
 379          return row[0] 
  380   
 382           
 383          assert changeid >= 0 
 384          c = self._change_cache.get(changeid) 
 385          if c: 
 386              return c 
 387          if t: 
 388              c = self._txn_getChangeNumberedNow(t, changeid) 
 389          else: 
 390              c = self.runInteractionNow(self._txn_getChangeNumberedNow, changeid) 
 391          self._change_cache.add(changeid, c) 
 392          return c 
  394          q = self.quoteq("SELECT author, comments," 
 395                          " is_dir, branch, revision, revlink," 
 396                          " when_timestamp, category," 
 397                          " repository, project" 
 398                          " FROM changes WHERE changeid = ?") 
 399          t.execute(q, (changeid,)) 
 400          rows = t.fetchall() 
 401          if not rows: 
 402              return None 
 403          (who, comments, 
 404           isdir, branch, revision, revlink, 
 405           when, category, repository, project) = rows[0] 
 406          branch = str_or_none(branch) 
 407          revision = str_or_none(revision) 
 408          q = self.quoteq("SELECT link FROM change_links WHERE changeid=?") 
 409          t.execute(q, (changeid,)) 
 410          rows = t.fetchall() 
 411          links = [row[0] for row in rows] 
 412          links.sort() 
 413   
 414          q = self.quoteq("SELECT filename FROM change_files WHERE changeid=?") 
 415          t.execute(q, (changeid,)) 
 416          rows = t.fetchall() 
 417          files = [row[0] for row in rows] 
 418          files.sort() 
 419   
 420          p = self.get_properties_from_db("change_properties", "changeid", 
 421                                          changeid, t) 
 422          c = Change(who=who, files=files, comments=comments, isdir=isdir, 
 423                     links=links, revision=revision, when=when, 
 424                     branch=branch, category=category, revlink=revlink, 
 425                     repository=repository, project=project) 
 426          c.properties.updateFromProperties(p) 
 427          c.number = changeid 
 428          return c 
  429   
 431           
 432           
 433          assert changeid >= 0 
 434          c = self._change_cache.get(changeid) 
 435          if c: 
 436              return defer.succeed(c) 
 437          d1 = self.runQuery(self.quoteq("SELECT author, comments," 
 438                                         " is_dir, branch, revision, revlink," 
 439                                         " when_timestamp, category," 
 440                                         " repository, project" 
 441                                         " FROM changes WHERE changeid = ?"), 
 442                             (changeid,)) 
 443          d2 = self.runQuery(self.quoteq("SELECT link FROM change_links" 
 444                                         " WHERE changeid=?"), 
 445                             (changeid,)) 
 446          d3 = self.runQuery(self.quoteq("SELECT filename FROM change_files" 
 447                                         " WHERE changeid=?"), 
 448                             (changeid,)) 
 449          d4 = self.runInteraction(self._txn_get_properties_from_db, 
 450                  "change_properties", "changeid", changeid) 
 451          d = defer.gatherResults([d1,d2,d3,d4]) 
 452          d.addCallback(self._getChangeByNumber_query_done, changeid) 
 453          return d 
  454   
 456          (rows, link_rows, file_rows, properties) = res 
 457          if not rows: 
 458              return None 
 459          (who, comments, 
 460           isdir, branch, revision, revlink, 
 461           when, category, repository, project) = rows[0] 
 462          branch = str_or_none(branch) 
 463          revision = str_or_none(revision) 
 464          links = [row[0] for row in link_rows] 
 465          links.sort() 
 466          files = [row[0] for row in file_rows] 
 467          files.sort() 
 468   
 469          c = Change(who=who, files=files, comments=comments, isdir=isdir, 
 470                     links=links, revision=revision, when=when, 
 471                     branch=branch, category=category, revlink=revlink, 
 472                     repository=repository, project=project) 
 473          c.properties.updateFromProperties(properties) 
 474          c.number = changeid 
 475          self._change_cache.add(changeid, c) 
 476          return c 
  477   
 479          """Return a Deferred that fires with a list of all Change instances 
 480          with numbers greater than the given value, sorted by number. This is 
 481          useful for catching up with everything that's happened since you last 
 482          called this function.""" 
 483          assert last_changeid >= 0 
 484          if t: 
 485              return self._txn_getChangesGreaterThan(t, last_changeid) 
 486          else: 
 487              return self.runInteractionNow(self._txn_getChangesGreaterThan, 
 488                                            last_changeid) 
  496   
 498          return defer.gatherResults([self.getChangeByNumber(changeid) 
 499                                      for changeid in changeids]) 
  500   
 501       
 502   
 504          assert isinstance(ssid, (int, long)) 
 505          ss = self._sourcestamp_cache.get(ssid) 
 506          if ss: 
 507              return ss 
 508          if t: 
 509              ss = self._txn_getSourceStampNumbered(t, ssid) 
 510          else: 
 511              ss = self.runInteractionNow(self._txn_getSourceStampNumbered, 
 512                                             ssid) 
 513          self._sourcestamp_cache.add(ssid, ss) 
 514          return ss 
  515   
 517          assert isinstance(ssid, (int, long)) 
 518          t.execute(self.quoteq("SELECT branch,revision,patchid,project,repository" 
 519                                " FROM sourcestamps WHERE id=?"), 
 520                    (ssid,)) 
 521          r = t.fetchall() 
 522          if not r: 
 523              return None 
 524          (branch_u, revision_u, patchid, project, repository) = r[0] 
 525          branch = str_or_none(branch_u) 
 526          revision = str_or_none(revision_u) 
 527   
 528          patch = None 
 529          if patchid is not None: 
 530              t.execute(self.quoteq("SELECT patchlevel,patch_base64,subdir" 
 531                                    " FROM patches WHERE id=?"), 
 532                        (patchid,)) 
 533              r = t.fetchall() 
 534              assert len(r) == 1 
 535              (patch_level, patch_text_base64, subdir_u) = r[0] 
 536              patch_text = base64.b64decode(patch_text_base64) 
 537              if subdir_u: 
 538                  patch = (patch_level, patch_text, str(subdir_u)) 
 539              else: 
 540                  patch = (patch_level, patch_text) 
 541   
 542          t.execute(self.quoteq("SELECT changeid FROM sourcestamp_changes" 
 543                                " WHERE sourcestampid=?" 
 544                                " ORDER BY changeid ASC"), 
 545                    (ssid,)) 
 546          r = t.fetchall() 
 547          changes = None 
 548          if r: 
 549              changes = [self.getChangeNumberedNow(changeid, t) 
 550                         for (changeid,) in r] 
 551          ss = SourceStamp(branch, revision, patch, changes, project=project, repository=repository) 
 552          ss.ssid = ssid 
 553          return ss 
  554   
 555       
 556   
 558          if t: 
 559              return self._txn_get_properties_from_db(t, tablename, idname, id) 
 560          else: 
 561              return self.runInteractionNow(self._txn_get_properties_from_db, 
 562                                            tablename, idname, id) 
  563   
 565           
 566           
 567          q = self.quoteq("SELECT property_name,property_value FROM %s WHERE %s=?" 
 568                          % (tablename, idname)) 
 569          t.execute(q, (id,)) 
 570          retval = Properties() 
 571          for key, valuepair in t.fetchall(): 
 572              value, source = json.loads(valuepair) 
 573              retval.setProperty(str(key), value, source) 
 574          return retval 
  575   
 576       
 577   
 581          for scheduler in added: 
 582              name = scheduler.name 
 583              assert name 
 584              class_name = "%s.%s" % (scheduler.__class__.__module__, 
 585                      scheduler.__class__.__name__) 
 586              q = self.quoteq(""" 
 587                  SELECT schedulerid, class_name FROM schedulers WHERE 
 588                      name=? AND 
 589                      (class_name=? OR class_name='') 
 590                      """) 
 591              t.execute(q, (name, class_name)) 
 592              row = t.fetchone() 
 593              if row: 
 594                  sid, db_class_name = row 
 595                  if db_class_name == '': 
 596                       
 597                       
 598                       
 599                      q = self.quoteq("""UPDATE schedulers SET class_name=? 
 600                          WHERE schedulerid=?""") 
 601                      t.execute(q, (class_name, sid)) 
 602                  elif db_class_name != class_name: 
 603                       
 604                       
 605                      sid = None 
 606              else: 
 607                  sid = None 
 608   
 609              if sid is None: 
 610                   
 611                   
 612                   
 613                  q = ("SELECT changeid FROM changes" 
 614                       " ORDER BY changeid DESC LIMIT 1") 
 615                  t.execute(q) 
 616                  max_changeid = _one_or_else(t.fetchall(), 0) 
 617                  state = scheduler.get_initial_state(max_changeid) 
 618                  state_json = json.dumps(state) 
 619                  q = self.quoteq("INSERT INTO schedulers" 
 620                                  " (name, class_name, state)" 
 621                                  "  VALUES (?,?,?)") 
 622                  t.execute(q, (name, class_name, state_json)) 
 623                  sid = t.lastrowid 
 624              log.msg("scheduler '%s' got id %d" % (scheduler.name, sid)) 
 625              scheduler.schedulerid = sid 
  626   
 628          q = self.quoteq("SELECT state FROM schedulers WHERE schedulerid=?") 
 629          t.execute(q, (schedulerid,)) 
 630          state_json = _one_or_else(t.fetchall()) 
 631          assert state_json is not None 
 632          return json.loads(state_json) 
  633   
 635          state_json = json.dumps(state) 
 636          q = self.quoteq("UPDATE schedulers SET state=? WHERE schedulerid=?") 
 637          t.execute(q, (state_json, schedulerid)) 
  638   
 640          """Given a SourceStamp (which may or may not have an ssid), make sure 
 641          the contents are in the database, and return the ssid. If the 
 642          SourceStamp originally came from the DB (and thus already has an 
 643          ssid), just return the ssid. If not, create a new row for it.""" 
 644          if ss.ssid is not None: 
 645              return ss.ssid 
 646          patchid = None 
 647          if ss.patch: 
 648              patchlevel = ss.patch[0] 
 649              diff = ss.patch[1] 
 650              subdir = None 
 651              if len(ss.patch) > 2: 
 652                  subdir = ss.patch[2] 
 653              q = self.quoteq("INSERT INTO patches" 
 654                              " (patchlevel, patch_base64, subdir)" 
 655                              " VALUES (?,?,?)") 
 656              t.execute(q, (patchlevel, base64.b64encode(diff), subdir)) 
 657              patchid = t.lastrowid 
 658          t.execute(self.quoteq("INSERT INTO sourcestamps" 
 659                                " (branch, revision, patchid, project, repository)" 
 660                                " VALUES (?,?,?,?,?)"), 
 661                    (ss.branch, ss.revision, patchid, ss.project, ss.repository)) 
 662          ss.ssid = t.lastrowid 
 663          q2 = self.quoteq("INSERT INTO sourcestamp_changes" 
 664                           " (sourcestampid, changeid) VALUES (?,?)") 
 665          for c in ss.changes: 
 666              t.execute(q2, (ss.ssid, c.number)) 
 667          return ss.ssid 
  668   
 669 -    def create_buildset(self, ssid, reason, properties, builderNames, t, 
 670                          external_idstring=None): 
  671           
 672          now = self._getCurrentTime() 
 673          t.execute(self.quoteq("INSERT INTO buildsets" 
 674                                " (external_idstring, reason," 
 675                                "  sourcestampid, submitted_at)" 
 676                                " VALUES (?,?,?,?)"), 
 677                    (external_idstring, reason, ssid, now)) 
 678          bsid = t.lastrowid 
 679          for propname, propvalue in properties.properties.items(): 
 680              encoded_value = json.dumps(propvalue) 
 681              t.execute(self.quoteq("INSERT INTO buildset_properties" 
 682                                    " (buildsetid, property_name, property_value)" 
 683                                    " VALUES (?,?,?)"), 
 684                        (bsid, propname, encoded_value)) 
 685          brids = [] 
 686          for bn in builderNames: 
 687              t.execute(self.quoteq("INSERT INTO buildrequests" 
 688                                    " (buildsetid, buildername, submitted_at)" 
 689                                    " VALUES (?,?,?)"), 
 690                        (bsid, bn, now)) 
 691              brid = t.lastrowid 
 692              brids.append(brid) 
 693          self.notify("add-buildset", bsid) 
 694          self.notify("add-buildrequest", *brids) 
 695          return bsid 
  696   
 698          q = self.quoteq("INSERT INTO scheduler_changes" 
 699                          " (schedulerid, changeid, important)" 
 700                          " VALUES (?,?,?)") 
 701          t.execute(q, (schedulerid, number, bool(important))) 
  702   
 704          q = self.quoteq("SELECT changeid, important" 
 705                          " FROM scheduler_changes" 
 706                          " WHERE schedulerid=?") 
 707          t.execute(q, (schedulerid,)) 
 708          important = [] 
 709          unimportant = [] 
 710          for (changeid, is_important) in t.fetchall(): 
 711              c = self.getChangeNumberedNow(changeid, t) 
 712              if is_important: 
 713                  important.append(c) 
 714              else: 
 715                  unimportant.append(c) 
 716          return (important, unimportant) 
  717   
 719          t.execute(self.quoteq("DELETE FROM scheduler_changes" 
 720                                " WHERE schedulerid=? AND changeid IN ") 
 721                    + self.parmlist(len(changeids)), 
 722                    (schedulerid,) + tuple(changeids)) 
  723   
 725           
 726           
 727          t.execute(self.quoteq("INSERT INTO scheduler_upstream_buildsets" 
 728                                " (buildsetid, schedulerid, active)" 
 729                                " VALUES (?,?,?)"), 
 730                    (bsid, schedulerid, 1)) 
  731   
 733           
 734          t.execute(self.quoteq("SELECT bs.id, " 
 735                                "  bs.sourcestampid, bs.complete, bs.results" 
 736                                " FROM scheduler_upstream_buildsets AS s," 
 737                                "  buildsets AS bs" 
 738                                " WHERE s.buildsetid=bs.id" 
 739                                "  AND s.schedulerid=?" 
 740                                "  AND s.active=1"), 
 741                    (schedulerid,)) 
 742          return t.fetchall() 
  743   
 745          t.execute(self.quoteq("UPDATE scheduler_upstream_buildsets" 
 746                                " SET active=0" 
 747                                " WHERE buildsetid=? AND schedulerid=?"), 
 748                    (buildsetid, schedulerid)) 
  749   
 750       
 751   
 753          assert isinstance(brid, (int, long)) 
 754          if t: 
 755              br = self._txn_getBuildRequestWithNumber(t, brid) 
 756          else: 
 757              br = self.runInteractionNow(self._txn_getBuildRequestWithNumber, 
 758                                          brid) 
 759          return br 
  761          assert isinstance(brid, (int, long)) 
 762          t.execute(self.quoteq("SELECT br.buildsetid, bs.reason," 
 763                                " bs.sourcestampid, br.buildername," 
 764                                " bs.submitted_at, br.priority" 
 765                                " FROM buildrequests AS br, buildsets AS bs" 
 766                                " WHERE br.id=? AND br.buildsetid=bs.id"), 
 767                    (brid,)) 
 768          r = t.fetchall() 
 769          if not r: 
 770              return None 
 771          (bsid, reason, ssid, builder_name, submitted_at, priority) = r[0] 
 772          ss = self.getSourceStampNumberedNow(ssid, t) 
 773          properties = self.get_properties_from_db("buildset_properties", 
 774                                                   "buildsetid", bsid, t) 
 775          br = BuildRequest(reason, ss, builder_name, properties) 
 776          br.submittedAt = submitted_at 
 777          br.priority = priority 
 778          br.id = brid 
 779          br.bsid = bsid 
 780          return br 
  781   
 783          assert isinstance(brid, (int, long)) 
 784          return self.runInteractionNow(self._txn_get_buildername_for_brid, brid) 
  786          assert isinstance(brid, (int, long)) 
 787          t.execute(self.quoteq("SELECT buildername FROM buildrequests" 
 788                                " WHERE id=?"), 
 789                    (brid,)) 
 790          r = t.fetchall() 
 791          if not r: 
 792              return None 
 793          return r[0][0] 
  794   
 797          t.execute(self.quoteq("SELECT br.id" 
 798                                " FROM buildrequests AS br, buildsets AS bs" 
 799                                " WHERE br.buildername=? AND br.complete=0" 
 800                                " AND br.buildsetid=bs.id" 
 801                                " AND (br.claimed_at<?" 
 802                                "      OR (br.claimed_by_name=?" 
 803                                "          AND br.claimed_by_incarnation!=?))" 
 804                                " ORDER BY br.priority DESC,bs.submitted_at ASC"), 
 805                    (buildername, old, master_name, master_incarnation)) 
 806          requests = [self.getBuildRequestWithNumber(brid, t) 
 807                      for (brid,) in t.fetchall()] 
 808          return requests 
  809   
 812          if not brids: 
 813              return 
 814          if t: 
 815              self._txn_claim_buildrequests(t, now, master_name, 
 816                                            master_incarnation, brids) 
 817          else: 
 818              self.runInteractionNow(self._txn_claim_buildrequests, 
 819                                     now, master_name, master_incarnation, brids) 
  822          q = self.quoteq("UPDATE buildrequests" 
 823                          " SET claimed_at = ?," 
 824                          "     claimed_by_name = ?, claimed_by_incarnation = ?" 
 825                          " WHERE id IN " + self.parmlist(len(brids))) 
 826          qargs = [now, master_name, master_incarnation] + list(brids) 
 827          t.execute(q, qargs) 
  828   
 832          now = self._getCurrentTime() 
 833          t.execute(self.quoteq("INSERT INTO builds (number, brid, start_time)" 
 834                                " VALUES (?,?,?)"), 
 835                    (buildnumber, brid, now)) 
 836          bid = t.lastrowid 
 837          self.notify("add-build", bid) 
 838          return bid 
  839   
 843          now = self._getCurrentTime() 
 844          q = self.quoteq("UPDATE builds SET finish_time = ?" 
 845                          " WHERE id IN " + self.parmlist(len(bids))) 
 846          qargs = [now] + list(bids) 
 847          t.execute(q, qargs) 
  848   
 852           
 853          t.execute(self.quoteq("SELECT b.brid,br.buildername,b.number" 
 854                                " FROM builds AS b, buildrequests AS br" 
 855                                " WHERE b.id=? AND b.brid=br.id"), 
 856                    (bid,)) 
 857          res = t.fetchall() 
 858          if res: 
 859              return res[0] 
 860          return (None,None,None) 
  861   
 868   
 872           
 873           
 874          q = self.quoteq("UPDATE buildrequests" 
 875                          " SET claimed_at=0," 
 876                          "     claimed_by_name=NULL, claimed_by_incarnation=NULL" 
 877                          " WHERE id IN " + self.parmlist(len(brids))) 
 878          t.execute(q, brids) 
 879          self.notify("add-buildrequest", *brids) 
  880   
 884          now = self._getCurrentTime() 
 885           
 886           
 887          q = self.quoteq("UPDATE buildrequests" 
 888                          " SET complete=1, results=?, complete_at=?" 
 889                          " WHERE id IN " + self.parmlist(len(brids))) 
 890          t.execute(q, [results, now]+brids) 
 891           
 892          q = self.quoteq("SELECT bs.id" 
 893                          " FROM buildsets AS bs, buildrequests AS br" 
 894                          " WHERE br.buildsetid=bs.id AND bs.complete=0" 
 895                          "  AND br.id in " 
 896                          + self.parmlist(len(brids))) 
 897          t.execute(q, brids) 
 898          bsids = [bsid for (bsid,) in t.fetchall()] 
 899          for bsid in bsids: 
 900              self._check_buildset(t, bsid, now) 
 901          self.notify("retire-buildrequest", *brids) 
 902          self.notify("modify-buildset", *bsids) 
  903   
 907           
 908           
 909           
 910           
 911           
 912           
 913          if True: 
 914              now = self._getCurrentTime() 
 915              q = self.quoteq("UPDATE buildrequests" 
 916                              " SET complete=1, results=?, complete_at=?" 
 917                              " WHERE id IN " + self.parmlist(len(brids))) 
 918              t.execute(q, [FAILURE, now]+brids) 
 919          else: 
 920              q = self.quoteq("DELETE FROM buildrequests" 
 921                              " WHERE id IN " + self.parmlist(len(brids))) 
 922              t.execute(q, brids) 
 923   
 924           
 925          q = self.quoteq("SELECT bs.id" 
 926                          " FROM buildsets AS bs, buildrequests AS br" 
 927                          " WHERE br.buildsetid=bs.id AND bs.complete=0" 
 928                          "  AND br.id in " 
 929                          + self.parmlist(len(brids))) 
 930          t.execute(q, brids) 
 931          bsids = [bsid for (bsid,) in t.fetchall()] 
 932          for bsid in bsids: 
 933              self._check_buildset(t, bsid, now) 
 934   
 935          self.notify("cancel-buildrequest", *brids) 
 936          self.notify("modify-buildset", *bsids) 
  937   
 939          q = self.quoteq("SELECT br.complete,br.results" 
 940                          " FROM buildsets AS bs, buildrequests AS br" 
 941                          " WHERE bs.complete=0" 
 942                          "  AND br.buildsetid=bs.id AND bs.id=?") 
 943          t.execute(q, (bsid,)) 
 944          results = t.fetchall() 
 945          is_complete = True 
 946          bs_results = SUCCESS 
 947          for (complete, r) in results: 
 948              if not complete: 
 949                   
 950                  is_complete = False 
 951              if r == FAILURE: 
 952                  bs_results = r 
 953          if is_complete: 
 954               
 955              q = self.quoteq("UPDATE buildsets" 
 956                              " SET complete=1, complete_at=?, results=?" 
 957                              " WHERE id=?") 
 958              t.execute(q, (now, bs_results, bsid)) 
  959   
 961          return self.runInteractionNow(self._txn_get_buildrequestids_for_buildset, 
 962                                        bsid) 
  964          t.execute(self.quoteq("SELECT buildername,id FROM buildrequests" 
 965                                " WHERE buildsetid=?"), 
 966                    (bsid,)) 
 967          return dict(t.fetchall()) 
  968   
 972           
 973           
 974           
 975           
 976           
 977           
 978          q = self.quoteq("SELECT br.complete,br.results" 
 979                          " FROM buildsets AS bs, buildrequests AS br" 
 980                          " WHERE br.buildsetid=bs.id AND bs.id=?") 
 981          t.execute(q, (bsid,)) 
 982          results = t.fetchall() 
 983          finished = True 
 984          successful = None 
 985          for (c,r) in results: 
 986              if not c: 
 987                  finished = False 
 988              if c and r not in (SUCCESS, WARNINGS): 
 989                  successful = False 
 990          if finished and successful is None: 
 991              successful = True 
 992          return (successful, finished) 
  993   
 997          t.execute("SELECT id FROM buildsets WHERE complete=0") 
 998          return [bsid for (bsid,) in t.fetchall()] 
 1002          q = self.quoteq("SELECT external_idstring, reason, sourcestampid," 
1003                          "       complete, results" 
1004                          " FROM buildsets WHERE id=?") 
1005          t.execute(q, (bsid,)) 
1006          res = t.fetchall() 
1007          if res: 
1008              (external, reason, ssid, complete, results) = res[0] 
1009              external_idstring = str_or_none(external) 
1010              reason = str_or_none(reason) 
1011              complete = bool(complete) 
1012              return (external_idstring, reason, ssid, complete, results) 
1013          return None  
 1014   
1016          return self.runInteractionNow(self._txn_get_pending_brids_for_builder, 
1017                                        buildername) 
 1019           
1020           
1021           
1022          t.execute(self.quoteq("SELECT id FROM buildrequests" 
1023                                " WHERE buildername=? AND" 
1024                                "  complete=0 AND claimed_at=0"), 
1025                    (buildername,)) 
1026          return [brid for (brid,) in t.fetchall()] 
 1027   
1028       
1029   
1031          return bool(self._pending_operation_count) 
  1032   
1033   
1034  threadable.synchronize(DBConnector) 
1035