Package buildbot :: Package db :: Module pool
[frames] | no frames]

Source Code for Module buildbot.db.pool

  1  # This file is part of Buildbot.  Buildbot is free software: you can 
  2  # redistribute it and/or modify it under the terms of the GNU General Public 
  3  # License as published by the Free Software Foundation, version 2. 
  4  # 
  5  # This program is distributed in the hope that it will be useful, but WITHOUT 
  6  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 
  8  # details. 
  9  # 
 10  # You should have received a copy of the GNU General Public License along with 
 11  # this program; if not, write to the Free Software Foundation, Inc., 51 
 12  # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 13  # 
 14  # Copyright Buildbot Team Members 
 15   
 16  import time 
 17  import traceback 
 18  import inspect 
 19  import shutil 
 20  import os 
 21  import sqlalchemy as sa 
 22  import twisted 
 23  import tempfile 
 24  from buildbot.process import metrics 
 25  from twisted.internet import reactor, threads, defer 
 26  from twisted.python import threadpool, failure, versions, log 
 27   
 28  # set this to True for *very* verbose query debugging output; this can 
 29  # be monkey-patched from master.cfg, too: 
 30  #     from buildbot.db import pool 
 31  #     pool.debug = True 
 32  debug = False 
 33  _debug_id = 1 
 34   
35 -def timed_do_fn(f):
36 """Decorate a do function to log before, after, and elapsed time, 37 with the name of the calling function. This is not speedy!""" 38 def wrap(callable, *args, **kwargs): 39 global _debug_id 40 41 # get a description of the function that called us 42 st = traceback.extract_stack(limit=2) 43 file, line, name, _ = st[0] 44 45 # and its locals 46 frame = inspect.currentframe(1) 47 locals = frame.f_locals 48 49 # invent a unique ID for the description 50 id, _debug_id = _debug_id, _debug_id+1 51 52 descr = "%s-%08x" % (name, id) 53 54 start_time = time.time() 55 log.msg("%s - before ('%s' line %d)" % (descr, file, line)) 56 for name in locals: 57 if name in ('self', 'thd'): 58 continue 59 log.msg("%s - %s = %r" % (descr, name, locals[name])) 60 61 # wrap the callable to log the begin and end of the actual thread 62 # function 63 def callable_wrap(*args, **kargs): 64 log.msg("%s - thd start" % (descr,)) 65 try: 66 return callable(*args, **kwargs) 67 finally: 68 log.msg("%s - thd end" % (descr,))
69 d = f(callable_wrap, *args, **kwargs) 70 71 def after(x): 72 end_time = time.time() 73 elapsed = (end_time - start_time) * 1000 74 log.msg("%s - after (%0.2f ms elapsed)" % (descr, elapsed)) 75 return x 76 d.addBoth(after) 77 return d 78 wrap.__name__ = f.__name__ 79 wrap.__doc__ = f.__doc__ 80 return wrap 81
82 -class DBThreadPool(threadpool.ThreadPool):
83 84 running = False 85 86 # Some versions of SQLite incorrectly cache metadata about which tables are 87 # and are not present on a per-connection basis. This cache can be flushed 88 # by querying the sqlite_master table. We currently assume all versions of 89 # SQLite have this bug, although it has only been observed in 3.4.2. A 90 # dynamic check for this bug would be more appropriate. This is documented 91 # in bug #1810. 92 __broken_sqlite = False 93
94 - def __init__(self, engine, verbose=False):
95 # verbose is used by upgrade scripts, and if it is set we should print 96 # messages about versions and other warnings 97 log_msg = log.msg 98 if verbose: 99 def log_msg(m): 100 print m
101 102 pool_size = 5 103 104 # If the engine has an C{optimal_thread_pool_size} attribute, then the 105 # maxthreads of the thread pool will be set to that value. This is 106 # most useful for SQLite in-memory connections, where exactly one 107 # connection (and thus thread) should be used. 108 if hasattr(engine, 'optimal_thread_pool_size'): 109 pool_size = engine.optimal_thread_pool_size 110 111 threadpool.ThreadPool.__init__(self, 112 minthreads=1, 113 maxthreads=pool_size, 114 name='DBThreadPool') 115 self.engine = engine 116 if engine.dialect.name == 'sqlite': 117 vers = self.get_sqlite_version() 118 if vers < (3,7): 119 log_msg("Using SQLite Version %s" % (vers,)) 120 log_msg("NOTE: this old version of SQLite does not support " 121 "WAL journal mode; a busy master may encounter " 122 "'Database is locked' errors. Consider upgrading.") 123 if vers < (3,4): 124 log_msg("NOTE: this old version of SQLite is not " 125 "supported.") 126 raise RuntimeError("unsupported SQLite version") 127 brkn = self.__broken_sqlite = self.detect_bug1810() 128 if brkn: 129 log_msg("Applying SQLite workaround from Buildbot bug #1810") 130 self._start_evt = reactor.callWhenRunning(self._start) 131 132 # patch the do methods to do verbose logging if necessary 133 if debug: 134 self.do = timed_do_fn(self.do) 135 self.do_with_engine = timed_do_fn(self.do_with_engine)
136
137 - def _start(self):
138 self._start_evt = None 139 if not self.running: 140 self.start() 141 self._stop_evt = reactor.addSystemEventTrigger( 142 'during', 'shutdown', self._stop) 143 self.running = True
144
145 - def _stop(self):
146 self._stop_evt = None 147 self.stop() 148 self.engine.dispose() 149 self.running = False
150
151 - def shutdown(self):
152 """Manually stop the pool. This is only necessary from tests, as the 153 pool will stop itself when the reactor stops under normal 154 circumstances.""" 155 if not self._stop_evt: 156 return # pool is already stopped 157 reactor.removeSystemEventTrigger(self._stop_evt) 158 self._stop()
159 160 # Try about 170 times over the space of a day, with the last few tries 161 # being about an hour apart. This is designed to span a reasonable amount 162 # of time for repairing a broken database server, while still failing 163 # actual problematic queries eventually 164 BACKOFF_START = 1.0 165 BACKOFF_MULT = 1.05 166 MAX_OPERATIONALERROR_TIME = 3600*24 # one day
167 - def __thd(self, with_engine, callable, args, kwargs):
168 # try to call callable(arg, *args, **kwargs) repeatedly until no 169 # OperationalErrors occur, where arg is either the engine (with_engine) 170 # or a connection (not with_engine) 171 backoff = self.BACKOFF_START 172 start = time.time() 173 while True: 174 if with_engine: 175 arg = self.engine 176 else: 177 arg = self.engine.contextual_connect() 178 179 if self.__broken_sqlite: # see bug #1810 180 arg.execute("select * from sqlite_master") 181 try: 182 try: 183 rv = callable(arg, *args, **kwargs) 184 assert not isinstance(rv, sa.engine.ResultProxy), \ 185 "do not return ResultProxy objects!" 186 except sa.exc.OperationalError, e: 187 text = e.orig.args[0] 188 if not isinstance(text, basestring): 189 raise 190 if "Lost connection" in text \ 191 or "database is locked" in text: 192 193 # see if we've retried too much 194 elapsed = time.time() - start 195 if elapsed > self.MAX_OPERATIONALERROR_TIME: 196 raise 197 198 metrics.MetricCountEvent.log( 199 "DBThreadPool.retry-on-OperationalError") 200 log.msg("automatically retrying query after " 201 "OperationalError (%ss sleep)" % backoff) 202 203 # sleep (remember, we're in a thread..) 204 time.sleep(backoff) 205 backoff *= self.BACKOFF_MULT 206 207 # and re-try 208 continue 209 else: 210 raise 211 finally: 212 if not with_engine: 213 arg.close() 214 break 215 return rv
216
217 - def do(self, callable, *args, **kwargs):
218 return threads.deferToThreadPool(reactor, self, 219 self.__thd, False, callable, args, kwargs)
220
221 - def do_with_engine(self, callable, *args, **kwargs):
222 return threads.deferToThreadPool(reactor, self, 223 self.__thd, True, callable, args, kwargs)
224 225 # older implementations for twisted < 0.8.2, which does not have 226 # deferToThreadPool; this basically re-implements it, although it gets some 227 # of the synchronization wrong - the thread may still be "in use" when the 228 # deferred fires in the parent, which can lead to database accesses hopping 229 # between threads. In practice, this should not cause any difficulty. 230 if twisted.version < versions.Version('twisted', 8, 2, 0):
231 - def __081_wrap(self, with_engine, callable, args, kwargs): # pragma: no cover
232 d = defer.Deferred() 233 def thd(): 234 try: 235 reactor.callFromThread(d.callback, 236 self.__thd(with_engine, callable, args, kwargs)) 237 except: 238 reactor.callFromThread(d.errback, 239 failure.Failure()) 240 self.callInThread(thd) 241 return d 242
243 - def do_081(self, callable, *args, **kwargs): # pragma: no cover
244 return self.__081_wrap(False, callable, args, kwargs) 245
246 - def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no cover
247 return self.__081_wrap(True, callable, args, kwargs) 248 249 do = do_081 250 do_with_engine = do_with_engine_081 251
252 - def detect_bug1810(self):
253 # detect buggy SQLite implementations; call only for a known-sqlite 254 # dialect 255 try: 256 import pysqlite2.dbapi2 as sqlite 257 sqlite = sqlite 258 except ImportError: 259 import sqlite3 as sqlite 260 261 tmpdir = tempfile.mkdtemp() 262 dbfile = os.path.join(tmpdir, "detect_bug1810.db") 263 def test(select_from_sqlite_master=False): 264 conn1 = None 265 conn2 = None 266 try: 267 conn1 = sqlite.connect(dbfile) 268 curs1 = conn1.cursor() 269 curs1.execute("PRAGMA table_info('foo')") 270 271 conn2 = sqlite.connect(dbfile) 272 curs2 = conn2.cursor() 273 curs2.execute("CREATE TABLE foo ( a integer )") 274 275 if select_from_sqlite_master: 276 curs1.execute("SELECT * from sqlite_master") 277 curs1.execute("SELECT * from foo") 278 finally: 279 if conn1: 280 conn1.close() 281 if conn2: 282 conn2.close() 283 os.unlink(dbfile)
284 285 try: 286 test() 287 except sqlite.OperationalError: 288 # this is the expected error indicating it's broken 289 shutil.rmtree(tmpdir) 290 return True 291 292 # but this version should not fail.. 293 test(select_from_sqlite_master=True) 294 shutil.rmtree(tmpdir) 295 return False # not broken - no workaround required 296
297 - def get_sqlite_version(self):
298 engine = sa.create_engine('sqlite://') 299 conn = engine.contextual_connect() 300 301 try: 302 r = conn.execute("SELECT sqlite_version()") 303 vers_row = r.fetchone() 304 r.close() 305 except: 306 return (0,) 307 308 if vers_row: 309 try: 310 return tuple(map(int, vers_row[0].split('.'))) 311 except (TypeError, ValueError): 312 return (0,) 313 else: 314 return (0,)
315