Package buildbot :: Package status :: Module status_push
[frames] | no frames]

Source Code for Module buildbot.status.status_push

  1  # -*- test-case-name: buildbot.broken_test.runs.test_status_push -*- 
  2   
  3  """Push events to an abstract receiver. 
  4   
  5  Implements the HTTP receiver.""" 
  6   
  7  import datetime 
  8  import logging 
  9  import os 
 10  import urllib 
 11  import urlparse 
 12   
 13  try: 
 14      import simplejson as json 
 15  except ImportError: 
 16      import json 
 17   
 18  from buildbot.status.base import StatusReceiverMultiService 
 19  from buildbot.status.persistent_queue import DiskQueue, IndexedQueue, \ 
 20          MemoryQueue, PersistentQueue 
 21  from buildbot.status.web.status_json import FilterOut 
 22  from twisted.internet import defer, reactor 
 23  from twisted.python import log 
 24  from twisted.web import client 
25 26 27 28 -class StatusPush(StatusReceiverMultiService):
29 """Event streamer to a abstract channel. 30 31 It uses IQueue to batch push requests and queue the data when 32 the receiver is down. 33 When a PersistentQueue object is used, the items are saved to disk on master 34 shutdown so they can be pushed back when the master is restarted. 35 """ 36
37 - def __init__(self, serverPushCb, queue=None, path=None, filter=True, 38 bufferDelay=1, retryDelay=5, blackList=None):
39 """ 40 @serverPushCb: callback to be used. It receives 'self' as parameter. It 41 should call self.queueNextServerPush() when it's done to queue the next 42 push. It is guaranteed that the queue is not empty when this function is 43 called. 44 @queue: a item queue that implements IQueue. 45 @path: path to save config. 46 @filter: when True (default), removes all "", None, False, [] or {} 47 entries. 48 @bufferDelay: amount of time events are queued before sending, to 49 reduce the number of push requests rate. This is the delay between the 50 end of a request to initializing a new one. 51 @retryDelay: amount of time between retries when no items were pushed on 52 last serverPushCb call. 53 @blackList: events that shouldn't be sent. 54 """ 55 StatusReceiverMultiService.__init__(self) 56 57 # Parameters. 58 self.queue = queue 59 if self.queue is None: 60 self.queue = MemoryQueue() 61 self.queue = IndexedQueue(self.queue) 62 self.path = path 63 self.filter = filter 64 self.bufferDelay = bufferDelay 65 self.retryDelay = retryDelay 66 if not callable(serverPushCb): 67 raise NotImplementedError('Please pass serverPushCb parameter.') 68 def hookPushCb(): 69 # Update the index so we know if the next push succeed or not, don't 70 # update the value when the queue is empty. 71 if not self.queue.nbItems(): 72 return 73 self.lastIndex = self.queue.getIndex() 74 return serverPushCb(self)
75 self.serverPushCb = hookPushCb 76 self.blackList = blackList 77 78 # Other defaults. 79 # IDelayedCall object that represents the next queued push. 80 self.task = None 81 self.stopped = False 82 self.lastIndex = -1 83 self.state = {} 84 self.state['started'] = str(datetime.datetime.utcnow()) 85 self.state['next_id'] = 1 86 self.state['last_id_pushed'] = 0 87 # Try to load back the state. 88 if self.path and os.path.isdir(self.path): 89 state_path = os.path.join(self.path, 'state') 90 if os.path.isfile(state_path): 91 self.state.update(json.load(open(state_path, 'r'))) 92 93 if self.queue.nbItems(): 94 # Last shutdown was not clean, don't wait to send events. 95 self.queueNextServerPush()
96
97 - def setServiceParent(self, parent):
98 """Starting up.""" 99 StatusReceiverMultiService.setServiceParent(self, parent) 100 self.status = self.parent.getStatus() 101 self.status.subscribe(self) 102 self.initialPush()
103
104 - def wasLastPushSuccessful(self):
105 """Returns if the "virtual pointer" in the queue advanced.""" 106 return self.lastIndex <= self.queue.getIndex()
107
108 - def queueNextServerPush(self):
109 """Queue the next push or call it immediately. 110 111 Called to signal new items are available to be sent or on shutdown. 112 A timer should be queued to trigger a network request or the callback 113 should be called immediately. If a status push is already queued, ignore 114 the current call.""" 115 # Determine the delay. 116 if self.wasLastPushSuccessful(): 117 if self.stopped: 118 # Shutting down. 119 delay = 0 120 else: 121 # Normal case. 122 delay = self.bufferDelay 123 else: 124 if self.stopped: 125 # Too bad, we can't do anything now, we're shutting down and the 126 # receiver is also down. We'll just save the objects to disk. 127 return 128 else: 129 # The server is inaccessible, retry less often. 130 delay = self.retryDelay 131 132 # Cleanup a previously queued task if necessary. 133 if self.task: 134 # Warning: we could be running inside the task. 135 if self.task.active(): 136 # There was already a task queue, don't requeue it, just let it 137 # go. 138 return 139 else: 140 if self.task.active(): 141 # There was a task queued but it is requested to call it 142 # *right now* so cancel it. 143 self.task.cancel() 144 # Otherwise, it was just a stray object. 145 self.task = None 146 147 # Do the queue/direct call. 148 if delay: 149 # Call in delay seconds. 150 self.task = reactor.callLater(delay, self.serverPushCb) 151 elif self.stopped: 152 if not self.queue.nbItems(): 153 return 154 # Call right now, we're shutting down. 155 @defer.deferredGenerator 156 def BlockForEverythingBeingSent(): 157 d = self.serverPushCb() 158 if d: 159 x = defer.waitForDeferred(d) 160 yield x 161 x.getResult()
162 return BlockForEverythingBeingSent() 163 else: 164 # delay should never be 0. That can cause Buildbot to spin tightly 165 # trying to push events that may not be received well by a status 166 # listener. 167 logging.exception('Did not expect delay to be 0, but it is.') 168 return 169
170 - def stopService(self):
171 """Shutting down.""" 172 self.finalPush() 173 self.stopped = True 174 if (self.task and self.task.active()): 175 # We don't have time to wait, force an immediate call. 176 self.task.cancel() 177 self.task = None 178 d = self.queueNextServerPush() 179 elif self.wasLastPushSuccessful(): 180 d = self.queueNextServerPush() 181 else: 182 d = defer.succeed(None) 183 184 # We're dying, make sure we save the results. 185 self.queue.save() 186 if self.path and os.path.isdir(self.path): 187 state_path = os.path.join(self.path, 'state') 188 json.dump(self.state, open(state_path, 'w'), sort_keys=True, 189 indent=2) 190 # Make sure all Deferreds are called on time and in a sane order. 191 defers = filter(None, [d, StatusReceiverMultiService.stopService(self)]) 192 return defer.DeferredList(defers)
193
194 - def push(self, event, **objs):
195 """Push a new event. 196 197 The new event will be either: 198 - Queued in memory to reduce network usage 199 - Queued to disk when the sink server is down 200 - Pushed (along the other queued items) to the server 201 """ 202 if self.blackList and event in self.blackList: 203 return 204 # First, generate the packet. 205 packet = {} 206 packet['id'] = self.state['next_id'] 207 self.state['next_id'] += 1 208 packet['timestamp'] = str(datetime.datetime.utcnow()) 209 packet['project'] = self.status.getProjectName() 210 packet['started'] = self.state['started'] 211 packet['event'] = event 212 packet['payload'] = {} 213 for obj_name, obj in objs.items(): 214 if hasattr(obj, 'asDict'): 215 obj = obj.asDict() 216 if self.filter: 217 obj = FilterOut(obj) 218 packet['payload'][obj_name] = obj 219 self.queue.pushItem(packet) 220 if self.task is None or not self.task.active(): 221 # No task queued since it was probably idle, let's queue a task. 222 return self.queueNextServerPush()
223 224 #### Events 225
226 - def initialPush(self):
227 # Push everything we want to push from the initial configuration. 228 self.push('start', status=self.status)
229
230 - def finalPush(self):
231 self.push('shutdown', status=self.status)
232
233 - def requestSubmitted(self, request):
234 self.push('requestSubmitted', request=request)
235
236 - def requestCancelled(self, builder, request):
237 self.push('requestCancelled', builder=builder, request=request)
238
239 - def buildsetSubmitted(self, buildset):
240 self.push('buildsetSubmitted', buildset=buildset)
241
242 - def builderAdded(self, builderName, builder):
243 self.push('builderAdded', builderName=builderName, builder=builder) 244 return self
245
246 - def builderChangedState(self, builderName, state):
247 self.push('builderChangedState', builderName=builderName, state=state)
248
249 - def buildStarted(self, builderName, build):
250 self.push('buildStarted', build=build) 251 return self
252
253 - def buildETAUpdate(self, build, ETA):
254 self.push('buildETAUpdate', build=build, ETA=ETA)
255
256 - def stepStarted(self, build, step):
257 self.push('stepStarted', 258 properties=build.getProperties().asList(), 259 step=step)
260
261 - def stepTextChanged(self, build, step, text):
262 self.push('stepTextChanged', 263 properties=build.getProperties().asList(), 264 step=step, 265 text=text)
266
267 - def stepText2Changed(self, build, step, text2):
268 self.push('stepText2Changed', 269 properties=build.getProperties().asList(), 270 step=step, 271 text2=text2)
272
273 - def stepETAUpdate(self, build, step, ETA, expectations):
274 self.push('stepETAUpdate', 275 properties=build.getProperties().asList(), 276 step=step, 277 ETA=ETA, 278 expectations=expectations)
279
280 - def logStarted(self, build, step, log):
281 self.push('logStarted', 282 properties=build.getProperties().asList(), 283 step=step)
284
285 - def logFinished(self, build, step, log):
286 self.push('logFinished', 287 properties=build.getProperties().asList(), 288 step=step)
289
290 - def stepFinished(self, build, step, results):
291 self.push('stepFinished', 292 properties=build.getProperties().asList(), 293 step=step)
294
295 - def buildFinished(self, builderName, build, results):
296 self.push('buildFinished', build=build)
297
298 - def builderRemoved(self, builderName):
299 self.push('buildedRemoved', builderName=builderName)
300
301 - def changeAdded(self, change):
302 self.push('changeAdded', change=change)
303
304 - def slaveConnected(self, slavename):
305 self.push('slaveConnected', slave=self.status.getSlave(slavename))
306
307 - def slaveDisconnected(self, slavename):
308 self.push('slaveDisconnected', slavename=slavename)
309
310 311 -class HttpStatusPush(StatusPush):
312 """Event streamer to a HTTP server.""" 313
314 - def __init__(self, serverUrl, debug=None, maxMemoryItems=None, 315 maxDiskItems=None, chunkSize=200, maxHttpRequestSize=2**20, 316 **kwargs):
317 """ 318 @serverUrl: Base URL to be used to push events notifications. 319 @maxMemoryItems: Maximum number of items to keep queued in memory. 320 @maxDiskItems: Maximum number of items to buffer to disk, if 0, doesn't 321 use disk at all. 322 @debug: Save the json with nice formatting. 323 @chunkSize: maximum number of items to send in each at each HTTP POST. 324 @maxHttpRequestSize: limits the size of encoded data for AE, the default 325 is 1MB. 326 """ 327 # Parameters. 328 self.serverUrl = serverUrl 329 self.debug = debug 330 self.chunkSize = chunkSize 331 self.lastPushWasSuccessful = True 332 self.maxHttpRequestSize = maxHttpRequestSize 333 if maxDiskItems != 0: 334 # The queue directory is determined by the server url. 335 path = ('events_' + 336 urlparse.urlparse(self.serverUrl)[1].split(':')[0]) 337 queue = PersistentQueue( 338 primaryQueue=MemoryQueue(maxItems=maxMemoryItems), 339 secondaryQueue=DiskQueue(path, maxItems=maxDiskItems)) 340 else: 341 path = None 342 queue = MemoryQueue(maxItems=maxMemoryItems) 343 344 # Use the unbounded method. 345 StatusPush.__init__(self, serverPushCb=HttpStatusPush.pushHttp, 346 queue=queue, path=path, **kwargs)
347
348 - def wasLastPushSuccessful(self):
349 return self.lastPushWasSuccessful
350
351 - def popChunk(self):
352 """Pops items from the pending list. 353 354 They must be queued back on failure.""" 355 if self.wasLastPushSuccessful(): 356 chunkSize = self.chunkSize 357 else: 358 chunkSize = 1 359 360 while True: 361 items = self.queue.popChunk(chunkSize) 362 if self.debug: 363 packets = json.dumps(items, indent=2, sort_keys=True) 364 else: 365 packets = json.dumps(items, separators=(',',':')) 366 data = urllib.urlencode({'packets': packets}) 367 if (not self.maxHttpRequestSize or 368 len(data) < self.maxHttpRequestSize): 369 return (data, items) 370 371 if chunkSize == 1: 372 # This packet is just too large. Drop this packet. 373 log.msg("ERROR: packet %s was dropped, too large: %d > %d" % 374 (items[0]['id'], len(data), self.maxHttpRequestSize)) 375 chunkSize = self.chunkSize 376 else: 377 # Try with half the packets. 378 chunkSize /= 2 379 self.queue.insertBackChunk(items)
380
381 - def pushHttp(self):
382 """Do the HTTP POST to the server.""" 383 (encoded_packets, items) = self.popChunk() 384 385 def Success(result): 386 """Queue up next push.""" 387 log.msg('Sent %d events to %s' % (len(items), self.serverUrl)) 388 self.lastPushWasSuccessful = True 389 return self.queueNextServerPush()
390 391 def Failure(result): 392 """Insert back items not sent and queue up next push.""" 393 # Server is now down. 394 log.msg('Failed to push %d events to %s: %s' % 395 (len(items), self.serverUrl, str(result))) 396 self.queue.insertBackChunk(items) 397 if self.stopped: 398 # Bad timing, was being called on shutdown and the server died 399 # on us. Make sure the queue is saved since we just queued back 400 # items. 401 self.queue.save() 402 self.lastPushWasSuccessful = False 403 return self.queueNextServerPush()
404 405 # Trigger the HTTP POST request. 406 headers = {'Content-Type': 'application/x-www-form-urlencoded'} 407 connection = client.getPage(self.serverUrl, 408 method='POST', 409 postdata=encoded_packets, 410 headers=headers, 411 agent='buildbot') 412 connection.addCallbacks(Success, Failure) 413 return connection 414 415 # vim: set ts=4 sts=4 sw=4 et: 416