Package buildbot :: Package db :: Module pool
[frames] | no frames]

Source Code for Module buildbot.db.pool

  1  # This file is part of Buildbot.  Buildbot is free software: you can 
  2  # redistribute it and/or modify it under the terms of the GNU General Public 
  3  # License as published by the Free Software Foundation, version 2. 
  4  # 
  5  # This program is distributed in the hope that it will be useful, but WITHOUT 
  6  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 
  8  # details. 
  9  # 
 10  # You should have received a copy of the GNU General Public License along with 
 11  # this program; if not, write to the Free Software Foundation, Inc., 51 
 12  # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 13  # 
 14  # Copyright Buildbot Team Members 
 15   
 16  import time 
 17  import traceback 
 18  import shutil 
 19  import os 
 20  import sqlalchemy as sa 
 21  import twisted 
 22  import tempfile 
 23  from twisted.internet import reactor, threads, defer 
 24  from twisted.python import threadpool, failure, versions, log 
 25   
 26  # set this to True for *very* verbose query debugging output; this can 
 27  # be monkey-patched from master.cfg, too: 
 28  #     from buildbot.db import pool 
 29  #     pool.debug = True 
 30  debug = False 
 31   
32 -def timed_do_fn(f):
33 """Decorate a do function to log before, after, and elapsed time, 34 with the name of the calling function. This is not speedy!""" 35 def wrap(*args, **kwargs): 36 # get a description of the function that called us 37 st = traceback.extract_stack(limit=2) 38 file, line, name, _ = st[0] 39 descr = "%s ('%s' line %d)" % (name, file, line) 40 41 start_time = time.time() 42 log.msg("%s - before" % (descr,)) 43 d = f(*args, **kwargs) 44 def after(x): 45 end_time = time.time() 46 elapsed = (end_time - start_time) * 1000 47 log.msg("%s - after (%0.2f ms elapsed)" % (descr, elapsed)) 48 return x
49 d.addBoth(after) 50 return d 51 wrap.__name__ = f.__name__ 52 wrap.__doc__ = f.__doc__ 53 return wrap 54
55 -class DBThreadPool(threadpool.ThreadPool):
56 """ 57 A pool of threads ready and waiting to execute queries. 58 59 If the engine has an C{optimal_thread_pool_size} attribute, then the 60 maxthreads of the thread pool will be set to that value. This is most 61 useful for SQLite in-memory connections, where exactly one connection 62 (and thus thread) should be used. 63 """ 64 65 running = False 66 67 # Some versions of SQLite incorrectly cache metadata about which tables are 68 # and are not present on a per-connection basis. This cache can be flushed 69 # by querying the sqlite_master table. We currently assume all versions of 70 # SQLite have this bug, although it has only been observed in 3.4.2. A 71 # dynamic check for this bug would be more appropriate. This is documented 72 # in bug #1810. 73 __broken_sqlite = False 74
75 - def __init__(self, engine):
76 pool_size = 5 77 if hasattr(engine, 'optimal_thread_pool_size'): 78 pool_size = engine.optimal_thread_pool_size 79 threadpool.ThreadPool.__init__(self, 80 minthreads=1, 81 maxthreads=pool_size, 82 name='DBThreadPool') 83 self.engine = engine 84 if engine.dialect.name == 'sqlite': 85 vers = self.get_sqlite_version() 86 log.msg("Using SQLite Version %s" % (vers,)) 87 if vers < (3,3,17): 88 log.msg("NOTE: this old version of SQLite does not support " 89 "multiple simultaneous accesses to the database; " 90 "add the 'pool_size=1' argument to your db url") 91 brkn = self.__broken_sqlite = self.detect_bug1810() 92 if brkn: 93 log.msg("Applying SQLite workaround from Buildbot bug #1810") 94 self._start_evt = reactor.callWhenRunning(self._start) 95 96 # patch the do methods to do verbose logging if necessary 97 if debug: 98 self.do = timed_do_fn(self.do) 99 self.do_with_engine = timed_do_fn(self.do_with_engine)
100
101 - def _start(self):
102 self._start_evt = None 103 if not self.running: 104 self.start() 105 self._stop_evt = reactor.addSystemEventTrigger( 106 'during', 'shutdown', self._stop) 107 self.running = True
108
109 - def _stop(self):
110 self._stop_evt = None 111 self.stop() 112 self.engine.dispose() 113 self.running = False
114
115 - def shutdown(self):
116 """Manually stop the pool. This is only necessary from tests, as the 117 pool will stop itself when the reactor stops under normal 118 circumstances.""" 119 if not self._stop_evt: 120 return # pool is already stopped 121 reactor.removeSystemEventTrigger(self._stop_evt) 122 self._stop()
123
124 - def do(self, callable, *args, **kwargs):
125 """ 126 Call C{callable} in a thread, with a Connection as first argument. 127 Returns a deferred that will indicate the results of the callable. 128 129 Note: do not return any SQLAlchemy objects via this deferred! 130 """ 131 def thd(): 132 conn = self.engine.contextual_connect() 133 if self.__broken_sqlite: # see bug #1810 134 conn.execute("select * from sqlite_master") 135 try: 136 rv = callable(conn, *args, **kwargs) 137 assert not isinstance(rv, sa.engine.ResultProxy), \ 138 "do not return ResultProxy objects!" 139 finally: 140 conn.close() 141 return rv
142 return threads.deferToThreadPool(reactor, self, thd)
143
144 - def do_with_engine(self, callable, *args, **kwargs):
145 """ 146 Like L{do}, but with an SQLAlchemy Engine as the first argument. This 147 is only used for schema manipulation, and is not used at master 148 runtime. 149 """ 150 def thd(): 151 if self.__broken_sqlite: # see bug #1810 152 self.engine.execute("select * from sqlite_master") 153 rv = callable(self.engine, *args, **kwargs) 154 assert not isinstance(rv, sa.engine.ResultProxy), \ 155 "do not return ResultProxy objects!" 156 return rv
157 return threads.deferToThreadPool(reactor, self, thd) 158 159 # older implementations for twisted < 0.8.2, which does not have 160 # deferToThreadPool; this basically re-implements it, although it gets some 161 # of the synchronization wrong - the thread may still be "in use" when the 162 # deferred fires in the parent, which can lead to database accesses hopping 163 # between threads. In practice, this should not cause any difficulty.
164 - def do_081(self, callable, *args, **kwargs): # pragma: no cover
165 d = defer.Deferred() 166 def thd(): 167 try: 168 conn = self.engine.contextual_connect() 169 if self.__broken_sqlite: # see bug #1810 170 conn.execute("select * from sqlite_master") 171 try: 172 rv = callable(conn, *args, **kwargs) 173 assert not isinstance(rv, sa.engine.ResultProxy), \ 174 "do not return ResultProxy objects!" 175 finally: 176 conn.close() 177 reactor.callFromThread(d.callback, rv) 178 except: 179 reactor.callFromThread(d.errback, failure.Failure()) 180 self.callInThread(thd) 181 return d
182 - def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no cover
183 d = defer.Deferred() 184 def thd(): 185 try: 186 conn = self.engine 187 if self.__broken_sqlite: # see bug #1810 188 conn.execute("select * from sqlite_master") 189 rv = callable(conn, *args, **kwargs) 190 assert not isinstance(rv, sa.engine.ResultProxy), \ 191 "do not return ResultProxy objects!" 192 reactor.callFromThread(d.callback, rv) 193 except: 194 reactor.callFromThread(d.errback, failure.Failure()) 195 self.callInThread(thd) 196 return d 197 198 # use the 0.8.1 versions on old Twisteds 199 if twisted.version < versions.Version('twisted', 8, 2, 0): 200 do = do_081 201 do_with_engine = do_with_engine_081 202
203 - def detect_bug1810(self):
204 # detect buggy SQLite implementations; call only for a known-sqlite 205 # dialect 206 try: 207 import pysqlite2.dbapi2 as sqlite 208 sqlite = sqlite 209 except ImportError: 210 import sqlite3 as sqlite 211 212 tmpdir = tempfile.mkdtemp() 213 dbfile = os.path.join(tmpdir, "detect_bug1810.db") 214 def test(select_from_sqlite_master=False): 215 conn1 = None 216 conn2 = None 217 try: 218 conn1 = sqlite.connect(dbfile) 219 curs1 = conn1.cursor() 220 curs1.execute("PRAGMA table_info('foo')") 221 222 conn2 = sqlite.connect(dbfile) 223 curs2 = conn2.cursor() 224 curs2.execute("CREATE TABLE foo ( a integer )") 225 226 if select_from_sqlite_master: 227 curs1.execute("SELECT * from sqlite_master") 228 curs1.execute("SELECT * from foo") 229 finally: 230 if conn1: 231 conn1.close() 232 if conn2: 233 conn2.close() 234 os.unlink(dbfile)
235 236 try: 237 test() 238 except sqlite.OperationalError: 239 # this is the expected error indicating it's broken 240 shutil.rmtree(tmpdir) 241 return True 242 243 # but this version should not fail.. 244 test(select_from_sqlite_master=True) 245 shutil.rmtree(tmpdir) 246 return False # not broken - no workaround required 247
248 - def get_sqlite_version(self):
249 engine = sa.create_engine('sqlite://') 250 conn = engine.contextual_connect() 251 252 try: 253 r = conn.execute("SELECT sqlite_version()") 254 vers_row = r.fetchone() 255 r.close() 256 except: 257 return (0,) 258 259 if vers_row: 260 try: 261 return tuple(map(int, vers_row[0].split('.'))) 262 except (TypeError, ValueError): 263 return (0,) 264 else: 265 return (0,)
266