1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 import sys, collections, base64
41
42 from twisted.python import log, threadable
43 from twisted.internet import defer
44 from twisted.enterprise import adbapi
45 from buildbot import util
46 from buildbot.util import collections as bbcollections
47 from buildbot.changes.changes import Change
48 from buildbot.sourcestamp import SourceStamp
49 from buildbot.buildrequest import BuildRequest
50 from buildbot.process.properties import Properties
51 from buildbot.status.builder import SUCCESS, WARNINGS, FAILURE
52 from buildbot.util.eventual import eventually
53 from buildbot.util import json
54
55
56
57
58
59
61 - def execute(self, *args, **kwargs):
68
70 if not res:
71 return default
72 return process_f(res[0][0])
73
75 if s is None:
76 return None
77 return str(s)
78
80 pass
81
83
84 compare_attrs = ["args", "kwargs"]
85 synchronized = ["notify", "_end_operation"]
86 MAX_QUERY_TIMES = 1000
87
89
90 self._query_times = collections.deque()
91 self._spec = spec
92
93
94 self._dbapi = spec.get_dbapi()
95 self._nonpool = None
96 self._nonpool_lastused = None
97 self._nonpool_max_idle = spec.get_maxidle()
98
99
100
101 self.paramstyle = self._dbapi.paramstyle
102
103 self._pool = spec.get_async_connection_pool()
104 self._pool.transactionFactory = MyTransaction
105
106
107
108
109
110 self._change_cache = util.LRUCache()
111 self._sourcestamp_cache = util.LRUCache()
112 self._active_operations = set()
113 self._pending_notifications = []
114 self._subscribers = bbcollections.defaultdict(set)
115
116 self._pending_operation_count = 0
117
118 self._started = False
119
123
125
126
127 self._pool.start()
128 self._started = True
129
131 """Call this when you're done with me"""
132
133
134 if self._nonpool:
135 self._nonpool.close()
136 self._nonpool = None
137 self._nonpool_lastused = None
138
139 if not self._started:
140 return
141 self._pool.close()
142 self._started = False
143 del self._pool
144
146 """
147 Given a query that contains qmark-style placeholders, like::
148 INSERT INTO foo (col1, col2) VALUES (?,?)
149 replace the '?' with '%s' if the backend uses format-style
150 placeholders, like::
151 INSERT INTO foo (col1, col2) VALUES (%s,%s)
152 """
153 if self.paramstyle == "format":
154 return query.replace("?","%s")
155 assert self.paramstyle == "qmark"
156 return query
157
159 """
160 When passing long lists of values to e.g., an INSERT query, it is
161 tedious to pass long strings of ? placeholders. This function will
162 create a parenthesis-enclosed list of COUNT placeholders. Note that
163 the placeholders have already had quoteq() applied.
164 """
165 p = self.quoteq("?")
166 return "(" + ",".join([p]*count) + ")"
167
169 """Returns None for an empty database, or a number (probably 1) for
170 the database's version"""
171 try:
172 res = self.runQueryNow("SELECT version FROM version")
173 except (self._dbapi.OperationalError, self._dbapi.ProgrammingError):
174
175 return None
176 assert len(res) == 1
177 return res[0][0]
178
180
181 assert self._started
182 return self.runInteractionNow(self._runQuery, *args, **kwargs)
183
187
189 t = Token()
190 self._active_operations.add(t)
191 return t
193
194
195
196
197 self._active_operations.discard(t)
198 if self._active_operations:
199 return
200 for (category, args) in self._pending_notifications:
201
202
203 eventually(self.send_notification, category, args)
204 self._pending_notifications = []
205
207
208 assert self._started
209 start = self._getCurrentTime()
210 t = self._start_operation()
211 try:
212 return self._runInteractionNow(interaction, *args, **kwargs)
213 finally:
214 self._end_operation(t)
215 self._add_query_time(start)
216
218
219
220
221
222 if self._nonpool_max_idle is not None:
223 now = util.now()
224 if self._nonpool_lastused and self._nonpool_lastused + self._nonpool_max_idle < now:
225 self._nonpool = None
226
227 if not self._nonpool:
228 self._nonpool = self._spec.get_sync_connection()
229
230 self._nonpool_lastused = util.now()
231 return self._nonpool
232
234 conn = self.get_sync_connection()
235 c = conn.cursor()
236 try:
237 result = interaction(c, *args, **kwargs)
238 c.close()
239 conn.commit()
240 return result
241 except:
242 excType, excValue, excTraceback = sys.exc_info()
243 try:
244 conn.rollback()
245 c2 = conn.cursor()
246 c2.execute(self._pool.good_sql)
247 c2.close()
248 conn.commit()
249 except:
250 log.msg("rollback failed, will reconnect next query")
251 log.err()
252
253
254 self._nonpool = None
255 raise excType, excValue, excTraceback
256
257 - def notify(self, category, *args):
258
259
260 self._pending_notifications.append( (category,args) )
261
267
270
272 assert self._started
273 self._pending_operation_count += 1
274 start = self._getCurrentTime()
275
276 d = self._pool.runQuery(*args, **kwargs)
277
278 return d
280 self._end_operation(t)
281 self._add_query_time(start)
282 self._pending_operation_count -= 1
283 return res
284
286 elapsed = self._getCurrentTime() - start
287 self._query_times.append(elapsed)
288 if len(self._query_times) > self.MAX_QUERY_TIMES:
289 self._query_times.popleft()
290
292 assert self._started
293 self._pending_operation_count += 1
294 start = self._getCurrentTime()
295 t = self._start_operation()
296 d = self._pool.runInteraction(*args, **kwargs)
297 d.addBoth(self._runInteraction_done, start, t)
298 return d
300 self._end_operation(t)
301 self._add_query_time(start)
302 self._pending_operation_count -= 1
303 return res
304
305
306
310
312 q = self.quoteq("INSERT INTO changes"
313 " (author,"
314 " comments, is_dir,"
315 " branch, revision, revlink,"
316 " when_timestamp, category,"
317 " repository, project)"
318 " VALUES (?, ?,?, ?,?,?, ?,?, ?,?)")
319
320
321 values = (change.who,
322 change.comments, change.isdir,
323 change.branch, change.revision, change.revlink,
324 change.when, change.category, change.repository,
325 change.project)
326 t.execute(q, values)
327 change.number = t.lastrowid
328
329 for link in change.links:
330 t.execute(self.quoteq("INSERT INTO change_links (changeid, link) "
331 "VALUES (?,?)"),
332 (change.number, link))
333 for filename in change.files:
334 t.execute(self.quoteq("INSERT INTO change_files (changeid,filename)"
335 " VALUES (?,?)"),
336 (change.number, filename))
337 for propname,propvalue in change.properties.properties.items():
338 encoded_value = json.dumps(propvalue)
339 t.execute(self.quoteq("INSERT INTO change_properties"
340 " (changeid, property_name, property_value)"
341 " VALUES (?,?,?)"),
342 (change.number, propname, encoded_value))
343 self.notify("add-change", change.number)
344
346 q = "SELECT changeid FROM changes"
347 args = []
348 if branches or categories or committers:
349 q += " WHERE "
350 pieces = []
351 if branches:
352 pieces.append("branch IN %s" % self.parmlist(len(branches)))
353 args.extend(list(branches))
354 if categories:
355 pieces.append("category IN %s" % self.parmlist(len(categories)))
356 args.extend(list(categories))
357 if committers:
358 pieces.append("author IN %s" % self.parmlist(len(committers)))
359 args.extend(list(committers))
360 if minTime:
361 pieces.append("when_timestamp > %d" % minTime)
362 q += " AND ".join(pieces)
363 q += " ORDER BY changeid DESC"
364 rows = self.runQueryNow(q, tuple(args))
365 for (changeid,) in rows:
366 yield self.getChangeNumberedNow(changeid)
367
369 if t:
370 return self._txn_getLatestChangeNumber(t)
371 else:
372 return self.runInteractionNow(self._txn_getLatestChangeNumber)
374 q = self.quoteq("SELECT max(changeid) from changes")
375 t.execute(q)
376 row = t.fetchone()
377 if not row:
378 return 0
379 return row[0]
380
382
383 assert changeid >= 0
384 c = self._change_cache.get(changeid)
385 if c:
386 return c
387 if t:
388 c = self._txn_getChangeNumberedNow(t, changeid)
389 else:
390 c = self.runInteractionNow(self._txn_getChangeNumberedNow, changeid)
391 self._change_cache.add(changeid, c)
392 return c
394 q = self.quoteq("SELECT author, comments,"
395 " is_dir, branch, revision, revlink,"
396 " when_timestamp, category,"
397 " repository, project"
398 " FROM changes WHERE changeid = ?")
399 t.execute(q, (changeid,))
400 rows = t.fetchall()
401 if not rows:
402 return None
403 (who, comments,
404 isdir, branch, revision, revlink,
405 when, category, repository, project) = rows[0]
406 branch = str_or_none(branch)
407 revision = str_or_none(revision)
408 q = self.quoteq("SELECT link FROM change_links WHERE changeid=?")
409 t.execute(q, (changeid,))
410 rows = t.fetchall()
411 links = [row[0] for row in rows]
412 links.sort()
413
414 q = self.quoteq("SELECT filename FROM change_files WHERE changeid=?")
415 t.execute(q, (changeid,))
416 rows = t.fetchall()
417 files = [row[0] for row in rows]
418 files.sort()
419
420 p = self.get_properties_from_db("change_properties", "changeid",
421 changeid, t)
422 c = Change(who=who, files=files, comments=comments, isdir=isdir,
423 links=links, revision=revision, when=when,
424 branch=branch, category=category, revlink=revlink,
425 repository=repository, project=project)
426 c.properties.updateFromProperties(p)
427 c.number = changeid
428 return c
429
431
432
433 assert changeid >= 0
434 c = self._change_cache.get(changeid)
435 if c:
436 return defer.succeed(c)
437 d1 = self.runQuery(self.quoteq("SELECT author, comments,"
438 " is_dir, branch, revision, revlink,"
439 " when_timestamp, category,"
440 " repository, project"
441 " FROM changes WHERE changeid = ?"),
442 (changeid,))
443 d2 = self.runQuery(self.quoteq("SELECT link FROM change_links"
444 " WHERE changeid=?"),
445 (changeid,))
446 d3 = self.runQuery(self.quoteq("SELECT filename FROM change_files"
447 " WHERE changeid=?"),
448 (changeid,))
449 d4 = self.runInteraction(self._txn_get_properties_from_db,
450 "change_properties", "changeid", changeid)
451 d = defer.gatherResults([d1,d2,d3,d4])
452 d.addCallback(self._getChangeByNumber_query_done, changeid)
453 return d
454
456 (rows, link_rows, file_rows, properties) = res
457 if not rows:
458 return None
459 (who, comments,
460 isdir, branch, revision, revlink,
461 when, category, repository, project) = rows[0]
462 branch = str_or_none(branch)
463 revision = str_or_none(revision)
464 links = [row[0] for row in link_rows]
465 links.sort()
466 files = [row[0] for row in file_rows]
467 files.sort()
468
469 c = Change(who=who, files=files, comments=comments, isdir=isdir,
470 links=links, revision=revision, when=when,
471 branch=branch, category=category, revlink=revlink,
472 repository=repository, project=project)
473 c.properties.updateFromProperties(properties)
474 c.number = changeid
475 self._change_cache.add(changeid, c)
476 return c
477
479 """Return a Deferred that fires with a list of all Change instances
480 with numbers greater than the given value, sorted by number. This is
481 useful for catching up with everything that's happened since you last
482 called this function."""
483 assert last_changeid >= 0
484 if t:
485 return self._txn_getChangesGreaterThan(t, last_changeid)
486 else:
487 return self.runInteractionNow(self._txn_getChangesGreaterThan,
488 last_changeid)
496
498 return defer.gatherResults([self.getChangeByNumber(changeid)
499 for changeid in changeids])
500
501
502
504 assert isinstance(ssid, (int, long))
505 ss = self._sourcestamp_cache.get(ssid)
506 if ss:
507 return ss
508 if t:
509 ss = self._txn_getSourceStampNumbered(t, ssid)
510 else:
511 ss = self.runInteractionNow(self._txn_getSourceStampNumbered,
512 ssid)
513 self._sourcestamp_cache.add(ssid, ss)
514 return ss
515
517 assert isinstance(ssid, (int, long))
518 t.execute(self.quoteq("SELECT branch,revision,patchid,project,repository"
519 " FROM sourcestamps WHERE id=?"),
520 (ssid,))
521 r = t.fetchall()
522 if not r:
523 return None
524 (branch_u, revision_u, patchid, project, repository) = r[0]
525 branch = str_or_none(branch_u)
526 revision = str_or_none(revision_u)
527
528 patch = None
529 if patchid is not None:
530 t.execute(self.quoteq("SELECT patchlevel,patch_base64,subdir"
531 " FROM patches WHERE id=?"),
532 (patchid,))
533 r = t.fetchall()
534 assert len(r) == 1
535 (patch_level, patch_text_base64, subdir_u) = r[0]
536 patch_text = base64.b64decode(patch_text_base64)
537 if subdir_u:
538 patch = (patch_level, patch_text, str(subdir_u))
539 else:
540 patch = (patch_level, patch_text)
541
542 t.execute(self.quoteq("SELECT changeid FROM sourcestamp_changes"
543 " WHERE sourcestampid=?"
544 " ORDER BY changeid ASC"),
545 (ssid,))
546 r = t.fetchall()
547 changes = None
548 if r:
549 changes = [self.getChangeNumberedNow(changeid, t)
550 for (changeid,) in r]
551 ss = SourceStamp(branch, revision, patch, changes, project=project, repository=repository)
552 ss.ssid = ssid
553 return ss
554
555
556
558 if t:
559 return self._txn_get_properties_from_db(t, tablename, idname, id)
560 else:
561 return self.runInteractionNow(self._txn_get_properties_from_db,
562 tablename, idname, id)
563
565
566
567 q = self.quoteq("SELECT property_name,property_value FROM %s WHERE %s=?"
568 % (tablename, idname))
569 t.execute(q, (id,))
570 retval = Properties()
571 for key, valuepair in t.fetchall():
572 value, source = json.loads(valuepair)
573 retval.setProperty(str(key), value, source)
574 return retval
575
576
577
581 for scheduler in added:
582 name = scheduler.name
583 assert name
584 class_name = "%s.%s" % (scheduler.__class__.__module__,
585 scheduler.__class__.__name__)
586 q = self.quoteq("""
587 SELECT schedulerid, class_name FROM schedulers WHERE
588 name=? AND
589 (class_name=? OR class_name='')
590 """)
591 t.execute(q, (name, class_name))
592 row = t.fetchone()
593 if row:
594 sid, db_class_name = row
595 if db_class_name == '':
596
597
598
599 q = self.quoteq("""UPDATE schedulers SET class_name=?
600 WHERE schedulerid=?""")
601 t.execute(q, (class_name, sid))
602 elif db_class_name != class_name:
603
604
605 sid = None
606 else:
607 sid = None
608
609 if sid is None:
610
611
612
613 q = ("SELECT changeid FROM changes"
614 " ORDER BY changeid DESC LIMIT 1")
615 t.execute(q)
616 max_changeid = _one_or_else(t.fetchall(), 0)
617 state = scheduler.get_initial_state(max_changeid)
618 state_json = json.dumps(state)
619 q = self.quoteq("INSERT INTO schedulers"
620 " (name, class_name, state)"
621 " VALUES (?,?,?)")
622 t.execute(q, (name, class_name, state_json))
623 sid = t.lastrowid
624 log.msg("scheduler '%s' got id %d" % (scheduler.name, sid))
625 scheduler.schedulerid = sid
626
628 q = self.quoteq("SELECT state FROM schedulers WHERE schedulerid=?")
629 t.execute(q, (schedulerid,))
630 state_json = _one_or_else(t.fetchall())
631 assert state_json is not None
632 return json.loads(state_json)
633
635 state_json = json.dumps(state)
636 q = self.quoteq("UPDATE schedulers SET state=? WHERE schedulerid=?")
637 t.execute(q, (state_json, schedulerid))
638
640 """Given a SourceStamp (which may or may not have an ssid), make sure
641 the contents are in the database, and return the ssid. If the
642 SourceStamp originally came from the DB (and thus already has an
643 ssid), just return the ssid. If not, create a new row for it."""
644 if ss.ssid is not None:
645 return ss.ssid
646 patchid = None
647 if ss.patch:
648 patchlevel = ss.patch[0]
649 diff = ss.patch[1]
650 subdir = None
651 if len(ss.patch) > 2:
652 subdir = ss.patch[2]
653 q = self.quoteq("INSERT INTO patches"
654 " (patchlevel, patch_base64, subdir)"
655 " VALUES (?,?,?)")
656 t.execute(q, (patchlevel, base64.b64encode(diff), subdir))
657 patchid = t.lastrowid
658 t.execute(self.quoteq("INSERT INTO sourcestamps"
659 " (branch, revision, patchid, project, repository)"
660 " VALUES (?,?,?,?,?)"),
661 (ss.branch, ss.revision, patchid, ss.project, ss.repository))
662 ss.ssid = t.lastrowid
663 q2 = self.quoteq("INSERT INTO sourcestamp_changes"
664 " (sourcestampid, changeid) VALUES (?,?)")
665 for c in ss.changes:
666 t.execute(q2, (ss.ssid, c.number))
667 return ss.ssid
668
669 - def create_buildset(self, ssid, reason, properties, builderNames, t,
670 external_idstring=None):
671
672 now = self._getCurrentTime()
673 t.execute(self.quoteq("INSERT INTO buildsets"
674 " (external_idstring, reason,"
675 " sourcestampid, submitted_at)"
676 " VALUES (?,?,?,?)"),
677 (external_idstring, reason, ssid, now))
678 bsid = t.lastrowid
679 for propname, propvalue in properties.properties.items():
680 encoded_value = json.dumps(propvalue)
681 t.execute(self.quoteq("INSERT INTO buildset_properties"
682 " (buildsetid, property_name, property_value)"
683 " VALUES (?,?,?)"),
684 (bsid, propname, encoded_value))
685 brids = []
686 for bn in builderNames:
687 t.execute(self.quoteq("INSERT INTO buildrequests"
688 " (buildsetid, buildername, submitted_at)"
689 " VALUES (?,?,?)"),
690 (bsid, bn, now))
691 brid = t.lastrowid
692 brids.append(brid)
693 self.notify("add-buildset", bsid)
694 self.notify("add-buildrequest", *brids)
695 return bsid
696
698 q = self.quoteq("INSERT INTO scheduler_changes"
699 " (schedulerid, changeid, important)"
700 " VALUES (?,?,?)")
701 t.execute(q, (schedulerid, number, bool(important)))
702
704 q = self.quoteq("SELECT changeid, important"
705 " FROM scheduler_changes"
706 " WHERE schedulerid=?")
707 t.execute(q, (schedulerid,))
708 important = []
709 unimportant = []
710 for (changeid, is_important) in t.fetchall():
711 c = self.getChangeNumberedNow(changeid, t)
712 if is_important:
713 important.append(c)
714 else:
715 unimportant.append(c)
716 return (important, unimportant)
717
719 t.execute(self.quoteq("DELETE FROM scheduler_changes"
720 " WHERE schedulerid=? AND changeid IN ")
721 + self.parmlist(len(changeids)),
722 (schedulerid,) + tuple(changeids))
723
725
726
727 t.execute(self.quoteq("INSERT INTO scheduler_upstream_buildsets"
728 " (buildsetid, schedulerid, active)"
729 " VALUES (?,?,?)"),
730 (bsid, schedulerid, 1))
731
733
734 t.execute(self.quoteq("SELECT bs.id, "
735 " bs.sourcestampid, bs.complete, bs.results"
736 " FROM scheduler_upstream_buildsets AS s,"
737 " buildsets AS bs"
738 " WHERE s.buildsetid=bs.id"
739 " AND s.schedulerid=?"
740 " AND s.active=1"),
741 (schedulerid,))
742 return t.fetchall()
743
745 t.execute(self.quoteq("UPDATE scheduler_upstream_buildsets"
746 " SET active=0"
747 " WHERE buildsetid=? AND schedulerid=?"),
748 (buildsetid, schedulerid))
749
750
751
753 assert isinstance(brid, (int, long))
754 if t:
755 br = self._txn_getBuildRequestWithNumber(t, brid)
756 else:
757 br = self.runInteractionNow(self._txn_getBuildRequestWithNumber,
758 brid)
759 return br
761 assert isinstance(brid, (int, long))
762 t.execute(self.quoteq("SELECT br.buildsetid, bs.reason,"
763 " bs.sourcestampid, br.buildername,"
764 " bs.submitted_at, br.priority"
765 " FROM buildrequests AS br, buildsets AS bs"
766 " WHERE br.id=? AND br.buildsetid=bs.id"),
767 (brid,))
768 r = t.fetchall()
769 if not r:
770 return None
771 (bsid, reason, ssid, builder_name, submitted_at, priority) = r[0]
772 ss = self.getSourceStampNumberedNow(ssid, t)
773 properties = self.get_properties_from_db("buildset_properties",
774 "buildsetid", bsid, t)
775 br = BuildRequest(reason, ss, builder_name, properties)
776 br.submittedAt = submitted_at
777 br.priority = priority
778 br.id = brid
779 br.bsid = bsid
780 return br
781
783 assert isinstance(brid, (int, long))
784 return self.runInteractionNow(self._txn_get_buildername_for_brid, brid)
786 assert isinstance(brid, (int, long))
787 t.execute(self.quoteq("SELECT buildername FROM buildrequests"
788 " WHERE id=?"),
789 (brid,))
790 r = t.fetchall()
791 if not r:
792 return None
793 return r[0][0]
794
797 q = ("SELECT br.id"
798 " FROM buildrequests AS br, buildsets AS bs"
799 " WHERE br.buildername=? AND br.complete=0"
800 " AND br.buildsetid=bs.id"
801 " AND (br.claimed_at<?"
802 " OR (br.claimed_by_name=?"
803 " AND br.claimed_by_incarnation!=?))"
804 " ORDER BY br.priority DESC,bs.submitted_at ASC")
805 if limit:
806 q += " LIMIT %s" % limit
807 t.execute(self.quoteq(q),
808 (buildername, old, master_name, master_incarnation))
809 requests = [self.getBuildRequestWithNumber(brid, t)
810 for (brid,) in t.fetchall()]
811 return requests
812
815 if not brids:
816 return
817 if t:
818 self._txn_claim_buildrequests(t, now, master_name,
819 master_incarnation, brids)
820 else:
821 self.runInteractionNow(self._txn_claim_buildrequests,
822 now, master_name, master_incarnation, brids)
825 q = self.quoteq("UPDATE buildrequests"
826 " SET claimed_at = ?,"
827 " claimed_by_name = ?, claimed_by_incarnation = ?"
828 " WHERE id IN " + self.parmlist(len(brids)))
829 qargs = [now, master_name, master_incarnation] + list(brids)
830 t.execute(q, qargs)
831
835 now = self._getCurrentTime()
836 t.execute(self.quoteq("INSERT INTO builds (number, brid, start_time)"
837 " VALUES (?,?,?)"),
838 (buildnumber, brid, now))
839 bid = t.lastrowid
840 self.notify("add-build", bid)
841 return bid
842
846 now = self._getCurrentTime()
847 q = self.quoteq("UPDATE builds SET finish_time = ?"
848 " WHERE id IN " + self.parmlist(len(bids)))
849 qargs = [now] + list(bids)
850 t.execute(q, qargs)
851
855
856 t.execute(self.quoteq("SELECT b.brid,br.buildername,b.number"
857 " FROM builds AS b, buildrequests AS br"
858 " WHERE b.id=? AND b.brid=br.id"),
859 (bid,))
860 res = t.fetchall()
861 if res:
862 return res[0]
863 return (None,None,None)
864
871
875
876
877 q = self.quoteq("UPDATE buildrequests"
878 " SET claimed_at=0,"
879 " claimed_by_name=NULL, claimed_by_incarnation=NULL"
880 " WHERE id IN " + self.parmlist(len(brids)))
881 t.execute(q, brids)
882 self.notify("add-buildrequest", *brids)
883
887 now = self._getCurrentTime()
888
889
890 q = self.quoteq("UPDATE buildrequests"
891 " SET complete=1, results=?, complete_at=?"
892 " WHERE id IN " + self.parmlist(len(brids)))
893 t.execute(q, [results, now]+brids)
894
895 q = self.quoteq("SELECT bs.id"
896 " FROM buildsets AS bs, buildrequests AS br"
897 " WHERE br.buildsetid=bs.id AND bs.complete=0"
898 " AND br.id in "
899 + self.parmlist(len(brids)))
900 t.execute(q, brids)
901 bsids = [bsid for (bsid,) in t.fetchall()]
902 for bsid in bsids:
903 self._check_buildset(t, bsid, now)
904 self.notify("retire-buildrequest", *brids)
905 self.notify("modify-buildset", *bsids)
906
910
911
912
913
914
915
916 if True:
917 now = self._getCurrentTime()
918 q = self.quoteq("UPDATE buildrequests"
919 " SET complete=1, results=?, complete_at=?"
920 " WHERE id IN " + self.parmlist(len(brids)))
921 t.execute(q, [FAILURE, now]+brids)
922 else:
923 q = self.quoteq("DELETE FROM buildrequests"
924 " WHERE id IN " + self.parmlist(len(brids)))
925 t.execute(q, brids)
926
927
928 q = self.quoteq("SELECT bs.id"
929 " FROM buildsets AS bs, buildrequests AS br"
930 " WHERE br.buildsetid=bs.id AND bs.complete=0"
931 " AND br.id in "
932 + self.parmlist(len(brids)))
933 t.execute(q, brids)
934 bsids = [bsid for (bsid,) in t.fetchall()]
935 for bsid in bsids:
936 self._check_buildset(t, bsid, now)
937
938 self.notify("cancel-buildrequest", *brids)
939 self.notify("modify-buildset", *bsids)
940
942 q = self.quoteq("SELECT br.complete,br.results"
943 " FROM buildsets AS bs, buildrequests AS br"
944 " WHERE bs.complete=0"
945 " AND br.buildsetid=bs.id AND bs.id=?")
946 t.execute(q, (bsid,))
947 results = t.fetchall()
948 is_complete = True
949 bs_results = SUCCESS
950 for (complete, r) in results:
951 if not complete:
952
953 is_complete = False
954 if r == FAILURE:
955 bs_results = r
956 if is_complete:
957
958 q = self.quoteq("UPDATE buildsets"
959 " SET complete=1, complete_at=?, results=?"
960 " WHERE id=?")
961 t.execute(q, (now, bs_results, bsid))
962
964 return self.runInteractionNow(self._txn_get_buildrequestids_for_buildset,
965 bsid)
967 t.execute(self.quoteq("SELECT buildername,id FROM buildrequests"
968 " WHERE buildsetid=?"),
969 (bsid,))
970 return dict(t.fetchall())
971
975
976
977
978
979
980
981 q = self.quoteq("SELECT br.complete,br.results"
982 " FROM buildsets AS bs, buildrequests AS br"
983 " WHERE br.buildsetid=bs.id AND bs.id=?")
984 t.execute(q, (bsid,))
985 results = t.fetchall()
986 finished = True
987 successful = None
988 for (c,r) in results:
989 if not c:
990 finished = False
991 if c and r not in (SUCCESS, WARNINGS):
992 successful = False
993 if finished and successful is None:
994 successful = True
995 return (successful, finished)
996
1000 t.execute("SELECT id FROM buildsets WHERE complete=0")
1001 return [bsid for (bsid,) in t.fetchall()]
1005 q = self.quoteq("SELECT external_idstring, reason, sourcestampid,"
1006 " complete, results"
1007 " FROM buildsets WHERE id=?")
1008 t.execute(q, (bsid,))
1009 res = t.fetchall()
1010 if res:
1011 (external, reason, ssid, complete, results) = res[0]
1012 external_idstring = str_or_none(external)
1013 reason = str_or_none(reason)
1014 complete = bool(complete)
1015 return (external_idstring, reason, ssid, complete, results)
1016 return None
1017
1019 return self.runInteractionNow(self._txn_get_pending_brids_for_builder,
1020 buildername)
1022
1023
1024
1025 t.execute(self.quoteq("SELECT id FROM buildrequests"
1026 " WHERE buildername=? AND"
1027 " complete=0 AND claimed_at=0"),
1028 (buildername,))
1029 return [brid for (brid,) in t.fetchall()]
1030
1031
1032
1034 return bool(self._pending_operation_count)
1035
1036
1037 threadable.synchronize(DBConnector)
1038