1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16  import time 
 17  import traceback 
 18  import shutil 
 19  import os 
 20  import sqlalchemy as sa 
 21  import twisted 
 22  import tempfile 
 23  from twisted.internet import reactor, threads, defer 
 24  from twisted.python import threadpool, failure, versions, log 
 25   
 26   
 27   
 28   
 29   
 30  debug = False 
 31   
 33      """Decorate a do function to log before, after, and elapsed time, 
 34      with the name of the calling function.  This is not speedy!""" 
 35      def wrap(*args, **kwargs): 
 36           
 37          st = traceback.extract_stack(limit=2) 
 38          file, line, name, _ = st[0] 
 39          descr = "%s ('%s' line %d)" % (name, file, line) 
 40   
 41          start_time = time.time() 
 42          log.msg("%s - before" % (descr,)) 
 43          d = f(*args, **kwargs) 
 44          def after(x): 
 45              end_time = time.time() 
 46              elapsed = (end_time - start_time) * 1000 
 47              log.msg("%s - after (%0.2f ms elapsed)" % (descr, elapsed)) 
 48              return x 
  49          d.addBoth(after) 
 50          return d 
 51      wrap.__name__ = f.__name__ 
 52      wrap.__doc__ = f.__doc__ 
 53      return wrap 
 54   
 56      """ 
 57      A pool of threads ready and waiting to execute queries. 
 58   
 59      If the engine has an C{optimal_thread_pool_size} attribute, then the 
 60      maxthreads of the thread pool will be set to that value.  This is most 
 61      useful for SQLite in-memory connections, where exactly one connection 
 62      (and thus thread) should be used. 
 63      """ 
 64   
 65      running = False 
 66   
 67       
 68       
 69       
 70       
 71       
 72       
 73      __broken_sqlite = False 
 74   
 76          pool_size = 5 
 77          if hasattr(engine, 'optimal_thread_pool_size'): 
 78              pool_size = engine.optimal_thread_pool_size 
 79          threadpool.ThreadPool.__init__(self, 
 80                          minthreads=1, 
 81                          maxthreads=pool_size, 
 82                          name='DBThreadPool') 
 83          self.engine = engine 
 84          if engine.dialect.name == 'sqlite': 
 85              vers = self.get_sqlite_version() 
 86              log.msg("Using SQLite Version %s" % (vers,)) 
 87              if vers < (3,3,17): 
 88                  log.msg("NOTE: this old version of SQLite does not support " 
 89                          "multiple simultaneous accesses to the database; " 
 90                          "add the 'pool_size=1' argument to your db url") 
 91              brkn = self.__broken_sqlite = self.detect_bug1810() 
 92              if brkn: 
 93                  log.msg("Applying SQLite workaround from Buildbot bug #1810") 
 94          self._start_evt = reactor.callWhenRunning(self._start) 
 95   
 96           
 97          if debug: 
 98              self.do = timed_do_fn(self.do) 
 99              self.do_with_engine = timed_do_fn(self.do_with_engine) 
 100   
102          self._start_evt = None 
103          if not self.running: 
104              self.start() 
105              self._stop_evt = reactor.addSystemEventTrigger( 
106                      'during', 'shutdown', self._stop) 
107              self.running = True 
 108   
110          self._stop_evt = None 
111          self.stop() 
112          self.engine.dispose() 
113          self.running = False 
 114   
116          """Manually stop the pool.  This is only necessary from tests, as the 
117          pool will stop itself when the reactor stops under normal 
118          circumstances.""" 
119          if not self._stop_evt: 
120              return  
121          reactor.removeSystemEventTrigger(self._stop_evt) 
122          self._stop() 
 123   
124 -    def do(self, callable, *args, **kwargs): 
 125          """ 
126          Call C{callable} in a thread, with a Connection as first argument. 
127          Returns a deferred that will indicate the results of the callable. 
128   
129          Note: do not return any SQLAlchemy objects via this deferred! 
130          """ 
131          def thd(): 
132              conn = self.engine.contextual_connect() 
133              if self.__broken_sqlite:  
134                  conn.execute("select * from sqlite_master") 
135              try: 
136                  rv = callable(conn, *args, **kwargs) 
137                  assert not isinstance(rv, sa.engine.ResultProxy), \ 
138                          "do not return ResultProxy objects!" 
139              finally: 
140                  conn.close() 
141              return rv 
 142          return threads.deferToThreadPool(reactor, self, thd) 
 143   
145          """ 
146          Like L{do}, but with an SQLAlchemy Engine as the first argument.  This 
147          is only used for schema manipulation, and is not used at master 
148          runtime. 
149          """ 
150          def thd(): 
151              if self.__broken_sqlite:  
152                  self.engine.execute("select * from sqlite_master") 
153              rv = callable(self.engine, *args, **kwargs) 
154              assert not isinstance(rv, sa.engine.ResultProxy), \ 
155                      "do not return ResultProxy objects!" 
156              return rv 
 157          return threads.deferToThreadPool(reactor, self, thd) 
158   
159       
160       
161       
162       
163       
164 -    def do_081(self, callable, *args, **kwargs):  
 165          d = defer.Deferred() 
166          def thd(): 
167              try: 
168                  conn = self.engine.contextual_connect() 
169                  if self.__broken_sqlite:  
170                      conn.execute("select * from sqlite_master") 
171                  try: 
172                      rv = callable(conn, *args, **kwargs) 
173                      assert not isinstance(rv, sa.engine.ResultProxy), \ 
174                              "do not return ResultProxy objects!" 
175                  finally: 
176                      conn.close() 
177                  reactor.callFromThread(d.callback, rv) 
178              except: 
179                  reactor.callFromThread(d.errback, failure.Failure()) 
180          self.callInThread(thd) 
181          return d 
183          d = defer.Deferred() 
184          def thd(): 
185              try: 
186                  conn = self.engine 
187                  if self.__broken_sqlite:  
188                      conn.execute("select * from sqlite_master") 
189                  rv = callable(conn, *args, **kwargs) 
190                  assert not isinstance(rv, sa.engine.ResultProxy), \ 
191                          "do not return ResultProxy objects!" 
192                  reactor.callFromThread(d.callback, rv) 
193              except: 
194                  reactor.callFromThread(d.errback, failure.Failure()) 
195          self.callInThread(thd) 
196          return d 
197   
198       
199      if twisted.version < versions.Version('twisted', 8, 2, 0): 
200          do = do_081 
201          do_with_engine = do_with_engine_081 
202   
204           
205           
206          try: 
207              import pysqlite2.dbapi2 as sqlite 
208              sqlite = sqlite 
209          except ImportError: 
210              import sqlite3 as sqlite 
211   
212          tmpdir = tempfile.mkdtemp() 
213          dbfile = os.path.join(tmpdir, "detect_bug1810.db") 
214          def test(select_from_sqlite_master=False): 
215              conn1 = None 
216              conn2 = None 
217              try: 
218                  conn1 = sqlite.connect(dbfile) 
219                  curs1 = conn1.cursor() 
220                  curs1.execute("PRAGMA table_info('foo')") 
221   
222                  conn2 = sqlite.connect(dbfile) 
223                  curs2 = conn2.cursor() 
224                  curs2.execute("CREATE TABLE foo ( a integer )") 
225   
226                  if select_from_sqlite_master: 
227                      curs1.execute("SELECT * from sqlite_master") 
228                  curs1.execute("SELECT * from foo") 
229              finally: 
230                  if conn1: 
231                      conn1.close() 
232                  if conn2: 
233                      conn2.close() 
234                  os.unlink(dbfile) 
 235   
236          try: 
237              test() 
238          except sqlite.OperationalError: 
239               
240              shutil.rmtree(tmpdir) 
241              return True 
242   
243           
244          test(select_from_sqlite_master=True) 
245          shutil.rmtree(tmpdir) 
246          return False  
247   
249          engine = sa.create_engine('sqlite://') 
250          conn = engine.contextual_connect() 
251   
252          try: 
253              r = conn.execute("SELECT sqlite_version()") 
254              vers_row = r.fetchone() 
255              r.close() 
256          except: 
257              return (0,) 
258   
259          if vers_row: 
260              try: 
261                  return tuple(map(int, vers_row[0].split('.'))) 
262              except (TypeError, ValueError): 
263                  return (0,) 
264          else: 
265              return (0,) 
 266