Package buildbot :: Package db :: Module pool
[frames] | no frames]

Source Code for Module buildbot.db.pool

  1  # This file is part of Buildbot.  Buildbot is free software: you can 
  2  # redistribute it and/or modify it under the terms of the GNU General Public 
  3  # License as published by the Free Software Foundation, version 2. 
  4  # 
  5  # This program is distributed in the hope that it will be useful, but WITHOUT 
  6  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 
  8  # details. 
  9  # 
 10  # You should have received a copy of the GNU General Public License along with 
 11  # this program; if not, write to the Free Software Foundation, Inc., 51 
 12  # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 13  # 
 14  # Copyright Buildbot Team Members 
 15   
 16  import time 
 17  import traceback 
 18  import inspect 
 19  import shutil 
 20  import os 
 21  import sqlalchemy as sa 
 22  import tempfile 
 23  from buildbot.process import metrics 
 24  from twisted.internet import reactor, threads 
 25  from twisted.python import threadpool, log 
 26   
 27  # set this to True for *very* verbose query debugging output; this can 
 28  # be monkey-patched from master.cfg, too: 
 29  #     from buildbot.db import pool 
 30  #     pool.debug = True 
 31  debug = False 
 32  _debug_id = 1 
 33   
34 -def timed_do_fn(f):
35 """Decorate a do function to log before, after, and elapsed time, 36 with the name of the calling function. This is not speedy!""" 37 def wrap(callable, *args, **kwargs): 38 global _debug_id 39 40 # get a description of the function that called us 41 st = traceback.extract_stack(limit=2) 42 file, line, name, _ = st[0] 43 44 # and its locals 45 frame = inspect.currentframe(1) 46 locals = frame.f_locals 47 48 # invent a unique ID for the description 49 id, _debug_id = _debug_id, _debug_id+1 50 51 descr = "%s-%08x" % (name, id) 52 53 start_time = time.time() 54 log.msg("%s - before ('%s' line %d)" % (descr, file, line)) 55 for name in locals: 56 if name in ('self', 'thd'): 57 continue 58 log.msg("%s - %s = %r" % (descr, name, locals[name])) 59 60 # wrap the callable to log the begin and end of the actual thread 61 # function 62 def callable_wrap(*args, **kargs): 63 log.msg("%s - thd start" % (descr,)) 64 try: 65 return callable(*args, **kwargs) 66 finally: 67 log.msg("%s - thd end" % (descr,))
68 d = f(callable_wrap, *args, **kwargs) 69 70 def after(x): 71 end_time = time.time() 72 elapsed = (end_time - start_time) * 1000 73 log.msg("%s - after (%0.2f ms elapsed)" % (descr, elapsed)) 74 return x 75 d.addBoth(after) 76 return d 77 wrap.__name__ = f.__name__ 78 wrap.__doc__ = f.__doc__ 79 return wrap 80
81 -class DBThreadPool(threadpool.ThreadPool):
82 83 running = False 84 85 # Some versions of SQLite incorrectly cache metadata about which tables are 86 # and are not present on a per-connection basis. This cache can be flushed 87 # by querying the sqlite_master table. We currently assume all versions of 88 # SQLite have this bug, although it has only been observed in 3.4.2. A 89 # dynamic check for this bug would be more appropriate. This is documented 90 # in bug #1810. 91 __broken_sqlite = False 92
93 - def __init__(self, engine, verbose=False):
94 # verbose is used by upgrade scripts, and if it is set we should print 95 # messages about versions and other warnings 96 log_msg = log.msg 97 if verbose: 98 def log_msg(m): 99 print m
100 101 pool_size = 5 102 103 # If the engine has an C{optimal_thread_pool_size} attribute, then the 104 # maxthreads of the thread pool will be set to that value. This is 105 # most useful for SQLite in-memory connections, where exactly one 106 # connection (and thus thread) should be used. 107 if hasattr(engine, 'optimal_thread_pool_size'): 108 pool_size = engine.optimal_thread_pool_size 109 110 threadpool.ThreadPool.__init__(self, 111 minthreads=1, 112 maxthreads=pool_size, 113 name='DBThreadPool') 114 self.engine = engine 115 if engine.dialect.name == 'sqlite': 116 vers = self.get_sqlite_version() 117 if vers < (3,7): 118 log_msg("Using SQLite Version %s" % (vers,)) 119 log_msg("NOTE: this old version of SQLite does not support " 120 "WAL journal mode; a busy master may encounter " 121 "'Database is locked' errors. Consider upgrading.") 122 if vers < (3,4): 123 log_msg("NOTE: this old version of SQLite is not " 124 "supported.") 125 raise RuntimeError("unsupported SQLite version") 126 brkn = self.__broken_sqlite = self.detect_bug1810() 127 if brkn: 128 log_msg("Applying SQLite workaround from Buildbot bug #1810") 129 self._start_evt = reactor.callWhenRunning(self._start) 130 131 # patch the do methods to do verbose logging if necessary 132 if debug: 133 self.do = timed_do_fn(self.do) 134 self.do_with_engine = timed_do_fn(self.do_with_engine)
135
136 - def _start(self):
137 self._start_evt = None 138 if not self.running: 139 self.start() 140 self._stop_evt = reactor.addSystemEventTrigger( 141 'during', 'shutdown', self._stop) 142 self.running = True
143
144 - def _stop(self):
145 self._stop_evt = None 146 self.stop() 147 self.engine.dispose() 148 self.running = False
149
150 - def shutdown(self):
151 """Manually stop the pool. This is only necessary from tests, as the 152 pool will stop itself when the reactor stops under normal 153 circumstances.""" 154 if not self._stop_evt: 155 return # pool is already stopped 156 reactor.removeSystemEventTrigger(self._stop_evt) 157 self._stop()
158 159 # Try about 170 times over the space of a day, with the last few tries 160 # being about an hour apart. This is designed to span a reasonable amount 161 # of time for repairing a broken database server, while still failing 162 # actual problematic queries eventually 163 BACKOFF_START = 1.0 164 BACKOFF_MULT = 1.05 165 MAX_OPERATIONALERROR_TIME = 3600*24 # one day
166 - def __thd(self, with_engine, callable, args, kwargs):
167 # try to call callable(arg, *args, **kwargs) repeatedly until no 168 # OperationalErrors occur, where arg is either the engine (with_engine) 169 # or a connection (not with_engine) 170 backoff = self.BACKOFF_START 171 start = time.time() 172 while True: 173 if with_engine: 174 arg = self.engine 175 else: 176 arg = self.engine.contextual_connect() 177 178 if self.__broken_sqlite: # see bug #1810 179 arg.execute("select * from sqlite_master") 180 try: 181 try: 182 rv = callable(arg, *args, **kwargs) 183 assert not isinstance(rv, sa.engine.ResultProxy), \ 184 "do not return ResultProxy objects!" 185 except sa.exc.OperationalError, e: 186 text = e.orig.args[0] 187 if not isinstance(text, basestring): 188 raise 189 if "Lost connection" in text \ 190 or "database is locked" in text: 191 192 # see if we've retried too much 193 elapsed = time.time() - start 194 if elapsed > self.MAX_OPERATIONALERROR_TIME: 195 raise 196 197 metrics.MetricCountEvent.log( 198 "DBThreadPool.retry-on-OperationalError") 199 log.msg("automatically retrying query after " 200 "OperationalError (%ss sleep)" % backoff) 201 202 # sleep (remember, we're in a thread..) 203 time.sleep(backoff) 204 backoff *= self.BACKOFF_MULT 205 206 # and re-try 207 continue 208 else: 209 raise 210 finally: 211 if not with_engine: 212 arg.close() 213 break 214 return rv
215
216 - def do(self, callable, *args, **kwargs):
217 return threads.deferToThreadPool(reactor, self, 218 self.__thd, False, callable, args, kwargs)
219
220 - def do_with_engine(self, callable, *args, **kwargs):
221 return threads.deferToThreadPool(reactor, self, 222 self.__thd, True, callable, args, kwargs)
223
224 - def detect_bug1810(self):
225 # detect buggy SQLite implementations; call only for a known-sqlite 226 # dialect 227 try: 228 import pysqlite2.dbapi2 as sqlite 229 sqlite = sqlite 230 except ImportError: 231 import sqlite3 as sqlite 232 233 tmpdir = tempfile.mkdtemp() 234 dbfile = os.path.join(tmpdir, "detect_bug1810.db") 235 def test(select_from_sqlite_master=False): 236 conn1 = None 237 conn2 = None 238 try: 239 conn1 = sqlite.connect(dbfile) 240 curs1 = conn1.cursor() 241 curs1.execute("PRAGMA table_info('foo')") 242 243 conn2 = sqlite.connect(dbfile) 244 curs2 = conn2.cursor() 245 curs2.execute("CREATE TABLE foo ( a integer )") 246 247 if select_from_sqlite_master: 248 curs1.execute("SELECT * from sqlite_master") 249 curs1.execute("SELECT * from foo") 250 finally: 251 if conn1: 252 conn1.close() 253 if conn2: 254 conn2.close() 255 os.unlink(dbfile)
256 257 try: 258 test() 259 except sqlite.OperationalError: 260 # this is the expected error indicating it's broken 261 shutil.rmtree(tmpdir) 262 return True 263 264 # but this version should not fail.. 265 test(select_from_sqlite_master=True) 266 shutil.rmtree(tmpdir) 267 return False # not broken - no workaround required 268
269 - def get_sqlite_version(self):
270 engine = sa.create_engine('sqlite://') 271 conn = engine.contextual_connect() 272 273 try: 274 r = conn.execute("SELECT sqlite_version()") 275 vers_row = r.fetchone() 276 r.close() 277 except: 278 return (0,) 279 280 if vers_row: 281 try: 282 return tuple(map(int, vers_row[0].split('.'))) 283 except (TypeError, ValueError): 284 return (0,) 285 else: 286 return (0,)
287