1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 import sys, collections, base64
17
18 from twisted.python import log, threadable
19 from twisted.internet import defer
20 from twisted.enterprise import adbapi
21 from buildbot import util
22 from buildbot.util import collections as bbcollections
23 from buildbot.changes.changes import Change
24 from buildbot.sourcestamp import SourceStamp
25 from buildbot.buildrequest import BuildRequest
26 from buildbot.process.properties import Properties
27 from buildbot.status.builder import SUCCESS, WARNINGS, FAILURE
28 from buildbot.util.eventual import eventually
29 from buildbot.util import json
30
31
32
33
34
35
37 - def execute(self, *args, **kwargs):
44
46 if not res:
47 return default
48 return process_f(res[0][0])
49
51 if s is None:
52 return None
53 return str(s)
54
56 pass
57
59
60 compare_attrs = ["args", "kwargs"]
61 synchronized = ["notify", "_end_operation"]
62 MAX_QUERY_TIMES = 1000
63
65
66 self._query_times = collections.deque()
67 self._spec = spec
68
69
70 self._dbapi = spec.get_dbapi()
71 self._nonpool = None
72 self._nonpool_lastused = None
73 self._nonpool_max_idle = spec.get_maxidle()
74
75
76
77 self.paramstyle = self._dbapi.paramstyle
78
79 self._pool = spec.get_async_connection_pool()
80 self._pool.transactionFactory = MyTransaction
81
82
83
84
85
86 self._change_cache = util.LRUCache()
87 self._sourcestamp_cache = util.LRUCache()
88 self._active_operations = set()
89 self._pending_notifications = []
90 self._subscribers = bbcollections.defaultdict(set)
91
92 self._pending_operation_count = 0
93
94 self._started = False
95
99
101
102
103 self._pool.start()
104 self._started = True
105
107 """Call this when you're done with me"""
108
109
110 if self._nonpool:
111 self._nonpool.close()
112 self._nonpool = None
113 self._nonpool_lastused = None
114
115 if not self._started:
116 return
117 self._pool.close()
118 self._started = False
119 del self._pool
120
122 """
123 Given a query that contains qmark-style placeholders, like::
124 INSERT INTO foo (col1, col2) VALUES (?,?)
125 replace the '?' with '%s' if the backend uses format-style
126 placeholders, like::
127 INSERT INTO foo (col1, col2) VALUES (%s,%s)
128 """
129 if self.paramstyle == "format":
130 return query.replace("?","%s")
131 assert self.paramstyle == "qmark"
132 return query
133
135 """
136 When passing long lists of values to e.g., an INSERT query, it is
137 tedious to pass long strings of ? placeholders. This function will
138 create a parenthesis-enclosed list of COUNT placeholders. Note that
139 the placeholders have already had quoteq() applied.
140 """
141 p = self.quoteq("?")
142 return "(" + ",".join([p]*count) + ")"
143
145 """Returns None for an empty database, or a number (probably 1) for
146 the database's version"""
147 try:
148 res = self.runQueryNow("SELECT version FROM version")
149 except (self._dbapi.OperationalError, self._dbapi.ProgrammingError):
150
151 return None
152 assert len(res) == 1
153 return res[0][0]
154
156
157 assert self._started
158 return self.runInteractionNow(self._runQuery, *args, **kwargs)
159
163
165 t = Token()
166 self._active_operations.add(t)
167 return t
169
170
171
172
173 self._active_operations.discard(t)
174 if self._active_operations:
175 return
176 for (category, args) in self._pending_notifications:
177
178
179 eventually(self.send_notification, category, args)
180 self._pending_notifications = []
181
183
184 assert self._started
185 start = self._getCurrentTime()
186 t = self._start_operation()
187 try:
188 return self._runInteractionNow(interaction, *args, **kwargs)
189 finally:
190 self._end_operation(t)
191 self._add_query_time(start)
192
194
195
196
197
198 if self._nonpool_max_idle is not None:
199 now = util.now()
200 if self._nonpool_lastused and self._nonpool_lastused + self._nonpool_max_idle < now:
201 self._nonpool = None
202
203 if not self._nonpool:
204 self._nonpool = self._spec.get_sync_connection()
205
206 self._nonpool_lastused = util.now()
207 return self._nonpool
208
210 conn = self.get_sync_connection()
211 c = conn.cursor()
212 try:
213 result = interaction(c, *args, **kwargs)
214 c.close()
215 conn.commit()
216 return result
217 except:
218 excType, excValue, excTraceback = sys.exc_info()
219 try:
220 conn.rollback()
221 c2 = conn.cursor()
222 c2.execute(self._pool.good_sql)
223 c2.close()
224 conn.commit()
225 except:
226 log.msg("rollback failed, will reconnect next query")
227 log.err()
228
229
230 self._nonpool = None
231 raise excType, excValue, excTraceback
232
233 - def notify(self, category, *args):
234
235
236 self._pending_notifications.append( (category,args) )
237
243
246
248 assert self._started
249 self._pending_operation_count += 1
250 d = self._pool.runQuery(*args, **kwargs)
251 return d
252
254 self._end_operation(t)
255 self._add_query_time(start)
256 self._pending_operation_count -= 1
257 return res
258
260 elapsed = self._getCurrentTime() - start
261 self._query_times.append(elapsed)
262 if len(self._query_times) > self.MAX_QUERY_TIMES:
263 self._query_times.popleft()
264
266 assert self._started
267 self._pending_operation_count += 1
268 start = self._getCurrentTime()
269 t = self._start_operation()
270 d = self._pool.runInteraction(*args, **kwargs)
271 d.addBoth(self._runInteraction_done, start, t)
272 return d
274 self._end_operation(t)
275 self._add_query_time(start)
276 self._pending_operation_count -= 1
277 return res
278
279
280
284
286 q = self.quoteq("INSERT INTO changes"
287 " (author,"
288 " comments, is_dir,"
289 " branch, revision, revlink,"
290 " when_timestamp, category,"
291 " repository, project)"
292 " VALUES (?, ?,?, ?,?,?, ?,?, ?,?)")
293
294
295 values = (change.who,
296 change.comments, change.isdir,
297 change.branch, change.revision, change.revlink,
298 change.when, change.category, change.repository,
299 change.project)
300 t.execute(q, values)
301 change.number = t.lastrowid
302
303 for link in change.links:
304 t.execute(self.quoteq("INSERT INTO change_links (changeid, link) "
305 "VALUES (?,?)"),
306 (change.number, link))
307 for filename in change.files:
308 t.execute(self.quoteq("INSERT INTO change_files (changeid,filename)"
309 " VALUES (?,?)"),
310 (change.number, filename))
311 for propname,propvalue in change.properties.properties.items():
312 encoded_value = json.dumps(propvalue)
313 t.execute(self.quoteq("INSERT INTO change_properties"
314 " (changeid, property_name, property_value)"
315 " VALUES (?,?,?)"),
316 (change.number, propname, encoded_value))
317 self.notify("add-change", change.number)
318
320 q = "SELECT changeid FROM changes"
321 args = []
322 if branches or categories or committers:
323 q += " WHERE "
324 pieces = []
325 if branches:
326 pieces.append("branch IN %s" % self.parmlist(len(branches)))
327 args.extend(list(branches))
328 if categories:
329 pieces.append("category IN %s" % self.parmlist(len(categories)))
330 args.extend(list(categories))
331 if committers:
332 pieces.append("author IN %s" % self.parmlist(len(committers)))
333 args.extend(list(committers))
334 if minTime:
335 pieces.append("when_timestamp > %d" % minTime)
336 q += " AND ".join(pieces)
337 q += " ORDER BY changeid DESC"
338 rows = self.runQueryNow(q, tuple(args))
339 for (changeid,) in rows:
340 yield self.getChangeNumberedNow(changeid)
341
343 if t:
344 return self._txn_getLatestChangeNumber(branch=branch, t=t)
345 else:
346 return self.runInteractionNow(self._txn_getLatestChangeNumber)
348 args = None
349 if branch:
350 br_clause = "WHERE branch =? "
351 args = ( branch, )
352 q = self.quoteq("SELECT max(changeid) from changes"+ br_clause)
353 t.execute(q, args)
354 row = t.fetchone()
355 if not row:
356 return 0
357 return row[0]
358
360
361 assert changeid >= 0
362 c = self._change_cache.get(changeid)
363 if c:
364 return c
365 if t:
366 c = self._txn_getChangeNumberedNow(t, changeid)
367 else:
368 c = self.runInteractionNow(self._txn_getChangeNumberedNow, changeid)
369 self._change_cache.add(changeid, c)
370 return c
372 q = self.quoteq("SELECT author, comments,"
373 " is_dir, branch, revision, revlink,"
374 " when_timestamp, category,"
375 " repository, project"
376 " FROM changes WHERE changeid = ?")
377 t.execute(q, (changeid,))
378 rows = t.fetchall()
379 if not rows:
380 return None
381 (who, comments,
382 isdir, branch, revision, revlink,
383 when, category, repository, project) = rows[0]
384 branch = str_or_none(branch)
385 revision = str_or_none(revision)
386 q = self.quoteq("SELECT link FROM change_links WHERE changeid=?")
387 t.execute(q, (changeid,))
388 rows = t.fetchall()
389 links = [row[0] for row in rows]
390 links.sort()
391
392 q = self.quoteq("SELECT filename FROM change_files WHERE changeid=?")
393 t.execute(q, (changeid,))
394 rows = t.fetchall()
395 files = [row[0] for row in rows]
396 files.sort()
397
398 p = self.get_properties_from_db("change_properties", "changeid",
399 changeid, t)
400 c = Change(who=who, files=files, comments=comments, isdir=isdir,
401 links=links, revision=revision, when=when,
402 branch=branch, category=category, revlink=revlink,
403 repository=repository, project=project)
404 c.properties.updateFromProperties(p)
405 c.number = changeid
406 return c
407
409
410
411 assert changeid >= 0
412 c = self._change_cache.get(changeid)
413 if c:
414 return defer.succeed(c)
415 d1 = self.runQuery(self.quoteq("SELECT author, comments,"
416 " is_dir, branch, revision, revlink,"
417 " when_timestamp, category,"
418 " repository, project"
419 " FROM changes WHERE changeid = ?"),
420 (changeid,))
421 d2 = self.runQuery(self.quoteq("SELECT link FROM change_links"
422 " WHERE changeid=?"),
423 (changeid,))
424 d3 = self.runQuery(self.quoteq("SELECT filename FROM change_files"
425 " WHERE changeid=?"),
426 (changeid,))
427 d4 = self.runInteraction(self._txn_get_properties_from_db,
428 "change_properties", "changeid", changeid)
429 d = defer.gatherResults([d1,d2,d3,d4])
430 d.addCallback(self._getChangeByNumber_query_done, changeid)
431 return d
432
434 (rows, link_rows, file_rows, properties) = res
435 if not rows:
436 return None
437 (who, comments,
438 isdir, branch, revision, revlink,
439 when, category, repository, project) = rows[0]
440 branch = str_or_none(branch)
441 revision = str_or_none(revision)
442 links = [row[0] for row in link_rows]
443 links.sort()
444 files = [row[0] for row in file_rows]
445 files.sort()
446
447 c = Change(who=who, files=files, comments=comments, isdir=isdir,
448 links=links, revision=revision, when=when,
449 branch=branch, category=category, revlink=revlink,
450 repository=repository, project=project)
451 c.properties.updateFromProperties(properties)
452 c.number = changeid
453 self._change_cache.add(changeid, c)
454 return c
455
457 """Return a Deferred that fires with a list of all Change instances
458 with numbers greater than the given value, sorted by number. This is
459 useful for catching up with everything that's happened since you last
460 called this function."""
461 assert last_changeid >= 0
462 if t:
463 return self._txn_getChangesGreaterThan(t, last_changeid)
464 else:
465 return self.runInteractionNow(self._txn_getChangesGreaterThan,
466 last_changeid)
474
476 """Return a list of all extant change id's less than the given value,
477 sorted by number."""
478 def txn(t):
479 q = self.quoteq("SELECT changeid FROM changes WHERE changeid < ?")
480 t.execute(q, (new_changeid,))
481 changes = [changeid for (changeid,) in t.fetchall()]
482 changes.sort()
483 return changes
484 return self.runInteractionNow(txn)
485
487 """Thoroughly remove a change from the database, including all dependent
488 tables"""
489 def txn(t):
490 for table in ('changes', 'scheduler_changes', 'sourcestamp_changes',
491 'change_files', 'change_links', 'change_properties'):
492 q = self.quoteq("DELETE FROM %s WHERE changeid = ?" % table)
493 t.execute(q, (changeid,))
494 return self.runInteractionNow(txn)
495
497 return defer.gatherResults([self.getChangeByNumber(changeid)
498 for changeid in changeids])
499
500
501
503 assert isinstance(ssid, (int, long))
504 ss = self._sourcestamp_cache.get(ssid)
505 if ss:
506 return ss
507 if t:
508 ss = self._txn_getSourceStampNumbered(t, ssid)
509 else:
510 ss = self.runInteractionNow(self._txn_getSourceStampNumbered,
511 ssid)
512 self._sourcestamp_cache.add(ssid, ss)
513 return ss
514
516 assert isinstance(ssid, (int, long))
517 t.execute(self.quoteq("SELECT branch,revision,patchid,project,repository"
518 " FROM sourcestamps WHERE id=?"),
519 (ssid,))
520 r = t.fetchall()
521 if not r:
522 return None
523 (branch_u, revision_u, patchid, project, repository) = r[0]
524 branch = str_or_none(branch_u)
525 revision = str_or_none(revision_u)
526
527 patch = None
528 if patchid is not None:
529 t.execute(self.quoteq("SELECT patchlevel,patch_base64,subdir"
530 " FROM patches WHERE id=?"),
531 (patchid,))
532 r = t.fetchall()
533 assert len(r) == 1
534 (patch_level, patch_text_base64, subdir_u) = r[0]
535 patch_text = base64.b64decode(patch_text_base64)
536 if subdir_u:
537 patch = (patch_level, patch_text, str(subdir_u))
538 else:
539 patch = (patch_level, patch_text)
540
541 t.execute(self.quoteq("SELECT changeid FROM sourcestamp_changes"
542 " WHERE sourcestampid=?"
543 " ORDER BY changeid ASC"),
544 (ssid,))
545 r = t.fetchall()
546 changes = None
547 if r:
548 changes = [self.getChangeNumberedNow(changeid, t)
549 for (changeid,) in r]
550 ss = SourceStamp(branch, revision, patch, changes, project=project, repository=repository)
551 ss.ssid = ssid
552 return ss
553
554
555
557 if t:
558 return self._txn_get_properties_from_db(t, tablename, idname, id)
559 else:
560 return self.runInteractionNow(self._txn_get_properties_from_db,
561 tablename, idname, id)
562
564
565
566 q = self.quoteq("SELECT property_name,property_value FROM %s WHERE %s=?"
567 % (tablename, idname))
568 t.execute(q, (id,))
569 retval = Properties()
570 for key, valuepair in t.fetchall():
571 value, source = json.loads(valuepair)
572 retval.setProperty(str(key), value, source)
573 return retval
574
575
576
580 for scheduler in added:
581 name = scheduler.name
582 assert name
583 class_name = "%s.%s" % (scheduler.__class__.__module__,
584 scheduler.__class__.__name__)
585 q = self.quoteq("""
586 SELECT schedulerid, class_name FROM schedulers WHERE
587 name=? AND
588 (class_name=? OR class_name='')
589 """)
590 t.execute(q, (name, class_name))
591 row = t.fetchone()
592 if row:
593 sid, db_class_name = row
594 if db_class_name == '':
595
596
597
598 q = self.quoteq("""UPDATE schedulers SET class_name=?
599 WHERE schedulerid=?""")
600 t.execute(q, (class_name, sid))
601 elif db_class_name != class_name:
602
603
604 sid = None
605 else:
606 sid = None
607
608 if sid is None:
609
610
611
612 q = ("SELECT changeid FROM changes"
613 " ORDER BY changeid DESC LIMIT 1")
614 t.execute(q)
615 max_changeid = _one_or_else(t.fetchall(), 0)
616 state = scheduler.get_initial_state(max_changeid)
617 state_json = json.dumps(state)
618 q = self.quoteq("INSERT INTO schedulers"
619 " (name, class_name, state)"
620 " VALUES (?,?,?)")
621 t.execute(q, (name, class_name, state_json))
622 sid = t.lastrowid
623 log.msg("scheduler '%s' got id %d" % (scheduler.name, sid))
624 scheduler.schedulerid = sid
625
627 q = self.quoteq("SELECT state FROM schedulers WHERE schedulerid=?")
628 t.execute(q, (schedulerid,))
629 state_json = _one_or_else(t.fetchall())
630 assert state_json is not None
631 return json.loads(state_json)
632
634 state_json = json.dumps(state)
635 q = self.quoteq("UPDATE schedulers SET state=? WHERE schedulerid=?")
636 t.execute(q, (state_json, schedulerid))
637
639 """Given a SourceStamp (which may or may not have an ssid), make sure
640 the contents are in the database, and return the ssid. If the
641 SourceStamp originally came from the DB (and thus already has an
642 ssid), just return the ssid. If not, create a new row for it."""
643 if ss.ssid is not None:
644 return ss.ssid
645 patchid = None
646 if ss.patch:
647 patchlevel = ss.patch[0]
648 diff = ss.patch[1]
649 subdir = None
650 if len(ss.patch) > 2:
651 subdir = ss.patch[2]
652 q = self.quoteq("INSERT INTO patches"
653 " (patchlevel, patch_base64, subdir)"
654 " VALUES (?,?,?)")
655 t.execute(q, (patchlevel, base64.b64encode(diff), subdir))
656 patchid = t.lastrowid
657 t.execute(self.quoteq("INSERT INTO sourcestamps"
658 " (branch, revision, patchid, project, repository)"
659 " VALUES (?,?,?,?,?)"),
660 (ss.branch, ss.revision, patchid, ss.project, ss.repository))
661 ss.ssid = t.lastrowid
662 q2 = self.quoteq("INSERT INTO sourcestamp_changes"
663 " (sourcestampid, changeid) VALUES (?,?)")
664 for c in ss.changes:
665 t.execute(q2, (ss.ssid, c.number))
666 return ss.ssid
667
668 - def create_buildset(self, ssid, reason, properties, builderNames, t,
669 external_idstring=None):
670
671 now = self._getCurrentTime()
672 t.execute(self.quoteq("INSERT INTO buildsets"
673 " (external_idstring, reason,"
674 " sourcestampid, submitted_at)"
675 " VALUES (?,?,?,?)"),
676 (external_idstring, reason, ssid, now))
677 bsid = t.lastrowid
678 for propname, propvalue in properties.properties.items():
679 encoded_value = json.dumps(propvalue)
680 t.execute(self.quoteq("INSERT INTO buildset_properties"
681 " (buildsetid, property_name, property_value)"
682 " VALUES (?,?,?)"),
683 (bsid, propname, encoded_value))
684 brids = []
685 for bn in builderNames:
686 t.execute(self.quoteq("INSERT INTO buildrequests"
687 " (buildsetid, buildername, submitted_at)"
688 " VALUES (?,?,?)"),
689 (bsid, bn, now))
690 brid = t.lastrowid
691 brids.append(brid)
692 self.notify("add-buildset", bsid)
693 self.notify("add-buildrequest", *brids)
694 return bsid
695
697 q = self.quoteq("INSERT INTO scheduler_changes"
698 " (schedulerid, changeid, important)"
699 " VALUES (?,?,?)")
700 t.execute(q, (schedulerid, number, bool(important)))
701
703 q = self.quoteq("SELECT changeid, important"
704 " FROM scheduler_changes"
705 " WHERE schedulerid=?")
706 t.execute(q, (schedulerid,))
707 important = []
708 unimportant = []
709 for (changeid, is_important) in t.fetchall():
710 c = self.getChangeNumberedNow(changeid, t)
711 if is_important:
712 important.append(c)
713 else:
714 unimportant.append(c)
715 return (important, unimportant)
716
718 while changeids:
719
720
721 batch, changeids = changeids[:100], changeids[100:]
722 t.execute(self.quoteq("DELETE FROM scheduler_changes"
723 " WHERE schedulerid=? AND changeid IN ")
724 + self.parmlist(len(batch)),
725 (schedulerid,) + tuple(batch))
726
728
729
730 t.execute(self.quoteq("INSERT INTO scheduler_upstream_buildsets"
731 " (buildsetid, schedulerid, active)"
732 " VALUES (?,?,?)"),
733 (bsid, schedulerid, 1))
734
736
737 t.execute(self.quoteq("SELECT bs.id, "
738 " bs.sourcestampid, bs.complete, bs.results"
739 " FROM scheduler_upstream_buildsets AS s,"
740 " buildsets AS bs"
741 " WHERE s.buildsetid=bs.id"
742 " AND s.schedulerid=?"
743 " AND s.active=1"),
744 (schedulerid,))
745 return t.fetchall()
746
748 t.execute(self.quoteq("UPDATE scheduler_upstream_buildsets"
749 " SET active=0"
750 " WHERE buildsetid=? AND schedulerid=?"),
751 (buildsetid, schedulerid))
752
753
754
756 assert isinstance(brid, (int, long))
757 if t:
758 br = self._txn_getBuildRequestWithNumber(t, brid)
759 else:
760 br = self.runInteractionNow(self._txn_getBuildRequestWithNumber,
761 brid)
762 return br
764 assert isinstance(brid, (int, long))
765 t.execute(self.quoteq("SELECT br.buildsetid, bs.reason,"
766 " bs.sourcestampid, br.buildername,"
767 " bs.submitted_at, br.priority"
768 " FROM buildrequests AS br, buildsets AS bs"
769 " WHERE br.id=? AND br.buildsetid=bs.id"),
770 (brid,))
771 r = t.fetchall()
772 if not r:
773 return None
774 (bsid, reason, ssid, builder_name, submitted_at, priority) = r[0]
775 ss = self.getSourceStampNumberedNow(ssid, t)
776 properties = self.get_properties_from_db("buildset_properties",
777 "buildsetid", bsid, t)
778 br = BuildRequest(reason, ss, builder_name, properties)
779 br.submittedAt = submitted_at
780 br.priority = priority
781 br.id = brid
782 br.bsid = bsid
783 return br
784
786 assert isinstance(brid, (int, long))
787 return self.runInteractionNow(self._txn_get_buildername_for_brid, brid)
789 assert isinstance(brid, (int, long))
790 t.execute(self.quoteq("SELECT buildername FROM buildrequests"
791 " WHERE id=?"),
792 (brid,))
793 r = t.fetchall()
794 if not r:
795 return None
796 return r[0][0]
797
800 q = ("SELECT br.id"
801 " FROM buildrequests AS br, buildsets AS bs"
802 " WHERE br.buildername=? AND br.complete=0"
803 " AND br.buildsetid=bs.id"
804 " AND (br.claimed_at<?"
805 " OR (br.claimed_by_name=?"
806 " AND br.claimed_by_incarnation!=?))"
807 " ORDER BY br.priority DESC,bs.submitted_at ASC")
808 if limit:
809 q += " LIMIT %s" % limit
810 t.execute(self.quoteq(q),
811 (buildername, old, master_name, master_incarnation))
812 requests = [self.getBuildRequestWithNumber(brid, t)
813 for (brid,) in t.fetchall()]
814 return requests
815
818 if not brids:
819 return
820 if t:
821 self._txn_claim_buildrequests(t, now, master_name,
822 master_incarnation, brids)
823 else:
824 self.runInteractionNow(self._txn_claim_buildrequests,
825 now, master_name, master_incarnation, brids)
828 brids = list(brids)
829 while brids:
830 batch, brids = brids[:100], brids[100:]
831 q = self.quoteq("UPDATE buildrequests"
832 " SET claimed_at = ?,"
833 " claimed_by_name = ?, claimed_by_incarnation = ?"
834 " WHERE id IN " + self.parmlist(len(batch)))
835 qargs = [now, master_name, master_incarnation] + list(batch)
836 t.execute(q, qargs)
837
841 now = self._getCurrentTime()
842 t.execute(self.quoteq("INSERT INTO builds (number, brid, start_time)"
843 " VALUES (?,?,?)"),
844 (buildnumber, brid, now))
845 bid = t.lastrowid
846 self.notify("add-build", bid)
847 return bid
848
852 now = self._getCurrentTime()
853 while bids:
854 batch, bids = bids[:100], bids[100:]
855 q = self.quoteq("UPDATE builds SET finish_time = ?"
856 " WHERE id IN " + self.parmlist(len(batch)))
857 qargs = [now] + list(batch)
858 t.execute(q, qargs)
859
863
864 t.execute(self.quoteq("SELECT b.brid,br.buildername,b.number"
865 " FROM builds AS b, buildrequests AS br"
866 " WHERE b.id=? AND b.brid=br.id"),
867 (bid,))
868 res = t.fetchall()
869 if res:
870 return res[0]
871 return (None,None,None)
872
879
883
884
885 while brids:
886 batch, brids = brids[:100], brids[100:]
887 q = self.quoteq("UPDATE buildrequests"
888 " SET claimed_at=0,"
889 " claimed_by_name=NULL, claimed_by_incarnation=NULL"
890 " WHERE id IN " + self.parmlist(len(batch)))
891 t.execute(q, batch)
892 self.notify("add-buildrequest", *brids)
893
897 now = self._getCurrentTime()
898
899
900 while brids:
901 batch, brids = brids[:100], brids[100:]
902
903 q = self.quoteq("UPDATE buildrequests"
904 " SET complete=1, results=?, complete_at=?"
905 " WHERE id IN " + self.parmlist(len(batch)))
906 t.execute(q, [results, now]+batch)
907
908 q = self.quoteq("SELECT bs.id"
909 " FROM buildsets AS bs, buildrequests AS br"
910 " WHERE br.buildsetid=bs.id AND bs.complete=0"
911 " AND br.id in "
912 + self.parmlist(len(batch)))
913 t.execute(q, batch)
914 bsids = [bsid for (bsid,) in t.fetchall()]
915 for bsid in bsids:
916 self._check_buildset(t, bsid, now)
917 self.notify("retire-buildrequest", *brids)
918 self.notify("modify-buildset", *bsids)
919
923
924
925
926
927
928
929 while brids:
930 batch, brids = brids[:100], brids[100:]
931
932 if True:
933 now = self._getCurrentTime()
934 q = self.quoteq("UPDATE buildrequests"
935 " SET complete=1, results=?, complete_at=?"
936 " WHERE id IN " + self.parmlist(len(batch)))
937 t.execute(q, [FAILURE, now]+batch)
938 else:
939 q = self.quoteq("DELETE FROM buildrequests"
940 " WHERE id IN " + self.parmlist(len(batch)))
941 t.execute(q, batch)
942
943
944 q = self.quoteq("SELECT bs.id"
945 " FROM buildsets AS bs, buildrequests AS br"
946 " WHERE br.buildsetid=bs.id AND bs.complete=0"
947 " AND br.id in "
948 + self.parmlist(len(batch)))
949 t.execute(q, batch)
950 bsids = [bsid for (bsid,) in t.fetchall()]
951 for bsid in bsids:
952 self._check_buildset(t, bsid, now)
953
954 self.notify("cancel-buildrequest", *brids)
955 self.notify("modify-buildset", *bsids)
956
958 q = self.quoteq("SELECT br.complete,br.results"
959 " FROM buildsets AS bs, buildrequests AS br"
960 " WHERE bs.complete=0"
961 " AND br.buildsetid=bs.id AND bs.id=?")
962 t.execute(q, (bsid,))
963 results = t.fetchall()
964 is_complete = True
965 bs_results = SUCCESS
966 for (complete, r) in results:
967 if not complete:
968
969 is_complete = False
970
971
972 if r not in (SUCCESS, WARNINGS):
973 bs_results = FAILURE
974 if is_complete:
975
976 q = self.quoteq("UPDATE buildsets"
977 " SET complete=1, complete_at=?, results=?"
978 " WHERE id=?")
979 t.execute(q, (now, bs_results, bsid))
980
982 return self.runInteractionNow(self._txn_get_buildrequestids_for_buildset,
983 bsid)
985 t.execute(self.quoteq("SELECT buildername,id FROM buildrequests"
986 " WHERE buildsetid=?"),
987 (bsid,))
988 return dict(t.fetchall())
989
993
994
995
996
997
998
999 q = self.quoteq("SELECT br.complete,br.results"
1000 " FROM buildsets AS bs, buildrequests AS br"
1001 " WHERE br.buildsetid=bs.id AND bs.id=?")
1002 t.execute(q, (bsid,))
1003 results = t.fetchall()
1004 finished = True
1005 successful = None
1006 for (c,r) in results:
1007 if not c:
1008 finished = False
1009 if c and r not in (SUCCESS, WARNINGS):
1010 successful = False
1011 if finished and successful is None:
1012 successful = True
1013 return (successful, finished)
1014
1018 t.execute("SELECT id FROM buildsets WHERE complete=0")
1019 return [bsid for (bsid,) in t.fetchall()]
1023 q = self.quoteq("SELECT external_idstring, reason, sourcestampid,"
1024 " complete, results"
1025 " FROM buildsets WHERE id=?")
1026 t.execute(q, (bsid,))
1027 res = t.fetchall()
1028 if res:
1029 (external, reason, ssid, complete, results) = res[0]
1030 external_idstring = str_or_none(external)
1031 reason = str_or_none(reason)
1032 complete = bool(complete)
1033 return (external_idstring, reason, ssid, complete, results)
1034 return None
1035
1037 return self.runInteractionNow(self._txn_get_pending_brids_for_builder,
1038 buildername)
1040
1041
1042
1043 t.execute(self.quoteq("SELECT id FROM buildrequests"
1044 " WHERE buildername=? AND"
1045 " complete=0 AND claimed_at=0"),
1046 (buildername,))
1047 return [brid for (brid,) in t.fetchall()]
1048
1049
1050
1052 return bool(self._pending_operation_count)
1053
1056
1057
1058 threadable.synchronize(DBConnector)
1059