Package buildbot :: Package slave :: Package commands :: Module transfer
[frames] | no frames]

Source Code for Module buildbot.slave.commands.transfer

  1  import os, tarfile, tempfile 
  2   
  3  from twisted.python import log 
  4  from twisted.internet import reactor, defer 
  5   
  6  from buildbot.slave.commands.base import Command, command_version 
  7  from buildbot.slave.commands.registry import registerSlaveCommand 
  8   
9 -class SlaveFileUploadCommand(Command):
10 """ 11 Upload a file from slave to build master 12 Arguments: 13 14 - ['workdir']: base directory to use 15 - ['slavesrc']: name of the slave-side file to read from 16 - ['writer']: RemoteReference to a transfer._FileWriter object 17 - ['maxsize']: max size (in bytes) of file to write 18 - ['blocksize']: max size for each data block 19 """ 20 debug = False 21
22 - def setup(self, args):
23 self.workdir = args['workdir'] 24 self.filename = args['slavesrc'] 25 self.writer = args['writer'] 26 self.remaining = args['maxsize'] 27 self.blocksize = args['blocksize'] 28 self.stderr = None 29 self.rc = 0
30
31 - def start(self):
32 if self.debug: 33 log.msg('SlaveFileUploadCommand started') 34 35 # Open file 36 self.path = os.path.join(self.builder.basedir, 37 self.workdir, 38 os.path.expanduser(self.filename)) 39 try: 40 self.fp = open(self.path, 'rb') 41 if self.debug: 42 log.msg('Opened %r for upload' % self.path) 43 except: 44 # TODO: this needs cleanup 45 self.fp = None 46 self.stderr = 'Cannot open file %r for upload' % self.path 47 self.rc = 1 48 if self.debug: 49 log.msg('Cannot open file %r for upload' % self.path) 50 51 self.sendStatus({'header': "sending %s" % self.path}) 52 53 d = defer.Deferred() 54 self._reactor.callLater(0, self._loop, d) 55 def _close(res): 56 # close the file, but pass through any errors from _loop 57 d1 = self.writer.callRemote("close") 58 d1.addErrback(log.err) 59 d1.addCallback(lambda ignored: res) 60 return d1
61 d.addBoth(_close) 62 d.addBoth(self.finished) 63 return d
64
65 - def _loop(self, fire_when_done):
66 d = defer.maybeDeferred(self._writeBlock) 67 def _done(finished): 68 if finished: 69 fire_when_done.callback(None) 70 else: 71 self._loop(fire_when_done)
72 def _err(why): 73 fire_when_done.errback(why) 74 d.addCallbacks(_done, _err) 75 return None 76
77 - def _writeBlock(self):
78 """Write a block of data to the remote writer""" 79 80 if self.interrupted or self.fp is None: 81 if self.debug: 82 log.msg('SlaveFileUploadCommand._writeBlock(): end') 83 return True 84 85 length = self.blocksize 86 if self.remaining is not None and length > self.remaining: 87 length = self.remaining 88 89 if length <= 0: 90 if self.stderr is None: 91 self.stderr = 'Maximum filesize reached, truncating file %r' \ 92 % self.path 93 self.rc = 1 94 data = '' 95 else: 96 data = self.fp.read(length) 97 98 if self.debug: 99 log.msg('SlaveFileUploadCommand._writeBlock(): '+ 100 'allowed=%d readlen=%d' % (length, len(data))) 101 if len(data) == 0: 102 log.msg("EOF: callRemote(close)") 103 return True 104 105 if self.remaining is not None: 106 self.remaining = self.remaining - len(data) 107 assert self.remaining >= 0 108 d = self.writer.callRemote('write', data) 109 d.addCallback(lambda res: False) 110 return d
111
112 - def interrupt(self):
113 if self.debug: 114 log.msg('interrupted') 115 if self.interrupted: 116 return 117 if self.stderr is None: 118 self.stderr = 'Upload of %r interrupted' % self.path 119 self.rc = 1 120 self.interrupted = True
121 # the next _writeBlock call will notice the .interrupted flag 122
123 - def finished(self, res):
124 if self.debug: 125 log.msg('finished: stderr=%r, rc=%r' % (self.stderr, self.rc)) 126 if self.stderr is None: 127 self.sendStatus({'rc': self.rc}) 128 else: 129 self.sendStatus({'stderr': self.stderr, 'rc': self.rc}) 130 return res
131 132 registerSlaveCommand("uploadFile", SlaveFileUploadCommand, command_version) 133 134
135 -class SlaveDirectoryUploadCommand(SlaveFileUploadCommand):
136 """ 137 Upload a directory from slave to build master 138 Arguments: 139 140 - ['workdir']: base directory to use 141 - ['slavesrc']: name of the slave-side directory to read from 142 - ['writer']: RemoteReference to a transfer._DirectoryWriter object 143 - ['maxsize']: max size (in bytes) of file to write 144 - ['blocksize']: max size for each data block 145 - ['compress']: one of [None, 'bz2', 'gz'] 146 """ 147 debug = True 148
149 - def setup(self, args):
150 self.workdir = args['workdir'] 151 self.dirname = args['slavesrc'] 152 self.writer = args['writer'] 153 self.remaining = args['maxsize'] 154 self.blocksize = args['blocksize'] 155 self.compress = args['compress'] 156 self.stderr = None 157 self.rc = 0
158
159 - def start(self):
160 if self.debug: 161 log.msg('SlaveDirectoryUploadCommand started') 162 163 self.path = os.path.join(self.builder.basedir, 164 self.workdir, 165 os.path.expanduser(self.dirname)) 166 if self.debug: 167 log.msg("path: %r" % self.path) 168 169 # Create temporary archive 170 fd, self.tarname = tempfile.mkstemp() 171 fileobj = os.fdopen(fd, 'w') 172 if self.compress == 'bz2': 173 mode='w|bz2' 174 elif self.compress == 'gz': 175 mode='w|gz' 176 else: 177 mode = 'w' 178 archive = tarfile.open(name=self.tarname, mode=mode, fileobj=fileobj) 179 archive.add(self.path, '') 180 archive.close() 181 fileobj.close() 182 183 # Transfer it 184 self.fp = open(self.tarname, 'rb') 185 186 self.sendStatus({'header': "sending %s" % self.path}) 187 188 d = defer.Deferred() 189 self._reactor.callLater(0, self._loop, d) 190 def unpack(res): 191 # unpack the archive, but pass through any errors from _loop 192 d1 = self.writer.callRemote("unpack") 193 d1.addErrback(log.err) 194 d1.addCallback(lambda ignored: res) 195 return d1
196 d.addCallback(unpack) 197 d.addBoth(self.finished) 198 return d
199
200 - def finished(self, res):
201 self.fp.close() 202 os.remove(self.tarname) 203 if self.debug: 204 log.msg('finished: stderr=%r, rc=%r' % (self.stderr, self.rc)) 205 if self.stderr is None: 206 self.sendStatus({'rc': self.rc}) 207 else: 208 self.sendStatus({'stderr': self.stderr, 'rc': self.rc}) 209 return res
210 211 registerSlaveCommand("uploadDirectory", SlaveDirectoryUploadCommand, command_version) 212 213
214 -class SlaveFileDownloadCommand(Command):
215 """ 216 Download a file from master to slave 217 Arguments: 218 219 - ['workdir']: base directory to use 220 - ['slavedest']: name of the slave-side file to be created 221 - ['reader']: RemoteReference to a transfer._FileReader object 222 - ['maxsize']: max size (in bytes) of file to write 223 - ['blocksize']: max size for each data block 224 - ['mode']: access mode for the new file 225 """ 226 debug = False 227
228 - def setup(self, args):
229 self.workdir = args['workdir'] 230 self.filename = args['slavedest'] 231 self.reader = args['reader'] 232 self.bytes_remaining = args['maxsize'] 233 self.blocksize = args['blocksize'] 234 self.mode = args['mode'] 235 self.stderr = None 236 self.rc = 0
237
238 - def start(self):
239 if self.debug: 240 log.msg('SlaveFileDownloadCommand starting') 241 242 # Open file 243 self.path = os.path.join(self.builder.basedir, 244 self.workdir, 245 os.path.expanduser(self.filename)) 246 247 dirname = os.path.dirname(self.path) 248 if not os.path.exists(dirname): 249 os.makedirs(dirname) 250 251 try: 252 self.fp = open(self.path, 'wb') 253 if self.debug: 254 log.msg('Opened %r for download' % self.path) 255 if self.mode is not None: 256 # note: there is a brief window during which the new file 257 # will have the buildslave's default (umask) mode before we 258 # set the new one. Don't use this mode= feature to keep files 259 # private: use the buildslave's umask for that instead. (it 260 # is possible to call os.umask() before and after the open() 261 # call, but cleaning up from exceptions properly is more of a 262 # nuisance that way). 263 os.chmod(self.path, self.mode) 264 except IOError: 265 # TODO: this still needs cleanup 266 self.fp = None 267 self.stderr = 'Cannot open file %r for download' % self.path 268 self.rc = 1 269 if self.debug: 270 log.msg('Cannot open file %r for download' % self.path) 271 272 d = defer.Deferred() 273 self._reactor.callLater(0, self._loop, d) 274 def _close(res): 275 # close the file, but pass through any errors from _loop 276 d1 = self.reader.callRemote('close') 277 d1.addErrback(log.err) 278 d1.addCallback(lambda ignored: res) 279 return d1
280 d.addBoth(_close) 281 d.addBoth(self.finished) 282 return d
283
284 - def _loop(self, fire_when_done):
285 d = defer.maybeDeferred(self._readBlock) 286 def _done(finished): 287 if finished: 288 fire_when_done.callback(None) 289 else: 290 self._loop(fire_when_done)
291 def _err(why): 292 fire_when_done.errback(why) 293 d.addCallbacks(_done, _err) 294 return None 295
296 - def _readBlock(self):
297 """Read a block of data from the remote reader.""" 298 299 if self.interrupted or self.fp is None: 300 if self.debug: 301 log.msg('SlaveFileDownloadCommand._readBlock(): end') 302 return True 303 304 length = self.blocksize 305 if self.bytes_remaining is not None and length > self.bytes_remaining: 306 length = self.bytes_remaining 307 308 if length <= 0: 309 if self.stderr is None: 310 self.stderr = 'Maximum filesize reached, truncating file %r' \ 311 % self.path 312 self.rc = 1 313 return True 314 else: 315 d = self.reader.callRemote('read', length) 316 d.addCallback(self._writeData) 317 return d
318
319 - def _writeData(self, data):
320 if self.debug: 321 log.msg('SlaveFileDownloadCommand._readBlock(): readlen=%d' % 322 len(data)) 323 if len(data) == 0: 324 return True 325 326 if self.bytes_remaining is not None: 327 self.bytes_remaining = self.bytes_remaining - len(data) 328 assert self.bytes_remaining >= 0 329 self.fp.write(data) 330 return False
331
332 - def interrupt(self):
333 if self.debug: 334 log.msg('interrupted') 335 if self.interrupted: 336 return 337 if self.stderr is None: 338 self.stderr = 'Download of %r interrupted' % self.path 339 self.rc = 1 340 self.interrupted = True
341 # now we wait for the next read request to return. _readBlock will 342 # abandon the file when it sees self.interrupted set. 343
344 - def finished(self, res):
345 if self.fp is not None: 346 self.fp.close() 347 348 if self.debug: 349 log.msg('finished: stderr=%r, rc=%r' % (self.stderr, self.rc)) 350 if self.stderr is None: 351 self.sendStatus({'rc': self.rc}) 352 else: 353 self.sendStatus({'stderr': self.stderr, 'rc': self.rc}) 354 return res
355 356 registerSlaveCommand("downloadFile", SlaveFileDownloadCommand, command_version) 357