1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 """
16 Buildbot metrics module
17
18 Keeps track of counts and timings of various internal buildbot
19 activities.
20
21 Basic architecture:
22
23 MetricEvent.log(...)
24 ||
25 \/
26 MetricLogObserver
27 ||
28 \/
29 MetricHandler
30 ||
31 \/
32 MetricWatcher
33 """
34 from collections import deque
35
36 from twisted.python import log
37 from twisted.internet.task import LoopingCall
38 from twisted.internet import reactor
39 from twisted.application import service
40 from buildbot import util, config
41 from buildbot.util.bbcollections import defaultdict
42
43 import gc, os, sys
44
45 try:
46 import resource
47 assert resource
48 except ImportError:
49 resource = None
52 @classmethod
53 - def log(cls, *args, **kwargs):
54 log.msg(metric=cls(*args, **kwargs))
55
57 - def __init__(self, counter, count=1, absolute=False):
58 self.counter = counter
59 self.count = count
60 self.absolute = absolute
61
66
67 ALARM_OK, ALARM_WARN, ALARM_CRIT = range(3)
68 ALARM_TEXT = ["OK", "WARN", "CRIT"]
72 self.alarm = alarm
73 self.level = level
74 self.msg = msg
75
77 def decorator(func):
78 def wrapper(*args, **kwargs):
79 MetricCountEvent.log(counter=counter)
80 return func(*args, **kwargs)
81 return wrapper
82 return decorator
83
85
86 _reactor = None
87
91
93 def wrapper(*args, **kwargs):
94 self.start()
95 return func(*args, **kwargs)
96 return wrapper
97
99 def wrapper(*args, **kwargs):
100 try:
101 return func(*args, **kwargs)
102 finally:
103 self.stop()
104 return wrapper
105
108
114
116 def decorator(func):
117 t = Timer(name)
118 t._reactor=_reactor
119 def wrapper(*args, **kwargs):
120 t.start()
121 try:
122 return func(*args, **kwargs)
123 finally:
124 t.stop()
125 return wrapper
126 return decorator
127
130 self._maxlen = maxlen
131 deque.__init__(self)
132
134 deque.append(self, o)
135 if len(self) > self._maxlen:
136 self.popleft()
137
142
146
148 if len(self) == 0:
149 self.average = 0
150 else:
151 self.average = float(sum(self)) / len(self)
152
153 return self.average
154
161
164
167
168
170 raise NotImplementedError
171
172 - def handle(self, eventDict, metric):
173 raise NotImplementedError
174
175 - def get(self, metric):
176 raise NotImplementedError
177
179 raise NotImplementedError
180
182 raise NotImplementedError
183
185 raise NotImplementedError
186
188 _counters = None
191
192 - def handle(self, eventDict, metric):
193 if metric.absolute:
194 self._counters[metric.counter] = metric.count
195 else:
196 self._counters[metric.counter] += metric.count
197
200
201 - def get(self, counter):
203
205 retval = []
206 for counter in sorted(self.keys()):
207 retval.append("Counter %s: %i" % (counter, self.get(counter)))
208 return "\n".join(retval)
209
211 retval = {}
212 for counter in sorted(self.keys()):
213 retval[counter] = self.get(counter)
214 return dict(counters=retval)
215
217 _timers = None
220
221 - def handle(self, eventDict, metric):
223
226
227 - def get(self, timer):
229
231 retval = []
232 for timer in sorted(self.keys()):
233 retval.append("Timer %s: %.3g" % (timer, self.get(timer)))
234 return "\n".join(retval)
235
237 retval = {}
238 for timer in sorted(self.keys()):
239 retval[timer] = self.get(timer)
240 return dict(timers=retval)
241
243 _alarms = None
246
247 - def handle(self, eventDict, metric):
249
258
264
294
322
324 if sys.platform == 'linux2':
325 try:
326 return int(open("/proc/%i/statm" % os.getpid()).read().split()[1])
327 except:
328 return 0
329 return 0
330
332
333 garbage_count = len(gc.garbage)
334 MetricCountEvent.log('gc.garbage', garbage_count, absolute=True)
335 if garbage_count == 0:
336 level = ALARM_OK
337 else:
338 level = ALARM_WARN
339 MetricAlarmEvent.log('gc.garbage', level=level)
340
341 if resource:
342 r = resource.getrusage(resource.RUSAGE_SELF)
343 attrs = ['ru_utime', 'ru_stime', 'ru_maxrss', 'ru_ixrss', 'ru_idrss',
344 'ru_isrss', 'ru_minflt', 'ru_majflt', 'ru_nswap',
345 'ru_inblock', 'ru_oublock', 'ru_msgsnd', 'ru_msgrcv',
346 'ru_nsignals', 'ru_nvcsw', 'ru_nivcsw']
347 for i,a in enumerate(attrs):
348
349
350 v = r[i]
351 if a == 'ru_maxrss' and v == 0:
352 v = _get_rss() * resource.getpagesize() / 1024
353 MetricCountEvent.log('resource.%s' % a, v, absolute=True)
354 MetricCountEvent.log('resource.pagesize', resource.getpagesize(), absolute=True)
355
356 then = util.now(_reactor)
357 dt = 0.1
358 def cb():
359 now = util.now(_reactor)
360 delay = (now - then) - dt
361 MetricTimeEvent.log("reactorDelay", delay)
362 _reactor.callLater(dt, cb)
363
364 -class MetricLogObserver(config.ReconfigurableServiceMixin,
365 service.MultiService):
366 _reactor = reactor
389
391
392 if new_config.metrics is None:
393 self.disable()
394 else:
395 self.enable()
396
397 metrics_config = new_config.metrics
398
399
400 log_interval = metrics_config.get('log_interval', 60)
401 if log_interval != self.log_interval:
402 if self.log_task:
403 self.log_task.stop()
404 self.log_task = None
405 if log_interval:
406 self.log_task = LoopingCall(self.report)
407 self.log_task.clock = self._reactor
408 self.log_task.start(log_interval)
409
410
411 periodic_interval = metrics_config.get('periodic_interval', 10)
412 if periodic_interval != self.periodic_interval:
413 if self.periodic_task:
414 self.periodic_task.stop()
415 self.periodic_task = None
416 if periodic_interval:
417 self.periodic_task = LoopingCall(periodicCheck,
418 self._reactor)
419 self.periodic_task.clock = self._reactor
420 self.periodic_task.start(periodic_interval)
421
422
423 return config.ReconfigurableServiceMixin.reconfigService(self,
424 new_config)
425
429
431 if self.enabled:
432 return
433 log.addObserver(self.emit)
434 self.enabled = True
435
437 if not self.enabled:
438 return
439
440 if self.periodic_task:
441 self.periodic_task.stop()
442 self.periodic_task = None
443
444 if self.log_task:
445 self.log_task.stop()
446 self.log_task = None
447
448 log.removeObserver(self.emit)
449 self.enabled = False
450
452 old = self.getHandler(interface)
453 self.handlers[interface] = handler
454 return old
455
457 return self.handlers.get(interface)
458
459 - def emit(self, eventDict):
460
461 metric = eventDict.get('metric')
462 if not metric or not isinstance(metric, MetricEvent):
463 return
464
465 if metric.__class__ not in self.handlers:
466 return
467
468 h = self.handlers[metric.__class__]
469 h.handle(eventDict, metric)
470 for w in h.watchers:
471 w.run()
472
474 retval = {}
475 for interface, handler in self.handlers.iteritems():
476 retval.update(handler.asDict())
477 return retval
478
480 try:
481 for interface, handler in self.handlers.iteritems():
482 report = handler.report()
483 if not report:
484 continue
485 for line in report.split("\n"):
486 log.msg(line)
487 except:
488 log.err()
489