1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 import sys, os, cgi, re, time
41
42 from twisted.python import log, reflect
43 from twisted.enterprise import adbapi
44
45 from buildbot import util
48 """
49 A Connection pool that expires connections after a certain amount of idle
50 time.
51 """
52 - def __init__(self, dbapiName, max_idle=60, *args, **kwargs):
53 """
54 @param max_idle: reconnect connections that have been idle more than
55 this number of seconds.
56 """
57
58 log.msg("Using expiring pool with max_idle=%i" % max_idle)
59
60 adbapi.ConnectionPool.__init__(self, dbapiName, *args, **kwargs)
61 self.max_idle = max_idle
62
63 self.connection_lastused = {}
64
66 tid = self.threadID()
67 now = util.now()
68 lastused = self.connection_lastused.get(tid)
69 if lastused and lastused + self.max_idle < now:
70 conn = self.connections.get(tid)
71 if self.noisy:
72 log.msg("expiring old connection")
73 self.disconnect(conn)
74
75 conn = adbapi.ConnectionPool.connect(self)
76 self.connection_lastused[tid] = now
77 return conn
78
80 adbapi.ConnectionPool.disconnect(self, conn)
81 tid = self.threadID()
82 del self.connection_lastused[tid]
83
87
89 max_retry_time = 1800
90 max_sleep_time = 1
91
95
98
100 start_time = util.now()
101 sleep_time = 0.1
102 while True:
103 try:
104 query_start_time = util.now()
105 result = self.cursor.execute(*args, **kw)
106 end_time = util.now()
107 if end_time - query_start_time > 2:
108 log.msg("Long query (%is): %s" % ((end_time - query_start_time), str((args, kw))))
109 return result
110 except self.dbapi.OperationalError, e:
111 if e.args[0] == 'database is locked':
112
113 log.msg("Retrying query %s" % str((args, kw)))
114 now = util.now()
115 if start_time + self.max_retry_time < now:
116 raise TimeoutError("Exceeded timeout trying to do %s" % str((args, kw)))
117 self.sleep(sleep_time)
118 sleep_time = max(self.max_sleep_time, sleep_time * 2)
119 continue
120 raise
121
124
127 self.dbapi = dbapi
128 self.conn = conn
129
132
134 return getattr(self.conn, name)
135
139
141 """
142 A specification for the database type and other connection parameters.
143 """
144
145
146 pool_args = ["max_idle"]
147 - def __init__(self, dbapiName, *connargs, **connkw):
148
149 if dbapiName == 'sqlite3':
150 dbapiName = self._get_sqlite_dbapi_name()
151
152 self.dbapiName = dbapiName
153 self.connargs = connargs
154 self.connkw = connkw
155
156 @classmethod
158 """
159 Parses a URL of the format
160 driver://[username:password@]host:port/database[?args]
161 and returns a DB object representing this URL. Percent-
162 substitution will be performed, replacing %(basedir)s with
163 the basedir argument.
164
165 raises ValueError on an invalid URL.
166 """
167 match = re.match(r"""
168 ^(?P<driver>\w+)://
169 (
170 ((?P<user>\w+)(:(?P<passwd>\S+))?@)?
171 ((?P<host>[-A-Za-z0-9.]+)(:(?P<port>\d+))?)?/
172 (?P<database>\S+?)(\?(?P<args>.*))?
173 )?$""", url, re.X)
174 if not match:
175 raise ValueError("Malformed url")
176
177 d = match.groupdict()
178 driver = d['driver']
179 user = d['user']
180 passwd = d['passwd']
181 host = d['host']
182 port = d['port']
183 if port is not None:
184 port = int(port)
185 database = d['database']
186 args = {}
187 if d['args']:
188 for key, value in cgi.parse_qsl(d['args']):
189 args[key] = value
190
191 if driver == "sqlite":
192
193 if not user == passwd == host == port == None:
194 raise ValueError("user, passwd, host, port must all be None")
195 if not database:
196 database = ":memory:"
197 elif basedir:
198 database = database % dict(basedir=basedir)
199 database = os.path.join(basedir, database)
200 return cls("sqlite3", database, **args)
201 elif driver == "mysql":
202 args['host'] = host
203 args['db'] = database
204 if user:
205 args['user'] = user
206 if passwd:
207 args['passwd'] = passwd
208 if port:
209 args['port'] = port
210 if 'max_idle' in args:
211 args['max_idle'] = int(args['max_idle'])
212
213 return cls("MySQLdb", use_unicode=True, charset="utf8", **args)
214 else:
215 raise ValueError("Unsupported dbapi %s" % driver)
216
218
219
220 sqlite_dbapi_name = None
221 try:
222 from pysqlite2 import dbapi2 as sqlite3
223 sqlite_dbapi_name = "pysqlite2.dbapi2"
224 except ImportError:
225
226 if sys.version_info >= (2,6):
227 import sqlite3
228 sqlite_dbapi_name = "sqlite3"
229 else:
230 raise
231 return sqlite_dbapi_name
232
234 """
235 Get the dbapi module used for this connection (for things like
236 exceptions and module-global attributes
237 """
238 return reflect.namedModule(self.dbapiName)
239
241 """
242 Get a synchronous connection to the specified database. This returns
243 a simple DBAPI connection object.
244 """
245 dbapi = self.get_dbapi()
246 connkw = self.connkw.copy()
247 for arg in self.pool_args:
248 if arg in connkw:
249 del connkw[arg]
250 conn = dbapi.connect(*self.connargs, **connkw)
251 if 'sqlite' in self.dbapiName:
252 conn = RetryingConnection(dbapi, conn)
253 return conn
254
256 """
257 Get an asynchronous (adbapi) connection pool for the specified
258 database.
259 """
260
261
262 connkw = self.connkw.copy()
263 connkw["cp_reconnect"] = True
264 connkw["cp_noisy"] = True
265
266
267
268
269
270 if 'sqlite' in self.dbapiName:
271 connkw['check_same_thread'] = False
272 log.msg("creating adbapi pool: %s %s %s" % \
273 (self.dbapiName, self.connargs, connkw))
274
275
276 if self.dbapiName == 'MySQLdb':
277 return ExpiringConnectionPool(self.dbapiName, *self.connargs, **connkw)
278 else:
279 return RetryingConnectionPool(self.dbapiName, *self.connargs, **connkw)
280
282 default = None
283 if self.dbapiName == "MySQLdb":
284 default = 60
285 return self.connkw.get("max_idle", default)
286