1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 """Push events to an abstract receiver.
18
19 Implements the HTTP receiver."""
20
21 import datetime
22 import logging
23 import os
24 import urllib
25 import urlparse
26
27 try:
28 import simplejson as json
29 assert json
30 except ImportError:
31 import json
32
33 from buildbot.status.base import StatusReceiverMultiService
34 from buildbot.status.persistent_queue import DiskQueue, IndexedQueue, \
35 MemoryQueue, PersistentQueue
36 from buildbot.status.web.status_json import FilterOut
37 from twisted.internet import defer, reactor
38 from twisted.python import log
39 from twisted.web import client
40
41
42
43 -class StatusPush(StatusReceiverMultiService):
44 """Event streamer to a abstract channel.
45
46 It uses IQueue to batch push requests and queue the data when
47 the receiver is down.
48 When a PersistentQueue object is used, the items are saved to disk on master
49 shutdown so they can be pushed back when the master is restarted.
50 """
51
52 - def __init__(self, serverPushCb, queue=None, path=None, filter=True,
53 bufferDelay=1, retryDelay=5, blackList=None):
54 """
55 @serverPushCb: callback to be used. It receives 'self' as parameter. It
56 should call self.queueNextServerPush() when it's done to queue the next
57 push. It is guaranteed that the queue is not empty when this function is
58 called.
59 @queue: a item queue that implements IQueue.
60 @path: path to save config.
61 @filter: when True (default), removes all "", None, False, [] or {}
62 entries.
63 @bufferDelay: amount of time events are queued before sending, to
64 reduce the number of push requests rate. This is the delay between the
65 end of a request to initializing a new one.
66 @retryDelay: amount of time between retries when no items were pushed on
67 last serverPushCb call.
68 @blackList: events that shouldn't be sent.
69 """
70 StatusReceiverMultiService.__init__(self)
71
72
73 self.queue = queue
74 if self.queue is None:
75 self.queue = MemoryQueue()
76 self.queue = IndexedQueue(self.queue)
77 self.path = path
78 self.filter = filter
79 self.bufferDelay = bufferDelay
80 self.retryDelay = retryDelay
81 if not callable(serverPushCb):
82 raise NotImplementedError('Please pass serverPushCb parameter.')
83 def hookPushCb():
84
85
86 if not self.queue.nbItems():
87 return
88 self.lastIndex = self.queue.getIndex()
89 return serverPushCb(self)
90 self.serverPushCb = hookPushCb
91 self.blackList = blackList
92
93
94
95 self.task = None
96 self.stopped = False
97 self.lastIndex = -1
98 self.state = {}
99 self.state['started'] = str(datetime.datetime.utcnow())
100 self.state['next_id'] = 1
101 self.state['last_id_pushed'] = 0
102
103 if self.path and os.path.isdir(self.path):
104 state_path = os.path.join(self.path, 'state')
105 if os.path.isfile(state_path):
106 self.state.update(json.load(open(state_path, 'r')))
107
108 if self.queue.nbItems():
109
110 self.queueNextServerPush()
111
118
120 """Returns if the "virtual pointer" in the queue advanced."""
121 return self.lastIndex <= self.queue.getIndex()
122
124 """Queue the next push or call it immediately.
125
126 Called to signal new items are available to be sent or on shutdown.
127 A timer should be queued to trigger a network request or the callback
128 should be called immediately. If a status push is already queued, ignore
129 the current call."""
130
131 if self.wasLastPushSuccessful():
132 if self.stopped:
133
134 delay = 0
135 else:
136
137 delay = self.bufferDelay
138 else:
139 if self.stopped:
140
141
142 return
143 else:
144
145 delay = self.retryDelay
146
147
148 if self.task:
149
150 if self.task.active():
151
152
153 return
154 else:
155 if self.task.active():
156
157
158 self.task.cancel()
159
160 self.task = None
161
162
163 if delay:
164
165 self.task = reactor.callLater(delay, self.serverPushCb)
166 elif self.stopped:
167 if not self.queue.nbItems():
168 return
169
170 @defer.deferredGenerator
171 def BlockForEverythingBeingSent():
172 d = self.serverPushCb()
173 if d:
174 x = defer.waitForDeferred(d)
175 yield x
176 x.getResult()
177 return BlockForEverythingBeingSent()
178 else:
179
180
181
182 logging.exception('Did not expect delay to be 0, but it is.')
183 return
184
208
209 - def push(self, event, **objs):
210 """Push a new event.
211
212 The new event will be either:
213 - Queued in memory to reduce network usage
214 - Queued to disk when the sink server is down
215 - Pushed (along the other queued items) to the server
216 """
217 if self.blackList and event in self.blackList:
218 return
219
220 packet = {}
221 packet['id'] = self.state['next_id']
222 self.state['next_id'] += 1
223 packet['timestamp'] = str(datetime.datetime.utcnow())
224 packet['project'] = self.status.getProjectName()
225 packet['started'] = self.state['started']
226 packet['event'] = event
227 packet['payload'] = {}
228 for obj_name, obj in objs.items():
229 if hasattr(obj, 'asDict'):
230 obj = obj.asDict()
231 if self.filter:
232 obj = FilterOut(obj)
233 packet['payload'][obj_name] = obj
234 self.queue.pushItem(packet)
235 if self.task is None or not self.task.active():
236
237 return self.queueNextServerPush()
238
239
240
244
247
249 self.push('requestSubmitted', request=request)
250
253
255 self.push('buildsetSubmitted', buildset=buildset)
256
260
262 self.push('builderChangedState', builderName=builderName, state=state)
263
267
270
275
276 - def stepTextChanged(self, build, step, text):
277 self.push('stepTextChanged',
278 properties=build.getProperties().asList(),
279 step=step,
280 text=text)
281
282 - def stepText2Changed(self, build, step, text2):
283 self.push('stepText2Changed',
284 properties=build.getProperties().asList(),
285 step=step,
286 text2=text2)
287
294
299
304
309
312
314 self.push('buildedRemoved', builderName=builderName)
315
317 self.push('changeAdded', change=change)
318
321
324
327 """Event streamer to a HTTP server."""
328
329 - def __init__(self, serverUrl, debug=None, maxMemoryItems=None,
330 maxDiskItems=None, chunkSize=200, maxHttpRequestSize=2**20,
331 **kwargs):
332 """
333 @serverUrl: Base URL to be used to push events notifications.
334 @maxMemoryItems: Maximum number of items to keep queued in memory.
335 @maxDiskItems: Maximum number of items to buffer to disk, if 0, doesn't
336 use disk at all.
337 @debug: Save the json with nice formatting.
338 @chunkSize: maximum number of items to send in each at each HTTP POST.
339 @maxHttpRequestSize: limits the size of encoded data for AE, the default
340 is 1MB.
341 """
342
343 self.serverUrl = serverUrl
344 self.debug = debug
345 self.chunkSize = chunkSize
346 self.lastPushWasSuccessful = True
347 self.maxHttpRequestSize = maxHttpRequestSize
348 if maxDiskItems != 0:
349
350 path = ('events_' +
351 urlparse.urlparse(self.serverUrl)[1].split(':')[0])
352 queue = PersistentQueue(
353 primaryQueue=MemoryQueue(maxItems=maxMemoryItems),
354 secondaryQueue=DiskQueue(path, maxItems=maxDiskItems))
355 else:
356 path = None
357 queue = MemoryQueue(maxItems=maxMemoryItems)
358
359
360 StatusPush.__init__(self, serverPushCb=HttpStatusPush.pushHttp,
361 queue=queue, path=path, **kwargs)
362
364 return self.lastPushWasSuccessful
365
395
397 """Do the HTTP POST to the server."""
398 (encoded_packets, items) = self.popChunk()
399
400 def Success(result):
401 """Queue up next push."""
402 log.msg('Sent %d events to %s' % (len(items), self.serverUrl))
403 self.lastPushWasSuccessful = True
404 return self.queueNextServerPush()
405
406 def Failure(result):
407 """Insert back items not sent and queue up next push."""
408
409 log.msg('Failed to push %d events to %s: %s' %
410 (len(items), self.serverUrl, str(result)))
411 self.queue.insertBackChunk(items)
412 if self.stopped:
413
414
415
416 self.queue.save()
417 self.lastPushWasSuccessful = False
418 return self.queueNextServerPush()
419
420
421 headers = {'Content-Type': 'application/x-www-form-urlencoded'}
422 connection = client.getPage(self.serverUrl,
423 method='POST',
424 postdata=encoded_packets,
425 headers=headers,
426 agent='buildbot')
427 connection.addCallbacks(Success, Failure)
428 return connection
429
430
431