Package buildbot :: Package status :: Module status_push
[frames] | no frames]

Source Code for Module buildbot.status.status_push

  1  # -*- test-case-name: buildbot.broken_test.runs.test_status_push -*- 
  2   
  3  """Push events to an abstract receiver. 
  4   
  5  Implements the HTTP receiver.""" 
  6   
  7  import datetime 
  8  import logging 
  9  import os 
 10  import urllib 
 11  import urlparse 
 12   
 13  try: 
 14      import simplejson as json 
 15  except ImportError: 
 16      import json 
 17   
 18  from buildbot.master import BuildMaster 
 19  from buildbot.status.base import StatusReceiverMultiService 
 20  from buildbot.status.persistent_queue import DiskQueue, IndexedQueue, \ 
 21          IQueue, MemoryQueue, PersistentQueue 
 22  from buildbot.status.web.status_json import FilterOut 
 23  from twisted.internet import defer, reactor 
 24  from twisted.python import log 
 25  from twisted.web import client 
26 27 28 29 -class StatusPush(StatusReceiverMultiService):
30 """Event streamer to a abstract channel. 31 32 It uses IQueue to batch push requests and queue the data when 33 the receiver is down. 34 When a PersistentQueue object is used, the items are saved to disk on master 35 shutdown so they can be pushed back when the master is restarted. 36 """ 37
38 - def __init__(self, serverPushCb, queue=None, path=None, filter=True, 39 bufferDelay=1, retryDelay=5, blackList=None):
40 """ 41 @serverPushCb: callback to be used. It receives 'self' as parameter. It 42 should call self.queueNextServerPush() when it's done to queue the next 43 push. It is guaranteed that the queue is not empty when this function is 44 called. 45 @queue: a item queue that implements IQueue. 46 @path: path to save config. 47 @filter: when True (default), removes all "", None, False, [] or {} 48 entries. 49 @bufferDelay: amount of time events are queued before sending, to 50 reduce the number of push requests rate. This is the delay between the 51 end of a request to initializing a new one. 52 @retryDelay: amount of time between retries when no items were pushed on 53 last serverPushCb call. 54 @blackList: events that shouldn't be sent. 55 """ 56 StatusReceiverMultiService.__init__(self) 57 58 # Parameters. 59 self.queue = queue 60 if self.queue is None: 61 self.queue = MemoryQueue() 62 self.queue = IndexedQueue(self.queue) 63 self.path = path 64 self.filter = filter 65 self.bufferDelay = bufferDelay 66 self.retryDelay = retryDelay 67 if not callable(serverPushCb): 68 raise NotImplementedError('Please pass serverPushCb parameter.') 69 def hookPushCb(): 70 # Update the index so we know if the next push succeed or not, don't 71 # update the value when the queue is empty. 72 if not self.queue.nbItems(): 73 return 74 self.lastIndex = self.queue.getIndex() 75 return serverPushCb(self)
76 self.serverPushCb = hookPushCb 77 self.blackList = blackList 78 79 # Other defaults. 80 # IDelayedCall object that represents the next queued push. 81 self.task = None 82 self.stopped = False 83 self.lastIndex = -1 84 self.state = {} 85 self.state['started'] = str(datetime.datetime.utcnow()) 86 self.state['next_id'] = 1 87 self.state['last_id_pushed'] = 0 88 # Try to load back the state. 89 if self.path and os.path.isdir(self.path): 90 state_path = os.path.join(self.path, 'state') 91 if os.path.isfile(state_path): 92 self.state.update(json.load(open(state_path, 'r'))) 93 94 if self.queue.nbItems(): 95 # Last shutdown was not clean, don't wait to send events. 96 self.queueNextServerPush()
97
98 - def setServiceParent(self, parent):
99 """Starting up.""" 100 StatusReceiverMultiService.setServiceParent(self, parent) 101 self.status = self.parent.getStatus() 102 self.status.subscribe(self) 103 self.initialPush()
104
105 - def wasLastPushSuccessful(self):
106 """Returns if the "virtual pointer" in the queue advanced.""" 107 return self.lastIndex <= self.queue.getIndex()
108
109 - def queueNextServerPush(self):
110 """Queue the next push or call it immediately. 111 112 Called to signal new items are available to be sent or on shutdown. 113 A timer should be queued to trigger a network request or the callback 114 should be called immediately. If a status push is already queued, ignore 115 the current call.""" 116 # Determine the delay. 117 if self.wasLastPushSuccessful(): 118 if self.stopped: 119 # Shutting down. 120 delay = 0 121 else: 122 # Normal case. 123 delay = self.bufferDelay 124 else: 125 if self.stopped: 126 # Too bad, we can't do anything now, we're shutting down and the 127 # receiver is also down. We'll just save the objects to disk. 128 return 129 else: 130 # The server is inaccessible, retry less often. 131 delay = self.retryDelay 132 133 # Cleanup a previously queued task if necessary. 134 if self.task: 135 # Warning: we could be running inside the task. 136 if self.task.active(): 137 # There was already a task queue, don't requeue it, just let it 138 # go. 139 return 140 else: 141 if self.task.active(): 142 # There was a task queued but it is requested to call it 143 # *right now* so cancel it. 144 self.task.cancel() 145 # Otherwise, it was just a stray object. 146 self.task = None 147 148 # Do the queue/direct call. 149 if delay: 150 # Call in delay seconds. 151 self.task = reactor.callLater(delay, self.serverPushCb) 152 elif self.stopped: 153 if not self.queue.nbItems(): 154 return 155 # Call right now, we're shutting down. 156 @defer.deferredGenerator 157 def BlockForEverythingBeingSent(): 158 d = self.serverPushCb() 159 if d: 160 x = defer.waitForDeferred(d) 161 yield x 162 x.getResult()
163 return BlockForEverythingBeingSent() 164 else: 165 # delay should never be 0. That can cause Buildbot to spin tightly 166 # trying to push events that may not be received well by a status 167 # listener. 168 logging.exception('Did not expect delay to be 0, but it is.') 169 return 170
171 - def stopService(self):
172 """Shutting down.""" 173 self.finalPush() 174 self.stopped = True 175 if (self.task and self.task.active()): 176 # We don't have time to wait, force an immediate call. 177 self.task.cancel() 178 self.task = None 179 d = self.queueNextServerPush() 180 elif self.wasLastPushSuccessful(): 181 d = self.queueNextServerPush() 182 else: 183 d = defer.succeed(None) 184 185 # We're dying, make sure we save the results. 186 self.queue.save() 187 if self.path and os.path.isdir(self.path): 188 state_path = os.path.join(self.path, 'state') 189 json.dump(self.state, open(state_path, 'w'), sort_keys=True, 190 indent=2) 191 # Make sure all Deferreds are called on time and in a sane order. 192 defers = filter(None, [d, StatusReceiverMultiService.stopService(self)]) 193 return defer.DeferredList(defers)
194
195 - def push(self, event, **objs):
196 """Push a new event. 197 198 The new event will be either: 199 - Queued in memory to reduce network usage 200 - Queued to disk when the sink server is down 201 - Pushed (along the other queued items) to the server 202 """ 203 if self.blackList and event in self.blackList: 204 return 205 # First, generate the packet. 206 packet = {} 207 packet['id'] = self.state['next_id'] 208 self.state['next_id'] += 1 209 packet['timestamp'] = str(datetime.datetime.utcnow()) 210 packet['project'] = self.status.getProjectName() 211 packet['started'] = self.state['started'] 212 packet['event'] = event 213 packet['payload'] = {} 214 for obj_name, obj in objs.items(): 215 if hasattr(obj, 'asDict'): 216 obj = obj.asDict() 217 if self.filter: 218 obj = FilterOut(obj) 219 packet['payload'][obj_name] = obj 220 self.queue.pushItem(packet) 221 if self.task is None or not self.task.active(): 222 # No task queued since it was probably idle, let's queue a task. 223 return self.queueNextServerPush()
224 225 #### Events 226
227 - def initialPush(self):
228 # Push everything we want to push from the initial configuration. 229 self.push('start', status=self.status)
230
231 - def finalPush(self):
232 self.push('shutdown', status=self.status)
233
234 - def requestSubmitted(self, request):
235 self.push('requestSubmitted', request=request)
236
237 - def requestCancelled(self, builder, request):
238 self.push('requestCancelled', builder=builder, request=request)
239
240 - def buildsetSubmitted(self, buildset):
241 self.push('buildsetSubmitted', buildset=buildset)
242
243 - def builderAdded(self, builderName, builder):
244 self.push('builderAdded', builderName=builderName, builder=builder) 245 return self
246
247 - def builderChangedState(self, builderName, state):
248 self.push('builderChangedState', builderName=builderName, state=state)
249
250 - def buildStarted(self, builderName, build):
251 self.push('buildStarted', build=build) 252 return self
253
254 - def buildETAUpdate(self, build, ETA):
255 self.push('buildETAUpdate', build=build, ETA=ETA)
256
257 - def stepStarted(self, build, step):
258 self.push('stepStarted', 259 properties=build.getProperties().asList(), 260 step=step)
261
262 - def stepTextChanged(self, build, step, text):
263 self.push('stepTextChanged', 264 properties=build.getProperties().asList(), 265 step=step, 266 text=text)
267
268 - def stepText2Changed(self, build, step, text2):
269 self.push('stepText2Changed', 270 properties=build.getProperties().asList(), 271 step=step, 272 text2=text2)
273
274 - def stepETAUpdate(self, build, step, ETA, expectations):
275 self.push('stepETAUpdate', 276 properties=build.getProperties().asList(), 277 step=step, 278 ETA=ETA, 279 expectations=expectations)
280
281 - def logStarted(self, build, step, log):
282 self.push('logStarted', 283 properties=build.getProperties().asList(), 284 step=step)
285
286 - def logFinished(self, build, step, log):
287 self.push('logFinished', 288 properties=build.getProperties().asList(), 289 step=step)
290
291 - def stepFinished(self, build, step, results):
292 self.push('stepFinished', 293 properties=build.getProperties().asList(), 294 step=step)
295
296 - def buildFinished(self, builderName, build, results):
297 self.push('buildFinished', build=build)
298
299 - def builderRemoved(self, builderName):
300 self.push('buildedRemoved', builderName=builderName)
301
302 - def changeAdded(self, change):
303 self.push('changeAdded', change=change)
304
305 - def slaveConnected(self, slavename):
306 self.push('slaveConnected', slave=self.status.getSlave(slavename))
307
308 - def slaveDisconnected(self, slavename):
309 self.push('slaveDisconnected', slavename=slavename)
310
311 312 -class HttpStatusPush(StatusPush):
313 """Event streamer to a HTTP server.""" 314
315 - def __init__(self, serverUrl, debug=None, maxMemoryItems=None, 316 maxDiskItems=None, chunkSize=200, maxHttpRequestSize=2**20, 317 **kwargs):
318 """ 319 @serverUrl: Base URL to be used to push events notifications. 320 @maxMemoryItems: Maximum number of items to keep queued in memory. 321 @maxDiskItems: Maximum number of items to buffer to disk, if 0, doesn't 322 use disk at all. 323 @debug: Save the json with nice formatting. 324 @chunkSize: maximum number of items to send in each at each HTTP POST. 325 @maxHttpRequestSize: limits the size of encoded data for AE, the default 326 is 1MB. 327 """ 328 # Parameters. 329 self.serverUrl = serverUrl 330 self.debug = debug 331 self.chunkSize = chunkSize 332 self.lastPushWasSuccessful = True 333 self.maxHttpRequestSize = maxHttpRequestSize 334 if maxDiskItems != 0: 335 # The queue directory is determined by the server url. 336 path = ('events_' + 337 urlparse.urlparse(self.serverUrl)[1].split(':')[0]) 338 queue = PersistentQueue( 339 primaryQueue=MemoryQueue(maxItems=maxMemoryItems), 340 secondaryQueue=DiskQueue(path, maxItems=maxDiskItems)) 341 else: 342 path = None 343 queue = MemoryQueue(maxItems=maxMemoryItems) 344 345 # Use the unbounded method. 346 StatusPush.__init__(self, serverPushCb=HttpStatusPush.pushHttp, 347 queue=queue, path=path, **kwargs)
348
349 - def wasLastPushSuccessful(self):
350 return self.lastPushWasSuccessful
351
352 - def popChunk(self):
353 """Pops items from the pending list. 354 355 They must be queued back on failure.""" 356 if self.wasLastPushSuccessful(): 357 chunkSize = self.chunkSize 358 else: 359 chunkSize = 1 360 361 while True: 362 items = self.queue.popChunk(chunkSize) 363 if self.debug: 364 packets = json.dumps(items, indent=2, sort_keys=True) 365 else: 366 packets = json.dumps(items, separators=(',',':')) 367 data = urllib.urlencode({'packets': packets}) 368 if (not self.maxHttpRequestSize or 369 len(data) < self.maxHttpRequestSize): 370 return (data, items) 371 372 if chunkSize == 1: 373 # This packet is just too large. Drop this packet. 374 log.msg("ERROR: packet %s was dropped, too large: %d > %d" % 375 (items[0]['id'], len(data), self.maxHttpRequestSize)) 376 chunkSize = self.chunkSize 377 else: 378 # Try with half the packets. 379 chunkSize /= 2 380 self.queue.insertBackChunk(items)
381
382 - def pushHttp(self):
383 """Do the HTTP POST to the server.""" 384 (encoded_packets, items) = self.popChunk() 385 386 def Success(result): 387 """Queue up next push.""" 388 log.msg('Sent %d events to %s' % (len(items), self.serverUrl)) 389 self.lastPushWasSuccessful = True 390 return self.queueNextServerPush()
391 392 def Failure(result): 393 """Insert back items not sent and queue up next push.""" 394 # Server is now down. 395 log.msg('Failed to push %d events to %s: %s' % 396 (len(items), self.serverUrl, str(result))) 397 self.queue.insertBackChunk(items) 398 if self.stopped: 399 # Bad timing, was being called on shutdown and the server died 400 # on us. Make sure the queue is saved since we just queued back 401 # items. 402 self.queue.save() 403 self.lastPushWasSuccessful = False 404 return self.queueNextServerPush()
405 406 # Trigger the HTTP POST request. 407 headers = {'Content-Type': 'application/x-www-form-urlencoded'} 408 connection = client.getPage(self.serverUrl, 409 method='POST', 410 postdata=encoded_packets, 411 headers=headers, 412 agent='buildbot') 413 connection.addCallbacks(Success, Failure) 414 return connection 415 416 # vim: set ts=4 sts=4 sw=4 et: 417