1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16  import time 
 17  import traceback 
 18  import inspect 
 19  import shutil 
 20  import os 
 21  import sqlalchemy as sa 
 22  import twisted 
 23  import tempfile 
 24  from buildbot.process import metrics 
 25  from twisted.internet import reactor, threads, defer 
 26  from twisted.python import threadpool, failure, versions, log 
 27   
 28   
 29   
 30   
 31   
 32  debug = False 
 33  _debug_id = 1 
 34   
 36      """Decorate a do function to log before, after, and elapsed time, 
 37      with the name of the calling function.  This is not speedy!""" 
 38      def wrap(callable, *args, **kwargs): 
 39          global _debug_id 
 40   
 41           
 42          st = traceback.extract_stack(limit=2) 
 43          file, line, name, _ = st[0] 
 44   
 45           
 46          frame = inspect.currentframe(1) 
 47          locals = frame.f_locals 
 48   
 49           
 50          id, _debug_id = _debug_id, _debug_id+1 
 51   
 52          descr = "%s-%08x" % (name, id) 
 53   
 54          start_time = time.time() 
 55          log.msg("%s - before ('%s' line %d)" % (descr, file, line)) 
 56          for name in locals: 
 57              if name in ('self', 'thd'): 
 58                  continue 
 59              log.msg("%s -   %s = %r" % (descr, name, locals[name])) 
 60   
 61           
 62           
 63          def callable_wrap(*args, **kargs): 
 64              log.msg("%s - thd start" % (descr,)) 
 65              try: 
 66                  return callable(*args, **kwargs) 
 67              finally: 
 68                  log.msg("%s - thd end" % (descr,)) 
  69          d = f(callable_wrap, *args, **kwargs) 
 70   
 71          def after(x): 
 72              end_time = time.time() 
 73              elapsed = (end_time - start_time) * 1000 
 74              log.msg("%s - after (%0.2f ms elapsed)" % (descr, elapsed)) 
 75              return x 
 76          d.addBoth(after) 
 77          return d 
 78      wrap.__name__ = f.__name__ 
 79      wrap.__doc__ = f.__doc__ 
 80      return wrap 
 81   
 83   
 84      running = False 
 85   
 86       
 87       
 88       
 89       
 90       
 91       
 92      __broken_sqlite = False 
 93   
 94 -    def __init__(self, engine, verbose=False): 
  95           
 96           
 97          log_msg = log.msg 
 98          if verbose: 
 99              def log_msg(m): 
100                  print m 
 101   
102          pool_size = 5 
103   
104           
105           
106           
107           
108          if hasattr(engine, 'optimal_thread_pool_size'): 
109              pool_size = engine.optimal_thread_pool_size 
110   
111          threadpool.ThreadPool.__init__(self, 
112                          minthreads=1, 
113                          maxthreads=pool_size, 
114                          name='DBThreadPool') 
115          self.engine = engine 
116          if engine.dialect.name == 'sqlite': 
117              vers = self.get_sqlite_version() 
118              if vers < (3,7): 
119                  log_msg("Using SQLite Version %s" % (vers,)) 
120                  log_msg("NOTE: this old version of SQLite does not support " 
121                          "WAL journal mode; a busy master may encounter " 
122                          "'Database is locked' errors.  Consider upgrading.") 
123                  if vers < (3,4): 
124                      log_msg("NOTE: this old version of SQLite is not " 
125                              "supported.") 
126                      raise RuntimeError("unsupported SQLite version") 
127              brkn = self.__broken_sqlite = self.detect_bug1810() 
128              if brkn: 
129                  log_msg("Applying SQLite workaround from Buildbot bug #1810") 
130          self._start_evt = reactor.callWhenRunning(self._start) 
131   
132           
133          if debug: 
134              self.do = timed_do_fn(self.do) 
135              self.do_with_engine = timed_do_fn(self.do_with_engine) 
 136   
138          self._start_evt = None 
139          if not self.running: 
140              self.start() 
141              self._stop_evt = reactor.addSystemEventTrigger( 
142                      'during', 'shutdown', self._stop) 
143              self.running = True 
 144   
146          self._stop_evt = None 
147          self.stop() 
148          self.engine.dispose() 
149          self.running = False 
 150   
152          """Manually stop the pool.  This is only necessary from tests, as the 
153          pool will stop itself when the reactor stops under normal 
154          circumstances.""" 
155          if not self._stop_evt: 
156              return  
157          reactor.removeSystemEventTrigger(self._stop_evt) 
158          self._stop() 
 159   
160       
161       
162       
163       
164      BACKOFF_START = 1.0 
165      BACKOFF_MULT = 1.05 
166      MAX_OPERATIONALERROR_TIME = 3600*24  
167 -    def __thd(self, with_engine, callable, args, kwargs): 
 168           
169           
170           
171          backoff = self.BACKOFF_START 
172          start = time.time() 
173          while True: 
174              if with_engine: 
175                  arg = self.engine 
176              else: 
177                  arg = self.engine.contextual_connect() 
178   
179              if self.__broken_sqlite:  
180                  arg.execute("select * from sqlite_master") 
181              try: 
182                  try: 
183                      rv = callable(arg, *args, **kwargs) 
184                      assert not isinstance(rv, sa.engine.ResultProxy), \ 
185                              "do not return ResultProxy objects!" 
186                  except sa.exc.OperationalError, e: 
187                      text = e.orig.args[0] 
188                      if not isinstance(text, basestring): 
189                          raise 
190                      if "Lost connection" in text \ 
191                          or "database is locked" in text: 
192   
193                           
194                          elapsed = time.time() - start 
195                          if elapsed > self.MAX_OPERATIONALERROR_TIME: 
196                              raise 
197   
198                          metrics.MetricCountEvent.log( 
199                                  "DBThreadPool.retry-on-OperationalError") 
200                          log.msg("automatically retrying query after " 
201                                  "OperationalError (%ss sleep)" % backoff) 
202   
203                           
204                          time.sleep(backoff) 
205                          backoff *= self.BACKOFF_MULT 
206   
207                           
208                          continue 
209                      else: 
210                          raise 
211              finally: 
212                  if not with_engine: 
213                      arg.close() 
214              break 
215          return rv 
 216   
217 -    def do(self, callable, *args, **kwargs): 
 218          return threads.deferToThreadPool(reactor, self, 
219                  self.__thd, False, callable, args, kwargs) 
 220   
222          return threads.deferToThreadPool(reactor, self, 
223                  self.__thd, True, callable, args, kwargs) 
 224   
225       
226       
227       
228       
229       
230      if twisted.version < versions.Version('twisted', 8, 2, 0): 
231 -        def __081_wrap(self, with_engine, callable, args, kwargs):  
 232              d = defer.Deferred() 
233              def thd(): 
234                  try: 
235                      reactor.callFromThread(d.callback, 
236                              self.__thd(with_engine, callable, args, kwargs)) 
237                  except: 
238                      reactor.callFromThread(d.errback, 
239                              failure.Failure()) 
240              self.callInThread(thd) 
241              return d 
242   
243 -        def do_081(self, callable, *args, **kwargs):  
 244              return self.__081_wrap(False, callable, args, kwargs) 
245   
247              return self.__081_wrap(True, callable, args, kwargs) 
248   
249          do = do_081 
250          do_with_engine = do_with_engine_081 
251   
253           
254           
255          try: 
256              import pysqlite2.dbapi2 as sqlite 
257              sqlite = sqlite 
258          except ImportError: 
259              import sqlite3 as sqlite 
260   
261          tmpdir = tempfile.mkdtemp() 
262          dbfile = os.path.join(tmpdir, "detect_bug1810.db") 
263          def test(select_from_sqlite_master=False): 
264              conn1 = None 
265              conn2 = None 
266              try: 
267                  conn1 = sqlite.connect(dbfile) 
268                  curs1 = conn1.cursor() 
269                  curs1.execute("PRAGMA table_info('foo')") 
270   
271                  conn2 = sqlite.connect(dbfile) 
272                  curs2 = conn2.cursor() 
273                  curs2.execute("CREATE TABLE foo ( a integer )") 
274   
275                  if select_from_sqlite_master: 
276                      curs1.execute("SELECT * from sqlite_master") 
277                  curs1.execute("SELECT * from foo") 
278              finally: 
279                  if conn1: 
280                      conn1.close() 
281                  if conn2: 
282                      conn2.close() 
283                  os.unlink(dbfile) 
 284   
285          try: 
286              test() 
287          except sqlite.OperationalError: 
288               
289              shutil.rmtree(tmpdir) 
290              return True 
291   
292           
293          test(select_from_sqlite_master=True) 
294          shutil.rmtree(tmpdir) 
295          return False  
296   
298          engine = sa.create_engine('sqlite://') 
299          conn = engine.contextual_connect() 
300   
301          try: 
302              r = conn.execute("SELECT sqlite_version()") 
303              vers_row = r.fetchone() 
304              r.close() 
305          except: 
306              return (0,) 
307   
308          if vers_row: 
309              try: 
310                  return tuple(map(int, vers_row[0].split('.'))) 
311              except (TypeError, ValueError): 
312                  return (0,) 
313          else: 
314              return (0,) 
 315