Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions scalyr_agent/builtin_monitors/linux_system_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import os
import re
import scalyr_agent.third_party.tcollector.tcollector as tcollector
import time
from Queue import Empty
from scalyr_agent import ScalyrMonitor, BadMonitorConfiguration, define_metric, define_log_field, define_config_option
from scalyr_agent.third_party.tcollector.tcollector import ReaderThread
Expand Down Expand Up @@ -243,6 +244,12 @@
'to create the full interface name when interating over network interfaces in /dev'
)

define_config_option(__monitor__, 'log_all_interval',
'Time in seconds between logging of the full set of metrics. Default is to log the full set '
'of metrics every time the monitor runs. If this is set to a value greater than the interval '
'between runs of the monitor, then intervening monitor runs will only log values that have '
'changed since the last monitor run.')

class TcollectorOptions(object):
"""Bare minimum implementation of an object to represent the tcollector options.

Expand Down Expand Up @@ -283,16 +290,40 @@ def __init__(self, monitor, queue, logger, error_logger):
self.__error_logger = error_logger
self.__timestamp_matcher = re.compile('(\\S+)\\s+\\d+\\s+(.*)')
self.__key_value_matcher = re.compile('(\\S+)=(\\S+)')
self.__parts_matcher = re.compile('(\\S+)\\s+([\\d.]+)\\s*(.*)')
self.__last_values = {}

def __rewrite_tsdb_line(self, line):
"""Rewrites the TSDB line emitted by the collectors to the format used by the agent-metrics parser."""
"""Rewrites the TSDB line emitted by the collectors to the format used by the agent-metrics parser.
Returns None if the line shouldn't be logged."""
# Strip out the timestamp that is the second token on the line.
match = self.__timestamp_matcher.match(line)
if match is not None:
line = '%s %s' % (match.group(1), match.group(2))

# Lines are of the form
#
# metric value [key=value ...]
#
# The identity of a metric is the metric name plus all its key/value pairs; we'll
# need that to figure out if the value has changed.
if self.__monitor.log_all_interval > 0:
parts_match = self.__parts_matcher.match(line)
if parts_match is not None:
identity = parts_match.group(1) + ' ' + parts_match.group(3)
value = parts_match.group(2)

if identity in self.__last_values:
(previous_value, previous_time) = self.__last_values[identity]
expiration_time = previous_time + self.__monitor.log_all_interval
if previous_value == value and expiration_time > time.time():
return None

self.__last_values[identity] = (value, time.time())

# Now rewrite any key/value pairs from foo=bar to foo="bar"
line = self.__key_value_matcher.sub('\\1="\\2"', line)

return line

def run(self):
Expand All @@ -307,15 +338,17 @@ def run(self):
# returned by the queue. See the 'stop' method for details.
if not self._run_state.is_running():
continue
self.__logger.info(line, metric_log_for_monitor=self.__monitor)
if line:
self.__logger.info(line, metric_log_for_monitor=self.__monitor)
while True:
try:
line = self.__rewrite_tsdb_line(self.__queue.get(False))
except Empty:
break
if not self._run_state.is_running():
continue
self.__logger.info(line, metric_log_for_monitor=self.__monitor)
if line:
self.__logger.info(line, metric_log_for_monitor=self.__monitor)

errors = 0 # We managed to do a successful iteration.
except (ArithmeticError, EOFError, EnvironmentError, LookupError,
Expand Down Expand Up @@ -397,6 +430,8 @@ def _initialize(self):
self.modules = tcollector.load_etc_dir(self.options, tags)
self.tags = tags

self.log_all_interval = int(self._config.get('log_all_interval', default='0'))

def run(self):
"""Begins executing the monitor, writing metric output to self._logger."""
tcollector.override_logging(self._logger)
Expand Down