Package buildslave :: Package commands :: Module transfer
[frames] | no frames]

Source Code for Module buildslave.commands.transfer

  1  import os, tarfile, tempfile 
  2   
  3  from twisted.python import log 
  4  from twisted.internet import defer 
  5   
  6  from buildslave.commands.base import Command, command_version 
  7   
8 -class SlaveFileUploadCommand(Command):
9 """ 10 Upload a file from slave to build master 11 Arguments: 12 13 - ['workdir']: base directory to use 14 - ['slavesrc']: name of the slave-side file to read from 15 - ['writer']: RemoteReference to a transfer._FileWriter object 16 - ['maxsize']: max size (in bytes) of file to write 17 - ['blocksize']: max size for each data block 18 """ 19 debug = False 20
21 - def setup(self, args):
22 self.workdir = args['workdir'] 23 self.filename = args['slavesrc'] 24 self.writer = args['writer'] 25 self.remaining = args['maxsize'] 26 self.blocksize = args['blocksize'] 27 self.stderr = None 28 self.rc = 0
29
30 - def start(self):
31 if self.debug: 32 log.msg('SlaveFileUploadCommand started') 33 34 # Open file 35 self.path = os.path.join(self.builder.basedir, 36 self.workdir, 37 os.path.expanduser(self.filename)) 38 try: 39 self.fp = open(self.path, 'rb') 40 if self.debug: 41 log.msg("Opened '%s' for upload" % self.path) 42 except: 43 self.fp = None 44 self.stderr = "Cannot open file '%s' for upload" % self.path 45 self.rc = 1 46 if self.debug: 47 log.msg("Cannot open file '%s' for upload" % self.path) 48 49 self.sendStatus({'header': "sending %s" % self.path}) 50 51 d = defer.Deferred() 52 self._reactor.callLater(0, self._loop, d) 53 def _close_ok(res): 54 self.fp = None 55 return self.writer.callRemote("close")
56 def _close_err(f): 57 self.fp = None 58 # call remote's close(), but keep the existing failure 59 d1 = self.writer.callRemote("close") 60 def eb(f2): 61 log.msg("ignoring error from remote close():") 62 log.err(f2)
63 d1.addErrback(eb) 64 d1.addBoth(lambda _ : f) # always return _loop failure 65 return d1 66 d.addCallbacks(_close_ok, _close_err) 67 d.addBoth(self.finished) 68 return d 69
70 - def _loop(self, fire_when_done):
71 d = defer.maybeDeferred(self._writeBlock) 72 def _done(finished): 73 if finished: 74 fire_when_done.callback(None) 75 else: 76 self._loop(fire_when_done)
77 def _err(why): 78 fire_when_done.errback(why) 79 d.addCallbacks(_done, _err) 80 return None 81
82 - def _writeBlock(self):
83 """Write a block of data to the remote writer""" 84 85 if self.interrupted or self.fp is None: 86 if self.debug: 87 log.msg('SlaveFileUploadCommand._writeBlock(): end') 88 return True 89 90 length = self.blocksize 91 if self.remaining is not None and length > self.remaining: 92 length = self.remaining 93 94 if length <= 0: 95 if self.stderr is None: 96 self.stderr = 'Maximum filesize reached, truncating file \'%s\'' \ 97 % self.path 98 self.rc = 1 99 data = '' 100 else: 101 data = self.fp.read(length) 102 103 if self.debug: 104 log.msg('SlaveFileUploadCommand._writeBlock(): '+ 105 'allowed=%d readlen=%d' % (length, len(data))) 106 if len(data) == 0: 107 log.msg("EOF: callRemote(close)") 108 return True 109 110 if self.remaining is not None: 111 self.remaining = self.remaining - len(data) 112 assert self.remaining >= 0 113 d = self.writer.callRemote('write', data) 114 d.addCallback(lambda res: False) 115 return d
116
117 - def interrupt(self):
118 if self.debug: 119 log.msg('interrupted') 120 if self.interrupted: 121 return 122 if self.stderr is None: 123 self.stderr = 'Upload of \'%s\' interrupted' % self.path 124 self.rc = 1 125 self.interrupted = True
126 # the next _writeBlock call will notice the .interrupted flag 127
128 - def finished(self, res):
129 if self.debug: 130 log.msg('finished: stderr=%r, rc=%r' % (self.stderr, self.rc)) 131 if self.stderr is None: 132 self.sendStatus({'rc': self.rc}) 133 else: 134 self.sendStatus({'stderr': self.stderr, 'rc': self.rc}) 135 return res
136 137
138 -class SlaveDirectoryUploadCommand(SlaveFileUploadCommand):
139 """ 140 Upload a directory from slave to build master 141 Arguments: 142 143 - ['workdir']: base directory to use 144 - ['slavesrc']: name of the slave-side directory to read from 145 - ['writer']: RemoteReference to a transfer._DirectoryWriter object 146 - ['maxsize']: max size (in bytes) of file to write 147 - ['blocksize']: max size for each data block 148 - ['compress']: one of [None, 'bz2', 'gz'] 149 """ 150 debug = False 151
152 - def setup(self, args):
153 self.workdir = args['workdir'] 154 self.dirname = args['slavesrc'] 155 self.writer = args['writer'] 156 self.remaining = args['maxsize'] 157 self.blocksize = args['blocksize'] 158 self.compress = args['compress'] 159 self.stderr = None 160 self.rc = 0
161
162 - def start(self):
163 if self.debug: 164 log.msg('SlaveDirectoryUploadCommand started') 165 166 self.path = os.path.join(self.builder.basedir, 167 self.workdir, 168 os.path.expanduser(self.dirname)) 169 if self.debug: 170 log.msg("path: %r" % self.path) 171 172 # Create temporary archive 173 fd, self.tarname = tempfile.mkstemp() 174 fileobj = os.fdopen(fd, 'w') 175 if self.compress == 'bz2': 176 mode='w|bz2' 177 elif self.compress == 'gz': 178 mode='w|gz' 179 else: 180 mode = 'w' 181 archive = tarfile.open(name=self.tarname, mode=mode, fileobj=fileobj) 182 archive.add(self.path, '') 183 archive.close() 184 fileobj.close() 185 186 # Transfer it 187 self.fp = open(self.tarname, 'rb') 188 189 self.sendStatus({'header': "sending %s" % self.path}) 190 191 d = defer.Deferred() 192 self._reactor.callLater(0, self._loop, d) 193 def unpack(res): 194 # unpack the archive, but pass through any errors from _loop 195 d1 = self.writer.callRemote("unpack") 196 d1.addErrback(log.err) 197 d1.addCallback(lambda ignored: res) 198 return d1
199 d.addCallback(unpack) 200 d.addBoth(self.finished) 201 return d
202
203 - def finished(self, res):
204 self.fp.close() 205 os.remove(self.tarname) 206 if self.debug: 207 log.msg('finished: stderr=%r, rc=%r' % (self.stderr, self.rc)) 208 if self.stderr is None: 209 self.sendStatus({'rc': self.rc}) 210 else: 211 self.sendStatus({'stderr': self.stderr, 'rc': self.rc}) 212 return res
213 214
215 -class SlaveFileDownloadCommand(Command):
216 """ 217 Download a file from master to slave 218 Arguments: 219 220 - ['workdir']: base directory to use 221 - ['slavedest']: name of the slave-side file to be created 222 - ['reader']: RemoteReference to a transfer._FileReader object 223 - ['maxsize']: max size (in bytes) of file to write 224 - ['blocksize']: max size for each data block 225 - ['mode']: access mode for the new file 226 """ 227 debug = False 228
229 - def setup(self, args):
230 self.workdir = args['workdir'] 231 self.filename = args['slavedest'] 232 self.reader = args['reader'] 233 self.bytes_remaining = args['maxsize'] 234 self.blocksize = args['blocksize'] 235 self.mode = args['mode'] 236 self.stderr = None 237 self.rc = 0
238
239 - def start(self):
240 if self.debug: 241 log.msg('SlaveFileDownloadCommand starting') 242 243 # Open file 244 self.path = os.path.join(self.builder.basedir, 245 self.workdir, 246 os.path.expanduser(self.filename)) 247 248 dirname = os.path.dirname(self.path) 249 if not os.path.exists(dirname): 250 os.makedirs(dirname) 251 252 try: 253 self.fp = open(self.path, 'wb') 254 if self.debug: 255 log.msg("Opened '%s' for download" % self.path) 256 if self.mode is not None: 257 # note: there is a brief window during which the new file 258 # will have the buildslave's default (umask) mode before we 259 # set the new one. Don't use this mode= feature to keep files 260 # private: use the buildslave's umask for that instead. (it 261 # is possible to call os.umask() before and after the open() 262 # call, but cleaning up from exceptions properly is more of a 263 # nuisance that way). 264 os.chmod(self.path, self.mode) 265 except IOError: 266 # TODO: this still needs cleanup 267 self.fp = None 268 self.stderr = "Cannot open file '%s' for download" % self.path 269 self.rc = 1 270 if self.debug: 271 log.msg("Cannot open file '%s' for download" % self.path) 272 273 d = defer.Deferred() 274 self._reactor.callLater(0, self._loop, d) 275 def _close(res): 276 # close the file, but pass through any errors from _loop 277 d1 = self.reader.callRemote('close') 278 d1.addErrback(log.err) # ignore errors closing a reader file 279 d1.addCallback(lambda ignored: res) 280 return d1
281 d.addBoth(_close) 282 d.addBoth(self.finished) 283 return d
284
285 - def _loop(self, fire_when_done):
286 d = defer.maybeDeferred(self._readBlock) 287 def _done(finished): 288 if finished: 289 fire_when_done.callback(None) 290 else: 291 self._loop(fire_when_done)
292 def _err(why): 293 fire_when_done.errback(why) 294 d.addCallbacks(_done, _err) 295 return None 296
297 - def _readBlock(self):
298 """Read a block of data from the remote reader.""" 299 300 if self.interrupted or self.fp is None: 301 if self.debug: 302 log.msg('SlaveFileDownloadCommand._readBlock(): end') 303 return True 304 305 length = self.blocksize 306 if self.bytes_remaining is not None and length > self.bytes_remaining: 307 length = self.bytes_remaining 308 309 if length <= 0: 310 if self.stderr is None: 311 self.stderr = "Maximum filesize reached, truncating file '%s'" \ 312 % self.path 313 self.rc = 1 314 return True 315 else: 316 d = self.reader.callRemote('read', length) 317 d.addCallback(self._writeData) 318 return d
319
320 - def _writeData(self, data):
321 if self.debug: 322 log.msg('SlaveFileDownloadCommand._readBlock(): readlen=%d' % 323 len(data)) 324 if len(data) == 0: 325 return True 326 327 if self.bytes_remaining is not None: 328 self.bytes_remaining = self.bytes_remaining - len(data) 329 assert self.bytes_remaining >= 0 330 self.fp.write(data) 331 return False
332
333 - def interrupt(self):
334 if self.debug: 335 log.msg('interrupted') 336 if self.interrupted: 337 return 338 if self.stderr is None: 339 self.stderr = "Download of '%s' interrupted" % self.path 340 self.rc = 1 341 self.interrupted = True
342 # now we wait for the next read request to return. _readBlock will 343 # abandon the file when it sees self.interrupted set. 344
345 - def finished(self, res):
346 if self.fp is not None: 347 self.fp.close() 348 349 if self.debug: 350 log.msg('finished: stderr=%r, rc=%r' % (self.stderr, self.rc)) 351 if self.stderr is None: 352 self.sendStatus({'rc': self.rc}) 353 else: 354 self.sendStatus({'stderr': self.stderr, 'rc': self.rc}) 355 return res
356