1   
   2   
   3   
   4   
   5   
   6   
   7   
   8   
   9   
  10   
  11   
  12   
  13   
  14   
  15   
  16  import sys, collections, base64 
  17   
  18  from twisted.python import log, threadable 
  19  from twisted.internet import defer 
  20  from twisted.enterprise import adbapi 
  21  from buildbot import util 
  22  from buildbot.util import collections as bbcollections 
  23  from buildbot.changes.changes import Change 
  24  from buildbot.sourcestamp import SourceStamp 
  25  from buildbot.buildrequest import BuildRequest 
  26  from buildbot.process.properties import Properties 
  27  from buildbot.status.builder import SUCCESS, WARNINGS, FAILURE 
  28  from buildbot.util.eventual import eventually 
  29  from buildbot.util import json 
  30   
  31   
  32   
  33   
  34   
  35   
  37 -    def execute(self, *args, **kwargs): 
    44   
  46      if not res: 
  47          return default 
  48      return process_f(res[0][0]) 
   49   
  51      if s is None: 
  52          return None 
  53      return str(s) 
   54   
  56      pass 
  57   
  59       
  60      compare_attrs = ["args", "kwargs"] 
  61      synchronized = ["notify", "_end_operation"] 
  62      MAX_QUERY_TIMES = 1000 
  63   
  65           
  66          self._query_times = collections.deque() 
  67          self._spec = spec 
  68   
  69           
  70          self._dbapi = spec.get_dbapi() 
  71          self._nonpool = None 
  72          self._nonpool_lastused = None 
  73          self._nonpool_max_idle = spec.get_maxidle() 
  74   
  75           
  76           
  77          self.paramstyle = self._dbapi.paramstyle 
  78   
  79          self._pool = spec.get_async_connection_pool() 
  80          self._pool.transactionFactory = MyTransaction 
  81           
  82           
  83           
  84           
  85   
  86          self._change_cache = util.LRUCache() 
  87          self._sourcestamp_cache = util.LRUCache() 
  88          self._active_operations = set()  
  89          self._pending_notifications = [] 
  90          self._subscribers = bbcollections.defaultdict(set) 
  91   
  92          self._pending_operation_count = 0 
  93   
  94          self._started = False 
   95   
  99   
 101           
 102           
 103          self._pool.start() 
 104          self._started = True 
  105   
 107          """Call this when you're done with me""" 
 108   
 109           
 110          if self._nonpool: 
 111              self._nonpool.close() 
 112              self._nonpool = None 
 113              self._nonpool_lastused = None 
 114   
 115          if not self._started: 
 116              return 
 117          self._pool.close() 
 118          self._started = False 
 119          del self._pool 
  120   
 122          """ 
 123          Given a query that contains qmark-style placeholders, like:: 
 124           INSERT INTO foo (col1, col2) VALUES (?,?) 
 125          replace the '?' with '%s' if the backend uses format-style 
 126          placeholders, like:: 
 127           INSERT INTO foo (col1, col2) VALUES (%s,%s) 
 128          """ 
 129          if self.paramstyle == "format": 
 130              return query.replace("?","%s") 
 131          assert self.paramstyle == "qmark" 
 132          return query 
  133   
 135          """ 
 136          When passing long lists of values to e.g., an INSERT query, it is 
 137          tedious to pass long strings of ? placeholders.  This function will 
 138          create a parenthesis-enclosed list of COUNT placeholders.  Note that 
 139          the placeholders have already had quoteq() applied. 
 140          """ 
 141          p = self.quoteq("?") 
 142          return "(" + ",".join([p]*count) + ")" 
  143   
 145          """Returns None for an empty database, or a number (probably 1) for 
 146          the database's version""" 
 147          try: 
 148              res = self.runQueryNow("SELECT version FROM version") 
 149          except (self._dbapi.OperationalError, self._dbapi.ProgrammingError): 
 150               
 151              return None 
 152          assert len(res) == 1 
 153          return res[0][0] 
  154   
 156           
 157          assert self._started 
 158          return self.runInteractionNow(self._runQuery, *args, **kwargs) 
  159   
 163   
 165          t = Token() 
 166          self._active_operations.add(t) 
 167          return t 
  169           
 170           
 171           
 172           
 173          self._active_operations.discard(t) 
 174          if self._active_operations: 
 175              return 
 176          for (category, args) in self._pending_notifications: 
 177               
 178               
 179              eventually(self.send_notification, category, args) 
 180          self._pending_notifications = [] 
  181   
 183           
 184          assert self._started 
 185          start = self._getCurrentTime() 
 186          t = self._start_operation() 
 187          try: 
 188              return self._runInteractionNow(interaction, *args, **kwargs) 
 189          finally: 
 190              self._end_operation(t) 
 191              self._add_query_time(start) 
  192   
 194           
 195           
 196           
 197           
 198          if self._nonpool_max_idle is not None: 
 199              now = util.now() 
 200              if self._nonpool_lastused and self._nonpool_lastused + self._nonpool_max_idle < now: 
 201                  self._nonpool = None 
 202   
 203          if not self._nonpool: 
 204              self._nonpool = self._spec.get_sync_connection() 
 205   
 206          self._nonpool_lastused = util.now() 
 207          return self._nonpool 
  208   
 210          conn = self.get_sync_connection() 
 211          c = conn.cursor() 
 212          try: 
 213              result = interaction(c, *args, **kwargs) 
 214              c.close() 
 215              conn.commit() 
 216              return result 
 217          except: 
 218              excType, excValue, excTraceback = sys.exc_info() 
 219              try: 
 220                  conn.rollback() 
 221                  c2 = conn.cursor() 
 222                  c2.execute(self._pool.good_sql) 
 223                  c2.close() 
 224                  conn.commit() 
 225              except: 
 226                  log.msg("rollback failed, will reconnect next query") 
 227                  log.err() 
 228                   
 229                   
 230                  self._nonpool = None 
 231              raise excType, excValue, excTraceback 
  232   
 233 -    def notify(self, category, *args): 
  234           
 235           
 236          self._pending_notifications.append( (category,args) ) 
  237   
 243   
 246   
 248          assert self._started 
 249          self._pending_operation_count += 1 
 250          d = self._pool.runQuery(*args, **kwargs) 
 251          return d 
  252   
 254          self._end_operation(t) 
 255          self._add_query_time(start) 
 256          self._pending_operation_count -= 1 
 257          return res 
  258   
 260          elapsed = self._getCurrentTime() - start 
 261          self._query_times.append(elapsed) 
 262          if len(self._query_times) > self.MAX_QUERY_TIMES: 
 263              self._query_times.popleft() 
  264   
 266          assert self._started 
 267          self._pending_operation_count += 1 
 268          start = self._getCurrentTime() 
 269          t = self._start_operation() 
 270          d = self._pool.runInteraction(*args, **kwargs) 
 271          d.addBoth(self._runInteraction_done, start, t) 
 272          return d 
  274          self._end_operation(t) 
 275          self._add_query_time(start) 
 276          self._pending_operation_count -= 1 
 277          return res 
  278   
 279       
 280   
 284   
 286          q = self.quoteq("INSERT INTO changes" 
 287                          " (author," 
 288                          "  comments, is_dir," 
 289                          "  branch, revision, revlink," 
 290                          "  when_timestamp, category," 
 291                          "  repository, project)" 
 292                          " VALUES (?, ?,?, ?,?,?, ?,?, ?,?)") 
 293           
 294   
 295          values = (change.who, 
 296                    change.comments, change.isdir, 
 297                    change.branch, change.revision, change.revlink, 
 298                    change.when, change.category, change.repository, 
 299                    change.project) 
 300          t.execute(q, values) 
 301          change.number = t.lastrowid 
 302   
 303          for link in change.links: 
 304              t.execute(self.quoteq("INSERT INTO change_links (changeid, link) " 
 305                                    "VALUES (?,?)"), 
 306                        (change.number, link)) 
 307          for filename in change.files: 
 308              t.execute(self.quoteq("INSERT INTO change_files (changeid,filename)" 
 309                                    " VALUES (?,?)"), 
 310                        (change.number, filename)) 
 311          for propname,propvalue in change.properties.properties.items(): 
 312              encoded_value = json.dumps(propvalue) 
 313              t.execute(self.quoteq("INSERT INTO change_properties" 
 314                                    " (changeid, property_name, property_value)" 
 315                                    " VALUES (?,?,?)"), 
 316                        (change.number, propname, encoded_value)) 
 317          self.notify("add-change", change.number) 
  318   
 320          q = "SELECT changeid FROM changes" 
 321          args = [] 
 322          if branches or categories or committers: 
 323              q += " WHERE " 
 324              pieces = [] 
 325              if branches: 
 326                  pieces.append("branch IN %s" % self.parmlist(len(branches))) 
 327                  args.extend(list(branches)) 
 328              if categories: 
 329                  pieces.append("category IN %s" % self.parmlist(len(categories))) 
 330                  args.extend(list(categories)) 
 331              if committers: 
 332                  pieces.append("author IN %s" % self.parmlist(len(committers))) 
 333                  args.extend(list(committers)) 
 334              if minTime: 
 335                  pieces.append("when_timestamp > %d" % minTime) 
 336              q += " AND ".join(pieces) 
 337          q += " ORDER BY changeid DESC" 
 338          rows = self.runQueryNow(q, tuple(args)) 
 339          for (changeid,) in rows: 
 340              yield self.getChangeNumberedNow(changeid) 
  341   
 343          if t: 
 344              return self._txn_getLatestChangeNumber(branch=branch, t=t) 
 345          else: 
 346              return self.runInteractionNow(self._txn_getLatestChangeNumber) 
  348          args = None 
 349          if branch: 
 350              br_clause = "WHERE branch =? " 
 351              args = ( branch, ) 
 352          q = self.quoteq("SELECT max(changeid) from changes"+ br_clause) 
 353          t.execute(q, args) 
 354          row = t.fetchone() 
 355          if not row: 
 356              return 0 
 357          return row[0] 
  358   
 360           
 361          assert changeid >= 0 
 362          c = self._change_cache.get(changeid) 
 363          if c: 
 364              return c 
 365          if t: 
 366              c = self._txn_getChangeNumberedNow(t, changeid) 
 367          else: 
 368              c = self.runInteractionNow(self._txn_getChangeNumberedNow, changeid) 
 369          self._change_cache.add(changeid, c) 
 370          return c 
  372          q = self.quoteq("SELECT author, comments," 
 373                          " is_dir, branch, revision, revlink," 
 374                          " when_timestamp, category," 
 375                          " repository, project" 
 376                          " FROM changes WHERE changeid = ?") 
 377          t.execute(q, (changeid,)) 
 378          rows = t.fetchall() 
 379          if not rows: 
 380              return None 
 381          (who, comments, 
 382           isdir, branch, revision, revlink, 
 383           when, category, repository, project) = rows[0] 
 384          branch = str_or_none(branch) 
 385          revision = str_or_none(revision) 
 386          q = self.quoteq("SELECT link FROM change_links WHERE changeid=?") 
 387          t.execute(q, (changeid,)) 
 388          rows = t.fetchall() 
 389          links = [row[0] for row in rows] 
 390          links.sort() 
 391   
 392          q = self.quoteq("SELECT filename FROM change_files WHERE changeid=?") 
 393          t.execute(q, (changeid,)) 
 394          rows = t.fetchall() 
 395          files = [row[0] for row in rows] 
 396          files.sort() 
 397   
 398          p = self.get_properties_from_db("change_properties", "changeid", 
 399                                          changeid, t) 
 400          c = Change(who=who, files=files, comments=comments, isdir=isdir, 
 401                     links=links, revision=revision, when=when, 
 402                     branch=branch, category=category, revlink=revlink, 
 403                     repository=repository, project=project) 
 404          c.properties.updateFromProperties(p) 
 405          c.number = changeid 
 406          return c 
  407   
 409           
 410           
 411          assert changeid >= 0 
 412          c = self._change_cache.get(changeid) 
 413          if c: 
 414              return defer.succeed(c) 
 415          d1 = self.runQuery(self.quoteq("SELECT author, comments," 
 416                                         " is_dir, branch, revision, revlink," 
 417                                         " when_timestamp, category," 
 418                                         " repository, project" 
 419                                         " FROM changes WHERE changeid = ?"), 
 420                             (changeid,)) 
 421          d2 = self.runQuery(self.quoteq("SELECT link FROM change_links" 
 422                                         " WHERE changeid=?"), 
 423                             (changeid,)) 
 424          d3 = self.runQuery(self.quoteq("SELECT filename FROM change_files" 
 425                                         " WHERE changeid=?"), 
 426                             (changeid,)) 
 427          d4 = self.runInteraction(self._txn_get_properties_from_db, 
 428                  "change_properties", "changeid", changeid) 
 429          d = defer.gatherResults([d1,d2,d3,d4]) 
 430          d.addCallback(self._getChangeByNumber_query_done, changeid) 
 431          return d 
  432   
 434          (rows, link_rows, file_rows, properties) = res 
 435          if not rows: 
 436              return None 
 437          (who, comments, 
 438           isdir, branch, revision, revlink, 
 439           when, category, repository, project) = rows[0] 
 440          branch = str_or_none(branch) 
 441          revision = str_or_none(revision) 
 442          links = [row[0] for row in link_rows] 
 443          links.sort() 
 444          files = [row[0] for row in file_rows] 
 445          files.sort() 
 446   
 447          c = Change(who=who, files=files, comments=comments, isdir=isdir, 
 448                     links=links, revision=revision, when=when, 
 449                     branch=branch, category=category, revlink=revlink, 
 450                     repository=repository, project=project) 
 451          c.properties.updateFromProperties(properties) 
 452          c.number = changeid 
 453          self._change_cache.add(changeid, c) 
 454          return c 
  455   
 457          """Return a Deferred that fires with a list of all Change instances 
 458          with numbers greater than the given value, sorted by number. This is 
 459          useful for catching up with everything that's happened since you last 
 460          called this function.""" 
 461          assert last_changeid >= 0 
 462          if t: 
 463              return self._txn_getChangesGreaterThan(t, last_changeid) 
 464          else: 
 465              return self.runInteractionNow(self._txn_getChangesGreaterThan, 
 466                                            last_changeid) 
  474   
 476          """Return a list of all extant change id's less than the given value, 
 477          sorted by number.""" 
 478          def txn(t): 
 479              q = self.quoteq("SELECT changeid FROM changes WHERE changeid < ?") 
 480              t.execute(q, (new_changeid,)) 
 481              changes = [changeid for (changeid,) in t.fetchall()] 
 482              changes.sort() 
 483              return changes 
  484          return self.runInteractionNow(txn) 
  485   
 487          """Thoroughly remove a change from the database, including all dependent 
 488          tables""" 
 489          def txn(t): 
 490              for table in ('changes', 'scheduler_changes', 'sourcestamp_changes', 
 491                            'change_files', 'change_links', 'change_properties'): 
 492                  q = self.quoteq("DELETE FROM %s WHERE changeid = ?" % table) 
 493                  t.execute(q, (changeid,)) 
  494          return self.runInteractionNow(txn) 
 495   
 497          return defer.gatherResults([self.getChangeByNumber(changeid) 
 498                                      for changeid in changeids]) 
  499   
 500       
 501   
 503          assert isinstance(ssid, (int, long)) 
 504          ss = self._sourcestamp_cache.get(ssid) 
 505          if ss: 
 506              return ss 
 507          if t: 
 508              ss = self._txn_getSourceStampNumbered(t, ssid) 
 509          else: 
 510              ss = self.runInteractionNow(self._txn_getSourceStampNumbered, 
 511                                             ssid) 
 512          self._sourcestamp_cache.add(ssid, ss) 
 513          return ss 
  514   
 516          assert isinstance(ssid, (int, long)) 
 517          t.execute(self.quoteq("SELECT branch,revision,patchid,project,repository" 
 518                                " FROM sourcestamps WHERE id=?"), 
 519                    (ssid,)) 
 520          r = t.fetchall() 
 521          if not r: 
 522              return None 
 523          (branch_u, revision_u, patchid, project, repository) = r[0] 
 524          branch = str_or_none(branch_u) 
 525          revision = str_or_none(revision_u) 
 526   
 527          patch = None 
 528          if patchid is not None: 
 529              t.execute(self.quoteq("SELECT patchlevel,patch_base64,subdir" 
 530                                    " FROM patches WHERE id=?"), 
 531                        (patchid,)) 
 532              r = t.fetchall() 
 533              assert len(r) == 1 
 534              (patch_level, patch_text_base64, subdir_u) = r[0] 
 535              patch_text = base64.b64decode(patch_text_base64) 
 536              if subdir_u: 
 537                  patch = (patch_level, patch_text, str(subdir_u)) 
 538              else: 
 539                  patch = (patch_level, patch_text) 
 540   
 541          t.execute(self.quoteq("SELECT changeid FROM sourcestamp_changes" 
 542                                " WHERE sourcestampid=?" 
 543                                " ORDER BY changeid ASC"), 
 544                    (ssid,)) 
 545          r = t.fetchall() 
 546          changes = None 
 547          if r: 
 548              changes = [self.getChangeNumberedNow(changeid, t) 
 549                         for (changeid,) in r] 
 550          ss = SourceStamp(branch, revision, patch, changes, project=project, repository=repository) 
 551          ss.ssid = ssid 
 552          return ss 
  553   
 554       
 555   
 557          if t: 
 558              return self._txn_get_properties_from_db(t, tablename, idname, id) 
 559          else: 
 560              return self.runInteractionNow(self._txn_get_properties_from_db, 
 561                                            tablename, idname, id) 
  562   
 564           
 565           
 566          q = self.quoteq("SELECT property_name,property_value FROM %s WHERE %s=?" 
 567                          % (tablename, idname)) 
 568          t.execute(q, (id,)) 
 569          retval = Properties() 
 570          for key, valuepair in t.fetchall(): 
 571              value, source = json.loads(valuepair) 
 572              retval.setProperty(str(key), value, source) 
 573          return retval 
  574   
 575       
 576   
 580          for scheduler in added: 
 581              name = scheduler.name 
 582              assert name 
 583              class_name = "%s.%s" % (scheduler.__class__.__module__, 
 584                      scheduler.__class__.__name__) 
 585              q = self.quoteq(""" 
 586                  SELECT schedulerid, class_name FROM schedulers WHERE 
 587                      name=? AND 
 588                      (class_name=? OR class_name='') 
 589                      """) 
 590              t.execute(q, (name, class_name)) 
 591              row = t.fetchone() 
 592              if row: 
 593                  sid, db_class_name = row 
 594                  if db_class_name == '': 
 595                       
 596                       
 597                       
 598                      q = self.quoteq("""UPDATE schedulers SET class_name=? 
 599                          WHERE schedulerid=?""") 
 600                      t.execute(q, (class_name, sid)) 
 601                  elif db_class_name != class_name: 
 602                       
 603                       
 604                      sid = None 
 605              else: 
 606                  sid = None 
 607   
 608              if sid is None: 
 609                   
 610                   
 611                   
 612                  q = ("SELECT changeid FROM changes" 
 613                       " ORDER BY changeid DESC LIMIT 1") 
 614                  t.execute(q) 
 615                  max_changeid = _one_or_else(t.fetchall(), 0) 
 616                  state = scheduler.get_initial_state(max_changeid) 
 617                  state_json = json.dumps(state) 
 618                  q = self.quoteq("INSERT INTO schedulers" 
 619                                  " (name, class_name, state)" 
 620                                  "  VALUES (?,?,?)") 
 621                  t.execute(q, (name, class_name, state_json)) 
 622                  sid = t.lastrowid 
 623              log.msg("scheduler '%s' got id %d" % (scheduler.name, sid)) 
 624              scheduler.schedulerid = sid 
  625   
 627          q = self.quoteq("SELECT state FROM schedulers WHERE schedulerid=?") 
 628          t.execute(q, (schedulerid,)) 
 629          state_json = _one_or_else(t.fetchall()) 
 630          assert state_json is not None 
 631          return json.loads(state_json) 
  632   
 634          state_json = json.dumps(state) 
 635          q = self.quoteq("UPDATE schedulers SET state=? WHERE schedulerid=?") 
 636          t.execute(q, (state_json, schedulerid)) 
  637   
 639          """Given a SourceStamp (which may or may not have an ssid), make sure 
 640          the contents are in the database, and return the ssid. If the 
 641          SourceStamp originally came from the DB (and thus already has an 
 642          ssid), just return the ssid. If not, create a new row for it.""" 
 643          if ss.ssid is not None: 
 644              return ss.ssid 
 645          patchid = None 
 646          if ss.patch: 
 647              patchlevel = ss.patch[0] 
 648              diff = ss.patch[1] 
 649              subdir = None 
 650              if len(ss.patch) > 2: 
 651                  subdir = ss.patch[2] 
 652              q = self.quoteq("INSERT INTO patches" 
 653                              " (patchlevel, patch_base64, subdir)" 
 654                              " VALUES (?,?,?)") 
 655              t.execute(q, (patchlevel, base64.b64encode(diff), subdir)) 
 656              patchid = t.lastrowid 
 657          t.execute(self.quoteq("INSERT INTO sourcestamps" 
 658                                " (branch, revision, patchid, project, repository)" 
 659                                " VALUES (?,?,?,?,?)"), 
 660                    (ss.branch, ss.revision, patchid, ss.project, ss.repository)) 
 661          ss.ssid = t.lastrowid 
 662          q2 = self.quoteq("INSERT INTO sourcestamp_changes" 
 663                           " (sourcestampid, changeid) VALUES (?,?)") 
 664          for c in ss.changes: 
 665              t.execute(q2, (ss.ssid, c.number)) 
 666          return ss.ssid 
  667   
 668 -    def create_buildset(self, ssid, reason, properties, builderNames, t, 
 669                          external_idstring=None): 
  670           
 671          now = self._getCurrentTime() 
 672          t.execute(self.quoteq("INSERT INTO buildsets" 
 673                                " (external_idstring, reason," 
 674                                "  sourcestampid, submitted_at)" 
 675                                " VALUES (?,?,?,?)"), 
 676                    (external_idstring, reason, ssid, now)) 
 677          bsid = t.lastrowid 
 678          for propname, propvalue in properties.properties.items(): 
 679              encoded_value = json.dumps(propvalue) 
 680              t.execute(self.quoteq("INSERT INTO buildset_properties" 
 681                                    " (buildsetid, property_name, property_value)" 
 682                                    " VALUES (?,?,?)"), 
 683                        (bsid, propname, encoded_value)) 
 684          brids = [] 
 685          for bn in builderNames: 
 686              t.execute(self.quoteq("INSERT INTO buildrequests" 
 687                                    " (buildsetid, buildername, submitted_at)" 
 688                                    " VALUES (?,?,?)"), 
 689                        (bsid, bn, now)) 
 690              brid = t.lastrowid 
 691              brids.append(brid) 
 692          self.notify("add-buildset", bsid) 
 693          self.notify("add-buildrequest", *brids) 
 694          return bsid 
  695   
 697          q = self.quoteq("INSERT INTO scheduler_changes" 
 698                          " (schedulerid, changeid, important)" 
 699                          " VALUES (?,?,?)") 
 700          t.execute(q, (schedulerid, number, bool(important))) 
  701   
 703          q = self.quoteq("SELECT changeid, important" 
 704                          " FROM scheduler_changes" 
 705                          " WHERE schedulerid=?") 
 706          t.execute(q, (schedulerid,)) 
 707          important = [] 
 708          unimportant = [] 
 709          for (changeid, is_important) in t.fetchall(): 
 710              c = self.getChangeNumberedNow(changeid, t) 
 711              if is_important: 
 712                  important.append(c) 
 713              else: 
 714                  unimportant.append(c) 
 715          return (important, unimportant) 
  716   
 718          while changeids: 
 719               
 720               
 721              batch, changeids = changeids[:100], changeids[100:] 
 722              t.execute(self.quoteq("DELETE FROM scheduler_changes" 
 723                                    " WHERE schedulerid=? AND changeid IN ") 
 724                        + self.parmlist(len(batch)), 
 725                        (schedulerid,) + tuple(batch)) 
  726   
 728           
 729           
 730          t.execute(self.quoteq("INSERT INTO scheduler_upstream_buildsets" 
 731                                " (buildsetid, schedulerid, active)" 
 732                                " VALUES (?,?,?)"), 
 733                    (bsid, schedulerid, 1)) 
  734   
 736           
 737          t.execute(self.quoteq("SELECT bs.id, " 
 738                                "  bs.sourcestampid, bs.complete, bs.results" 
 739                                " FROM scheduler_upstream_buildsets AS s," 
 740                                "  buildsets AS bs" 
 741                                " WHERE s.buildsetid=bs.id" 
 742                                "  AND s.schedulerid=?" 
 743                                "  AND s.active=1"), 
 744                    (schedulerid,)) 
 745          return t.fetchall() 
  746   
 748          t.execute(self.quoteq("UPDATE scheduler_upstream_buildsets" 
 749                                " SET active=0" 
 750                                " WHERE buildsetid=? AND schedulerid=?"), 
 751                    (buildsetid, schedulerid)) 
  752   
 753       
 754   
 756          assert isinstance(brid, (int, long)) 
 757          if t: 
 758              br = self._txn_getBuildRequestWithNumber(t, brid) 
 759          else: 
 760              br = self.runInteractionNow(self._txn_getBuildRequestWithNumber, 
 761                                          brid) 
 762          return br 
  764          assert isinstance(brid, (int, long)) 
 765          t.execute(self.quoteq("SELECT br.buildsetid, bs.reason," 
 766                                " bs.sourcestampid, br.buildername," 
 767                                " bs.submitted_at, br.priority" 
 768                                " FROM buildrequests AS br, buildsets AS bs" 
 769                                " WHERE br.id=? AND br.buildsetid=bs.id"), 
 770                    (brid,)) 
 771          r = t.fetchall() 
 772          if not r: 
 773              return None 
 774          (bsid, reason, ssid, builder_name, submitted_at, priority) = r[0] 
 775          ss = self.getSourceStampNumberedNow(ssid, t) 
 776          properties = self.get_properties_from_db("buildset_properties", 
 777                                                   "buildsetid", bsid, t) 
 778          br = BuildRequest(reason, ss, builder_name, properties) 
 779          br.submittedAt = submitted_at 
 780          br.priority = priority 
 781          br.id = brid 
 782          br.bsid = bsid 
 783          return br 
  784   
 786          assert isinstance(brid, (int, long)) 
 787          return self.runInteractionNow(self._txn_get_buildername_for_brid, brid) 
  789          assert isinstance(brid, (int, long)) 
 790          t.execute(self.quoteq("SELECT buildername FROM buildrequests" 
 791                                " WHERE id=?"), 
 792                    (brid,)) 
 793          r = t.fetchall() 
 794          if not r: 
 795              return None 
 796          return r[0][0] 
  797   
 800          q = ("SELECT br.id" 
 801               " FROM buildrequests AS br, buildsets AS bs" 
 802               " WHERE br.buildername=? AND br.complete=0" 
 803               " AND br.buildsetid=bs.id" 
 804               " AND (br.claimed_at<?" 
 805               "      OR (br.claimed_by_name=?" 
 806               "          AND br.claimed_by_incarnation!=?))" 
 807               " ORDER BY br.priority DESC,bs.submitted_at ASC") 
 808          if limit: 
 809              q += " LIMIT %s" % limit 
 810          t.execute(self.quoteq(q), 
 811                  (buildername, old, master_name, master_incarnation)) 
 812          requests = [self.getBuildRequestWithNumber(brid, t) 
 813                      for (brid,) in t.fetchall()] 
 814          return requests 
  815   
 818          if not brids: 
 819              return 
 820          if t: 
 821              self._txn_claim_buildrequests(t, now, master_name, 
 822                                            master_incarnation, brids) 
 823          else: 
 824              self.runInteractionNow(self._txn_claim_buildrequests, 
 825                                     now, master_name, master_incarnation, brids) 
  828          brids = list(brids)  
 829          while brids: 
 830              batch, brids = brids[:100], brids[100:] 
 831              q = self.quoteq("UPDATE buildrequests" 
 832                              " SET claimed_at = ?," 
 833                              "     claimed_by_name = ?, claimed_by_incarnation = ?" 
 834                              " WHERE id IN " + self.parmlist(len(batch))) 
 835              qargs = [now, master_name, master_incarnation] + list(batch) 
 836              t.execute(q, qargs) 
  837   
 841          now = self._getCurrentTime() 
 842          t.execute(self.quoteq("INSERT INTO builds (number, brid, start_time)" 
 843                                " VALUES (?,?,?)"), 
 844                    (buildnumber, brid, now)) 
 845          bid = t.lastrowid 
 846          self.notify("add-build", bid) 
 847          return bid 
  848   
 852          now = self._getCurrentTime() 
 853          while bids: 
 854              batch, bids = bids[:100], bids[100:] 
 855              q = self.quoteq("UPDATE builds SET finish_time = ?" 
 856                              " WHERE id IN " + self.parmlist(len(batch))) 
 857              qargs = [now] + list(batch) 
 858              t.execute(q, qargs) 
  859   
 863           
 864          t.execute(self.quoteq("SELECT b.brid,br.buildername,b.number" 
 865                                " FROM builds AS b, buildrequests AS br" 
 866                                " WHERE b.id=? AND b.brid=br.id"), 
 867                    (bid,)) 
 868          res = t.fetchall() 
 869          if res: 
 870              return res[0] 
 871          return (None,None,None) 
  872   
 879   
 883           
 884           
 885          while brids: 
 886              batch, brids = brids[:100], brids[100:] 
 887              q = self.quoteq("UPDATE buildrequests" 
 888                              " SET claimed_at=0," 
 889                              "     claimed_by_name=NULL, claimed_by_incarnation=NULL" 
 890                              " WHERE id IN " + self.parmlist(len(batch))) 
 891              t.execute(q, batch) 
 892          self.notify("add-buildrequest", *brids) 
  893   
 897          now = self._getCurrentTime() 
 898           
 899           
 900          while brids: 
 901              batch, brids = brids[:100], brids[100:] 
 902   
 903              q = self.quoteq("UPDATE buildrequests" 
 904                              " SET complete=1, results=?, complete_at=?" 
 905                              " WHERE id IN " + self.parmlist(len(batch))) 
 906              t.execute(q, [results, now]+batch) 
 907               
 908              q = self.quoteq("SELECT bs.id" 
 909                              " FROM buildsets AS bs, buildrequests AS br" 
 910                              " WHERE br.buildsetid=bs.id AND bs.complete=0" 
 911                              "  AND br.id in " 
 912                              + self.parmlist(len(batch))) 
 913              t.execute(q, batch) 
 914              bsids = [bsid for (bsid,) in t.fetchall()] 
 915              for bsid in bsids: 
 916                  self._check_buildset(t, bsid, now) 
 917          self.notify("retire-buildrequest", *brids) 
 918          self.notify("modify-buildset", *bsids) 
  919   
 923           
 924           
 925           
 926           
 927           
 928           
 929          while brids: 
 930              batch, brids = brids[:100], brids[100:] 
 931   
 932              if True: 
 933                  now = self._getCurrentTime() 
 934                  q = self.quoteq("UPDATE buildrequests" 
 935                                  " SET complete=1, results=?, complete_at=?" 
 936                                  " WHERE id IN " + self.parmlist(len(batch))) 
 937                  t.execute(q, [FAILURE, now]+batch) 
 938              else: 
 939                  q = self.quoteq("DELETE FROM buildrequests" 
 940                                  " WHERE id IN " + self.parmlist(len(batch))) 
 941                  t.execute(q, batch) 
 942   
 943               
 944              q = self.quoteq("SELECT bs.id" 
 945                              " FROM buildsets AS bs, buildrequests AS br" 
 946                              " WHERE br.buildsetid=bs.id AND bs.complete=0" 
 947                              "  AND br.id in " 
 948                              + self.parmlist(len(batch))) 
 949              t.execute(q, batch) 
 950              bsids = [bsid for (bsid,) in t.fetchall()] 
 951              for bsid in bsids: 
 952                  self._check_buildset(t, bsid, now) 
 953   
 954          self.notify("cancel-buildrequest", *brids) 
 955          self.notify("modify-buildset", *bsids) 
  956   
 958          q = self.quoteq("SELECT br.complete,br.results" 
 959                          " FROM buildsets AS bs, buildrequests AS br" 
 960                          " WHERE bs.complete=0" 
 961                          "  AND br.buildsetid=bs.id AND bs.id=?") 
 962          t.execute(q, (bsid,)) 
 963          results = t.fetchall() 
 964          is_complete = True 
 965          bs_results = SUCCESS 
 966          for (complete, r) in results: 
 967              if not complete: 
 968                   
 969                  is_complete = False 
 970               
 971               
 972              if r not in (SUCCESS, WARNINGS): 
 973                  bs_results = FAILURE 
 974          if is_complete: 
 975               
 976              q = self.quoteq("UPDATE buildsets" 
 977                              " SET complete=1, complete_at=?, results=?" 
 978                              " WHERE id=?") 
 979              t.execute(q, (now, bs_results, bsid)) 
  980   
 982          return self.runInteractionNow(self._txn_get_buildrequestids_for_buildset, 
 983                                        bsid) 
  985          t.execute(self.quoteq("SELECT buildername,id FROM buildrequests" 
 986                                " WHERE buildsetid=?"), 
 987                    (bsid,)) 
 988          return dict(t.fetchall()) 
  989   
 993           
 994           
 995           
 996           
 997           
 998           
 999          q = self.quoteq("SELECT br.complete,br.results" 
1000                          " FROM buildsets AS bs, buildrequests AS br" 
1001                          " WHERE br.buildsetid=bs.id AND bs.id=?") 
1002          t.execute(q, (bsid,)) 
1003          results = t.fetchall() 
1004          finished = True 
1005          successful = None 
1006          for (c,r) in results: 
1007              if not c: 
1008                  finished = False 
1009              if c and r not in (SUCCESS, WARNINGS): 
1010                  successful = False 
1011          if finished and successful is None: 
1012              successful = True 
1013          return (successful, finished) 
 1014   
1018          t.execute("SELECT id FROM buildsets WHERE complete=0") 
1019          return [bsid for (bsid,) in t.fetchall()] 
 1023          q = self.quoteq("SELECT external_idstring, reason, sourcestampid," 
1024                          "       complete, results" 
1025                          " FROM buildsets WHERE id=?") 
1026          t.execute(q, (bsid,)) 
1027          res = t.fetchall() 
1028          if res: 
1029              (external, reason, ssid, complete, results) = res[0] 
1030              external_idstring = str_or_none(external) 
1031              reason = str_or_none(reason) 
1032              complete = bool(complete) 
1033              return (external_idstring, reason, ssid, complete, results) 
1034          return None  
 1035   
1037          return self.runInteractionNow(self._txn_get_pending_brids_for_builder, 
1038                                        buildername) 
 1040           
1041           
1042           
1043          t.execute(self.quoteq("SELECT id FROM buildrequests" 
1044                                " WHERE buildername=? AND" 
1045                                "  complete=0 AND claimed_at=0"), 
1046                    (buildername,)) 
1047          return [brid for (brid,) in t.fetchall()] 
 1048   
1049       
1050   
1052          return bool(self._pending_operation_count) 
 1053   
1056   
1057   
1058  threadable.synchronize(DBConnector) 
1059