1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 import time
17 import traceback
18 import inspect
19 import shutil
20 import os
21 import sqlalchemy as sa
22 import twisted
23 import tempfile
24 from buildbot.process import metrics
25 from twisted.internet import reactor, threads, defer
26 from twisted.python import threadpool, failure, versions, log
27
28
29
30
31
32 debug = False
33 _debug_id = 1
34
36 """Decorate a do function to log before, after, and elapsed time,
37 with the name of the calling function. This is not speedy!"""
38 def wrap(callable, *args, **kwargs):
39 global _debug_id
40
41
42 st = traceback.extract_stack(limit=2)
43 file, line, name, _ = st[0]
44
45
46 frame = inspect.currentframe(1)
47 locals = frame.f_locals
48
49
50 id, _debug_id = _debug_id, _debug_id+1
51
52 descr = "%s-%08x" % (name, id)
53
54 start_time = time.time()
55 log.msg("%s - before ('%s' line %d)" % (descr, file, line))
56 for name in locals:
57 if name in ('self', 'thd'):
58 continue
59 log.msg("%s - %s = %r" % (descr, name, locals[name]))
60
61
62
63 def callable_wrap(*args, **kargs):
64 log.msg("%s - thd start" % (descr,))
65 try:
66 return callable(*args, **kwargs)
67 finally:
68 log.msg("%s - thd end" % (descr,))
69 d = f(callable_wrap, *args, **kwargs)
70
71 def after(x):
72 end_time = time.time()
73 elapsed = (end_time - start_time) * 1000
74 log.msg("%s - after (%0.2f ms elapsed)" % (descr, elapsed))
75 return x
76 d.addBoth(after)
77 return d
78 wrap.__name__ = f.__name__
79 wrap.__doc__ = f.__doc__
80 return wrap
81
83
84 running = False
85
86
87
88
89
90
91
92 __broken_sqlite = False
93
94 - def __init__(self, engine, verbose=False):
95
96
97 log_msg = log.msg
98 if verbose:
99 def log_msg(m):
100 print m
101
102 pool_size = 5
103
104
105
106
107
108 if hasattr(engine, 'optimal_thread_pool_size'):
109 pool_size = engine.optimal_thread_pool_size
110
111 threadpool.ThreadPool.__init__(self,
112 minthreads=1,
113 maxthreads=pool_size,
114 name='DBThreadPool')
115 self.engine = engine
116 if engine.dialect.name == 'sqlite':
117 vers = self.get_sqlite_version()
118 if vers < (3,7):
119 log_msg("Using SQLite Version %s" % (vers,))
120 log_msg("NOTE: this old version of SQLite does not support "
121 "WAL journal mode; a busy master may encounter "
122 "'Database is locked' errors. Consider upgrading.")
123 if vers < (3,4):
124 log_msg("NOTE: this old version of SQLite is not "
125 "supported.")
126 raise RuntimeError("unsupported SQLite version")
127 brkn = self.__broken_sqlite = self.detect_bug1810()
128 if brkn:
129 log_msg("Applying SQLite workaround from Buildbot bug #1810")
130 self._start_evt = reactor.callWhenRunning(self._start)
131
132
133 if debug:
134 self.do = timed_do_fn(self.do)
135 self.do_with_engine = timed_do_fn(self.do_with_engine)
136
138 self._start_evt = None
139 if not self.running:
140 self.start()
141 self._stop_evt = reactor.addSystemEventTrigger(
142 'during', 'shutdown', self._stop)
143 self.running = True
144
146 self._stop_evt = None
147 self.stop()
148 self.engine.dispose()
149 self.running = False
150
152 """Manually stop the pool. This is only necessary from tests, as the
153 pool will stop itself when the reactor stops under normal
154 circumstances."""
155 if not self._stop_evt:
156 return
157 reactor.removeSystemEventTrigger(self._stop_evt)
158 self._stop()
159
160
161
162
163
164 BACKOFF_START = 1.0
165 BACKOFF_MULT = 1.05
166 MAX_OPERATIONALERROR_TIME = 3600*24
167 - def __thd(self, with_engine, callable, args, kwargs):
168
169
170
171 backoff = self.BACKOFF_START
172 start = time.time()
173 while True:
174 if with_engine:
175 arg = self.engine
176 else:
177 arg = self.engine.contextual_connect()
178
179 if self.__broken_sqlite:
180 arg.execute("select * from sqlite_master")
181 try:
182 try:
183 rv = callable(arg, *args, **kwargs)
184 assert not isinstance(rv, sa.engine.ResultProxy), \
185 "do not return ResultProxy objects!"
186 except sa.exc.OperationalError, e:
187 text = e.orig.args[0]
188 if not isinstance(text, basestring):
189 raise
190 if "Lost connection" in text \
191 or "database is locked" in text:
192
193
194 elapsed = time.time() - start
195 if elapsed > self.MAX_OPERATIONALERROR_TIME:
196 raise
197
198 metrics.MetricCountEvent.log(
199 "DBThreadPool.retry-on-OperationalError")
200 log.msg("automatically retrying query after "
201 "OperationalError (%ss sleep)" % backoff)
202
203
204 time.sleep(backoff)
205 backoff *= self.BACKOFF_MULT
206
207
208 continue
209 else:
210 raise
211 finally:
212 if not with_engine:
213 arg.close()
214 break
215 return rv
216
217 - def do(self, callable, *args, **kwargs):
218 return threads.deferToThreadPool(reactor, self,
219 self.__thd, False, callable, args, kwargs)
220
222 return threads.deferToThreadPool(reactor, self,
223 self.__thd, True, callable, args, kwargs)
224
225
226
227
228
229
230 if twisted.version < versions.Version('twisted', 8, 2, 0):
231 - def __081_wrap(self, with_engine, callable, args, kwargs):
232 d = defer.Deferred()
233 def thd():
234 try:
235 reactor.callFromThread(d.callback,
236 self.__thd(with_engine, callable, args, kwargs))
237 except:
238 reactor.callFromThread(d.errback,
239 failure.Failure())
240 self.callInThread(thd)
241 return d
242
243 - def do_081(self, callable, *args, **kwargs):
244 return self.__081_wrap(False, callable, args, kwargs)
245
247 return self.__081_wrap(True, callable, args, kwargs)
248
249 do = do_081
250 do_with_engine = do_with_engine_081
251
253
254
255 try:
256 import pysqlite2.dbapi2 as sqlite
257 sqlite = sqlite
258 except ImportError:
259 import sqlite3 as sqlite
260
261 tmpdir = tempfile.mkdtemp()
262 dbfile = os.path.join(tmpdir, "detect_bug1810.db")
263 def test(select_from_sqlite_master=False):
264 conn1 = None
265 conn2 = None
266 try:
267 conn1 = sqlite.connect(dbfile)
268 curs1 = conn1.cursor()
269 curs1.execute("PRAGMA table_info('foo')")
270
271 conn2 = sqlite.connect(dbfile)
272 curs2 = conn2.cursor()
273 curs2.execute("CREATE TABLE foo ( a integer )")
274
275 if select_from_sqlite_master:
276 curs1.execute("SELECT * from sqlite_master")
277 curs1.execute("SELECT * from foo")
278 finally:
279 if conn1:
280 conn1.close()
281 if conn2:
282 conn2.close()
283 os.unlink(dbfile)
284
285 try:
286 test()
287 except sqlite.OperationalError:
288
289 shutil.rmtree(tmpdir)
290 return True
291
292
293 test(select_from_sqlite_master=True)
294 shutil.rmtree(tmpdir)
295 return False
296
298 engine = sa.create_engine('sqlite://')
299 conn = engine.contextual_connect()
300
301 try:
302 r = conn.execute("SELECT sqlite_version()")
303 vers_row = r.fetchone()
304 r.close()
305 except:
306 return (0,)
307
308 if vers_row:
309 try:
310 return tuple(map(int, vers_row[0].split('.')))
311 except (TypeError, ValueError):
312 return (0,)
313 else:
314 return (0,)
315