1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 from __future__ import with_statement
17
18
19 """Push events to an abstract receiver.
20
21 Implements the HTTP receiver."""
22
23 import datetime
24 import os
25 import urllib
26 import urlparse
27
28 try:
29 import simplejson as json
30 assert json
31 except ImportError:
32 import json
33
34 from buildbot import config
35 from buildbot.status.base import StatusReceiverMultiService
36 from buildbot.status.persistent_queue import DiskQueue, IndexedQueue, \
37 MemoryQueue, PersistentQueue
38 from buildbot.status.web.status_json import FilterOut
39 from twisted.internet import defer, reactor
40 from twisted.python import log
41 from twisted.web import client
42
43
44
45 -class StatusPush(StatusReceiverMultiService):
46 """Event streamer to a abstract channel.
47
48 It uses IQueue to batch push requests and queue the data when
49 the receiver is down.
50 When a PersistentQueue object is used, the items are saved to disk on master
51 shutdown so they can be pushed back when the master is restarted.
52 """
53
54 - def __init__(self, serverPushCb, queue=None, path=None, filter=True,
55 bufferDelay=1, retryDelay=5, blackList=None):
56 """
57 @serverPushCb: callback to be used. It receives 'self' as parameter. It
58 should call self.queueNextServerPush() when it's done to queue the next
59 push. It is guaranteed that the queue is not empty when this function is
60 called.
61 @queue: a item queue that implements IQueue.
62 @path: path to save config.
63 @filter: when True (default), removes all "", None, False, [] or {}
64 entries.
65 @bufferDelay: amount of time events are queued before sending, to
66 reduce the number of push requests rate. This is the delay between the
67 end of a request to initializing a new one.
68 @retryDelay: amount of time between retries when no items were pushed on
69 last serverPushCb call.
70 @blackList: events that shouldn't be sent.
71 """
72 StatusReceiverMultiService.__init__(self)
73
74
75 self.queue = queue
76 if self.queue is None:
77 self.queue = MemoryQueue()
78 self.queue = IndexedQueue(self.queue)
79 self.path = path
80 self.filter = filter
81 self.bufferDelay = bufferDelay
82 self.retryDelay = retryDelay
83 if not callable(serverPushCb):
84 raise NotImplementedError('Please pass serverPushCb parameter.')
85 def hookPushCb():
86
87
88 if not self.queue.nbItems():
89 return
90 self.lastIndex = self.queue.getIndex()
91 return serverPushCb(self)
92 self.serverPushCb = hookPushCb
93 self.blackList = blackList
94
95
96
97 self.task = None
98 self.stopped = False
99 self.lastIndex = -1
100 self.state = {}
101 self.state['started'] = str(datetime.datetime.utcnow())
102 self.state['next_id'] = 1
103 self.state['last_id_pushed'] = 0
104
105 if self.path and os.path.isdir(self.path):
106 state_path = os.path.join(self.path, 'state')
107 if os.path.isfile(state_path):
108 with open(state_path, 'r') as f:
109 self.state.update(json.load(f))
110
111 if self.queue.nbItems():
112
113 self.queueNextServerPush()
114
121
123 """Returns if the "virtual pointer" in the queue advanced."""
124 return self.lastIndex <= self.queue.getIndex()
125
127 """Queue the next push or call it immediately.
128
129 Called to signal new items are available to be sent or on shutdown.
130 A timer should be queued to trigger a network request or the callback
131 should be called immediately. If a status push is already queued, ignore
132 the current call."""
133
134 if self.wasLastPushSuccessful():
135 if self.stopped:
136
137 delay = 0
138 else:
139
140 delay = self.bufferDelay
141 else:
142 if self.stopped:
143
144
145 return
146 else:
147
148 delay = self.retryDelay
149
150
151 if self.task:
152
153 if self.task.active():
154
155
156 return
157 else:
158 if self.task.active():
159
160
161 self.task.cancel()
162
163 self.task = None
164
165
166 if delay:
167
168 self.task = reactor.callLater(delay, self.serverPushCb)
169 elif self.stopped:
170 if not self.queue.nbItems():
171 return
172
173 @defer.inlineCallbacks
174 def BlockForEverythingBeingSent():
175 yield defer.maybeDeferred(self.serverPushCb())
176 return BlockForEverythingBeingSent()
177 else:
178
179
180
181 log.err('Did not expect delay to be 0, but it is.')
182 return
183
208
209 - def push(self, event, **objs):
210 """Push a new event.
211
212 The new event will be either:
213 - Queued in memory to reduce network usage
214 - Queued to disk when the sink server is down
215 - Pushed (along the other queued items) to the server
216 """
217 if self.blackList and event in self.blackList:
218 return
219
220 packet = {}
221 packet['id'] = self.state['next_id']
222 self.state['next_id'] += 1
223 packet['timestamp'] = str(datetime.datetime.utcnow())
224 packet['project'] = self.status.getTitle()
225 packet['started'] = self.state['started']
226 packet['event'] = event
227 packet['payload'] = {}
228 for obj_name, obj in objs.items():
229 if hasattr(obj, 'asDict'):
230 obj = obj.asDict()
231 if self.filter:
232 obj = FilterOut(obj)
233 packet['payload'][obj_name] = obj
234 self.queue.pushItem(packet)
235 if self.task is None or not self.task.active():
236
237 return self.queueNextServerPush()
238
239
240
244
247
249 self.push('requestSubmitted', request=request)
250
253
256
260
262 self.push('builderChangedState', builderName=builderName, state=state)
263
267
270
275
276 - def stepTextChanged(self, build, step, text):
277 self.push('stepTextChanged',
278 properties=build.getProperties().asList(),
279 step=step,
280 text=text)
281
282 - def stepText2Changed(self, build, step, text2):
283 self.push('stepText2Changed',
284 properties=build.getProperties().asList(),
285 step=step,
286 text2=text2)
287
294
299
304
309
312
314 self.push('buildedRemoved', builderName=builderName)
315
317 self.push('changeAdded', change=change)
318
321
324
327 """Event streamer to a HTTP server."""
328
329 - def __init__(self, serverUrl, debug=None, maxMemoryItems=None,
330 maxDiskItems=None, chunkSize=200, maxHttpRequestSize=2**20,
331 extra_post_params=None, **kwargs):
332 """
333 @serverUrl: Base URL to be used to push events notifications.
334 @maxMemoryItems: Maximum number of items to keep queued in memory.
335 @maxDiskItems: Maximum number of items to buffer to disk, if 0, doesn't
336 use disk at all.
337 @debug: Save the json with nice formatting.
338 @chunkSize: maximum number of items to send in each at each HTTP POST.
339 @maxHttpRequestSize: limits the size of encoded data for AE, the default
340 is 1MB.
341 """
342 if not serverUrl:
343 raise config.ConfigErrors(['HttpStatusPush requires a serverUrl'])
344
345
346 self.serverUrl = serverUrl
347 self.extra_post_params = extra_post_params or {}
348 self.debug = debug
349 self.chunkSize = chunkSize
350 self.lastPushWasSuccessful = True
351 self.maxHttpRequestSize = maxHttpRequestSize
352 if maxDiskItems != 0:
353
354 path = ('events_' +
355 urlparse.urlparse(self.serverUrl)[1].split(':')[0])
356 queue = PersistentQueue(
357 primaryQueue=MemoryQueue(maxItems=maxMemoryItems),
358 secondaryQueue=DiskQueue(path, maxItems=maxDiskItems))
359 else:
360 path = None
361 queue = MemoryQueue(maxItems=maxMemoryItems)
362
363
364 StatusPush.__init__(self, serverPushCb=HttpStatusPush.pushHttp,
365 queue=queue, path=path, **kwargs)
366
368 return self.lastPushWasSuccessful
369
371 """Pops items from the pending list.
372
373 They must be queued back on failure."""
374 if self.wasLastPushSuccessful():
375 chunkSize = self.chunkSize
376 else:
377 chunkSize = 1
378
379 while True:
380 items = self.queue.popChunk(chunkSize)
381 if self.debug:
382 packets = json.dumps(items, indent=2, sort_keys=True)
383 else:
384 packets = json.dumps(items, separators=(',',':'))
385 params = {'packets': packets}
386 params.update(self.extra_post_params)
387 data = urllib.urlencode(params)
388 if (not self.maxHttpRequestSize or
389 len(data) < self.maxHttpRequestSize):
390 return (data, items)
391
392 if chunkSize == 1:
393
394 log.msg("ERROR: packet %s was dropped, too large: %d > %d" %
395 (items[0]['id'], len(data), self.maxHttpRequestSize))
396 chunkSize = self.chunkSize
397 else:
398
399 chunkSize /= 2
400 self.queue.insertBackChunk(items)
401
403 """Do the HTTP POST to the server."""
404 (encoded_packets, items) = self.popChunk()
405
406 def Success(result):
407 """Queue up next push."""
408 log.msg('Sent %d events to %s' % (len(items), self.serverUrl))
409 self.lastPushWasSuccessful = True
410 return self.queueNextServerPush()
411
412 def Failure(result):
413 """Insert back items not sent and queue up next push."""
414
415 log.msg('Failed to push %d events to %s: %s' %
416 (len(items), self.serverUrl, str(result)))
417 self.queue.insertBackChunk(items)
418 if self.stopped:
419
420
421
422 self.queue.save()
423 self.lastPushWasSuccessful = False
424 return self.queueNextServerPush()
425
426
427 headers = {'Content-Type': 'application/x-www-form-urlencoded'}
428 connection = client.getPage(self.serverUrl,
429 method='POST',
430 postdata=encoded_packets,
431 headers=headers,
432 agent='buildbot')
433 connection.addCallbacks(Success, Failure)
434 return connection
435
436
437