1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 import time
17 import traceback
18 import inspect
19 import shutil
20 import os
21 import sqlalchemy as sa
22 import tempfile
23 from buildbot.process import metrics
24 from twisted.internet import reactor, threads
25 from twisted.python import threadpool, log
26
27
28
29
30
31 debug = False
32 _debug_id = 1
33
35 """Decorate a do function to log before, after, and elapsed time,
36 with the name of the calling function. This is not speedy!"""
37 def wrap(callable, *args, **kwargs):
38 global _debug_id
39
40
41 st = traceback.extract_stack(limit=2)
42 file, line, name, _ = st[0]
43
44
45 frame = inspect.currentframe(1)
46 locals = frame.f_locals
47
48
49 id, _debug_id = _debug_id, _debug_id+1
50
51 descr = "%s-%08x" % (name, id)
52
53 start_time = time.time()
54 log.msg("%s - before ('%s' line %d)" % (descr, file, line))
55 for name in locals:
56 if name in ('self', 'thd'):
57 continue
58 log.msg("%s - %s = %r" % (descr, name, locals[name]))
59
60
61
62 def callable_wrap(*args, **kargs):
63 log.msg("%s - thd start" % (descr,))
64 try:
65 return callable(*args, **kwargs)
66 finally:
67 log.msg("%s - thd end" % (descr,))
68 d = f(callable_wrap, *args, **kwargs)
69
70 def after(x):
71 end_time = time.time()
72 elapsed = (end_time - start_time) * 1000
73 log.msg("%s - after (%0.2f ms elapsed)" % (descr, elapsed))
74 return x
75 d.addBoth(after)
76 return d
77 wrap.__name__ = f.__name__
78 wrap.__doc__ = f.__doc__
79 return wrap
80
82
83 running = False
84
85
86
87
88
89
90
91 __broken_sqlite = False
92
93 - def __init__(self, engine, verbose=False):
94
95
96 log_msg = log.msg
97 if verbose:
98 def log_msg(m):
99 print m
100
101 pool_size = 5
102
103
104
105
106
107 if hasattr(engine, 'optimal_thread_pool_size'):
108 pool_size = engine.optimal_thread_pool_size
109
110 threadpool.ThreadPool.__init__(self,
111 minthreads=1,
112 maxthreads=pool_size,
113 name='DBThreadPool')
114 self.engine = engine
115 if engine.dialect.name == 'sqlite':
116 vers = self.get_sqlite_version()
117 if vers < (3,7):
118 log_msg("Using SQLite Version %s" % (vers,))
119 log_msg("NOTE: this old version of SQLite does not support "
120 "WAL journal mode; a busy master may encounter "
121 "'Database is locked' errors. Consider upgrading.")
122 if vers < (3,4):
123 log_msg("NOTE: this old version of SQLite is not "
124 "supported.")
125 raise RuntimeError("unsupported SQLite version")
126 brkn = self.__broken_sqlite = self.detect_bug1810()
127 if brkn:
128 log_msg("Applying SQLite workaround from Buildbot bug #1810")
129 self._start_evt = reactor.callWhenRunning(self._start)
130
131
132 if debug:
133 self.do = timed_do_fn(self.do)
134 self.do_with_engine = timed_do_fn(self.do_with_engine)
135
137 self._start_evt = None
138 if not self.running:
139 self.start()
140 self._stop_evt = reactor.addSystemEventTrigger(
141 'during', 'shutdown', self._stop)
142 self.running = True
143
145 self._stop_evt = None
146 self.stop()
147 self.engine.dispose()
148 self.running = False
149
151 """Manually stop the pool. This is only necessary from tests, as the
152 pool will stop itself when the reactor stops under normal
153 circumstances."""
154 if not self._stop_evt:
155 return
156 reactor.removeSystemEventTrigger(self._stop_evt)
157 self._stop()
158
159
160
161
162
163 BACKOFF_START = 1.0
164 BACKOFF_MULT = 1.05
165 MAX_OPERATIONALERROR_TIME = 3600*24
166 - def __thd(self, with_engine, callable, args, kwargs):
167
168
169
170 backoff = self.BACKOFF_START
171 start = time.time()
172 while True:
173 if with_engine:
174 arg = self.engine
175 else:
176 arg = self.engine.contextual_connect()
177
178 if self.__broken_sqlite:
179 arg.execute("select * from sqlite_master")
180 try:
181 try:
182 rv = callable(arg, *args, **kwargs)
183 assert not isinstance(rv, sa.engine.ResultProxy), \
184 "do not return ResultProxy objects!"
185 except sa.exc.OperationalError, e:
186 text = e.orig.args[0]
187 if not isinstance(text, basestring):
188 raise
189 if "Lost connection" in text \
190 or "database is locked" in text:
191
192
193 elapsed = time.time() - start
194 if elapsed > self.MAX_OPERATIONALERROR_TIME:
195 raise
196
197 metrics.MetricCountEvent.log(
198 "DBThreadPool.retry-on-OperationalError")
199 log.msg("automatically retrying query after "
200 "OperationalError (%ss sleep)" % backoff)
201
202
203 time.sleep(backoff)
204 backoff *= self.BACKOFF_MULT
205
206
207 continue
208 else:
209 raise
210 finally:
211 if not with_engine:
212 arg.close()
213 break
214 return rv
215
216 - def do(self, callable, *args, **kwargs):
217 return threads.deferToThreadPool(reactor, self,
218 self.__thd, False, callable, args, kwargs)
219
221 return threads.deferToThreadPool(reactor, self,
222 self.__thd, True, callable, args, kwargs)
223
225
226
227 try:
228 import pysqlite2.dbapi2 as sqlite
229 sqlite = sqlite
230 except ImportError:
231 import sqlite3 as sqlite
232
233 tmpdir = tempfile.mkdtemp()
234 dbfile = os.path.join(tmpdir, "detect_bug1810.db")
235 def test(select_from_sqlite_master=False):
236 conn1 = None
237 conn2 = None
238 try:
239 conn1 = sqlite.connect(dbfile)
240 curs1 = conn1.cursor()
241 curs1.execute("PRAGMA table_info('foo')")
242
243 conn2 = sqlite.connect(dbfile)
244 curs2 = conn2.cursor()
245 curs2.execute("CREATE TABLE foo ( a integer )")
246
247 if select_from_sqlite_master:
248 curs1.execute("SELECT * from sqlite_master")
249 curs1.execute("SELECT * from foo")
250 finally:
251 if conn1:
252 conn1.close()
253 if conn2:
254 conn2.close()
255 os.unlink(dbfile)
256
257 try:
258 test()
259 except sqlite.OperationalError:
260
261 shutil.rmtree(tmpdir)
262 return True
263
264
265 test(select_from_sqlite_master=True)
266 shutil.rmtree(tmpdir)
267 return False
268
270 engine = sa.create_engine('sqlite://')
271 conn = engine.contextual_connect()
272
273 try:
274 r = conn.execute("SELECT sqlite_version()")
275 vers_row = r.fetchone()
276 r.close()
277 except:
278 return (0,)
279
280 if vers_row:
281 try:
282 return tuple(map(int, vers_row[0].split('.')))
283 except (TypeError, ValueError):
284 return (0,)
285 else:
286 return (0,)
287