Package buildbot :: Package db :: Module state
[frames] | no frames]

Source Code for Module buildbot.db.state

  1  # This file is part of Buildbot.  Buildbot is free software: you can 
  2  # redistribute it and/or modify it under the terms of the GNU General Public 
  3  # License as published by the Free Software Foundation, version 2. 
  4  # 
  5  # This program is distributed in the hope that it will be useful, but WITHOUT 
  6  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 
  8  # details. 
  9  # 
 10  # You should have received a copy of the GNU General Public License along with 
 11  # this program; if not, write to the Free Software Foundation, Inc., 51 
 12  # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 13  # 
 14  # Copyright Buildbot Team Members 
 15   
 16  from buildbot.util import json 
 17  import sqlalchemy as sa 
 18  import sqlalchemy.exc 
 19  from buildbot.db import base 
20 21 -class _IdNotFoundError(Exception):
22 pass # used internally
23
24 -class ObjDict(dict):
25 pass
26
27 -class StateConnectorComponent(base.DBConnectorComponent):
28 # Documentation is in developer/database.rst 29
30 - def getObjectId(self, name, class_name):
31 # defer to a cached method that only takes one parameter (a tuple) 32 return self._getObjectId((name, class_name) 33 ).addCallback(lambda objdict : objdict['id'])
34 35 @base.cached('objectids')
36 - def _getObjectId(self, name_class_name_tuple):
37 name, class_name = name_class_name_tuple 38 def thd(conn): 39 objects_tbl = self.db.model.objects 40 41 self.check_length(objects_tbl.c.name, name) 42 self.check_length(objects_tbl.c.class_name, class_name) 43 44 def select(): 45 q = sa.select([ objects_tbl.c.id ], 46 whereclause=((objects_tbl.c.name == name) 47 & (objects_tbl.c.class_name == class_name))) 48 res = conn.execute(q) 49 row = res.fetchone() 50 res.close() 51 if not row: 52 raise _IdNotFoundError 53 return row.id
54 55 def insert(): 56 res = conn.execute(objects_tbl.insert(), 57 name=name, 58 class_name=class_name) 59 return res.inserted_primary_key[0]
60 61 # we want to try selecting, then inserting, but if the insert fails 62 # then try selecting again. We include an invocation of a hook 63 # method to allow tests to exercise this particular behavior 64 try: 65 return ObjDict(id=select()) 66 except _IdNotFoundError: 67 pass 68 69 self._test_timing_hook(conn) 70 71 try: 72 return ObjDict(id=insert()) 73 except (sqlalchemy.exc.IntegrityError, 74 sqlalchemy.exc.ProgrammingError): 75 pass 76 77 return ObjDict(id=select()) 78 79 return self.db.pool.do(thd) 80
81 - class Thunk: pass
82 - def getState(self, objectid, name, default=Thunk):
83 def thd(conn): 84 object_state_tbl = self.db.model.object_state 85 86 q = sa.select([ object_state_tbl.c.value_json ], 87 whereclause=((object_state_tbl.c.objectid == objectid) 88 & (object_state_tbl.c.name == name))) 89 res = conn.execute(q) 90 row = res.fetchone() 91 res.close() 92 93 if not row: 94 if default is self.Thunk: 95 raise KeyError("no such state value '%s' for object %d" % 96 (name, objectid)) 97 return default 98 try: 99 return json.loads(row.value_json) 100 except: 101 raise TypeError("JSON error loading state value '%s' for %d" % 102 (name, objectid))
103 return self.db.pool.do(thd) 104
105 - def setState(self, objectid, name, value):
106 def thd(conn): 107 object_state_tbl = self.db.model.object_state 108 109 try: 110 value_json = json.dumps(value) 111 except: 112 raise TypeError("Error encoding JSON for %r" % (value,)) 113 114 self.check_length(object_state_tbl.c.name, name) 115 116 def update(): 117 q = object_state_tbl.update( 118 whereclause=((object_state_tbl.c.objectid == objectid) 119 & (object_state_tbl.c.name == name))) 120 res = conn.execute(q, value_json=value_json) 121 122 # check whether that worked 123 return res.rowcount > 0
124 125 def insert(): 126 conn.execute(object_state_tbl.insert(), 127 objectid=objectid, 128 name=name, 129 value_json=value_json) 130 131 # try updating; if that fails, try inserting; if that fails, then 132 # we raced with another instance to insert, so let that instance 133 # win. 134 135 if update(): 136 return 137 138 self._test_timing_hook(conn) 139 140 try: 141 insert() 142 except (sqlalchemy.exc.IntegrityError, sqlalchemy.exc.ProgrammingError): 143 pass # someone beat us to it - oh well 144 145 return self.db.pool.do(thd) 146
147 - def _test_timing_hook(self, conn):
148 # called so tests can simulate another process inserting a database row 149 # at an inopportune moment 150 pass
151