1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 import time
17 import traceback
18 import shutil
19 import os
20 import sqlalchemy as sa
21 import twisted
22 import tempfile
23 from twisted.internet import reactor, threads, defer
24 from twisted.python import threadpool, failure, versions, log
25
26
27
28
29
30 debug = False
31
33 """Decorate a do function to log before, after, and elapsed time,
34 with the name of the calling function. This is not speedy!"""
35 def wrap(*args, **kwargs):
36
37 st = traceback.extract_stack(limit=2)
38 file, line, name, _ = st[0]
39 descr = "%s ('%s' line %d)" % (name, file, line)
40
41 start_time = time.time()
42 log.msg("%s - before" % (descr,))
43 d = f(*args, **kwargs)
44 def after(x):
45 end_time = time.time()
46 elapsed = (end_time - start_time) * 1000
47 log.msg("%s - after (%0.2f ms elapsed)" % (descr, elapsed))
48 return x
49 d.addBoth(after)
50 return d
51 wrap.__name__ = f.__name__
52 wrap.__doc__ = f.__doc__
53 return wrap
54
56 """
57 A pool of threads ready and waiting to execute queries.
58
59 If the engine has an C{optimal_thread_pool_size} attribute, then the
60 maxthreads of the thread pool will be set to that value. This is most
61 useful for SQLite in-memory connections, where exactly one connection
62 (and thus thread) should be used.
63 """
64
65 running = False
66
67
68
69
70
71
72
73 __broken_sqlite = False
74
76 pool_size = 5
77 if hasattr(engine, 'optimal_thread_pool_size'):
78 pool_size = engine.optimal_thread_pool_size
79 threadpool.ThreadPool.__init__(self,
80 minthreads=1,
81 maxthreads=pool_size,
82 name='DBThreadPool')
83 self.engine = engine
84 if engine.dialect.name == 'sqlite':
85 vers = self.get_sqlite_version()
86 log.msg("Using SQLite Version %s" % (vers,))
87 if vers < (3,3,17):
88 log.msg("NOTE: this old version of SQLite does not support "
89 "multiple simultaneous accesses to the database; "
90 "add the 'pool_size=1' argument to your db url")
91 brkn = self.__broken_sqlite = self.detect_bug1810()
92 if brkn:
93 log.msg("Applying SQLite workaround from Buildbot bug #1810")
94 self._start_evt = reactor.callWhenRunning(self._start)
95
96
97 if debug:
98 self.do = timed_do_fn(self.do)
99 self.do_with_engine = timed_do_fn(self.do_with_engine)
100
102 self._start_evt = None
103 if not self.running:
104 self.start()
105 self._stop_evt = reactor.addSystemEventTrigger(
106 'during', 'shutdown', self._stop)
107 self.running = True
108
110 self._stop_evt = None
111 self.stop()
112 self.engine.dispose()
113 self.running = False
114
116 """Manually stop the pool. This is only necessary from tests, as the
117 pool will stop itself when the reactor stops under normal
118 circumstances."""
119 if not self._stop_evt:
120 return
121 reactor.removeSystemEventTrigger(self._stop_evt)
122 self._stop()
123
124 - def do(self, callable, *args, **kwargs):
125 """
126 Call C{callable} in a thread, with a Connection as first argument.
127 Returns a deferred that will indicate the results of the callable.
128
129 Note: do not return any SQLAlchemy objects via this deferred!
130 """
131 def thd():
132 conn = self.engine.contextual_connect()
133 if self.__broken_sqlite:
134 conn.execute("select * from sqlite_master")
135 try:
136 rv = callable(conn, *args, **kwargs)
137 assert not isinstance(rv, sa.engine.ResultProxy), \
138 "do not return ResultProxy objects!"
139 finally:
140 conn.close()
141 return rv
142 return threads.deferToThreadPool(reactor, self, thd)
143
145 """
146 Like L{do}, but with an SQLAlchemy Engine as the first argument. This
147 is only used for schema manipulation, and is not used at master
148 runtime.
149 """
150 def thd():
151 if self.__broken_sqlite:
152 self.engine.execute("select * from sqlite_master")
153 rv = callable(self.engine, *args, **kwargs)
154 assert not isinstance(rv, sa.engine.ResultProxy), \
155 "do not return ResultProxy objects!"
156 return rv
157 return threads.deferToThreadPool(reactor, self, thd)
158
159
160
161
162
163
164 - def do_081(self, callable, *args, **kwargs):
165 d = defer.Deferred()
166 def thd():
167 try:
168 conn = self.engine.contextual_connect()
169 if self.__broken_sqlite:
170 conn.execute("select * from sqlite_master")
171 try:
172 rv = callable(conn, *args, **kwargs)
173 assert not isinstance(rv, sa.engine.ResultProxy), \
174 "do not return ResultProxy objects!"
175 finally:
176 conn.close()
177 reactor.callFromThread(d.callback, rv)
178 except:
179 reactor.callFromThread(d.errback, failure.Failure())
180 self.callInThread(thd)
181 return d
183 d = defer.Deferred()
184 def thd():
185 try:
186 conn = self.engine
187 if self.__broken_sqlite:
188 conn.execute("select * from sqlite_master")
189 rv = callable(conn, *args, **kwargs)
190 assert not isinstance(rv, sa.engine.ResultProxy), \
191 "do not return ResultProxy objects!"
192 reactor.callFromThread(d.callback, rv)
193 except:
194 reactor.callFromThread(d.errback, failure.Failure())
195 self.callInThread(thd)
196 return d
197
198
199 if twisted.version < versions.Version('twisted', 8, 2, 0):
200 do = do_081
201 do_with_engine = do_with_engine_081
202
204
205
206 try:
207 import pysqlite2.dbapi2 as sqlite
208 sqlite = sqlite
209 except ImportError:
210 import sqlite3 as sqlite
211
212 tmpdir = tempfile.mkdtemp()
213 dbfile = os.path.join(tmpdir, "detect_bug1810.db")
214 def test(select_from_sqlite_master=False):
215 conn1 = None
216 conn2 = None
217 try:
218 conn1 = sqlite.connect(dbfile)
219 curs1 = conn1.cursor()
220 curs1.execute("PRAGMA table_info('foo')")
221
222 conn2 = sqlite.connect(dbfile)
223 curs2 = conn2.cursor()
224 curs2.execute("CREATE TABLE foo ( a integer )")
225
226 if select_from_sqlite_master:
227 curs1.execute("SELECT * from sqlite_master")
228 curs1.execute("SELECT * from foo")
229 finally:
230 if conn1:
231 conn1.close()
232 if conn2:
233 conn2.close()
234 os.unlink(dbfile)
235
236 try:
237 test()
238 except sqlite.OperationalError:
239
240 shutil.rmtree(tmpdir)
241 return True
242
243
244 test(select_from_sqlite_master=True)
245 shutil.rmtree(tmpdir)
246 return False
247
249 engine = sa.create_engine('sqlite://')
250 conn = engine.contextual_connect()
251
252 try:
253 r = conn.execute("SELECT sqlite_version()")
254 vers_row = r.fetchone()
255 r.close()
256 except:
257 return (0,)
258
259 if vers_row:
260 try:
261 return tuple(map(int, vers_row[0].split('.')))
262 except (TypeError, ValueError):
263 return (0,)
264 else:
265 return (0,)
266