1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 from __future__ import with_statement
17 """
18 Buildbot metrics module
19
20 Keeps track of counts and timings of various internal buildbot
21 activities.
22
23 Basic architecture:
24
25 MetricEvent.log(...)
26 ||
27 \/
28 MetricLogObserver
29 ||
30 \/
31 MetricHandler
32 ||
33 \/
34 MetricWatcher
35 """
36 from collections import deque
37
38 from twisted.python import log
39 from twisted.internet.task import LoopingCall
40 from twisted.internet import reactor
41 from twisted.application import service
42 from buildbot import util, config
43 from collections import defaultdict
44
45 import gc, os, sys
46
47 try:
48 import resource
49 assert resource
50 except ImportError:
51 resource = None
54 @classmethod
55 - def log(cls, *args, **kwargs):
56 log.msg(metric=cls(*args, **kwargs))
57
59 - def __init__(self, counter, count=1, absolute=False):
60 self.counter = counter
61 self.count = count
62 self.absolute = absolute
63
68
69 ALARM_OK, ALARM_WARN, ALARM_CRIT = range(3)
70 ALARM_TEXT = ["OK", "WARN", "CRIT"]
74 self.alarm = alarm
75 self.level = level
76 self.msg = msg
77
79 def decorator(func):
80 def wrapper(*args, **kwargs):
81 MetricCountEvent.log(counter=counter)
82 return func(*args, **kwargs)
83 return wrapper
84 return decorator
85
87
88 _reactor = None
89
93
95 def wrapper(*args, **kwargs):
96 self.start()
97 return func(*args, **kwargs)
98 return wrapper
99
101 def wrapper(*args, **kwargs):
102 try:
103 return func(*args, **kwargs)
104 finally:
105 self.stop()
106 return wrapper
107
110
116
118 def decorator(func):
119 t = Timer(name)
120 t._reactor=_reactor
121 def wrapper(*args, **kwargs):
122 t.start()
123 try:
124 return func(*args, **kwargs)
125 finally:
126 t.stop()
127 return wrapper
128 return decorator
129
132 self._maxlen = maxlen
133 deque.__init__(self)
134
136 deque.append(self, o)
137 if len(self) > self._maxlen:
138 self.popleft()
139
144
148
150 if len(self) == 0:
151 self.average = 0
152 else:
153 self.average = float(sum(self)) / len(self)
154
155 return self.average
156
163
166
169
170
172 raise NotImplementedError
173
174 - def handle(self, eventDict, metric):
175 raise NotImplementedError
176
177 - def get(self, metric):
178 raise NotImplementedError
179
181 raise NotImplementedError
182
184 raise NotImplementedError
185
187 raise NotImplementedError
188
190 _counters = None
193
194 - def handle(self, eventDict, metric):
195 if metric.absolute:
196 self._counters[metric.counter] = metric.count
197 else:
198 self._counters[metric.counter] += metric.count
199
202
203 - def get(self, counter):
205
207 retval = []
208 for counter in sorted(self.keys()):
209 retval.append("Counter %s: %i" % (counter, self.get(counter)))
210 return "\n".join(retval)
211
213 retval = {}
214 for counter in sorted(self.keys()):
215 retval[counter] = self.get(counter)
216 return dict(counters=retval)
217
219 _timers = None
222
223 - def handle(self, eventDict, metric):
225
228
229 - def get(self, timer):
231
233 retval = []
234 for timer in sorted(self.keys()):
235 retval.append("Timer %s: %.3g" % (timer, self.get(timer)))
236 return "\n".join(retval)
237
239 retval = {}
240 for timer in sorted(self.keys()):
241 retval[timer] = self.get(timer)
242 return dict(timers=retval)
243
245 _alarms = None
248
249 - def handle(self, eventDict, metric):
251
260
266
296
324
326 if sys.platform == 'linux2':
327 try:
328 with open("/proc/%i/statm" % os.getpid()) as f:
329 return int(f.read().split()[1])
330 except:
331 return 0
332 return 0
333
335 try:
336
337 garbage_count = len(gc.garbage)
338 MetricCountEvent.log('gc.garbage', garbage_count, absolute=True)
339 if garbage_count == 0:
340 level = ALARM_OK
341 else:
342 level = ALARM_WARN
343 MetricAlarmEvent.log('gc.garbage', level=level)
344
345 if resource:
346 r = resource.getrusage(resource.RUSAGE_SELF)
347 attrs = ['ru_utime', 'ru_stime', 'ru_maxrss', 'ru_ixrss', 'ru_idrss',
348 'ru_isrss', 'ru_minflt', 'ru_majflt', 'ru_nswap',
349 'ru_inblock', 'ru_oublock', 'ru_msgsnd', 'ru_msgrcv',
350 'ru_nsignals', 'ru_nvcsw', 'ru_nivcsw']
351 for i,a in enumerate(attrs):
352
353
354 v = r[i]
355 if a == 'ru_maxrss' and v == 0:
356 v = _get_rss() * resource.getpagesize() / 1024
357 MetricCountEvent.log('resource.%s' % a, v, absolute=True)
358 MetricCountEvent.log('resource.pagesize', resource.getpagesize(), absolute=True)
359
360 then = util.now(_reactor)
361 dt = 0.1
362 def cb():
363 now = util.now(_reactor)
364 delay = (now - then) - dt
365 MetricTimeEvent.log("reactorDelay", delay)
366 _reactor.callLater(dt, cb)
367 except Exception:
368 log.err(None, "while collecting VM metrics")
369
370 -class MetricLogObserver(config.ReconfigurableServiceMixin,
371 service.MultiService):
372 _reactor = reactor
395
397
398 if new_config.metrics is None:
399 self.disable()
400 else:
401 self.enable()
402
403 metrics_config = new_config.metrics
404
405
406 log_interval = metrics_config.get('log_interval', 60)
407 if log_interval != self.log_interval:
408 if self.log_task:
409 self.log_task.stop()
410 self.log_task = None
411 if log_interval:
412 self.log_task = LoopingCall(self.report)
413 self.log_task.clock = self._reactor
414 self.log_task.start(log_interval)
415
416
417 periodic_interval = metrics_config.get('periodic_interval', 10)
418 if periodic_interval != self.periodic_interval:
419 if self.periodic_task:
420 self.periodic_task.stop()
421 self.periodic_task = None
422 if periodic_interval:
423 self.periodic_task = LoopingCall(periodicCheck,
424 self._reactor)
425 self.periodic_task.clock = self._reactor
426 self.periodic_task.start(periodic_interval)
427
428
429 return config.ReconfigurableServiceMixin.reconfigService(self,
430 new_config)
431
435
437 if self.enabled:
438 return
439 log.addObserver(self.emit)
440 self.enabled = True
441
443 if not self.enabled:
444 return
445
446 if self.periodic_task:
447 self.periodic_task.stop()
448 self.periodic_task = None
449
450 if self.log_task:
451 self.log_task.stop()
452 self.log_task = None
453
454 log.removeObserver(self.emit)
455 self.enabled = False
456
458 old = self.getHandler(interface)
459 self.handlers[interface] = handler
460 return old
461
463 return self.handlers.get(interface)
464
465 - def emit(self, eventDict):
466
467 metric = eventDict.get('metric')
468 if not metric or not isinstance(metric, MetricEvent):
469 return
470
471 if metric.__class__ not in self.handlers:
472 return
473
474 h = self.handlers[metric.__class__]
475 h.handle(eventDict, metric)
476 for w in h.watchers:
477 w.run()
478
480 retval = {}
481 for interface, handler in self.handlers.iteritems():
482 retval.update(handler.asDict())
483 return retval
484
486 try:
487 for interface, handler in self.handlers.iteritems():
488 report = handler.report()
489 if not report:
490 continue
491 for line in report.split("\n"):
492 log.msg(line)
493 except:
494 log.err(None, "generating metric report")
495