1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """
17 Support for changes in the database
18 """
19
20 from buildbot.util import json
21 import sqlalchemy as sa
22 from twisted.internet import defer, reactor
23 from buildbot.db import base
24 from buildbot.util import epoch2datetime, datetime2epoch
28
30
31
32 - def addChange(self, author=None, files=None, comments=None, is_dir=0,
33 revision=None, when_timestamp=None, branch=None,
34 category=None, revlink='', properties={}, repository='', codebase='',
35 project='', uid=None, _reactor=reactor):
36 assert project is not None, "project must be a string, not None"
37 assert repository is not None, "repository must be a string, not None"
38
39 if when_timestamp is None:
40 when_timestamp = epoch2datetime(_reactor.seconds())
41
42
43 for pv in properties.values():
44 assert pv[1] == 'Change', ("properties must be qualified with"
45 "source 'Change'")
46
47 def thd(conn):
48
49
50
51
52
53
54 transaction = conn.begin()
55
56 ch_tbl = self.db.model.changes
57
58 self.check_length(ch_tbl.c.author, author)
59 self.check_length(ch_tbl.c.comments, comments)
60 self.check_length(ch_tbl.c.branch, branch)
61 self.check_length(ch_tbl.c.revision, revision)
62 self.check_length(ch_tbl.c.revlink, revlink)
63 self.check_length(ch_tbl.c.category, category)
64 self.check_length(ch_tbl.c.repository, repository)
65 self.check_length(ch_tbl.c.project, project)
66
67 r = conn.execute(ch_tbl.insert(), dict(
68 author=author,
69 comments=comments,
70 is_dir=is_dir,
71 branch=branch,
72 revision=revision,
73 revlink=revlink,
74 when_timestamp=datetime2epoch(when_timestamp),
75 category=category,
76 repository=repository,
77 codebase=codebase,
78 project=project))
79 changeid = r.inserted_primary_key[0]
80 if files:
81 tbl = self.db.model.change_files
82 for f in files:
83 self.check_length(tbl.c.filename, f)
84 conn.execute(tbl.insert(), [
85 dict(changeid=changeid, filename=f)
86 for f in files
87 ])
88 if properties:
89 tbl = self.db.model.change_properties
90 inserts = [
91 dict(changeid=changeid,
92 property_name=k,
93 property_value=json.dumps(v))
94 for k,v in properties.iteritems()
95 ]
96 for i in inserts:
97 self.check_length(tbl.c.property_name,
98 i['property_name'])
99 self.check_length(tbl.c.property_value,
100 i['property_value'])
101
102 conn.execute(tbl.insert(), inserts)
103 if uid:
104 ins = self.db.model.change_users.insert()
105 conn.execute(ins, dict(changeid=changeid, uid=uid))
106
107 transaction.commit()
108
109 return changeid
110 d = self.db.pool.do(thd)
111 return d
112
113 @base.cached("chdicts")
115 assert changeid >= 0
116 def thd(conn):
117
118 changes_tbl = self.db.model.changes
119 q = changes_tbl.select(whereclause=(changes_tbl.c.changeid == changeid))
120 rp = conn.execute(q)
121 row = rp.fetchone()
122 if not row:
123 return None
124
125 return self._chdict_from_change_row_thd(conn, row)
126 d = self.db.pool.do(thd)
127 return d
128
130 assert changeid >= 0
131 def thd(conn):
132 cu_tbl = self.db.model.change_users
133 q = cu_tbl.select(whereclause=(cu_tbl.c.changeid == changeid))
134 res = conn.execute(q)
135 rows = res.fetchall()
136 row_uids = [ row.uid for row in rows ]
137 return row_uids
138 d = self.db.pool.do(thd)
139 return d
140
142 def thd(conn):
143
144 changes_tbl = self.db.model.changes
145 q = sa.select([changes_tbl.c.changeid],
146 order_by=[sa.desc(changes_tbl.c.changeid)],
147 limit=count)
148 rp = conn.execute(q)
149 changeids = [ row.changeid for row in rp ]
150 rp.close()
151 return list(reversed(changeids))
152 d = self.db.pool.do(thd)
153
154
155 def get_changes(changeids):
156 return defer.gatherResults([ self.getChange(changeid)
157 for changeid in changeids ])
158 d.addCallback(get_changes)
159 return d
160
162 def thd(conn):
163 changes_tbl = self.db.model.changes
164 q = sa.select([ changes_tbl.c.changeid ],
165 order_by=sa.desc(changes_tbl.c.changeid),
166 limit=1)
167 return conn.scalar(q)
168 d = self.db.pool.do(thd)
169 return d
170
171
172
174 """
175 Called periodically by DBConnector, this method deletes changes older
176 than C{changeHorizon}.
177 """
178
179 if not changeHorizon:
180 return defer.succeed(None)
181 def thd(conn):
182 changes_tbl = self.db.model.changes
183
184
185
186
187
188
189 q = sa.select([changes_tbl.c.changeid],
190 order_by=[sa.desc(changes_tbl.c.changeid)],
191 offset=changeHorizon)
192 res = conn.execute(q)
193 ids_to_delete = [ r.changeid for r in res ]
194
195
196 for table_name in ('scheduler_changes', 'sourcestamp_changes',
197 'change_files', 'change_properties', 'changes',
198 'change_users'):
199 remaining = ids_to_delete[:]
200 while remaining:
201 batch, remaining = remaining[:100], remaining[100:]
202 table = self.db.model.metadata.tables[table_name]
203 conn.execute(
204 table.delete(table.c.changeid.in_(batch)))
205 return self.db.pool.do(thd)
206
208
209
210 change_files_tbl = self.db.model.change_files
211 change_properties_tbl = self.db.model.change_properties
212
213 chdict = ChDict(
214 changeid=ch_row.changeid,
215 author=ch_row.author,
216 files=[],
217 comments=ch_row.comments,
218 is_dir=ch_row.is_dir,
219 revision=ch_row.revision,
220 when_timestamp=epoch2datetime(ch_row.when_timestamp),
221 branch=ch_row.branch,
222 category=ch_row.category,
223 revlink=ch_row.revlink,
224 properties={},
225 repository=ch_row.repository,
226 codebase=ch_row.codebase,
227 project=ch_row.project)
228
229 query = change_files_tbl.select(
230 whereclause=(change_files_tbl.c.changeid == ch_row.changeid))
231 rows = conn.execute(query)
232 for r in rows:
233 chdict['files'].append(r.filename)
234
235
236
237
238 def split_vs(vs):
239 try:
240 v,s = vs
241 if s != "Change":
242 v,s = vs, "Change"
243 except:
244 v,s = vs, "Change"
245 return v, s
246
247 query = change_properties_tbl.select(
248 whereclause=(change_properties_tbl.c.changeid == ch_row.changeid))
249 rows = conn.execute(query)
250 for r in rows:
251 try:
252 v, s = split_vs(json.loads(r.property_value))
253 chdict['properties'][r.property_name] = (v,s)
254 except ValueError:
255 pass
256
257 return chdict
258