1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 from __future__ import with_statement
17 """
18 Buildbot metrics module
19
20 Keeps track of counts and timings of various internal buildbot
21 activities.
22
23 Basic architecture:
24
25 MetricEvent.log(...)
26 ||
27 \/
28 MetricLogObserver
29 ||
30 \/
31 MetricHandler
32 ||
33 \/
34 MetricWatcher
35 """
36 from collections import deque
37
38 from twisted.python import log
39 from twisted.internet.task import LoopingCall
40 from twisted.internet import reactor
41 from twisted.application import service
42 from buildbot import util, config
43 from collections import defaultdict
44
45 import gc, os, sys
46
47 try:
48 import resource
49 assert resource
50 except ImportError:
51 resource = None
54 @classmethod
55 - def log(cls, *args, **kwargs):
56 log.msg(metric=cls(*args, **kwargs))
57
59 - def __init__(self, counter, count=1, absolute=False):
60 self.counter = counter
61 self.count = count
62 self.absolute = absolute
63
68
69 ALARM_OK, ALARM_WARN, ALARM_CRIT = range(3)
70 ALARM_TEXT = ["OK", "WARN", "CRIT"]
74 self.alarm = alarm
75 self.level = level
76 self.msg = msg
77
79 def decorator(func):
80 def wrapper(*args, **kwargs):
81 MetricCountEvent.log(counter=counter)
82 return func(*args, **kwargs)
83 return wrapper
84 return decorator
85
87
88 _reactor = None
89
93
95 def wrapper(*args, **kwargs):
96 self.start()
97 return func(*args, **kwargs)
98 return wrapper
99
101 def wrapper(*args, **kwargs):
102 try:
103 return func(*args, **kwargs)
104 finally:
105 self.stop()
106 return wrapper
107
110
116
118 def decorator(func):
119 t = Timer(name)
120 t._reactor=_reactor
121 def wrapper(*args, **kwargs):
122 t.start()
123 try:
124 return func(*args, **kwargs)
125 finally:
126 t.stop()
127 return wrapper
128 return decorator
129
132 self._maxlen = maxlen
133 deque.__init__(self)
134
136 deque.append(self, o)
137 if len(self) > self._maxlen:
138 self.popleft()
139
144
148
150 if len(self) == 0:
151 self.average = 0
152 else:
153 self.average = float(sum(self)) / len(self)
154
155 return self.average
156
163
166
169
170
172 raise NotImplementedError
173
174 - def handle(self, eventDict, metric):
175 raise NotImplementedError
176
177 - def get(self, metric):
178 raise NotImplementedError
179
181 raise NotImplementedError
182
184 raise NotImplementedError
185
187 raise NotImplementedError
188
190 _counters = None
193
194 - def handle(self, eventDict, metric):
195 if metric.absolute:
196 self._counters[metric.counter] = metric.count
197 else:
198 self._counters[metric.counter] += metric.count
199
202
203 - def get(self, counter):
205
207 retval = []
208 for counter in sorted(self.keys()):
209 retval.append("Counter %s: %i" % (counter, self.get(counter)))
210 return "\n".join(retval)
211
213 retval = {}
214 for counter in sorted(self.keys()):
215 retval[counter] = self.get(counter)
216 return dict(counters=retval)
217
219 _timers = None
222
223 - def handle(self, eventDict, metric):
225
228
229 - def get(self, timer):
231
233 retval = []
234 for timer in sorted(self.keys()):
235 retval.append("Timer %s: %.3g" % (timer, self.get(timer)))
236 return "\n".join(retval)
237
239 retval = {}
240 for timer in sorted(self.keys()):
241 retval[timer] = self.get(timer)
242 return dict(timers=retval)
243
245 _alarms = None
248
249 - def handle(self, eventDict, metric):
251
260
266
296
324
326 if sys.platform == 'linux2':
327 try:
328 with open("/proc/%i/statm" % os.getpid()) as f:
329 return int(f.read().split()[1])
330 except:
331 return 0
332 return 0
333
335
336 garbage_count = len(gc.garbage)
337 MetricCountEvent.log('gc.garbage', garbage_count, absolute=True)
338 if garbage_count == 0:
339 level = ALARM_OK
340 else:
341 level = ALARM_WARN
342 MetricAlarmEvent.log('gc.garbage', level=level)
343
344 if resource:
345 r = resource.getrusage(resource.RUSAGE_SELF)
346 attrs = ['ru_utime', 'ru_stime', 'ru_maxrss', 'ru_ixrss', 'ru_idrss',
347 'ru_isrss', 'ru_minflt', 'ru_majflt', 'ru_nswap',
348 'ru_inblock', 'ru_oublock', 'ru_msgsnd', 'ru_msgrcv',
349 'ru_nsignals', 'ru_nvcsw', 'ru_nivcsw']
350 for i,a in enumerate(attrs):
351
352
353 v = r[i]
354 if a == 'ru_maxrss' and v == 0:
355 v = _get_rss() * resource.getpagesize() / 1024
356 MetricCountEvent.log('resource.%s' % a, v, absolute=True)
357 MetricCountEvent.log('resource.pagesize', resource.getpagesize(), absolute=True)
358
359 then = util.now(_reactor)
360 dt = 0.1
361 def cb():
362 now = util.now(_reactor)
363 delay = (now - then) - dt
364 MetricTimeEvent.log("reactorDelay", delay)
365 _reactor.callLater(dt, cb)
366
367 -class MetricLogObserver(config.ReconfigurableServiceMixin,
368 service.MultiService):
369 _reactor = reactor
392
394
395 if new_config.metrics is None:
396 self.disable()
397 else:
398 self.enable()
399
400 metrics_config = new_config.metrics
401
402
403 log_interval = metrics_config.get('log_interval', 60)
404 if log_interval != self.log_interval:
405 if self.log_task:
406 self.log_task.stop()
407 self.log_task = None
408 if log_interval:
409 self.log_task = LoopingCall(self.report)
410 self.log_task.clock = self._reactor
411 self.log_task.start(log_interval)
412
413
414 periodic_interval = metrics_config.get('periodic_interval', 10)
415 if periodic_interval != self.periodic_interval:
416 if self.periodic_task:
417 self.periodic_task.stop()
418 self.periodic_task = None
419 if periodic_interval:
420 self.periodic_task = LoopingCall(periodicCheck,
421 self._reactor)
422 self.periodic_task.clock = self._reactor
423 self.periodic_task.start(periodic_interval)
424
425
426 return config.ReconfigurableServiceMixin.reconfigService(self,
427 new_config)
428
432
434 if self.enabled:
435 return
436 log.addObserver(self.emit)
437 self.enabled = True
438
440 if not self.enabled:
441 return
442
443 if self.periodic_task:
444 self.periodic_task.stop()
445 self.periodic_task = None
446
447 if self.log_task:
448 self.log_task.stop()
449 self.log_task = None
450
451 log.removeObserver(self.emit)
452 self.enabled = False
453
455 old = self.getHandler(interface)
456 self.handlers[interface] = handler
457 return old
458
460 return self.handlers.get(interface)
461
462 - def emit(self, eventDict):
463
464 metric = eventDict.get('metric')
465 if not metric or not isinstance(metric, MetricEvent):
466 return
467
468 if metric.__class__ not in self.handlers:
469 return
470
471 h = self.handlers[metric.__class__]
472 h.handle(eventDict, metric)
473 for w in h.watchers:
474 w.run()
475
477 retval = {}
478 for interface, handler in self.handlers.iteritems():
479 retval.update(handler.asDict())
480 return retval
481
483 try:
484 for interface, handler in self.handlers.iteritems():
485 report = handler.report()
486 if not report:
487 continue
488 for line in report.split("\n"):
489 log.msg(line)
490 except:
491 log.err()
492