1
2
3 """Push events to an abstract receiver.
4
5 Implements the HTTP receiver."""
6
7 import datetime
8 import logging
9 import os
10 import urllib
11 import urlparse
12
13 try:
14 import simplejson as json
15 except ImportError:
16 import json
17
18 from buildbot.status.base import StatusReceiverMultiService
19 from buildbot.status.persistent_queue import DiskQueue, IndexedQueue, \
20 MemoryQueue, PersistentQueue
21 from buildbot.status.web.status_json import FilterOut
22 from twisted.internet import defer, reactor
23 from twisted.python import log
24 from twisted.web import client
25
26
27
28 -class StatusPush(StatusReceiverMultiService):
29 """Event streamer to a abstract channel.
30
31 It uses IQueue to batch push requests and queue the data when
32 the receiver is down.
33 When a PersistentQueue object is used, the items are saved to disk on master
34 shutdown so they can be pushed back when the master is restarted.
35 """
36
37 - def __init__(self, serverPushCb, queue=None, path=None, filter=True,
38 bufferDelay=1, retryDelay=5, blackList=None):
39 """
40 @serverPushCb: callback to be used. It receives 'self' as parameter. It
41 should call self.queueNextServerPush() when it's done to queue the next
42 push. It is guaranteed that the queue is not empty when this function is
43 called.
44 @queue: a item queue that implements IQueue.
45 @path: path to save config.
46 @filter: when True (default), removes all "", None, False, [] or {}
47 entries.
48 @bufferDelay: amount of time events are queued before sending, to
49 reduce the number of push requests rate. This is the delay between the
50 end of a request to initializing a new one.
51 @retryDelay: amount of time between retries when no items were pushed on
52 last serverPushCb call.
53 @blackList: events that shouldn't be sent.
54 """
55 StatusReceiverMultiService.__init__(self)
56
57
58 self.queue = queue
59 if self.queue is None:
60 self.queue = MemoryQueue()
61 self.queue = IndexedQueue(self.queue)
62 self.path = path
63 self.filter = filter
64 self.bufferDelay = bufferDelay
65 self.retryDelay = retryDelay
66 if not callable(serverPushCb):
67 raise NotImplementedError('Please pass serverPushCb parameter.')
68 def hookPushCb():
69
70
71 if not self.queue.nbItems():
72 return
73 self.lastIndex = self.queue.getIndex()
74 return serverPushCb(self)
75 self.serverPushCb = hookPushCb
76 self.blackList = blackList
77
78
79
80 self.task = None
81 self.stopped = False
82 self.lastIndex = -1
83 self.state = {}
84 self.state['started'] = str(datetime.datetime.utcnow())
85 self.state['next_id'] = 1
86 self.state['last_id_pushed'] = 0
87
88 if self.path and os.path.isdir(self.path):
89 state_path = os.path.join(self.path, 'state')
90 if os.path.isfile(state_path):
91 self.state.update(json.load(open(state_path, 'r')))
92
93 if self.queue.nbItems():
94
95 self.queueNextServerPush()
96
103
105 """Returns if the "virtual pointer" in the queue advanced."""
106 return self.lastIndex <= self.queue.getIndex()
107
109 """Queue the next push or call it immediately.
110
111 Called to signal new items are available to be sent or on shutdown.
112 A timer should be queued to trigger a network request or the callback
113 should be called immediately. If a status push is already queued, ignore
114 the current call."""
115
116 if self.wasLastPushSuccessful():
117 if self.stopped:
118
119 delay = 0
120 else:
121
122 delay = self.bufferDelay
123 else:
124 if self.stopped:
125
126
127 return
128 else:
129
130 delay = self.retryDelay
131
132
133 if self.task:
134
135 if self.task.active():
136
137
138 return
139 else:
140 if self.task.active():
141
142
143 self.task.cancel()
144
145 self.task = None
146
147
148 if delay:
149
150 self.task = reactor.callLater(delay, self.serverPushCb)
151 elif self.stopped:
152 if not self.queue.nbItems():
153 return
154
155 @defer.deferredGenerator
156 def BlockForEverythingBeingSent():
157 d = self.serverPushCb()
158 if d:
159 x = defer.waitForDeferred(d)
160 yield x
161 x.getResult()
162 return BlockForEverythingBeingSent()
163 else:
164
165
166
167 logging.exception('Did not expect delay to be 0, but it is.')
168 return
169
193
194 - def push(self, event, **objs):
195 """Push a new event.
196
197 The new event will be either:
198 - Queued in memory to reduce network usage
199 - Queued to disk when the sink server is down
200 - Pushed (along the other queued items) to the server
201 """
202 if self.blackList and event in self.blackList:
203 return
204
205 packet = {}
206 packet['id'] = self.state['next_id']
207 self.state['next_id'] += 1
208 packet['timestamp'] = str(datetime.datetime.utcnow())
209 packet['project'] = self.status.getProjectName()
210 packet['started'] = self.state['started']
211 packet['event'] = event
212 packet['payload'] = {}
213 for obj_name, obj in objs.items():
214 if hasattr(obj, 'asDict'):
215 obj = obj.asDict()
216 if self.filter:
217 obj = FilterOut(obj)
218 packet['payload'][obj_name] = obj
219 self.queue.pushItem(packet)
220 if self.task is None or not self.task.active():
221
222 return self.queueNextServerPush()
223
224
225
229
232
234 self.push('requestSubmitted', request=request)
235
238
240 self.push('buildsetSubmitted', buildset=buildset)
241
245
247 self.push('builderChangedState', builderName=builderName, state=state)
248
252
255
260
261 - def stepTextChanged(self, build, step, text):
262 self.push('stepTextChanged',
263 properties=build.getProperties().asList(),
264 step=step,
265 text=text)
266
267 - def stepText2Changed(self, build, step, text2):
268 self.push('stepText2Changed',
269 properties=build.getProperties().asList(),
270 step=step,
271 text2=text2)
272
279
284
289
294
297
299 self.push('buildedRemoved', builderName=builderName)
300
302 self.push('changeAdded', change=change)
303
306
309
312 """Event streamer to a HTTP server."""
313
314 - def __init__(self, serverUrl, debug=None, maxMemoryItems=None,
315 maxDiskItems=None, chunkSize=200, maxHttpRequestSize=2**20,
316 **kwargs):
317 """
318 @serverUrl: Base URL to be used to push events notifications.
319 @maxMemoryItems: Maximum number of items to keep queued in memory.
320 @maxDiskItems: Maximum number of items to buffer to disk, if 0, doesn't
321 use disk at all.
322 @debug: Save the json with nice formatting.
323 @chunkSize: maximum number of items to send in each at each HTTP POST.
324 @maxHttpRequestSize: limits the size of encoded data for AE, the default
325 is 1MB.
326 """
327
328 self.serverUrl = serverUrl
329 self.debug = debug
330 self.chunkSize = chunkSize
331 self.lastPushWasSuccessful = True
332 self.maxHttpRequestSize = maxHttpRequestSize
333 if maxDiskItems != 0:
334
335 path = ('events_' +
336 urlparse.urlparse(self.serverUrl)[1].split(':')[0])
337 queue = PersistentQueue(
338 primaryQueue=MemoryQueue(maxItems=maxMemoryItems),
339 secondaryQueue=DiskQueue(path, maxItems=maxDiskItems))
340 else:
341 path = None
342 queue = MemoryQueue(maxItems=maxMemoryItems)
343
344
345 StatusPush.__init__(self, serverPushCb=HttpStatusPush.pushHttp,
346 queue=queue, path=path, **kwargs)
347
349 return self.lastPushWasSuccessful
350
380
382 """Do the HTTP POST to the server."""
383 (encoded_packets, items) = self.popChunk()
384
385 def Success(result):
386 """Queue up next push."""
387 log.msg('Sent %d events to %s' % (len(items), self.serverUrl))
388 self.lastPushWasSuccessful = True
389 return self.queueNextServerPush()
390
391 def Failure(result):
392 """Insert back items not sent and queue up next push."""
393
394 log.msg('Failed to push %d events to %s: %s' %
395 (len(items), self.serverUrl, str(result)))
396 self.queue.insertBackChunk(items)
397 if self.stopped:
398
399
400
401 self.queue.save()
402 self.lastPushWasSuccessful = False
403 return self.queueNextServerPush()
404
405
406 headers = {'Content-Type': 'application/x-www-form-urlencoded'}
407 connection = client.getPage(self.serverUrl,
408 method='POST',
409 postdata=encoded_packets,
410 headers=headers,
411 agent='buildbot')
412 connection.addCallbacks(Success, Failure)
413 return connection
414
415
416