1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 import sys, os, cgi, re, time
17
18 from twisted.python import log, reflect
19 from twisted.enterprise import adbapi
20
21 from buildbot import util
24 """
25 A Connection pool that expires connections after a certain amount of idle
26 time.
27 """
28 - def __init__(self, dbapiName, max_idle=60, *args, **kwargs):
29 """
30 @param max_idle: reconnect connections that have been idle more than
31 this number of seconds.
32 """
33
34 log.msg("Using expiring pool with max_idle=%i" % max_idle)
35
36 adbapi.ConnectionPool.__init__(self, dbapiName, *args, **kwargs)
37 self.max_idle = max_idle
38
39 self.connection_lastused = {}
40
42 tid = self.threadID()
43 now = util.now()
44 lastused = self.connection_lastused.get(tid)
45 if lastused and lastused + self.max_idle < now:
46 conn = self.connections.get(tid)
47 if self.noisy:
48 log.msg("expiring old connection")
49 self.disconnect(conn)
50
51 conn = adbapi.ConnectionPool.connect(self)
52 self.connection_lastused[tid] = now
53 return conn
54
56 adbapi.ConnectionPool.disconnect(self, conn)
57 tid = self.threadID()
58 del self.connection_lastused[tid]
59
63
65 max_retry_time = 1800
66 max_sleep_time = 1
67
71
74
76 start_time = util.now()
77 sleep_time = 0.1
78 while True:
79 try:
80 query_start_time = util.now()
81 result = self.cursor.execute(*args, **kw)
82 end_time = util.now()
83 if end_time - query_start_time > 2:
84 log.msg("Long query (%is): %s" % ((end_time - query_start_time), str((args, kw))))
85 return result
86 except self.dbapi.OperationalError, e:
87 if e.args[0] == 'database is locked':
88
89 log.msg("Retrying query %s" % str((args, kw)))
90 now = util.now()
91 if start_time + self.max_retry_time < now:
92 raise TimeoutError("Exceeded timeout trying to do %s" % str((args, kw)))
93 self.sleep(sleep_time)
94 sleep_time = max(self.max_sleep_time, sleep_time * 2)
95 continue
96 raise
97
100
103 self.dbapi = dbapi
104 self.conn = conn
105
108
110 return getattr(self.conn, name)
111
115
117 """
118 A specification for the database type and other connection parameters.
119 """
120
121
122 pool_args = ["max_idle"]
123 - def __init__(self, dbapiName, *connargs, **connkw):
124
125 if dbapiName == 'sqlite3':
126 dbapiName = self._get_sqlite_dbapi_name()
127
128 self.dbapiName = dbapiName
129 self.connargs = connargs
130 self.connkw = connkw
131
132 @classmethod
134 """
135 Parses a URL of the format
136 driver://[username:password@]host:port/database[?args]
137 and returns a DB object representing this URL. Percent-
138 substitution will be performed, replacing %(basedir)s with
139 the basedir argument.
140
141 raises ValueError on an invalid URL.
142 """
143 match = re.match(r"""
144 ^(?P<driver>\w+)://
145 (
146 ((?P<user>\w+)(:(?P<passwd>\S+))?@)?
147 ((?P<host>[-A-Za-z0-9.]+)(:(?P<port>\d+))?)?/
148 (?P<database>\S+?)(\?(?P<args>.*))?
149 )?$""", url, re.X)
150 if not match:
151 raise ValueError("Malformed url")
152
153 d = match.groupdict()
154 driver = d['driver']
155 user = d['user']
156 passwd = d['passwd']
157 host = d['host']
158 port = d['port']
159 if port is not None:
160 port = int(port)
161 database = d['database']
162 args = {}
163 if d['args']:
164 for key, value in cgi.parse_qsl(d['args']):
165 args[key] = value
166
167 if driver == "sqlite":
168
169 if not user == passwd == host == port == None:
170 raise ValueError("user, passwd, host, port must all be None")
171 if not database:
172 database = ":memory:"
173 elif basedir:
174 database = database % dict(basedir=basedir)
175 database = os.path.join(basedir, database)
176 return cls("sqlite3", database, **args)
177 elif driver == "mysql":
178 args['host'] = host
179 args['db'] = database
180 if user:
181 args['user'] = user
182 if passwd:
183 args['passwd'] = passwd
184 if port:
185 args['port'] = port
186 if 'max_idle' in args:
187 args['max_idle'] = int(args['max_idle'])
188
189 return cls("MySQLdb", use_unicode=True, charset="utf8", **args)
190 else:
191 raise ValueError("Unsupported dbapi %s" % driver)
192
194
195
196 sqlite_dbapi_name = None
197 try:
198 from pysqlite2 import dbapi2 as sqlite3
199 assert sqlite3
200 sqlite_dbapi_name = "pysqlite2.dbapi2"
201 except ImportError:
202
203 if sys.version_info >= (2,6):
204 import sqlite3
205 assert sqlite3
206 sqlite_dbapi_name = "sqlite3"
207 else:
208 raise
209 return sqlite_dbapi_name
210
212 """
213 Get the dbapi module used for this connection (for things like
214 exceptions and module-global attributes
215 """
216 return reflect.namedModule(self.dbapiName)
217
219 """
220 Get a synchronous connection to the specified database. This returns
221 a simple DBAPI connection object.
222 """
223 dbapi = self.get_dbapi()
224 connkw = self.connkw.copy()
225 for arg in self.pool_args:
226 if arg in connkw:
227 del connkw[arg]
228 conn = dbapi.connect(*self.connargs, **connkw)
229 if 'sqlite' in self.dbapiName:
230 conn = RetryingConnection(dbapi, conn)
231 return conn
232
234 """
235 Get an asynchronous (adbapi) connection pool for the specified
236 database.
237 """
238
239
240 connkw = self.connkw.copy()
241 connkw["cp_reconnect"] = True
242 connkw["cp_noisy"] = True
243
244
245
246
247
248 if 'sqlite' in self.dbapiName:
249 connkw['check_same_thread'] = False
250 log.msg("creating adbapi pool: %s %s %s" % \
251 (self.dbapiName, self.connargs, connkw))
252
253
254 if self.dbapiName == 'MySQLdb':
255 return ExpiringConnectionPool(self.dbapiName, *self.connargs, **connkw)
256 else:
257 return RetryingConnectionPool(self.dbapiName, *self.connargs, **connkw)
258
260 default = None
261 if self.dbapiName == "MySQLdb":
262 default = 60
263 return self.connkw.get("max_idle", default)
264