1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 import sys, collections, base64
41
42 from twisted.python import log, threadable
43 from twisted.internet import defer
44 from twisted.enterprise import adbapi
45 from buildbot import util
46 from buildbot.util import collections as bbcollections
47 from buildbot.changes.changes import Change
48 from buildbot.sourcestamp import SourceStamp
49 from buildbot.buildrequest import BuildRequest
50 from buildbot.process.properties import Properties
51 from buildbot.status.builder import SUCCESS, WARNINGS, FAILURE
52 from buildbot.util.eventual import eventually
53 from buildbot.util import json
54
55
56
57
58
59
61 - def execute(self, *args, **kwargs):
68
70 if not res:
71 return default
72 return process_f(res[0][0])
73
75 if s is None:
76 return None
77 return str(s)
78
80 pass
81
83
84 compare_attrs = ["args", "kwargs"]
85 synchronized = ["notify", "_end_operation"]
86 MAX_QUERY_TIMES = 1000
87
89
90 self._query_times = collections.deque()
91 self._spec = spec
92
93
94 self._dbapi = spec.get_dbapi()
95 self._nonpool = None
96 self._nonpool_lastused = None
97 self._nonpool_max_idle = spec.get_maxidle()
98
99
100
101 self.paramstyle = self._dbapi.paramstyle
102
103 self._pool = spec.get_async_connection_pool()
104 self._pool.transactionFactory = MyTransaction
105
106
107
108
109
110 self._change_cache = util.LRUCache()
111 self._sourcestamp_cache = util.LRUCache()
112 self._active_operations = set()
113 self._pending_notifications = []
114 self._subscribers = bbcollections.defaultdict(set)
115
116 self._pending_operation_count = 0
117
118 self._started = False
119
123
125
126
127 self._pool.start()
128 self._started = True
129
131 """Call this when you're done with me"""
132
133
134 if self._nonpool:
135 self._nonpool.close()
136 self._nonpool = None
137 self._nonpool_lastused = None
138
139 if not self._started:
140 return
141 self._pool.close()
142 self._started = False
143 del self._pool
144
146 """
147 Given a query that contains qmark-style placeholders, like::
148 INSERT INTO foo (col1, col2) VALUES (?,?)
149 replace the '?' with '%s' if the backend uses format-style
150 placeholders, like::
151 INSERT INTO foo (col1, col2) VALUES (%s,%s)
152 """
153 if self.paramstyle == "format":
154 return query.replace("?","%s")
155 assert self.paramstyle == "qmark"
156 return query
157
159 """
160 When passing long lists of values to e.g., an INSERT query, it is
161 tedious to pass long strings of ? placeholders. This function will
162 create a parenthesis-enclosed list of COUNT placeholders. Note that
163 the placeholders have already had quoteq() applied.
164 """
165 p = self.quoteq("?")
166 return "(" + ",".join([p]*count) + ")"
167
169 """Returns None for an empty database, or a number (probably 1) for
170 the database's version"""
171 try:
172 res = self.runQueryNow("SELECT version FROM version")
173 except (self._dbapi.OperationalError, self._dbapi.ProgrammingError):
174
175 return None
176 assert len(res) == 1
177 return res[0][0]
178
180
181 assert self._started
182 return self.runInteractionNow(self._runQuery, *args, **kwargs)
183
187
189 t = Token()
190 self._active_operations.add(t)
191 return t
193
194
195
196
197 self._active_operations.discard(t)
198 if self._active_operations:
199 return
200 for (category, args) in self._pending_notifications:
201
202
203 eventually(self.send_notification, category, args)
204 self._pending_notifications = []
205
207
208 assert self._started
209 start = self._getCurrentTime()
210 t = self._start_operation()
211 try:
212 return self._runInteractionNow(interaction, *args, **kwargs)
213 finally:
214 self._end_operation(t)
215 self._add_query_time(start)
216
218
219
220
221
222 if self._nonpool_max_idle is not None:
223 now = util.now()
224 if self._nonpool_lastused and self._nonpool_lastused + self._nonpool_max_idle < now:
225 self._nonpool = None
226
227 if not self._nonpool:
228 self._nonpool = self._spec.get_sync_connection()
229
230 self._nonpool_lastused = util.now()
231 return self._nonpool
232
234 conn = self.get_sync_connection()
235 c = conn.cursor()
236 try:
237 result = interaction(c, *args, **kwargs)
238 c.close()
239 conn.commit()
240 return result
241 except:
242 excType, excValue, excTraceback = sys.exc_info()
243 try:
244 conn.rollback()
245 c2 = conn.cursor()
246 c2.execute(self._pool.good_sql)
247 c2.close()
248 conn.commit()
249 except:
250 log.msg("rollback failed, will reconnect next query")
251 log.err()
252
253
254 self._nonpool = None
255 raise excType, excValue, excTraceback
256
257 - def notify(self, category, *args):
258
259
260 self._pending_notifications.append( (category,args) )
261
267
270
272 assert self._started
273 self._pending_operation_count += 1
274 start = self._getCurrentTime()
275
276 d = self._pool.runQuery(*args, **kwargs)
277
278 return d
280 self._end_operation(t)
281 self._add_query_time(start)
282 self._pending_operation_count -= 1
283 return res
284
286 elapsed = self._getCurrentTime() - start
287 self._query_times.append(elapsed)
288 if len(self._query_times) > self.MAX_QUERY_TIMES:
289 self._query_times.popleft()
290
292 assert self._started
293 self._pending_operation_count += 1
294 start = self._getCurrentTime()
295 t = self._start_operation()
296 d = self._pool.runInteraction(*args, **kwargs)
297 d.addBoth(self._runInteraction_done, start, t)
298 return d
300 self._end_operation(t)
301 self._add_query_time(start)
302 self._pending_operation_count -= 1
303 return res
304
305
306
310
312 q = self.quoteq("INSERT INTO changes"
313 " (author,"
314 " comments, is_dir,"
315 " branch, revision, revlink,"
316 " when_timestamp, category,"
317 " repository, project)"
318 " VALUES (?, ?,?, ?,?,?, ?,?, ?,?)")
319
320
321 values = (change.who,
322 change.comments, change.isdir,
323 change.branch, change.revision, change.revlink,
324 change.when, change.category, change.repository,
325 change.project)
326 t.execute(q, values)
327 change.number = t.lastrowid
328
329 for link in change.links:
330 t.execute(self.quoteq("INSERT INTO change_links (changeid, link) "
331 "VALUES (?,?)"),
332 (change.number, link))
333 for filename in change.files:
334 t.execute(self.quoteq("INSERT INTO change_files (changeid,filename)"
335 " VALUES (?,?)"),
336 (change.number, filename))
337 for propname,propvalue in change.properties.properties.items():
338 encoded_value = json.dumps(propvalue)
339 t.execute(self.quoteq("INSERT INTO change_properties"
340 " (changeid, property_name, property_value)"
341 " VALUES (?,?,?)"),
342 (change.number, propname, encoded_value))
343 self.notify("add-change", change.number)
344
346 q = "SELECT changeid FROM changes"
347 args = []
348 if branches or categories or committers:
349 q += " WHERE "
350 pieces = []
351 if branches:
352 pieces.append("branch IN %s" % self.parmlist(len(branches)))
353 args.extend(list(branches))
354 if categories:
355 pieces.append("category IN %s" % self.parmlist(len(categories)))
356 args.extend(list(categories))
357 if committers:
358 pieces.append("author IN %s" % self.parmlist(len(committers)))
359 args.extend(list(committers))
360 if minTime:
361 pieces.append("when_timestamp > %d" % minTime)
362 q += " AND ".join(pieces)
363 q += " ORDER BY changeid DESC"
364 rows = self.runQueryNow(q, tuple(args))
365 for (changeid,) in rows:
366 yield self.getChangeNumberedNow(changeid)
367
369 if t:
370 return self._txn_getLatestChangeNumber(branch=branch, t=t)
371 else:
372 return self.runInteractionNow(self._txn_getLatestChangeNumber)
374 args = None
375 if branch:
376 br_clause = "WHERE branch =? "
377 args = ( branch, )
378 q = self.quoteq("SELECT max(changeid) from changes"+ br_clause)
379 t.execute(q, args)
380 row = t.fetchone()
381 if not row:
382 return 0
383 return row[0]
384
386
387 assert changeid >= 0
388 c = self._change_cache.get(changeid)
389 if c:
390 return c
391 if t:
392 c = self._txn_getChangeNumberedNow(t, changeid)
393 else:
394 c = self.runInteractionNow(self._txn_getChangeNumberedNow, changeid)
395 self._change_cache.add(changeid, c)
396 return c
398 q = self.quoteq("SELECT author, comments,"
399 " is_dir, branch, revision, revlink,"
400 " when_timestamp, category,"
401 " repository, project"
402 " FROM changes WHERE changeid = ?")
403 t.execute(q, (changeid,))
404 rows = t.fetchall()
405 if not rows:
406 return None
407 (who, comments,
408 isdir, branch, revision, revlink,
409 when, category, repository, project) = rows[0]
410 branch = str_or_none(branch)
411 revision = str_or_none(revision)
412 q = self.quoteq("SELECT link FROM change_links WHERE changeid=?")
413 t.execute(q, (changeid,))
414 rows = t.fetchall()
415 links = [row[0] for row in rows]
416 links.sort()
417
418 q = self.quoteq("SELECT filename FROM change_files WHERE changeid=?")
419 t.execute(q, (changeid,))
420 rows = t.fetchall()
421 files = [row[0] for row in rows]
422 files.sort()
423
424 p = self.get_properties_from_db("change_properties", "changeid",
425 changeid, t)
426 c = Change(who=who, files=files, comments=comments, isdir=isdir,
427 links=links, revision=revision, when=when,
428 branch=branch, category=category, revlink=revlink,
429 repository=repository, project=project)
430 c.properties.updateFromProperties(p)
431 c.number = changeid
432 return c
433
435
436
437 assert changeid >= 0
438 c = self._change_cache.get(changeid)
439 if c:
440 return defer.succeed(c)
441 d1 = self.runQuery(self.quoteq("SELECT author, comments,"
442 " is_dir, branch, revision, revlink,"
443 " when_timestamp, category,"
444 " repository, project"
445 " FROM changes WHERE changeid = ?"),
446 (changeid,))
447 d2 = self.runQuery(self.quoteq("SELECT link FROM change_links"
448 " WHERE changeid=?"),
449 (changeid,))
450 d3 = self.runQuery(self.quoteq("SELECT filename FROM change_files"
451 " WHERE changeid=?"),
452 (changeid,))
453 d4 = self.runInteraction(self._txn_get_properties_from_db,
454 "change_properties", "changeid", changeid)
455 d = defer.gatherResults([d1,d2,d3,d4])
456 d.addCallback(self._getChangeByNumber_query_done, changeid)
457 return d
458
460 (rows, link_rows, file_rows, properties) = res
461 if not rows:
462 return None
463 (who, comments,
464 isdir, branch, revision, revlink,
465 when, category, repository, project) = rows[0]
466 branch = str_or_none(branch)
467 revision = str_or_none(revision)
468 links = [row[0] for row in link_rows]
469 links.sort()
470 files = [row[0] for row in file_rows]
471 files.sort()
472
473 c = Change(who=who, files=files, comments=comments, isdir=isdir,
474 links=links, revision=revision, when=when,
475 branch=branch, category=category, revlink=revlink,
476 repository=repository, project=project)
477 c.properties.updateFromProperties(properties)
478 c.number = changeid
479 self._change_cache.add(changeid, c)
480 return c
481
483 """Return a Deferred that fires with a list of all Change instances
484 with numbers greater than the given value, sorted by number. This is
485 useful for catching up with everything that's happened since you last
486 called this function."""
487 assert last_changeid >= 0
488 if t:
489 return self._txn_getChangesGreaterThan(t, last_changeid)
490 else:
491 return self.runInteractionNow(self._txn_getChangesGreaterThan,
492 last_changeid)
500
502 """Return a list of all extant change id's less than the given value,
503 sorted by number."""
504 def txn(t):
505 q = self.quoteq("SELECT changeid FROM changes WHERE changeid < ?")
506 t.execute(q, (new_changeid,))
507 changes = [changeid for (changeid,) in t.fetchall()]
508 changes.sort()
509 return changes
510 return self.runInteractionNow(txn)
511
513 """Thoroughly remove a change from the database, including all dependent
514 tables"""
515 def txn(t):
516 for table in ('changes', 'scheduler_changes', 'sourcestamp_changes',
517 'change_files', 'change_links', 'change_properties'):
518 q = self.quoteq("DELETE FROM %s WHERE changeid = ?" % table)
519 t.execute(q, (changeid,))
520 return self.runInteractionNow(txn)
521
523 return defer.gatherResults([self.getChangeByNumber(changeid)
524 for changeid in changeids])
525
526
527
529 assert isinstance(ssid, (int, long))
530 ss = self._sourcestamp_cache.get(ssid)
531 if ss:
532 return ss
533 if t:
534 ss = self._txn_getSourceStampNumbered(t, ssid)
535 else:
536 ss = self.runInteractionNow(self._txn_getSourceStampNumbered,
537 ssid)
538 self._sourcestamp_cache.add(ssid, ss)
539 return ss
540
542 assert isinstance(ssid, (int, long))
543 t.execute(self.quoteq("SELECT branch,revision,patchid,project,repository"
544 " FROM sourcestamps WHERE id=?"),
545 (ssid,))
546 r = t.fetchall()
547 if not r:
548 return None
549 (branch_u, revision_u, patchid, project, repository) = r[0]
550 branch = str_or_none(branch_u)
551 revision = str_or_none(revision_u)
552
553 patch = None
554 if patchid is not None:
555 t.execute(self.quoteq("SELECT patchlevel,patch_base64,subdir"
556 " FROM patches WHERE id=?"),
557 (patchid,))
558 r = t.fetchall()
559 assert len(r) == 1
560 (patch_level, patch_text_base64, subdir_u) = r[0]
561 patch_text = base64.b64decode(patch_text_base64)
562 if subdir_u:
563 patch = (patch_level, patch_text, str(subdir_u))
564 else:
565 patch = (patch_level, patch_text)
566
567 t.execute(self.quoteq("SELECT changeid FROM sourcestamp_changes"
568 " WHERE sourcestampid=?"
569 " ORDER BY changeid ASC"),
570 (ssid,))
571 r = t.fetchall()
572 changes = None
573 if r:
574 changes = [self.getChangeNumberedNow(changeid, t)
575 for (changeid,) in r]
576 ss = SourceStamp(branch, revision, patch, changes, project=project, repository=repository)
577 ss.ssid = ssid
578 return ss
579
580
581
583 if t:
584 return self._txn_get_properties_from_db(t, tablename, idname, id)
585 else:
586 return self.runInteractionNow(self._txn_get_properties_from_db,
587 tablename, idname, id)
588
590
591
592 q = self.quoteq("SELECT property_name,property_value FROM %s WHERE %s=?"
593 % (tablename, idname))
594 t.execute(q, (id,))
595 retval = Properties()
596 for key, valuepair in t.fetchall():
597 value, source = json.loads(valuepair)
598 retval.setProperty(str(key), value, source)
599 return retval
600
601
602
606 for scheduler in added:
607 name = scheduler.name
608 assert name
609 class_name = "%s.%s" % (scheduler.__class__.__module__,
610 scheduler.__class__.__name__)
611 q = self.quoteq("""
612 SELECT schedulerid, class_name FROM schedulers WHERE
613 name=? AND
614 (class_name=? OR class_name='')
615 """)
616 t.execute(q, (name, class_name))
617 row = t.fetchone()
618 if row:
619 sid, db_class_name = row
620 if db_class_name == '':
621
622
623
624 q = self.quoteq("""UPDATE schedulers SET class_name=?
625 WHERE schedulerid=?""")
626 t.execute(q, (class_name, sid))
627 elif db_class_name != class_name:
628
629
630 sid = None
631 else:
632 sid = None
633
634 if sid is None:
635
636
637
638 q = ("SELECT changeid FROM changes"
639 " ORDER BY changeid DESC LIMIT 1")
640 t.execute(q)
641 max_changeid = _one_or_else(t.fetchall(), 0)
642 state = scheduler.get_initial_state(max_changeid)
643 state_json = json.dumps(state)
644 q = self.quoteq("INSERT INTO schedulers"
645 " (name, class_name, state)"
646 " VALUES (?,?,?)")
647 t.execute(q, (name, class_name, state_json))
648 sid = t.lastrowid
649 log.msg("scheduler '%s' got id %d" % (scheduler.name, sid))
650 scheduler.schedulerid = sid
651
653 q = self.quoteq("SELECT state FROM schedulers WHERE schedulerid=?")
654 t.execute(q, (schedulerid,))
655 state_json = _one_or_else(t.fetchall())
656 assert state_json is not None
657 return json.loads(state_json)
658
660 state_json = json.dumps(state)
661 q = self.quoteq("UPDATE schedulers SET state=? WHERE schedulerid=?")
662 t.execute(q, (state_json, schedulerid))
663
665 """Given a SourceStamp (which may or may not have an ssid), make sure
666 the contents are in the database, and return the ssid. If the
667 SourceStamp originally came from the DB (and thus already has an
668 ssid), just return the ssid. If not, create a new row for it."""
669 if ss.ssid is not None:
670 return ss.ssid
671 patchid = None
672 if ss.patch:
673 patchlevel = ss.patch[0]
674 diff = ss.patch[1]
675 subdir = None
676 if len(ss.patch) > 2:
677 subdir = ss.patch[2]
678 q = self.quoteq("INSERT INTO patches"
679 " (patchlevel, patch_base64, subdir)"
680 " VALUES (?,?,?)")
681 t.execute(q, (patchlevel, base64.b64encode(diff), subdir))
682 patchid = t.lastrowid
683 t.execute(self.quoteq("INSERT INTO sourcestamps"
684 " (branch, revision, patchid, project, repository)"
685 " VALUES (?,?,?,?,?)"),
686 (ss.branch, ss.revision, patchid, ss.project, ss.repository))
687 ss.ssid = t.lastrowid
688 q2 = self.quoteq("INSERT INTO sourcestamp_changes"
689 " (sourcestampid, changeid) VALUES (?,?)")
690 for c in ss.changes:
691 t.execute(q2, (ss.ssid, c.number))
692 return ss.ssid
693
694 - def create_buildset(self, ssid, reason, properties, builderNames, t,
695 external_idstring=None):
696
697 now = self._getCurrentTime()
698 t.execute(self.quoteq("INSERT INTO buildsets"
699 " (external_idstring, reason,"
700 " sourcestampid, submitted_at)"
701 " VALUES (?,?,?,?)"),
702 (external_idstring, reason, ssid, now))
703 bsid = t.lastrowid
704 for propname, propvalue in properties.properties.items():
705 encoded_value = json.dumps(propvalue)
706 t.execute(self.quoteq("INSERT INTO buildset_properties"
707 " (buildsetid, property_name, property_value)"
708 " VALUES (?,?,?)"),
709 (bsid, propname, encoded_value))
710 brids = []
711 for bn in builderNames:
712 t.execute(self.quoteq("INSERT INTO buildrequests"
713 " (buildsetid, buildername, submitted_at)"
714 " VALUES (?,?,?)"),
715 (bsid, bn, now))
716 brid = t.lastrowid
717 brids.append(brid)
718 self.notify("add-buildset", bsid)
719 self.notify("add-buildrequest", *brids)
720 return bsid
721
723 q = self.quoteq("INSERT INTO scheduler_changes"
724 " (schedulerid, changeid, important)"
725 " VALUES (?,?,?)")
726 t.execute(q, (schedulerid, number, bool(important)))
727
729 q = self.quoteq("SELECT changeid, important"
730 " FROM scheduler_changes"
731 " WHERE schedulerid=?")
732 t.execute(q, (schedulerid,))
733 important = []
734 unimportant = []
735 for (changeid, is_important) in t.fetchall():
736 c = self.getChangeNumberedNow(changeid, t)
737 if is_important:
738 important.append(c)
739 else:
740 unimportant.append(c)
741 return (important, unimportant)
742
744 while changeids:
745
746
747 batch, changeids = changeids[:100], changeids[100:]
748 t.execute(self.quoteq("DELETE FROM scheduler_changes"
749 " WHERE schedulerid=? AND changeid IN ")
750 + self.parmlist(len(batch)),
751 (schedulerid,) + tuple(batch))
752
754
755
756 t.execute(self.quoteq("INSERT INTO scheduler_upstream_buildsets"
757 " (buildsetid, schedulerid, active)"
758 " VALUES (?,?,?)"),
759 (bsid, schedulerid, 1))
760
762
763 t.execute(self.quoteq("SELECT bs.id, "
764 " bs.sourcestampid, bs.complete, bs.results"
765 " FROM scheduler_upstream_buildsets AS s,"
766 " buildsets AS bs"
767 " WHERE s.buildsetid=bs.id"
768 " AND s.schedulerid=?"
769 " AND s.active=1"),
770 (schedulerid,))
771 return t.fetchall()
772
774 t.execute(self.quoteq("UPDATE scheduler_upstream_buildsets"
775 " SET active=0"
776 " WHERE buildsetid=? AND schedulerid=?"),
777 (buildsetid, schedulerid))
778
779
780
782 assert isinstance(brid, (int, long))
783 if t:
784 br = self._txn_getBuildRequestWithNumber(t, brid)
785 else:
786 br = self.runInteractionNow(self._txn_getBuildRequestWithNumber,
787 brid)
788 return br
790 assert isinstance(brid, (int, long))
791 t.execute(self.quoteq("SELECT br.buildsetid, bs.reason,"
792 " bs.sourcestampid, br.buildername,"
793 " bs.submitted_at, br.priority"
794 " FROM buildrequests AS br, buildsets AS bs"
795 " WHERE br.id=? AND br.buildsetid=bs.id"),
796 (brid,))
797 r = t.fetchall()
798 if not r:
799 return None
800 (bsid, reason, ssid, builder_name, submitted_at, priority) = r[0]
801 ss = self.getSourceStampNumberedNow(ssid, t)
802 properties = self.get_properties_from_db("buildset_properties",
803 "buildsetid", bsid, t)
804 br = BuildRequest(reason, ss, builder_name, properties)
805 br.submittedAt = submitted_at
806 br.priority = priority
807 br.id = brid
808 br.bsid = bsid
809 return br
810
812 assert isinstance(brid, (int, long))
813 return self.runInteractionNow(self._txn_get_buildername_for_brid, brid)
815 assert isinstance(brid, (int, long))
816 t.execute(self.quoteq("SELECT buildername FROM buildrequests"
817 " WHERE id=?"),
818 (brid,))
819 r = t.fetchall()
820 if not r:
821 return None
822 return r[0][0]
823
826 q = ("SELECT br.id"
827 " FROM buildrequests AS br, buildsets AS bs"
828 " WHERE br.buildername=? AND br.complete=0"
829 " AND br.buildsetid=bs.id"
830 " AND (br.claimed_at<?"
831 " OR (br.claimed_by_name=?"
832 " AND br.claimed_by_incarnation!=?))"
833 " ORDER BY br.priority DESC,bs.submitted_at ASC")
834 if limit:
835 q += " LIMIT %s" % limit
836 t.execute(self.quoteq(q),
837 (buildername, old, master_name, master_incarnation))
838 requests = [self.getBuildRequestWithNumber(brid, t)
839 for (brid,) in t.fetchall()]
840 return requests
841
844 if not brids:
845 return
846 if t:
847 self._txn_claim_buildrequests(t, now, master_name,
848 master_incarnation, brids)
849 else:
850 self.runInteractionNow(self._txn_claim_buildrequests,
851 now, master_name, master_incarnation, brids)
854 brids = list(brids)
855 while brids:
856 batch, brids = brids[:100], brids[100:]
857 q = self.quoteq("UPDATE buildrequests"
858 " SET claimed_at = ?,"
859 " claimed_by_name = ?, claimed_by_incarnation = ?"
860 " WHERE id IN " + self.parmlist(len(batch)))
861 qargs = [now, master_name, master_incarnation] + list(batch)
862 t.execute(q, qargs)
863
867 now = self._getCurrentTime()
868 t.execute(self.quoteq("INSERT INTO builds (number, brid, start_time)"
869 " VALUES (?,?,?)"),
870 (buildnumber, brid, now))
871 bid = t.lastrowid
872 self.notify("add-build", bid)
873 return bid
874
878 now = self._getCurrentTime()
879 while bids:
880 batch, bids = bids[:100], bids[100:]
881 q = self.quoteq("UPDATE builds SET finish_time = ?"
882 " WHERE id IN " + self.parmlist(len(batch)))
883 qargs = [now] + list(batch)
884 t.execute(q, qargs)
885
889
890 t.execute(self.quoteq("SELECT b.brid,br.buildername,b.number"
891 " FROM builds AS b, buildrequests AS br"
892 " WHERE b.id=? AND b.brid=br.id"),
893 (bid,))
894 res = t.fetchall()
895 if res:
896 return res[0]
897 return (None,None,None)
898
905
909
910
911 while brids:
912 batch, brids = brids[:100], brids[100:]
913 q = self.quoteq("UPDATE buildrequests"
914 " SET claimed_at=0,"
915 " claimed_by_name=NULL, claimed_by_incarnation=NULL"
916 " WHERE id IN " + self.parmlist(len(batch)))
917 t.execute(q, batch)
918 self.notify("add-buildrequest", *brids)
919
923 now = self._getCurrentTime()
924
925
926 while brids:
927 batch, brids = brids[:100], brids[100:]
928
929 q = self.quoteq("UPDATE buildrequests"
930 " SET complete=1, results=?, complete_at=?"
931 " WHERE id IN " + self.parmlist(len(batch)))
932 t.execute(q, [results, now]+batch)
933
934 q = self.quoteq("SELECT bs.id"
935 " FROM buildsets AS bs, buildrequests AS br"
936 " WHERE br.buildsetid=bs.id AND bs.complete=0"
937 " AND br.id in "
938 + self.parmlist(len(batch)))
939 t.execute(q, batch)
940 bsids = [bsid for (bsid,) in t.fetchall()]
941 for bsid in bsids:
942 self._check_buildset(t, bsid, now)
943 self.notify("retire-buildrequest", *brids)
944 self.notify("modify-buildset", *bsids)
945
949
950
951
952
953
954
955 while brids:
956 batch, brids = brids[:100], brids[100:]
957
958 if True:
959 now = self._getCurrentTime()
960 q = self.quoteq("UPDATE buildrequests"
961 " SET complete=1, results=?, complete_at=?"
962 " WHERE id IN " + self.parmlist(len(batch)))
963 t.execute(q, [FAILURE, now]+batch)
964 else:
965 q = self.quoteq("DELETE FROM buildrequests"
966 " WHERE id IN " + self.parmlist(len(batch)))
967 t.execute(q, batch)
968
969
970 q = self.quoteq("SELECT bs.id"
971 " FROM buildsets AS bs, buildrequests AS br"
972 " WHERE br.buildsetid=bs.id AND bs.complete=0"
973 " AND br.id in "
974 + self.parmlist(len(batch)))
975 t.execute(q, batch)
976 bsids = [bsid for (bsid,) in t.fetchall()]
977 for bsid in bsids:
978 self._check_buildset(t, bsid, now)
979
980 self.notify("cancel-buildrequest", *brids)
981 self.notify("modify-buildset", *bsids)
982
984 q = self.quoteq("SELECT br.complete,br.results"
985 " FROM buildsets AS bs, buildrequests AS br"
986 " WHERE bs.complete=0"
987 " AND br.buildsetid=bs.id AND bs.id=?")
988 t.execute(q, (bsid,))
989 results = t.fetchall()
990 is_complete = True
991 bs_results = SUCCESS
992 for (complete, r) in results:
993 if not complete:
994
995 is_complete = False
996
997
998 if r not in (SUCCESS, WARNINGS):
999 bs_results = FAILURE
1000 if is_complete:
1001
1002 q = self.quoteq("UPDATE buildsets"
1003 " SET complete=1, complete_at=?, results=?"
1004 " WHERE id=?")
1005 t.execute(q, (now, bs_results, bsid))
1006
1008 return self.runInteractionNow(self._txn_get_buildrequestids_for_buildset,
1009 bsid)
1011 t.execute(self.quoteq("SELECT buildername,id FROM buildrequests"
1012 " WHERE buildsetid=?"),
1013 (bsid,))
1014 return dict(t.fetchall())
1015
1019
1020
1021
1022
1023
1024
1025 q = self.quoteq("SELECT br.complete,br.results"
1026 " FROM buildsets AS bs, buildrequests AS br"
1027 " WHERE br.buildsetid=bs.id AND bs.id=?")
1028 t.execute(q, (bsid,))
1029 results = t.fetchall()
1030 finished = True
1031 successful = None
1032 for (c,r) in results:
1033 if not c:
1034 finished = False
1035 if c and r not in (SUCCESS, WARNINGS):
1036 successful = False
1037 if finished and successful is None:
1038 successful = True
1039 return (successful, finished)
1040
1044 t.execute("SELECT id FROM buildsets WHERE complete=0")
1045 return [bsid for (bsid,) in t.fetchall()]
1049 q = self.quoteq("SELECT external_idstring, reason, sourcestampid,"
1050 " complete, results"
1051 " FROM buildsets WHERE id=?")
1052 t.execute(q, (bsid,))
1053 res = t.fetchall()
1054 if res:
1055 (external, reason, ssid, complete, results) = res[0]
1056 external_idstring = str_or_none(external)
1057 reason = str_or_none(reason)
1058 complete = bool(complete)
1059 return (external_idstring, reason, ssid, complete, results)
1060 return None
1061
1063 return self.runInteractionNow(self._txn_get_pending_brids_for_builder,
1064 buildername)
1066
1067
1068
1069 t.execute(self.quoteq("SELECT id FROM buildrequests"
1070 " WHERE buildername=? AND"
1071 " complete=0 AND claimed_at=0"),
1072 (buildername,))
1073 return [brid for (brid,) in t.fetchall()]
1074
1075
1076
1078 return bool(self._pending_operation_count)
1079
1082
1083
1084 threadable.synchronize(DBConnector)
1085