1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 """Push events to an abstract receiver.
18
19 Implements the HTTP receiver."""
20
21 import datetime
22 import os
23 import urllib
24 import urlparse
25
26 try:
27 import simplejson as json
28 assert json
29 except ImportError:
30 import json
31
32 from buildbot.status.base import StatusReceiverMultiService
33 from buildbot.status.persistent_queue import DiskQueue, IndexedQueue, \
34 MemoryQueue, PersistentQueue
35 from buildbot.status.web.status_json import FilterOut
36 from twisted.internet import defer, reactor
37 from twisted.python import log
38 from twisted.web import client
39
40
41
42 -class StatusPush(StatusReceiverMultiService):
43 """Event streamer to a abstract channel.
44
45 It uses IQueue to batch push requests and queue the data when
46 the receiver is down.
47 When a PersistentQueue object is used, the items are saved to disk on master
48 shutdown so they can be pushed back when the master is restarted.
49 """
50
51 - def __init__(self, serverPushCb, queue=None, path=None, filter=True,
52 bufferDelay=1, retryDelay=5, blackList=None):
53 """
54 @serverPushCb: callback to be used. It receives 'self' as parameter. It
55 should call self.queueNextServerPush() when it's done to queue the next
56 push. It is guaranteed that the queue is not empty when this function is
57 called.
58 @queue: a item queue that implements IQueue.
59 @path: path to save config.
60 @filter: when True (default), removes all "", None, False, [] or {}
61 entries.
62 @bufferDelay: amount of time events are queued before sending, to
63 reduce the number of push requests rate. This is the delay between the
64 end of a request to initializing a new one.
65 @retryDelay: amount of time between retries when no items were pushed on
66 last serverPushCb call.
67 @blackList: events that shouldn't be sent.
68 """
69 StatusReceiverMultiService.__init__(self)
70
71
72 self.queue = queue
73 if self.queue is None:
74 self.queue = MemoryQueue()
75 self.queue = IndexedQueue(self.queue)
76 self.path = path
77 self.filter = filter
78 self.bufferDelay = bufferDelay
79 self.retryDelay = retryDelay
80 if not callable(serverPushCb):
81 raise NotImplementedError('Please pass serverPushCb parameter.')
82 def hookPushCb():
83
84
85 if not self.queue.nbItems():
86 return
87 self.lastIndex = self.queue.getIndex()
88 return serverPushCb(self)
89 self.serverPushCb = hookPushCb
90 self.blackList = blackList
91
92
93
94 self.task = None
95 self.stopped = False
96 self.lastIndex = -1
97 self.state = {}
98 self.state['started'] = str(datetime.datetime.utcnow())
99 self.state['next_id'] = 1
100 self.state['last_id_pushed'] = 0
101
102 if self.path and os.path.isdir(self.path):
103 state_path = os.path.join(self.path, 'state')
104 if os.path.isfile(state_path):
105 self.state.update(json.load(open(state_path, 'r')))
106
107 if self.queue.nbItems():
108
109 self.queueNextServerPush()
110
117
119 """Returns if the "virtual pointer" in the queue advanced."""
120 return self.lastIndex <= self.queue.getIndex()
121
123 """Queue the next push or call it immediately.
124
125 Called to signal new items are available to be sent or on shutdown.
126 A timer should be queued to trigger a network request or the callback
127 should be called immediately. If a status push is already queued, ignore
128 the current call."""
129
130 if self.wasLastPushSuccessful():
131 if self.stopped:
132
133 delay = 0
134 else:
135
136 delay = self.bufferDelay
137 else:
138 if self.stopped:
139
140
141 return
142 else:
143
144 delay = self.retryDelay
145
146
147 if self.task:
148
149 if self.task.active():
150
151
152 return
153 else:
154 if self.task.active():
155
156
157 self.task.cancel()
158
159 self.task = None
160
161
162 if delay:
163
164 self.task = reactor.callLater(delay, self.serverPushCb)
165 elif self.stopped:
166 if not self.queue.nbItems():
167 return
168
169 @defer.deferredGenerator
170 def BlockForEverythingBeingSent():
171 d = self.serverPushCb()
172 if d:
173 x = defer.waitForDeferred(d)
174 yield x
175 x.getResult()
176 return BlockForEverythingBeingSent()
177 else:
178
179
180
181 log.err('Did not expect delay to be 0, but it is.')
182 return
183
207
208 - def push(self, event, **objs):
209 """Push a new event.
210
211 The new event will be either:
212 - Queued in memory to reduce network usage
213 - Queued to disk when the sink server is down
214 - Pushed (along the other queued items) to the server
215 """
216 if self.blackList and event in self.blackList:
217 return
218
219 packet = {}
220 packet['id'] = self.state['next_id']
221 self.state['next_id'] += 1
222 packet['timestamp'] = str(datetime.datetime.utcnow())
223 packet['project'] = self.status.getTitle()
224 packet['started'] = self.state['started']
225 packet['event'] = event
226 packet['payload'] = {}
227 for obj_name, obj in objs.items():
228 if hasattr(obj, 'asDict'):
229 obj = obj.asDict()
230 if self.filter:
231 obj = FilterOut(obj)
232 packet['payload'][obj_name] = obj
233 self.queue.pushItem(packet)
234 if self.task is None or not self.task.active():
235
236 return self.queueNextServerPush()
237
238
239
243
246
248 self.push('requestSubmitted', request=request)
249
252
255
259
261 self.push('builderChangedState', builderName=builderName, state=state)
262
266
269
274
275 - def stepTextChanged(self, build, step, text):
276 self.push('stepTextChanged',
277 properties=build.getProperties().asList(),
278 step=step,
279 text=text)
280
281 - def stepText2Changed(self, build, step, text2):
282 self.push('stepText2Changed',
283 properties=build.getProperties().asList(),
284 step=step,
285 text2=text2)
286
293
298
303
308
311
313 self.push('buildedRemoved', builderName=builderName)
314
316 self.push('changeAdded', change=change)
317
320
323
326 """Event streamer to a HTTP server."""
327
328 - def __init__(self, serverUrl, debug=None, maxMemoryItems=None,
329 maxDiskItems=None, chunkSize=200, maxHttpRequestSize=2**20,
330 **kwargs):
331 """
332 @serverUrl: Base URL to be used to push events notifications.
333 @maxMemoryItems: Maximum number of items to keep queued in memory.
334 @maxDiskItems: Maximum number of items to buffer to disk, if 0, doesn't
335 use disk at all.
336 @debug: Save the json with nice formatting.
337 @chunkSize: maximum number of items to send in each at each HTTP POST.
338 @maxHttpRequestSize: limits the size of encoded data for AE, the default
339 is 1MB.
340 """
341
342 self.serverUrl = serverUrl
343 self.debug = debug
344 self.chunkSize = chunkSize
345 self.lastPushWasSuccessful = True
346 self.maxHttpRequestSize = maxHttpRequestSize
347 if maxDiskItems != 0:
348
349 path = ('events_' +
350 urlparse.urlparse(self.serverUrl)[1].split(':')[0])
351 queue = PersistentQueue(
352 primaryQueue=MemoryQueue(maxItems=maxMemoryItems),
353 secondaryQueue=DiskQueue(path, maxItems=maxDiskItems))
354 else:
355 path = None
356 queue = MemoryQueue(maxItems=maxMemoryItems)
357
358
359 StatusPush.__init__(self, serverPushCb=HttpStatusPush.pushHttp,
360 queue=queue, path=path, **kwargs)
361
363 return self.lastPushWasSuccessful
364
394
396 """Do the HTTP POST to the server."""
397 (encoded_packets, items) = self.popChunk()
398
399 def Success(result):
400 """Queue up next push."""
401 log.msg('Sent %d events to %s' % (len(items), self.serverUrl))
402 self.lastPushWasSuccessful = True
403 return self.queueNextServerPush()
404
405 def Failure(result):
406 """Insert back items not sent and queue up next push."""
407
408 log.msg('Failed to push %d events to %s: %s' %
409 (len(items), self.serverUrl, str(result)))
410 self.queue.insertBackChunk(items)
411 if self.stopped:
412
413
414
415 self.queue.save()
416 self.lastPushWasSuccessful = False
417 return self.queueNextServerPush()
418
419
420 headers = {'Content-Type': 'application/x-www-form-urlencoded'}
421 connection = client.getPage(self.serverUrl,
422 method='POST',
423 postdata=encoded_packets,
424 headers=headers,
425 agent='buildbot')
426 connection.addCallbacks(Success, Failure)
427 return connection
428
429
430