1
2
3 """Push events to an abstract receiver.
4
5 Implements the HTTP receiver."""
6
7 import datetime
8 import logging
9 import os
10 import urllib
11 import urlparse
12
13 try:
14 import simplejson as json
15 except ImportError:
16 import json
17
18 from buildbot.master import BuildMaster
19 from buildbot.status.base import StatusReceiverMultiService
20 from buildbot.status.persistent_queue import DiskQueue, IndexedQueue, \
21 IQueue, MemoryQueue, PersistentQueue
22 from buildbot.status.web.status_json import FilterOut
23 from twisted.internet import defer, reactor
24 from twisted.python import log
25 from twisted.web import client
26
27
28
29 -class StatusPush(StatusReceiverMultiService):
30 """Event streamer to a abstract channel.
31
32 It uses IQueue to batch push requests and queue the data when
33 the receiver is down.
34 When a PersistentQueue object is used, the items are saved to disk on master
35 shutdown so they can be pushed back when the master is restarted.
36 """
37
38 - def __init__(self, serverPushCb, queue=None, path=None, filter=True,
39 bufferDelay=1, retryDelay=5, blackList=None):
40 """
41 @serverPushCb: callback to be used. It receives 'self' as parameter. It
42 should call self.queueNextServerPush() when it's done to queue the next
43 push. It is guaranteed that the queue is not empty when this function is
44 called.
45 @queue: a item queue that implements IQueue.
46 @path: path to save config.
47 @filter: when True (default), removes all "", None, False, [] or {}
48 entries.
49 @bufferDelay: amount of time events are queued before sending, to
50 reduce the number of push requests rate. This is the delay between the
51 end of a request to initializing a new one.
52 @retryDelay: amount of time between retries when no items were pushed on
53 last serverPushCb call.
54 @blackList: events that shouldn't be sent.
55 """
56 StatusReceiverMultiService.__init__(self)
57
58
59 self.queue = queue
60 if self.queue is None:
61 self.queue = MemoryQueue()
62 self.queue = IndexedQueue(self.queue)
63 self.path = path
64 self.filter = filter
65 self.bufferDelay = bufferDelay
66 self.retryDelay = retryDelay
67 if not callable(serverPushCb):
68 raise NotImplementedError('Please pass serverPushCb parameter.')
69 def hookPushCb():
70
71
72 if not self.queue.nbItems():
73 return
74 self.lastIndex = self.queue.getIndex()
75 return serverPushCb(self)
76 self.serverPushCb = hookPushCb
77 self.blackList = blackList
78
79
80
81 self.task = None
82 self.stopped = False
83 self.lastIndex = -1
84 self.state = {}
85 self.state['started'] = str(datetime.datetime.utcnow())
86 self.state['next_id'] = 1
87 self.state['last_id_pushed'] = 0
88
89 if self.path and os.path.isdir(self.path):
90 state_path = os.path.join(self.path, 'state')
91 if os.path.isfile(state_path):
92 self.state.update(json.load(open(state_path, 'r')))
93
94 if self.queue.nbItems():
95
96 self.queueNextServerPush()
97
104
106 """Returns if the "virtual pointer" in the queue advanced."""
107 return self.lastIndex <= self.queue.getIndex()
108
110 """Queue the next push or call it immediately.
111
112 Called to signal new items are available to be sent or on shutdown.
113 A timer should be queued to trigger a network request or the callback
114 should be called immediately. If a status push is already queued, ignore
115 the current call."""
116
117 if self.wasLastPushSuccessful():
118 if self.stopped:
119
120 delay = 0
121 else:
122
123 delay = self.bufferDelay
124 else:
125 if self.stopped:
126
127
128 return
129 else:
130
131 delay = self.retryDelay
132
133
134 if self.task:
135
136 if self.task.active():
137
138
139 return
140 else:
141 if self.task.active():
142
143
144 self.task.cancel()
145
146 self.task = None
147
148
149 if delay:
150
151 self.task = reactor.callLater(delay, self.serverPushCb)
152 elif self.stopped:
153 if not self.queue.nbItems():
154 return
155
156 @defer.deferredGenerator
157 def BlockForEverythingBeingSent():
158 d = self.serverPushCb()
159 if d:
160 x = defer.waitForDeferred(d)
161 yield x
162 x.getResult()
163 return BlockForEverythingBeingSent()
164 else:
165
166
167
168 logging.exception('Did not expect delay to be 0, but it is.')
169 return
170
194
195 - def push(self, event, **objs):
196 """Push a new event.
197
198 The new event will be either:
199 - Queued in memory to reduce network usage
200 - Queued to disk when the sink server is down
201 - Pushed (along the other queued items) to the server
202 """
203 if self.blackList and event in self.blackList:
204 return
205
206 packet = {}
207 packet['id'] = self.state['next_id']
208 self.state['next_id'] += 1
209 packet['timestamp'] = str(datetime.datetime.utcnow())
210 packet['project'] = self.status.getProjectName()
211 packet['started'] = self.state['started']
212 packet['event'] = event
213 packet['payload'] = {}
214 for obj_name, obj in objs.items():
215 if hasattr(obj, 'asDict'):
216 obj = obj.asDict()
217 if self.filter:
218 obj = FilterOut(obj)
219 packet['payload'][obj_name] = obj
220 self.queue.pushItem(packet)
221 if self.task is None or not self.task.active():
222
223 return self.queueNextServerPush()
224
225
226
230
233
235 self.push('requestSubmitted', request=request)
236
239
241 self.push('buildsetSubmitted', buildset=buildset)
242
246
248 self.push('builderChangedState', builderName=builderName, state=state)
249
253
256
261
262 - def stepTextChanged(self, build, step, text):
263 self.push('stepTextChanged',
264 properties=build.getProperties().asList(),
265 step=step,
266 text=text)
267
268 - def stepText2Changed(self, build, step, text2):
269 self.push('stepText2Changed',
270 properties=build.getProperties().asList(),
271 step=step,
272 text2=text2)
273
280
285
290
295
298
300 self.push('buildedRemoved', builderName=builderName)
301
303 self.push('changeAdded', change=change)
304
307
310
313 """Event streamer to a HTTP server."""
314
315 - def __init__(self, serverUrl, debug=None, maxMemoryItems=None,
316 maxDiskItems=None, chunkSize=200, maxHttpRequestSize=2**20,
317 **kwargs):
318 """
319 @serverUrl: Base URL to be used to push events notifications.
320 @maxMemoryItems: Maximum number of items to keep queued in memory.
321 @maxDiskItems: Maximum number of items to buffer to disk, if 0, doesn't
322 use disk at all.
323 @debug: Save the json with nice formatting.
324 @chunkSize: maximum number of items to send in each at each HTTP POST.
325 @maxHttpRequestSize: limits the size of encoded data for AE, the default
326 is 1MB.
327 """
328
329 self.serverUrl = serverUrl
330 self.debug = debug
331 self.chunkSize = chunkSize
332 self.lastPushWasSuccessful = True
333 self.maxHttpRequestSize = maxHttpRequestSize
334 if maxDiskItems != 0:
335
336 path = ('events_' +
337 urlparse.urlparse(self.serverUrl)[1].split(':')[0])
338 queue = PersistentQueue(
339 primaryQueue=MemoryQueue(maxItems=maxMemoryItems),
340 secondaryQueue=DiskQueue(path, maxItems=maxDiskItems))
341 else:
342 path = None
343 queue = MemoryQueue(maxItems=maxMemoryItems)
344
345
346 StatusPush.__init__(self, serverPushCb=HttpStatusPush.pushHttp,
347 queue=queue, path=path, **kwargs)
348
350 return self.lastPushWasSuccessful
351
353 """Pops items from the pending list.
354
355 They must be queued back on failure."""
356 if self.wasLastPushSuccessful():
357 chunkSize = self.chunkSize
358 else:
359 chunkSize = 1
360
361 while True:
362 items = self.queue.popChunk(chunkSize)
363 if self.debug:
364 packets = json.dumps(items, indent=2, sort_keys=True)
365 else:
366 packets = json.dumps(items, separators=(',',':'))
367 data = urllib.urlencode({'packets': packets})
368 if (not self.maxHttpRequestSize or
369 len(data) < self.maxHttpRequestSize):
370 return (data, items)
371
372 if chunkSize == 1:
373
374 log.msg("ERROR: packet %s was dropped, too large: %d > %d" %
375 (items[0]['id'], len(data), self.maxHttpRequestSize))
376 chunkSize = self.chunkSize
377 else:
378
379 chunkSize /= 2
380 self.queue.insertBackChunk(items)
381
383 """Do the HTTP POST to the server."""
384 (encoded_packets, items) = self.popChunk()
385
386 def Success(result):
387 """Queue up next push."""
388 log.msg('Sent %d events to %s' % (len(items), self.serverUrl))
389 self.lastPushWasSuccessful = True
390 return self.queueNextServerPush()
391
392 def Failure(result):
393 """Insert back items not sent and queue up next push."""
394
395 log.msg('Failed to push %d events to %s: %s' %
396 (len(items), self.serverUrl, str(result)))
397 self.queue.insertBackChunk(items)
398 if self.stopped:
399
400
401
402 self.queue.save()
403 self.lastPushWasSuccessful = False
404 return self.queueNextServerPush()
405
406
407 headers = {'Content-Type': 'application/x-www-form-urlencoded'}
408 connection = client.getPage(self.serverUrl,
409 method='POST',
410 postdata=encoded_packets,
411 headers=headers,
412 agent='buildbot')
413 connection.addCallbacks(Success, Failure)
414 return connection
415
416
417