Package buildbot :: Package status :: Module status_push
[frames] | no frames]

Source Code for Module buildbot.status.status_push

  1  # This file is part of Buildbot.  Buildbot is free software: you can 
  2  # redistribute it and/or modify it under the terms of the GNU General Public 
  3  # License as published by the Free Software Foundation, version 2. 
  4  # 
  5  # This program is distributed in the hope that it will be useful, but WITHOUT 
  6  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 
  8  # details. 
  9  # 
 10  # You should have received a copy of the GNU General Public License along with 
 11  # this program; if not, write to the Free Software Foundation, Inc., 51 
 12  # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 13  # 
 14  # Copyright Buildbot Team Members 
 15   
 16   
 17  """Push events to an abstract receiver. 
 18   
 19  Implements the HTTP receiver.""" 
 20   
 21  import datetime 
 22  import os 
 23  import urllib 
 24  import urlparse 
 25   
 26  try: 
 27      import simplejson as json 
 28      assert json 
 29  except ImportError: 
 30      import json 
 31   
 32  from buildbot.status.base import StatusReceiverMultiService 
 33  from buildbot.status.persistent_queue import DiskQueue, IndexedQueue, \ 
 34          MemoryQueue, PersistentQueue 
 35  from buildbot.status.web.status_json import FilterOut 
 36  from twisted.internet import defer, reactor 
 37  from twisted.python import log 
 38  from twisted.web import client 
39 40 41 42 -class StatusPush(StatusReceiverMultiService):
43 """Event streamer to a abstract channel. 44 45 It uses IQueue to batch push requests and queue the data when 46 the receiver is down. 47 When a PersistentQueue object is used, the items are saved to disk on master 48 shutdown so they can be pushed back when the master is restarted. 49 """ 50
51 - def __init__(self, serverPushCb, queue=None, path=None, filter=True, 52 bufferDelay=1, retryDelay=5, blackList=None):
53 """ 54 @serverPushCb: callback to be used. It receives 'self' as parameter. It 55 should call self.queueNextServerPush() when it's done to queue the next 56 push. It is guaranteed that the queue is not empty when this function is 57 called. 58 @queue: a item queue that implements IQueue. 59 @path: path to save config. 60 @filter: when True (default), removes all "", None, False, [] or {} 61 entries. 62 @bufferDelay: amount of time events are queued before sending, to 63 reduce the number of push requests rate. This is the delay between the 64 end of a request to initializing a new one. 65 @retryDelay: amount of time between retries when no items were pushed on 66 last serverPushCb call. 67 @blackList: events that shouldn't be sent. 68 """ 69 StatusReceiverMultiService.__init__(self) 70 71 # Parameters. 72 self.queue = queue 73 if self.queue is None: 74 self.queue = MemoryQueue() 75 self.queue = IndexedQueue(self.queue) 76 self.path = path 77 self.filter = filter 78 self.bufferDelay = bufferDelay 79 self.retryDelay = retryDelay 80 if not callable(serverPushCb): 81 raise NotImplementedError('Please pass serverPushCb parameter.') 82 def hookPushCb(): 83 # Update the index so we know if the next push succeed or not, don't 84 # update the value when the queue is empty. 85 if not self.queue.nbItems(): 86 return 87 self.lastIndex = self.queue.getIndex() 88 return serverPushCb(self)
89 self.serverPushCb = hookPushCb 90 self.blackList = blackList 91 92 # Other defaults. 93 # IDelayedCall object that represents the next queued push. 94 self.task = None 95 self.stopped = False 96 self.lastIndex = -1 97 self.state = {} 98 self.state['started'] = str(datetime.datetime.utcnow()) 99 self.state['next_id'] = 1 100 self.state['last_id_pushed'] = 0 101 # Try to load back the state. 102 if self.path and os.path.isdir(self.path): 103 state_path = os.path.join(self.path, 'state') 104 if os.path.isfile(state_path): 105 self.state.update(json.load(open(state_path, 'r'))) 106 107 if self.queue.nbItems(): 108 # Last shutdown was not clean, don't wait to send events. 109 self.queueNextServerPush()
110
111 - def startService(self):
112 """Starting up.""" 113 StatusReceiverMultiService.startService(self) 114 self.status = self.parent.getStatus() 115 self.status.subscribe(self) 116 self.initialPush()
117
118 - def wasLastPushSuccessful(self):
119 """Returns if the "virtual pointer" in the queue advanced.""" 120 return self.lastIndex <= self.queue.getIndex()
121
122 - def queueNextServerPush(self):
123 """Queue the next push or call it immediately. 124 125 Called to signal new items are available to be sent or on shutdown. 126 A timer should be queued to trigger a network request or the callback 127 should be called immediately. If a status push is already queued, ignore 128 the current call.""" 129 # Determine the delay. 130 if self.wasLastPushSuccessful(): 131 if self.stopped: 132 # Shutting down. 133 delay = 0 134 else: 135 # Normal case. 136 delay = self.bufferDelay 137 else: 138 if self.stopped: 139 # Too bad, we can't do anything now, we're shutting down and the 140 # receiver is also down. We'll just save the objects to disk. 141 return 142 else: 143 # The server is inaccessible, retry less often. 144 delay = self.retryDelay 145 146 # Cleanup a previously queued task if necessary. 147 if self.task: 148 # Warning: we could be running inside the task. 149 if self.task.active(): 150 # There was already a task queue, don't requeue it, just let it 151 # go. 152 return 153 else: 154 if self.task.active(): 155 # There was a task queued but it is requested to call it 156 # *right now* so cancel it. 157 self.task.cancel() 158 # Otherwise, it was just a stray object. 159 self.task = None 160 161 # Do the queue/direct call. 162 if delay: 163 # Call in delay seconds. 164 self.task = reactor.callLater(delay, self.serverPushCb) 165 elif self.stopped: 166 if not self.queue.nbItems(): 167 return 168 # Call right now, we're shutting down. 169 @defer.deferredGenerator 170 def BlockForEverythingBeingSent(): 171 d = self.serverPushCb() 172 if d: 173 x = defer.waitForDeferred(d) 174 yield x 175 x.getResult()
176 return BlockForEverythingBeingSent() 177 else: 178 # delay should never be 0. That can cause Buildbot to spin tightly 179 # trying to push events that may not be received well by a status 180 # listener. 181 log.err('Did not expect delay to be 0, but it is.') 182 return 183
184 - def stopService(self):
185 """Shutting down.""" 186 self.finalPush() 187 self.stopped = True 188 if (self.task and self.task.active()): 189 # We don't have time to wait, force an immediate call. 190 self.task.cancel() 191 self.task = None 192 d = self.queueNextServerPush() 193 elif self.wasLastPushSuccessful(): 194 d = self.queueNextServerPush() 195 else: 196 d = defer.succeed(None) 197 198 # We're dying, make sure we save the results. 199 self.queue.save() 200 if self.path and os.path.isdir(self.path): 201 state_path = os.path.join(self.path, 'state') 202 json.dump(self.state, open(state_path, 'w'), sort_keys=True, 203 indent=2) 204 # Make sure all Deferreds are called on time and in a sane order. 205 defers = filter(None, [d, StatusReceiverMultiService.stopService(self)]) 206 return defer.DeferredList(defers)
207
208 - def push(self, event, **objs):
209 """Push a new event. 210 211 The new event will be either: 212 - Queued in memory to reduce network usage 213 - Queued to disk when the sink server is down 214 - Pushed (along the other queued items) to the server 215 """ 216 if self.blackList and event in self.blackList: 217 return 218 # First, generate the packet. 219 packet = {} 220 packet['id'] = self.state['next_id'] 221 self.state['next_id'] += 1 222 packet['timestamp'] = str(datetime.datetime.utcnow()) 223 packet['project'] = self.status.getTitle() 224 packet['started'] = self.state['started'] 225 packet['event'] = event 226 packet['payload'] = {} 227 for obj_name, obj in objs.items(): 228 if hasattr(obj, 'asDict'): 229 obj = obj.asDict() 230 if self.filter: 231 obj = FilterOut(obj) 232 packet['payload'][obj_name] = obj 233 self.queue.pushItem(packet) 234 if self.task is None or not self.task.active(): 235 # No task queued since it was probably idle, let's queue a task. 236 return self.queueNextServerPush()
237 238 #### Events 239
240 - def initialPush(self):
241 # Push everything we want to push from the initial configuration. 242 self.push('start', status=self.status)
243
244 - def finalPush(self):
245 self.push('shutdown', status=self.status)
246
247 - def requestSubmitted(self, request):
248 self.push('requestSubmitted', request=request)
249
250 - def requestCancelled(self, builder, request):
251 self.push('requestCancelled', builder=builder, request=request)
252
253 - def buildsetSubmitted(self, buildset):
254 self.push('buildsetSubmitted', buildset=buildset)
255
256 - def builderAdded(self, builderName, builder):
257 self.push('builderAdded', builderName=builderName, builder=builder) 258 return self
259
260 - def builderChangedState(self, builderName, state):
261 self.push('builderChangedState', builderName=builderName, state=state)
262
263 - def buildStarted(self, builderName, build):
264 self.push('buildStarted', build=build) 265 return self
266
267 - def buildETAUpdate(self, build, ETA):
268 self.push('buildETAUpdate', build=build, ETA=ETA)
269
270 - def stepStarted(self, build, step):
271 self.push('stepStarted', 272 properties=build.getProperties().asList(), 273 step=step)
274
275 - def stepTextChanged(self, build, step, text):
276 self.push('stepTextChanged', 277 properties=build.getProperties().asList(), 278 step=step, 279 text=text)
280
281 - def stepText2Changed(self, build, step, text2):
282 self.push('stepText2Changed', 283 properties=build.getProperties().asList(), 284 step=step, 285 text2=text2)
286
287 - def stepETAUpdate(self, build, step, ETA, expectations):
288 self.push('stepETAUpdate', 289 properties=build.getProperties().asList(), 290 step=step, 291 ETA=ETA, 292 expectations=expectations)
293
294 - def logStarted(self, build, step, log):
295 self.push('logStarted', 296 properties=build.getProperties().asList(), 297 step=step)
298
299 - def logFinished(self, build, step, log):
300 self.push('logFinished', 301 properties=build.getProperties().asList(), 302 step=step)
303
304 - def stepFinished(self, build, step, results):
305 self.push('stepFinished', 306 properties=build.getProperties().asList(), 307 step=step)
308
309 - def buildFinished(self, builderName, build, results):
310 self.push('buildFinished', build=build)
311
312 - def builderRemoved(self, builderName):
313 self.push('buildedRemoved', builderName=builderName)
314
315 - def changeAdded(self, change):
316 self.push('changeAdded', change=change)
317
318 - def slaveConnected(self, slavename):
319 self.push('slaveConnected', slave=self.status.getSlave(slavename))
320
321 - def slaveDisconnected(self, slavename):
322 self.push('slaveDisconnected', slavename=slavename)
323
324 325 -class HttpStatusPush(StatusPush):
326 """Event streamer to a HTTP server.""" 327
328 - def __init__(self, serverUrl, debug=None, maxMemoryItems=None, 329 maxDiskItems=None, chunkSize=200, maxHttpRequestSize=2**20, 330 **kwargs):
331 """ 332 @serverUrl: Base URL to be used to push events notifications. 333 @maxMemoryItems: Maximum number of items to keep queued in memory. 334 @maxDiskItems: Maximum number of items to buffer to disk, if 0, doesn't 335 use disk at all. 336 @debug: Save the json with nice formatting. 337 @chunkSize: maximum number of items to send in each at each HTTP POST. 338 @maxHttpRequestSize: limits the size of encoded data for AE, the default 339 is 1MB. 340 """ 341 # Parameters. 342 self.serverUrl = serverUrl 343 self.debug = debug 344 self.chunkSize = chunkSize 345 self.lastPushWasSuccessful = True 346 self.maxHttpRequestSize = maxHttpRequestSize 347 if maxDiskItems != 0: 348 # The queue directory is determined by the server url. 349 path = ('events_' + 350 urlparse.urlparse(self.serverUrl)[1].split(':')[0]) 351 queue = PersistentQueue( 352 primaryQueue=MemoryQueue(maxItems=maxMemoryItems), 353 secondaryQueue=DiskQueue(path, maxItems=maxDiskItems)) 354 else: 355 path = None 356 queue = MemoryQueue(maxItems=maxMemoryItems) 357 358 # Use the unbounded method. 359 StatusPush.__init__(self, serverPushCb=HttpStatusPush.pushHttp, 360 queue=queue, path=path, **kwargs)
361
362 - def wasLastPushSuccessful(self):
363 return self.lastPushWasSuccessful
364
365 - def popChunk(self):
366 """Pops items from the pending list. 367 368 They must be queued back on failure.""" 369 if self.wasLastPushSuccessful(): 370 chunkSize = self.chunkSize 371 else: 372 chunkSize = 1 373 374 while True: 375 items = self.queue.popChunk(chunkSize) 376 if self.debug: 377 packets = json.dumps(items, indent=2, sort_keys=True) 378 else: 379 packets = json.dumps(items, separators=(',',':')) 380 data = urllib.urlencode({'packets': packets}) 381 if (not self.maxHttpRequestSize or 382 len(data) < self.maxHttpRequestSize): 383 return (data, items) 384 385 if chunkSize == 1: 386 # This packet is just too large. Drop this packet. 387 log.msg("ERROR: packet %s was dropped, too large: %d > %d" % 388 (items[0]['id'], len(data), self.maxHttpRequestSize)) 389 chunkSize = self.chunkSize 390 else: 391 # Try with half the packets. 392 chunkSize /= 2 393 self.queue.insertBackChunk(items)
394
395 - def pushHttp(self):
396 """Do the HTTP POST to the server.""" 397 (encoded_packets, items) = self.popChunk() 398 399 def Success(result): 400 """Queue up next push.""" 401 log.msg('Sent %d events to %s' % (len(items), self.serverUrl)) 402 self.lastPushWasSuccessful = True 403 return self.queueNextServerPush()
404 405 def Failure(result): 406 """Insert back items not sent and queue up next push.""" 407 # Server is now down. 408 log.msg('Failed to push %d events to %s: %s' % 409 (len(items), self.serverUrl, str(result))) 410 self.queue.insertBackChunk(items) 411 if self.stopped: 412 # Bad timing, was being called on shutdown and the server died 413 # on us. Make sure the queue is saved since we just queued back 414 # items. 415 self.queue.save() 416 self.lastPushWasSuccessful = False 417 return self.queueNextServerPush()
418 419 # Trigger the HTTP POST request. 420 headers = {'Content-Type': 'application/x-www-form-urlencoded'} 421 connection = client.getPage(self.serverUrl, 422 method='POST', 423 postdata=encoded_packets, 424 headers=headers, 425 agent='buildbot') 426 connection.addCallbacks(Success, Failure) 427 return connection 428 429 # vim: set ts=4 sts=4 sw=4 et: 430