1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 """
16 Buildbot metrics module
17
18 Keeps track of counts and timings of various internal buildbot
19 activities.
20
21 Basic architecture:
22
23 MetricEvent.log(...)
24 ||
25 \/
26 MetricLogObserver
27 ||
28 \/
29 MetricHandler
30 ||
31 \/
32 MetricWatcher
33 """
34 from collections import deque
35
36 from twisted.python import log
37 from twisted.internet.task import LoopingCall
38 from twisted.internet import reactor
39 from twisted.application import service
40
41 from buildbot import util
42 from buildbot.util.bbcollections import defaultdict
43
44 import gc, os, sys
45
46 try:
47 import resource
48 assert resource
49 except ImportError:
50 resource = None
53 @classmethod
54 - def log(cls, *args, **kwargs):
55 log.msg(metric=cls(*args, **kwargs))
56
58 - def __init__(self, counter, count=1, absolute=False):
59 self.counter = counter
60 self.count = count
61 self.absolute = absolute
62
67
68 ALARM_OK, ALARM_WARN, ALARM_CRIT = range(3)
69 ALARM_TEXT = ["OK", "WARN", "CRIT"]
73 self.alarm = alarm
74 self.level = level
75 self.msg = msg
76
78 def decorator(func):
79 def wrapper(*args, **kwargs):
80 MetricCountEvent.log(counter=counter)
81 return func(*args, **kwargs)
82 return wrapper
83 return decorator
84
86
87 _reactor = None
88
92
94 def wrapper(*args, **kwargs):
95 self.start()
96 return func(*args, **kwargs)
97 return wrapper
98
100 def wrapper(*args, **kwargs):
101 try:
102 return func(*args, **kwargs)
103 finally:
104 self.stop()
105 return wrapper
106
109
115
117 def decorator(func):
118 t = Timer(name)
119 t._reactor=_reactor
120 def wrapper(*args, **kwargs):
121 t.start()
122 try:
123 return func(*args, **kwargs)
124 finally:
125 t.stop()
126 return wrapper
127 return decorator
128
131 self._maxlen = maxlen
132 deque.__init__(self)
133
135 deque.append(self, o)
136 if len(self) > self._maxlen:
137 self.popleft()
138
143
147
149 if len(self) == 0:
150 self.average = 0
151 else:
152 self.average = float(sum(self)) / len(self)
153
154 return self.average
155
162
165
168
169
171 raise NotImplementedError
172
173 - def handle(self, eventDict, metric):
174 raise NotImplementedError
175
176 - def get(self, metric):
177 raise NotImplementedError
178
180 raise NotImplementedError
181
183 raise NotImplementedError
184
186 raise NotImplementedError
187
189 _counters = None
192
193 - def handle(self, eventDict, metric):
194 if metric.absolute:
195 self._counters[metric.counter] = metric.count
196 else:
197 self._counters[metric.counter] += metric.count
198
201
202 - def get(self, counter):
204
206 retval = []
207 for counter in sorted(self.keys()):
208 retval.append("Counter %s: %i" % (counter, self.get(counter)))
209 return "\n".join(retval)
210
212 retval = {}
213 for counter in sorted(self.keys()):
214 retval[counter] = self.get(counter)
215 return dict(counters=retval)
216
218 _timers = None
221
222 - def handle(self, eventDict, metric):
224
227
228 - def get(self, timer):
230
232 retval = []
233 for timer in sorted(self.keys()):
234 retval.append("Timer %s: %.3g" % (timer, self.get(timer)))
235 return "\n".join(retval)
236
238 retval = {}
239 for timer in sorted(self.keys()):
240 retval[timer] = self.get(timer)
241 return dict(timers=retval)
242
244 _alarms = None
247
248 - def handle(self, eventDict, metric):
250
259
265
293
321
323 if sys.platform == 'linux2':
324 try:
325 return int(open("/proc/%i/statm" % os.getpid()).read().split()[1])
326 except:
327 return 0
328 return 0
329
331
332 garbage_count = len(gc.garbage)
333 MetricCountEvent.log('gc.garbage', garbage_count, absolute=True)
334 if garbage_count == 0:
335 level = ALARM_OK
336 else:
337 level = ALARM_WARN
338 MetricAlarmEvent.log('gc.garbage', level=level)
339
340 if resource:
341 r = resource.getrusage(resource.RUSAGE_SELF)
342 attrs = ['ru_utime', 'ru_stime', 'ru_maxrss', 'ru_ixrss', 'ru_idrss',
343 'ru_isrss', 'ru_minflt', 'ru_majflt', 'ru_nswap',
344 'ru_inblock', 'ru_oublock', 'ru_msgsnd', 'ru_msgrcv',
345 'ru_nsignals', 'ru_nvcsw', 'ru_nivcsw']
346 for i,a in enumerate(attrs):
347
348
349 v = r[i]
350 if a == 'ru_maxrss' and v == 0:
351 v = _get_rss() * resource.getpagesize() / 1024
352 MetricCountEvent.log('resource.%s' % a, v, absolute=True)
353 MetricCountEvent.log('resource.pagesize', resource.getpagesize(), absolute=True)
354
355 then = util.now(_reactor)
356 dt = 0.1
357 def cb():
358 now = util.now(_reactor)
359 delay = (now - then) - dt
360 MetricTimeEvent.log("reactorDelay", delay)
361 _reactor.callLater(dt, cb)
362
364 _reactor = reactor
384
386 self.config = config
387 log_interval = self.config.get('log_interval', 60)
388 if self.log_task:
389 self.log_task.stop()
390 if log_interval:
391
392 self.log_task = LoopingCall(self.report)
393 self.log_task.clock = self._reactor
394 self.log_task.start(log_interval)
395 else:
396 self.log_task = None
397
398 periodic_interval = self.config.get('periodic_interval', 10)
399 if self.periodic_task:
400 self.periodic_task.stop()
401 if periodic_interval:
402 self.periodic_task = LoopingCall(periodicCheck, self._reactor)
403 self.periodic_task.clock = self._reactor
404 self.periodic_task.start(periodic_interval)
405 else:
406 self.periodic_task = None
407
414
416 log.msg("Stopping %s" % self)
417 service.MultiService.stopService(self)
418
419 if self.periodic_task:
420 self.periodic_task.stop()
421 self.periodic_task = None
422
423 if self.log_task:
424 self.log_task.stop()
425 self.log_task = None
426
427 log.removeObserver(self.emit)
428
430 old = self.getHandler(interface)
431 self.handlers[interface] = handler
432 return old
433
435 return self.handlers.get(interface)
436
437 - def emit(self, eventDict):
438
439 metric = eventDict.get('metric')
440 if not metric or not isinstance(metric, MetricEvent):
441 return
442
443 if metric.__class__ not in self.handlers:
444 return
445
446 h = self.handlers[metric.__class__]
447 h.handle(eventDict, metric)
448 for w in h.watchers:
449 w.run()
450
452 retval = {}
453 for interface, handler in self.handlers.iteritems():
454 retval.update(handler.asDict())
455 return retval
456
458 try:
459 for interface, handler in self.handlers.iteritems():
460 report = handler.report()
461 if not report:
462 continue
463 for line in report.split("\n"):
464 log.msg(line)
465 except:
466 log.err()
467