1   
  2   
  3  """Push events to an abstract receiver. 
  4   
  5  Implements the HTTP receiver.""" 
  6   
  7  import datetime 
  8  import logging 
  9  import os 
 10  import urllib 
 11  import urlparse 
 12   
 13  try: 
 14      import simplejson as json 
 15  except ImportError: 
 16      import json 
 17   
 18  from buildbot.status.base import StatusReceiverMultiService 
 19  from buildbot.status.persistent_queue import DiskQueue, IndexedQueue, \ 
 20          MemoryQueue, PersistentQueue 
 21  from buildbot.status.web.status_json import FilterOut 
 22  from twisted.internet import defer, reactor 
 23  from twisted.python import log 
 24  from twisted.web import client 
 25   
 26   
 27   
 28 -class StatusPush(StatusReceiverMultiService): 
  29      """Event streamer to a abstract channel. 
 30   
 31      It uses IQueue to batch push requests and queue the data when 
 32      the receiver is down. 
 33      When a PersistentQueue object is used, the items are saved to disk on master 
 34      shutdown so they can be pushed back when the master is restarted. 
 35      """ 
 36   
 37 -    def __init__(self, serverPushCb, queue=None, path=None, filter=True, 
 38                   bufferDelay=1, retryDelay=5, blackList=None): 
  39          """ 
 40          @serverPushCb: callback to be used. It receives 'self' as parameter. It 
 41          should call self.queueNextServerPush() when it's done to queue the next 
 42          push. It is guaranteed that the queue is not empty when this function is 
 43          called. 
 44          @queue: a item queue that implements IQueue. 
 45          @path: path to save config. 
 46          @filter: when True (default), removes all "", None, False, [] or {} 
 47          entries. 
 48          @bufferDelay: amount of time events are queued before sending, to 
 49          reduce the number of push requests rate. This is the delay between the 
 50          end of a request to initializing a new one. 
 51          @retryDelay: amount of time between retries when no items were pushed on 
 52          last serverPushCb call. 
 53          @blackList: events that shouldn't be sent. 
 54          """ 
 55          StatusReceiverMultiService.__init__(self) 
 56   
 57           
 58          self.queue = queue 
 59          if self.queue is None: 
 60              self.queue = MemoryQueue() 
 61          self.queue = IndexedQueue(self.queue) 
 62          self.path = path 
 63          self.filter = filter 
 64          self.bufferDelay = bufferDelay 
 65          self.retryDelay = retryDelay 
 66          if not callable(serverPushCb): 
 67              raise NotImplementedError('Please pass serverPushCb parameter.') 
 68          def hookPushCb(): 
 69               
 70               
 71              if not self.queue.nbItems(): 
 72                  return 
 73              self.lastIndex = self.queue.getIndex() 
 74              return serverPushCb(self) 
  75          self.serverPushCb = hookPushCb 
 76          self.blackList = blackList 
 77   
 78           
 79           
 80          self.task = None 
 81          self.stopped = False 
 82          self.lastIndex = -1 
 83          self.state = {} 
 84          self.state['started'] = str(datetime.datetime.utcnow()) 
 85          self.state['next_id'] = 1 
 86          self.state['last_id_pushed'] = 0 
 87           
 88          if self.path and os.path.isdir(self.path): 
 89              state_path = os.path.join(self.path, 'state') 
 90              if os.path.isfile(state_path): 
 91                  self.state.update(json.load(open(state_path, 'r'))) 
 92   
 93          if self.queue.nbItems(): 
 94               
 95              self.queueNextServerPush() 
  96   
103   
105          """Returns if the "virtual pointer" in the queue advanced.""" 
106          return self.lastIndex <= self.queue.getIndex() 
 107   
109          """Queue the next push or call it immediately. 
110   
111          Called to signal new items are available to be sent or on shutdown. 
112          A timer should be queued to trigger a network request or the callback 
113          should be called immediately. If a status push is already queued, ignore 
114          the current call.""" 
115           
116          if self.wasLastPushSuccessful(): 
117              if self.stopped: 
118                   
119                  delay = 0 
120              else: 
121                   
122                  delay = self.bufferDelay 
123          else: 
124              if self.stopped: 
125                   
126                   
127                  return 
128              else: 
129                   
130                  delay = self.retryDelay 
131   
132           
133          if self.task: 
134               
135              if self.task.active(): 
136                   
137                   
138                  return 
139              else: 
140                  if self.task.active(): 
141                       
142                       
143                      self.task.cancel() 
144                   
145                  self.task = None 
146   
147           
148          if delay: 
149               
150              self.task = reactor.callLater(delay, self.serverPushCb) 
151          elif self.stopped: 
152              if not self.queue.nbItems(): 
153                  return 
154               
155              @defer.deferredGenerator 
156              def BlockForEverythingBeingSent(): 
157                  d = self.serverPushCb() 
158                  if d: 
159                      x = defer.waitForDeferred(d) 
160                      yield x 
161                      x.getResult() 
 162              return BlockForEverythingBeingSent() 
163          else: 
164               
165               
166               
167              logging.exception('Did not expect delay to be 0, but it is.') 
168              return 
169   
193   
194 -    def push(self, event, **objs): 
 195          """Push a new event. 
196   
197          The new event will be either: 
198          - Queued in memory to reduce network usage 
199          - Queued to disk when the sink server is down 
200          - Pushed (along the other queued items) to the server 
201          """ 
202          if self.blackList and event in self.blackList: 
203              return 
204           
205          packet = {} 
206          packet['id'] = self.state['next_id'] 
207          self.state['next_id'] += 1 
208          packet['timestamp'] = str(datetime.datetime.utcnow()) 
209          packet['project'] = self.status.getProjectName() 
210          packet['started'] = self.state['started'] 
211          packet['event'] = event 
212          packet['payload'] = {} 
213          for obj_name, obj in objs.items(): 
214              if hasattr(obj, 'asDict'): 
215                  obj = obj.asDict() 
216              if self.filter: 
217                  obj = FilterOut(obj) 
218              packet['payload'][obj_name] = obj 
219          self.queue.pushItem(packet) 
220          if self.task is None or not self.task.active(): 
221               
222              return self.queueNextServerPush() 
 223   
224       
225   
229   
232   
234          self.push('requestSubmitted', request=request) 
 235   
238   
240          self.push('buildsetSubmitted', buildset=buildset) 
 241   
245   
247          self.push('builderChangedState', builderName=builderName, state=state) 
 248   
252   
255   
260   
261 -    def stepTextChanged(self, build, step, text): 
 262          self.push('stepTextChanged', 
263                    properties=build.getProperties().asList(), 
264                    step=step, 
265                    text=text) 
 266   
267 -    def stepText2Changed(self, build, step, text2): 
 268          self.push('stepText2Changed', 
269                    properties=build.getProperties().asList(), 
270                    step=step, 
271                    text2=text2) 
 272   
279   
284   
289   
294   
297   
299          self.push('buildedRemoved', builderName=builderName) 
 300   
302          self.push('changeAdded', change=change) 
 303   
306   
309   
312      """Event streamer to a HTTP server.""" 
313   
314 -    def __init__(self, serverUrl, debug=None, maxMemoryItems=None, 
315                   maxDiskItems=None, chunkSize=200, maxHttpRequestSize=2**20, 
316                   **kwargs): 
 317          """ 
318          @serverUrl: Base URL to be used to push events notifications. 
319          @maxMemoryItems: Maximum number of items to keep queued in memory. 
320          @maxDiskItems: Maximum number of items to buffer to disk, if 0, doesn't 
321          use disk at all. 
322          @debug: Save the json with nice formatting. 
323          @chunkSize: maximum number of items to send in each at each HTTP POST. 
324          @maxHttpRequestSize: limits the size of encoded data for AE, the default 
325          is 1MB. 
326          """ 
327           
328          self.serverUrl = serverUrl 
329          self.debug = debug 
330          self.chunkSize = chunkSize 
331          self.lastPushWasSuccessful = True 
332          self.maxHttpRequestSize = maxHttpRequestSize 
333          if maxDiskItems != 0: 
334               
335              path = ('events_' + 
336                      urlparse.urlparse(self.serverUrl)[1].split(':')[0]) 
337              queue = PersistentQueue( 
338                          primaryQueue=MemoryQueue(maxItems=maxMemoryItems), 
339                          secondaryQueue=DiskQueue(path, maxItems=maxDiskItems)) 
340          else: 
341              path = None 
342              queue = MemoryQueue(maxItems=maxMemoryItems) 
343   
344           
345          StatusPush.__init__(self, serverPushCb=HttpStatusPush.pushHttp, 
346                              queue=queue, path=path, **kwargs) 
 347   
349          return self.lastPushWasSuccessful 
 350   
380   
382          """Do the HTTP POST to the server.""" 
383          (encoded_packets, items) = self.popChunk() 
384   
385          def Success(result): 
386              """Queue up next push.""" 
387              log.msg('Sent %d events to %s' % (len(items), self.serverUrl)) 
388              self.lastPushWasSuccessful = True 
389              return self.queueNextServerPush() 
 390   
391          def Failure(result): 
392              """Insert back items not sent and queue up next push.""" 
393               
394              log.msg('Failed to push %d events to %s: %s' % 
395                      (len(items), self.serverUrl, str(result))) 
396              self.queue.insertBackChunk(items) 
397              if self.stopped: 
398                   
399                   
400                   
401                  self.queue.save() 
402              self.lastPushWasSuccessful = False 
403              return self.queueNextServerPush() 
 404   
405           
406          headers = {'Content-Type': 'application/x-www-form-urlencoded'} 
407          connection = client.getPage(self.serverUrl, 
408                                      method='POST', 
409                                      postdata=encoded_packets, 
410                                      headers=headers, 
411                                      agent='buildbot') 
412          connection.addCallbacks(Success, Failure) 
413          return connection 
414   
415   
416