1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 import sys, os, cgi, re, time
41
42 from twisted.python import log, reflect
43 from twisted.internet import defer, reactor
44 from twisted.enterprise import adbapi
45
46 from buildbot.db.connector import DBConnector
47 from buildbot.db.exceptions import *
48 from buildbot import util
51 """
52 A Connection pool that expires connections after a certain amount of idle
53 time.
54 """
55 - def __init__(self, dbapiName, max_idle=60, *args, **kwargs):
56 """
57 @param max_idle: reconnect connections that have been idle more than
58 this number of seconds.
59 """
60
61 log.msg("Using expiring pool with max_idle=%i" % max_idle)
62
63 adbapi.ConnectionPool.__init__(self, dbapiName, *args, **kwargs)
64 self.max_idle = max_idle
65
66 self.connection_lastused = {}
67
69 tid = self.threadID()
70 now = util.now()
71 lastused = self.connection_lastused.get(tid)
72 if lastused and lastused + self.max_idle < now:
73 conn = self.connections.get(tid)
74 if self.noisy:
75 log.msg("expiring old connection")
76 self.disconnect(conn)
77
78 conn = adbapi.ConnectionPool.connect(self)
79 self.connection_lastused[tid] = now
80 return conn
81
83 adbapi.ConnectionPool.disconnect(self, conn)
84 tid = self.threadID()
85 del self.connection_lastused[tid]
86
90
92 max_retry_time = 1800
93 max_sleep_time = 1
94
98
101
103 start_time = util.now()
104 sleep_time = 0.1
105 while True:
106 try:
107 query_start_time = util.now()
108 result = self.cursor.execute(*args, **kw)
109 end_time = util.now()
110 if end_time - query_start_time > 2:
111 log.msg("Long query (%is): %s" % ((end_time - query_start_time), str((args, kw))))
112 return result
113 except self.dbapi.OperationalError, e:
114 if e.args[0] == 'database is locked':
115
116 log.msg("Retrying query %s" % str((args, kw)))
117 now = util.now()
118 if start_time + self.max_retry_time < now:
119 raise TimeoutError("Exceeded timeout trying to do %s" % str((args, kw)))
120 self.sleep(sleep_time)
121 sleep_time = max(self.max_sleep_time, sleep_time * 2)
122 continue
123 raise
124
127
130 self.dbapi = dbapi
131 self.conn = conn
132
135
137 return getattr(self.conn, name)
138
142
144 """
145 A specification for the database type and other connection parameters.
146 """
147
148
149 pool_args = ["max_idle"]
150 - def __init__(self, dbapiName, *connargs, **connkw):
151
152 if dbapiName == 'sqlite3':
153 dbapiName = self._get_sqlite_dbapi_name()
154
155 self.dbapiName = dbapiName
156 self.connargs = connargs
157 self.connkw = connkw
158
159 @classmethod
161 """
162 Parses a URL of the format
163 driver://[username:password@]host:port/database[?args]
164 and returns a DB object representing this URL. Percent-
165 substitution will be performed, replacing %(basedir)s with
166 the basedir argument.
167
168 raises ValueError on an invalid URL.
169 """
170 match = re.match(r"""
171 ^(?P<driver>\w+)://
172 (
173 ((?P<user>\w+)(:(?P<passwd>\S+))?@)?
174 ((?P<host>[-A-Za-z0-9.]+)(:(?P<port>\d+))?)?/
175 (?P<database>\S+?)(\?(?P<args>.*))?
176 )?$""", url, re.X)
177 if not match:
178 raise ValueError("Malformed url")
179
180 d = match.groupdict()
181 driver = d['driver']
182 user = d['user']
183 passwd = d['passwd']
184 host = d['host']
185 port = d['port']
186 if port is not None:
187 port = int(port)
188 database = d['database']
189 args = {}
190 if d['args']:
191 for key, value in cgi.parse_qsl(d['args']):
192 args[key] = value
193
194 if driver == "sqlite":
195
196 if not user == passwd == host == port == None:
197 raise ValueError("user, passwd, host, port must all be None")
198 if not database:
199 database = ":memory:"
200 else:
201 database = database % dict(basedir=basedir)
202 database = os.path.join(basedir, database)
203 return cls("sqlite3", database, **args)
204 elif driver == "mysql":
205 args['host'] = host
206 args['db'] = database
207 if user:
208 args['user'] = user
209 if passwd:
210 args['passwd'] = passwd
211 if port:
212 args['port'] = port
213 if 'max_idle' in args:
214 args['max_idle'] = int(args['max_idle'])
215
216 return cls("MySQLdb", use_unicode=True, charset="utf8", **args)
217 else:
218 raise ValueError("Unsupported dbapi %s" % driver)
219
221
222
223 sqlite_dbapi_name = None
224 try:
225 from pysqlite2 import dbapi2 as sqlite3
226 sqlite_dbapi_name = "pysqlite2.dbapi2"
227 except ImportError:
228
229 if sys.version_info >= (2,6):
230 import sqlite3
231 sqlite_dbapi_name = "sqlite3"
232 else:
233 raise
234 return sqlite_dbapi_name
235
237 """
238 Get the dbapi module used for this connection (for things like
239 exceptions and module-global attributes
240 """
241 return reflect.namedModule(self.dbapiName)
242
244 """
245 Get a synchronous connection to the specified database. This returns
246 a simple DBAPI connection object.
247 """
248 dbapi = self.get_dbapi()
249 connkw = self.connkw.copy()
250 for arg in self.pool_args:
251 if arg in connkw:
252 del connkw[arg]
253 conn = dbapi.connect(*self.connargs, **connkw)
254 if 'sqlite' in self.dbapiName:
255 conn = RetryingConnection(dbapi, conn)
256 return conn
257
259 """
260 Get an asynchronous (adbapi) connection pool for the specified
261 database.
262 """
263
264
265 connkw = self.connkw.copy()
266 connkw["cp_reconnect"] = True
267 connkw["cp_noisy"] = True
268
269
270
271
272
273 if 'sqlite' in self.dbapiName:
274 connkw['check_same_thread'] = False
275 log.msg("creating adbapi pool: %s %s %s" % \
276 (self.dbapiName, self.connargs, connkw))
277
278
279 if self.dbapiName == 'MySQLdb':
280 return ExpiringConnectionPool(self.dbapiName, *self.connargs, **connkw)
281 else:
282 return RetryingConnectionPool(self.dbapiName, *self.connargs, **connkw)
283
285 default = None
286 if self.dbapiName == "MySQLdb":
287 default = 60
288 return self.connkw.get("max_idle", default)
289