Package buildbot :: Package db :: Module enginestrategy
[frames] | no frames]

Source Code for Module buildbot.db.enginestrategy

  1  # This file is part of Buildbot.  Buildbot is free software: you can 
  2  # redistribute it and/or modify it under the terms of the GNU General Public 
  3  # License as published by the Free Software Foundation, version 2. 
  4  # 
  5  # This program is distributed in the hope that it will be useful, but WITHOUT 
  6  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 
  8  # details. 
  9  # 
 10  # You should have received a copy of the GNU General Public License along with 
 11  # this program; if not, write to the Free Software Foundation, Inc., 51 
 12  # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 13  # 
 14  # Copyright Buildbot Team Members 
 15   
 16  """ 
 17  A wrapper around `sqlalchemy.create_engine` that handles all of the 
 18  special cases that Buildbot needs.  Those include: 
 19   
 20   - pool_recycle for MySQL 
 21   - %(basedir) substitution 
 22   - optimal thread pool size calculation 
 23   
 24  """ 
 25   
 26  import os 
 27  import sqlalchemy as sa 
 28  from twisted.python import log 
 29  from sqlalchemy.engine import strategies, url 
 30  from sqlalchemy.pool import NullPool 
 31  from buildbot.util import sautils 
 32   
 33  # from http://www.mail-archive.com/sqlalchemy@googlegroups.com/msg15079.html 
34 -class ReconnectingListener(object):
35 - def __init__(self):
36 self.retried = False
37
38 -class BuildbotEngineStrategy(strategies.ThreadLocalEngineStrategy):
39 # A subclass of the ThreadLocalEngineStrategy that can effectively interact 40 # with Buildbot. 41 # 42 # This adjusts the passed-in parameters to ensure that we get the behaviors 43 # Buildbot wants from particular drivers, and wraps the outgoing Engine 44 # object so that its methods run in threads and return deferreds. 45 46 name = 'buildbot' 47
48 - def special_case_sqlite(self, u, kwargs):
49 """For sqlite, percent-substitute %(basedir)s and use a full 50 path to the basedir. If using a memory database, force the 51 pool size to be 1.""" 52 max_conns = None 53 54 # when given a database path, stick the basedir in there 55 if u.database: 56 57 # Use NullPool instead of the sqlalchemy-0.6.8-default 58 # SingletonThreadpool for sqlite to suppress the error in 59 # http://groups.google.com/group/sqlalchemy/msg/f8482e4721a89589, 60 # which also explains that NullPool is the new default in 61 # sqlalchemy 0.7 for non-memory SQLite databases. 62 kwargs.setdefault('poolclass', NullPool) 63 64 u.database = u.database % dict(basedir = kwargs['basedir']) 65 if not os.path.isabs(u.database[0]): 66 u.database = os.path.join(kwargs['basedir'], u.database) 67 68 # in-memory databases need exactly one connection 69 if not u.database: 70 kwargs['pool_size'] = 1 71 max_conns = 1 72 73 # allow serializing access to the db 74 if 'serialize_access' in u.query: 75 u.query.pop('serialize_access') 76 max_conns = 1 77 78 return u, kwargs, max_conns
79
80 - def set_up_sqlite_engine(self, u, engine):
81 """Special setup for sqlite engines""" 82 # try to enable WAL logging 83 if u.database: 84 def connect_listener(connection, record): 85 connection.execute("pragma checkpoint_fullfsync = off")
86 87 if sautils.sa_version() < (0,7,0): 88 class CheckpointFullfsyncDisabler(object): 89 pass
90 disabler = CheckpointFullfsyncDisabler() 91 disabler.connect = connect_listener 92 engine.pool.add_listener(disabler) 93 else: 94 sa.event.listen(engine.pool, 'connect', connect_listener) 95 96 log.msg("setting database journal mode to 'wal'") 97 try: 98 engine.execute("pragma journal_mode = wal") 99 except: 100 log.msg("failed to set journal mode - database may fail") 101
102 - def special_case_mysql(self, u, kwargs):
103 """For mysql, take max_idle out of the query arguments, and 104 use its value for pool_recycle. Also, force use_unicode and 105 charset to be True and 'utf8', failing if they were set to 106 anything else.""" 107 108 kwargs['pool_recycle'] = int(u.query.pop('max_idle', 3600)) 109 110 # default to the InnoDB storage engine 111 storage_engine = u.query.pop('storage_engine', 'MyISAM') 112 kwargs['connect_args'] = { 113 'init_command' : 'SET storage_engine=%s' % storage_engine, 114 } 115 116 if 'use_unicode' in u.query: 117 if u.query['use_unicode'] != "True": 118 raise TypeError("Buildbot requires use_unicode=True " + 119 "(and adds it automatically)") 120 else: 121 u.query['use_unicode'] = True 122 123 if 'charset' in u.query: 124 if u.query['charset'] != "utf8": 125 raise TypeError("Buildbot requires charset=utf8 " + 126 "(and adds it automatically)") 127 else: 128 u.query['charset'] = 'utf8' 129 130 return u, kwargs, None
131
132 - def set_up_mysql_engine(self, u, engine):
133 """Special setup for mysql engines""" 134 # add the reconnecting PoolListener that will detect a 135 # disconnected connection and automatically start a new 136 # one. This provides a measure of additional safety over 137 # the pool_recycle parameter, and is useful when e.g., the 138 # mysql server goes away 139 def checkout_listener(dbapi_con, con_record, con_proxy): 140 try: 141 cursor = dbapi_con.cursor() 142 cursor.execute("SELECT 1") 143 except dbapi_con.OperationalError, ex: 144 if ex.args[0] in (2006, 2013, 2014, 2045, 2055): 145 # sqlalchemy will re-create the connection 146 raise sa.exc.DisconnectionError() 147 raise
148 149 # older versions of sqlalchemy require the listener to be specified 150 # in the kwargs, in a class instance 151 if sautils.sa_version() < (0,7,0): 152 class ReconnectingListener(object): 153 pass 154 rcl = ReconnectingListener() 155 rcl.checkout = checkout_listener 156 engine.pool.add_listener(rcl) 157 else: 158 sa.event.listen(engine.pool, 'checkout', checkout_listener) 159
160 - def create(self, name_or_url, **kwargs):
161 if 'basedir' not in kwargs: 162 raise TypeError('no basedir supplied to create_engine') 163 164 max_conns = None 165 166 # apply special cases 167 u = url.make_url(name_or_url) 168 if u.drivername.startswith('sqlite'): 169 u, kwargs, max_conns = self.special_case_sqlite(u, kwargs) 170 elif u.drivername.startswith('mysql'): 171 u, kwargs, max_conns = self.special_case_mysql(u, kwargs) 172 173 # remove the basedir as it may confuse sqlalchemy 174 basedir = kwargs.pop('basedir') 175 176 # calculate the maximum number of connections from the pool parameters, 177 # if it hasn't already been specified 178 if max_conns is None: 179 max_conns = kwargs.get('pool_size', 5) + kwargs.get('max_overflow', 10) 180 181 engine = strategies.ThreadLocalEngineStrategy.create(self, 182 u, **kwargs) 183 184 # annotate the engine with the optimal thread pool size; this is used 185 # by DBConnector to configure the surrounding thread pool 186 engine.optimal_thread_pool_size = max_conns 187 188 # keep the basedir 189 engine.buildbot_basedir = basedir 190 191 if u.drivername.startswith('sqlite'): 192 self.set_up_sqlite_engine(u, engine) 193 elif u.drivername.startswith('mysql'): 194 self.set_up_mysql_engine(u, engine) 195 196 return engine
197 198 BuildbotEngineStrategy() 199 200 # this module is really imported for the side-effects, but pyflakes will like 201 # us to use something from the module -- so offer a copy of create_engine, 202 # which explicitly adds the strategy argument
203 -def create_engine(*args, **kwargs):
204 kwargs['strategy'] = 'buildbot' 205 206 return sa.create_engine(*args, **kwargs)
207