1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17  """Push events to an abstract receiver. 
 18   
 19  Implements the HTTP receiver.""" 
 20   
 21  import datetime 
 22  import logging 
 23  import os 
 24  import urllib 
 25  import urlparse 
 26   
 27  try: 
 28      import simplejson as json 
 29      assert json 
 30  except ImportError: 
 31      import json 
 32   
 33  from buildbot.status.base import StatusReceiverMultiService 
 34  from buildbot.status.persistent_queue import DiskQueue, IndexedQueue, \ 
 35          MemoryQueue, PersistentQueue 
 36  from buildbot.status.web.status_json import FilterOut 
 37  from twisted.internet import defer, reactor 
 38  from twisted.python import log 
 39  from twisted.web import client 
 40   
 41   
 42   
 43 -class StatusPush(StatusReceiverMultiService): 
  44      """Event streamer to a abstract channel. 
 45   
 46      It uses IQueue to batch push requests and queue the data when 
 47      the receiver is down. 
 48      When a PersistentQueue object is used, the items are saved to disk on master 
 49      shutdown so they can be pushed back when the master is restarted. 
 50      """ 
 51   
 52 -    def __init__(self, serverPushCb, queue=None, path=None, filter=True, 
 53                   bufferDelay=1, retryDelay=5, blackList=None): 
  54          """ 
 55          @serverPushCb: callback to be used. It receives 'self' as parameter. It 
 56          should call self.queueNextServerPush() when it's done to queue the next 
 57          push. It is guaranteed that the queue is not empty when this function is 
 58          called. 
 59          @queue: a item queue that implements IQueue. 
 60          @path: path to save config. 
 61          @filter: when True (default), removes all "", None, False, [] or {} 
 62          entries. 
 63          @bufferDelay: amount of time events are queued before sending, to 
 64          reduce the number of push requests rate. This is the delay between the 
 65          end of a request to initializing a new one. 
 66          @retryDelay: amount of time between retries when no items were pushed on 
 67          last serverPushCb call. 
 68          @blackList: events that shouldn't be sent. 
 69          """ 
 70          StatusReceiverMultiService.__init__(self) 
 71   
 72           
 73          self.queue = queue 
 74          if self.queue is None: 
 75              self.queue = MemoryQueue() 
 76          self.queue = IndexedQueue(self.queue) 
 77          self.path = path 
 78          self.filter = filter 
 79          self.bufferDelay = bufferDelay 
 80          self.retryDelay = retryDelay 
 81          if not callable(serverPushCb): 
 82              raise NotImplementedError('Please pass serverPushCb parameter.') 
 83          def hookPushCb(): 
 84               
 85               
 86              if not self.queue.nbItems(): 
 87                  return 
 88              self.lastIndex = self.queue.getIndex() 
 89              return serverPushCb(self) 
  90          self.serverPushCb = hookPushCb 
 91          self.blackList = blackList 
 92   
 93           
 94           
 95          self.task = None 
 96          self.stopped = False 
 97          self.lastIndex = -1 
 98          self.state = {} 
 99          self.state['started'] = str(datetime.datetime.utcnow()) 
100          self.state['next_id'] = 1 
101          self.state['last_id_pushed'] = 0 
102           
103          if self.path and os.path.isdir(self.path): 
104              state_path = os.path.join(self.path, 'state') 
105              if os.path.isfile(state_path): 
106                  self.state.update(json.load(open(state_path, 'r'))) 
107   
108          if self.queue.nbItems(): 
109               
110              self.queueNextServerPush() 
 111   
118   
120          """Returns if the "virtual pointer" in the queue advanced.""" 
121          return self.lastIndex <= self.queue.getIndex() 
 122   
124          """Queue the next push or call it immediately. 
125   
126          Called to signal new items are available to be sent or on shutdown. 
127          A timer should be queued to trigger a network request or the callback 
128          should be called immediately. If a status push is already queued, ignore 
129          the current call.""" 
130           
131          if self.wasLastPushSuccessful(): 
132              if self.stopped: 
133                   
134                  delay = 0 
135              else: 
136                   
137                  delay = self.bufferDelay 
138          else: 
139              if self.stopped: 
140                   
141                   
142                  return 
143              else: 
144                   
145                  delay = self.retryDelay 
146   
147           
148          if self.task: 
149               
150              if self.task.active(): 
151                   
152                   
153                  return 
154              else: 
155                  if self.task.active(): 
156                       
157                       
158                      self.task.cancel() 
159                   
160                  self.task = None 
161   
162           
163          if delay: 
164               
165              self.task = reactor.callLater(delay, self.serverPushCb) 
166          elif self.stopped: 
167              if not self.queue.nbItems(): 
168                  return 
169               
170              @defer.deferredGenerator 
171              def BlockForEverythingBeingSent(): 
172                  d = self.serverPushCb() 
173                  if d: 
174                      x = defer.waitForDeferred(d) 
175                      yield x 
176                      x.getResult() 
 177              return BlockForEverythingBeingSent() 
178          else: 
179               
180               
181               
182              logging.exception('Did not expect delay to be 0, but it is.') 
183              return 
184   
208   
209 -    def push(self, event, **objs): 
 210          """Push a new event. 
211   
212          The new event will be either: 
213          - Queued in memory to reduce network usage 
214          - Queued to disk when the sink server is down 
215          - Pushed (along the other queued items) to the server 
216          """ 
217          if self.blackList and event in self.blackList: 
218              return 
219           
220          packet = {} 
221          packet['id'] = self.state['next_id'] 
222          self.state['next_id'] += 1 
223          packet['timestamp'] = str(datetime.datetime.utcnow()) 
224          packet['project'] = self.status.getTitle() 
225          packet['started'] = self.state['started'] 
226          packet['event'] = event 
227          packet['payload'] = {} 
228          for obj_name, obj in objs.items(): 
229              if hasattr(obj, 'asDict'): 
230                  obj = obj.asDict() 
231              if self.filter: 
232                  obj = FilterOut(obj) 
233              packet['payload'][obj_name] = obj 
234          self.queue.pushItem(packet) 
235          if self.task is None or not self.task.active(): 
236               
237              return self.queueNextServerPush() 
 238   
239       
240   
244   
247   
249          self.push('requestSubmitted', request=request) 
 250   
253   
256   
260   
262          self.push('builderChangedState', builderName=builderName, state=state) 
 263   
267   
270   
275   
276 -    def stepTextChanged(self, build, step, text): 
 277          self.push('stepTextChanged', 
278                    properties=build.getProperties().asList(), 
279                    step=step, 
280                    text=text) 
 281   
282 -    def stepText2Changed(self, build, step, text2): 
 283          self.push('stepText2Changed', 
284                    properties=build.getProperties().asList(), 
285                    step=step, 
286                    text2=text2) 
 287   
294   
299   
304   
309   
312   
314          self.push('buildedRemoved', builderName=builderName) 
 315   
317          self.push('changeAdded', change=change) 
 318   
321   
324   
327      """Event streamer to a HTTP server.""" 
328   
329 -    def __init__(self, serverUrl, debug=None, maxMemoryItems=None, 
330                   maxDiskItems=None, chunkSize=200, maxHttpRequestSize=2**20, 
331                   **kwargs): 
 332          """ 
333          @serverUrl: Base URL to be used to push events notifications. 
334          @maxMemoryItems: Maximum number of items to keep queued in memory. 
335          @maxDiskItems: Maximum number of items to buffer to disk, if 0, doesn't 
336          use disk at all. 
337          @debug: Save the json with nice formatting. 
338          @chunkSize: maximum number of items to send in each at each HTTP POST. 
339          @maxHttpRequestSize: limits the size of encoded data for AE, the default 
340          is 1MB. 
341          """ 
342           
343          self.serverUrl = serverUrl 
344          self.debug = debug 
345          self.chunkSize = chunkSize 
346          self.lastPushWasSuccessful = True 
347          self.maxHttpRequestSize = maxHttpRequestSize 
348          if maxDiskItems != 0: 
349               
350              path = ('events_' + 
351                      urlparse.urlparse(self.serverUrl)[1].split(':')[0]) 
352              queue = PersistentQueue( 
353                          primaryQueue=MemoryQueue(maxItems=maxMemoryItems), 
354                          secondaryQueue=DiskQueue(path, maxItems=maxDiskItems)) 
355          else: 
356              path = None 
357              queue = MemoryQueue(maxItems=maxMemoryItems) 
358   
359           
360          StatusPush.__init__(self, serverPushCb=HttpStatusPush.pushHttp, 
361                              queue=queue, path=path, **kwargs) 
 362   
364          return self.lastPushWasSuccessful 
 365   
395   
397          """Do the HTTP POST to the server.""" 
398          (encoded_packets, items) = self.popChunk() 
399   
400          def Success(result): 
401              """Queue up next push.""" 
402              log.msg('Sent %d events to %s' % (len(items), self.serverUrl)) 
403              self.lastPushWasSuccessful = True 
404              return self.queueNextServerPush() 
 405   
406          def Failure(result): 
407              """Insert back items not sent and queue up next push.""" 
408               
409              log.msg('Failed to push %d events to %s: %s' % 
410                      (len(items), self.serverUrl, str(result))) 
411              self.queue.insertBackChunk(items) 
412              if self.stopped: 
413                   
414                   
415                   
416                  self.queue.save() 
417              self.lastPushWasSuccessful = False 
418              return self.queueNextServerPush() 
 419   
420           
421          headers = {'Content-Type': 'application/x-www-form-urlencoded'} 
422          connection = client.getPage(self.serverUrl, 
423                                      method='POST', 
424                                      postdata=encoded_packets, 
425                                      headers=headers, 
426                                      agent='buildbot') 
427          connection.addCallbacks(Success, Failure) 
428          return connection 
429   
430   
431