Untitled diff

Created Diff never expires
8 removals
Lines
Total
Removed
Words
Total
Removed
To continue using this feature, upgrade to
Diffchecker logo
Diffchecker Pro
526 lines
12 additions
Lines
Total
Added
Words
Total
Added
To continue using this feature, upgrade to
Diffchecker logo
Diffchecker Pro
530 lines
# (C) Datadog, Inc. 2012-2016
# (C) Datadog, Inc. 2012-2016
# All rights reserved
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
# Licensed under Simplified BSD License (see LICENSE)


# stdlib
# stdlib
from collections import defaultdict
from collections import defaultdict
import copy
import copy
import re
import re
import time
import time

# 3rd party
# 3rd party
from haproxy import haproxy
import requests
import requests


# project
# project
from checks import AgentCheck
from checks import AgentCheck
from config import _is_affirmative
from config import _is_affirmative
from util import headers
from util import headers


STATS_URL = "/;csv;norefresh"
STATS_URL = "/;csv;norefresh"
EVENT_TYPE = SOURCE_TYPE_NAME = 'haproxy'
EVENT_TYPE = SOURCE_TYPE_NAME = 'haproxy'




class Services(object):
class Services(object):
BACKEND = 'BACKEND'
BACKEND = 'BACKEND'
FRONTEND = 'FRONTEND'
FRONTEND = 'FRONTEND'
ALL = (BACKEND, FRONTEND)
ALL = (BACKEND, FRONTEND)


# Statuses that we normalize to and that are reported by
# Statuses that we normalize to and that are reported by
# `haproxy.count_per_status` by default (unless `collate_status_tags_per_host` is enabled)
# `haproxy.count_per_status` by default (unless `collate_status_tags_per_host` is enabled)
ALL_STATUSES = (
ALL_STATUSES = (
'up', 'open', 'down', 'maint', 'nolb'
'up', 'open', 'down', 'maint', 'nolb'
)
)


AVAILABLE = 'available'
AVAILABLE = 'available'
UNAVAILABLE = 'unavailable'
UNAVAILABLE = 'unavailable'
COLLATED_STATUSES = (AVAILABLE, UNAVAILABLE)
COLLATED_STATUSES = (AVAILABLE, UNAVAILABLE)


BACKEND_STATUS_TO_COLLATED = {
BACKEND_STATUS_TO_COLLATED = {
'up': AVAILABLE,
'up': AVAILABLE,
'down': UNAVAILABLE,
'down': UNAVAILABLE,
'maint': UNAVAILABLE,
'maint': UNAVAILABLE,
'nolb': UNAVAILABLE,
'nolb': UNAVAILABLE,
}
}


STATUS_TO_COLLATED = {
STATUS_TO_COLLATED = {
'up': AVAILABLE,
'up': AVAILABLE,
'open': AVAILABLE,
'open': AVAILABLE,
'down': UNAVAILABLE,
'down': UNAVAILABLE,
'maint': UNAVAILABLE,
'maint': UNAVAILABLE,
'nolb': UNAVAILABLE,
'nolb': UNAVAILABLE,
}
}


STATUS_TO_SERVICE_CHECK = {
STATUS_TO_SERVICE_CHECK = {
'up': AgentCheck.OK,
'up': AgentCheck.OK,
'down': AgentCheck.CRITICAL,
'down': AgentCheck.CRITICAL,
'no_check': AgentCheck.UNKNOWN,
'no_check': AgentCheck.UNKNOWN,
'maint': AgentCheck.OK,
'maint': AgentCheck.OK,
}
}




class HAProxy(AgentCheck):
class HAProxy(AgentCheck):
def __init__(self, name, init_config, agentConfig, instances=None):
def __init__(self, name, init_config, agentConfig, instances=None):
AgentCheck.__init__(self, name, init_config, agentConfig, instances)
AgentCheck.__init__(self, name, init_config, agentConfig, instances)


# Host status needs to persist across all checks
# Host status needs to persist across all checks
self.host_status = defaultdict(lambda: defaultdict(lambda: None))
self.host_status = defaultdict(lambda: defaultdict(lambda: None))


METRICS = {
METRICS = {
"qcur": ("gauge", "queue.current"),
"qcur": ("gauge", "queue.current"),
"scur": ("gauge", "session.current"),
"scur": ("gauge", "session.current"),
"slim": ("gauge", "session.limit"),
"slim": ("gauge", "session.limit"),
"spct": ("gauge", "session.pct"), # Calculated as: (scur/slim)*100
"spct": ("gauge", "session.pct"), # Calculated as: (scur/slim)*100
"stot": ("rate", "session.rate"),
"stot": ("rate", "session.rate"),
"bin": ("rate", "bytes.in_rate"),
"bin": ("rate", "bytes.in_rate"),
"bout": ("rate", "bytes.out_rate"),
"bout": ("rate", "bytes.out_rate"),
"dreq": ("rate", "denied.req_rate"),
"dreq": ("rate", "denied.req_rate"),
"dresp": ("rate", "denied.resp_rate"),
"dresp": ("rate", "denied.resp_rate"),
"ereq": ("rate", "errors.req_rate"),
"ereq": ("rate", "errors.req_rate"),
"econ": ("rate", "errors.con_rate"),
"econ": ("rate", "errors.con_rate"),
"eresp": ("rate", "errors.resp_rate"),
"eresp": ("rate", "errors.resp_rate"),
"wretr": ("rate", "warnings.retr_rate"),
"wretr": ("rate", "warnings.retr_rate"),
"wredis": ("rate", "warnings.redis_rate"),
"wredis": ("rate", "warnings.redis_rate"),
"req_rate": ("gauge", "requests.rate"), # HA Proxy 1.4 and higher
"req_rate": ("gauge", "requests.rate"), # HA Proxy 1.4 and higher
"hrsp_1xx": ("rate", "response.1xx"), # HA Proxy 1.4 and higher
"hrsp_1xx": ("rate", "response.1xx"), # HA Proxy 1.4 and higher
"hrsp_2xx": ("rate", "response.2xx"), # HA Proxy 1.4 and higher
"hrsp_2xx": ("rate", "response.2xx"), # HA Proxy 1.4 and higher
"hrsp_3xx": ("rate", "response.3xx"), # HA Proxy 1.4 and higher
"hrsp_3xx": ("rate", "response.3xx"), # HA Proxy 1.4 and higher
"hrsp_4xx": ("rate", "response.4xx"), # HA Proxy 1.4 and higher
"hrsp_4xx": ("rate", "response.4xx"), # HA Proxy 1.4 and higher
"hrsp_5xx": ("rate", "response.5xx"), # HA Proxy 1.4 and higher
"hrsp_5xx": ("rate", "response.5xx"), # HA Proxy 1.4 and higher
"hrsp_other": ("rate", "response.other"), # HA Proxy 1.4 and higher
"hrsp_other": ("rate", "response.other"), # HA Proxy 1.4 and higher
"qtime": ("gauge", "queue.time"), # HA Proxy 1.5 and higher
"qtime": ("gauge", "queue.time"), # HA Proxy 1.5 and higher
"ctime": ("gauge", "connect.time"), # HA Proxy 1.5 and higher
"ctime": ("gauge", "connect.time"), # HA Proxy 1.5 and higher
"rtime": ("gauge", "response.time"), # HA Proxy 1.5 and higher
"rtime": ("gauge", "response.time"), # HA Proxy 1.5 and higher
"ttime": ("gauge", "session.time"), # HA Proxy 1.5 and higher
"ttime": ("gauge", "session.time"), # HA Proxy 1.5 and higher
}
}


SERVICE_CHECK_NAME = 'haproxy.backend_up'
SERVICE_CHECK_NAME = 'haproxy.backend_up'


def check(self, instance):
def check(self, instance):
url = instance.get('url')
url = instance.get('url')
username = instance.get('username')
username = instance.get('username')
password = instance.get('password')
password = instance.get('password')
collect_aggregates_only = _is_affirmative(
collect_aggregates_only = _is_affirmative(
instance.get('collect_aggregates_only', True)
instance.get('collect_aggregates_only', True)
)
)
collect_status_metrics = _is_affirmative(
collect_status_metrics = _is_affirmative(
instance.get('collect_status_metrics', False)
instance.get('collect_status_metrics', False)
)
)


collect_status_metrics_by_host = _is_affirmative(
collect_status_metrics_by_host = _is_affirmative(
instance.get('collect_status_metrics_by_host', False)
instance.get('collect_status_metrics_by_host', False)
)
)


collate_status_tags_per_host = _is_affirmative(
collate_status_tags_per_host = _is_affirmative(
instance.get('collate_status_tags_per_host', False)
instance.get('collate_status_tags_per_host', False)
)
)


count_status_by_service = _is_affirmative(
count_status_by_service = _is_affirmative(
instance.get('count_status_by_service', True)
instance.get('count_status_by_service', True)
)
)


tag_service_check_by_host = _is_affirmative(
tag_service_check_by_host = _is_affirmative(
instance.get('tag_service_check_by_host', False)
instance.get('tag_service_check_by_host', False)
)
)


services_incl_filter = instance.get('services_include', [])
services_incl_filter = instance.get('services_include', [])
services_excl_filter = instance.get('services_exclude', [])
services_excl_filter = instance.get('services_exclude', [])


verify = not _is_affirmative(instance.get('disable_ssl_validation', False))
verify = not _is_affirmative(instance.get('disable_ssl_validation', False))


self.log.debug('Processing HAProxy data for %s' % url)
self.log.debug('Processing HAProxy data for %s' % url)


data = self._fetch_data(url, username, password, verify)
data = self._fetch_data(url, username, password, verify)


process_events = instance.get('status_check', self.init_config.get('status_check', False))
process_events = instance.get('status_check', self.init_config.get('status_check', False))


self._process_data(
self._process_data(
data, collect_aggregates_only, process_events,
data, collect_aggregates_only, process_events,
url=url, collect_status_metrics=collect_status_metrics,
url=url, collect_status_metrics=collect_status_metrics,
collect_status_metrics_by_host=collect_status_metrics_by_host,
collect_status_metrics_by_host=collect_status_metrics_by_host,
tag_service_check_by_host=tag_service_check_by_host,
tag_service_check_by_host=tag_service_check_by_host,
services_incl_filter=services_incl_filter,
services_incl_filter=services_incl_filter,
services_excl_filter=services_excl_filter,
services_excl_filter=services_excl_filter,
collate_status_tags_per_host=collate_status_tags_per_host,
collate_status_tags_per_host=collate_status_tags_per_host,
count_status_by_service=count_status_by_service,
count_status_by_service=count_status_by_service,
)
)


def _fetch_data(self, url, username, password, verify):
def _fetch_data(self, url, username, password, verify):
''' Hit a given URL and return the parsed json '''
''' Hit a given URL and return the parsed json '''
# Try to fetch data from the stats URL
# Try to fetch data from the stats URL


auth = (username, password)
auth = (username, password)
url = "%s%s" % (url, STATS_URL)
if url.find('http') != -1:

url = "%s%s" % (url, STATS_URL)
self.log.debug("HAProxy Fetching haproxy search data from: %s" % url)
self.log.debug("HAProxy Fetching haproxy search data from: %s" % url)


r = requests.get(url, auth=auth, headers=headers(self.agentConfig), verify=verify)
r = requests.get(url, auth=auth, headers=headers(self.agentConfig), verify=verify)
r.raise_for_status()
r.raise_for_status()


return r.content.splitlines()
return r.content.splitlines()
else:
self.log.debug("HAProxy Fetching haproxy search data from UNIX socket")
stats = haproxy.HAProxyStats(url)
return stats.execute('show stat', 1000)


def _process_data(self, data, collect_aggregates_only, process_events, url=None,
def _process_data(self, data, collect_aggregates_only, process_events, url=None,
collect_status_metrics=False, collect_status_metrics_by_host=False,
collect_status_metrics=False, collect_status_metrics_by_host=False,
tag_service_check_by_host=False, services_incl_filter=None,
tag_service_check_by_host=False, services_incl_filter=None,
services_excl_filter=None, collate_status_tags_per_host=False,
services_excl_filter=None, collate_status_tags_per_host=False,
count_status_by_service=True):
count_status_by_service=True):
''' Main data-processing loop. For each piece of useful data, we'll
''' Main data-processing loop. For each piece of useful data, we'll
either save a metric, save an event or both. '''
either save a metric, save an event or both. '''


# Split the first line into an index of fields
# Split the first line into an index of fields
# The line looks like:
# The line looks like:
# "# pxname,svname,qcur,qmax,scur,smax,slim,stot,bin,bout,dreq,dresp,ereq,econ,eresp,wretr,wredis,status,weight,act,bck,chkfail,chkdown,lastchg,downtime,qlimit,pid,iid,sid,throttle,lbtot,tracked,type,rate,rate_lim,rate_max,"
# "# pxname,svname,qcur,qmax,scur,smax,slim,stot,bin,bout,dreq,dresp,ereq,econ,eresp,wretr,wredis,status,weight,act,bck,chkfail,chkdown,lastchg,downtime,qlimit,pid,iid,sid,throttle,lbtot,tracked,type,rate,rate_lim,rate_max,"
fields = [f.strip() for f in data[0][2:].split(',') if f]
fields = [f.strip() for f in data[0][2:].split(',') if f]


self.hosts_statuses = defaultdict(int)
self.hosts_statuses = defaultdict(int)


back_or_front = None
back_or_front = None


# Skip the first line, go backwards to set back_or_front
# Skip the first line, go backwards to set back_or_front
for line in data[:0:-1]:
for line in data[:0:-1]:
if not line.strip():
if not line.strip():
continue
continue


# Store each line's values in a dictionary
# Store each line's values in a dictionary
data_dict = self._line_to_dict(fields, line)
data_dict = self._line_to_dict(fields, line)


if self._is_aggregate(data_dict):
if self._is_aggregate(data_dict):
back_or_front = data_dict['svname']
back_or_front = data_dict['svname']


self._update_data_dict(data_dict, back_or_front)
self._update_data_dict(data_dict, back_or_front)


self._update_hosts_statuses_if_needed(
self._update_hosts_statuses_if_needed(
collect_status_metrics, collect_status_metrics_by_host,
collect_status_metrics, collect_status_metrics_by_host,
data_dict, self.hosts_statuses
data_dict, self.hosts_statuses
)
)


if self._should_process(data_dict, collect_aggregates_only):
if self._should_process(data_dict, collect_aggregates_only):
# update status
# update status
# Send the list of data to the metric and event callbacks
# Send the list of data to the metric and event callbacks
self._process_metrics(
self._process_metrics(
data_dict, url,
data_dict, url,
services_incl_filter=services_incl_filter,
services_incl_filter=services_incl_filter,
services_excl_filter=services_excl_filter
services_excl_filter=services_excl_filter
)
)
if process_events:
if process_events:
self._process_event(
self._process_event(
data_dict, url,
data_dict, url,
services_incl_filter=services_incl_filter,
services_incl_filter=services_incl_filter,
services_excl_filter=services_excl_filter
services_excl_filter=services_excl_filter
)
)
self._process_service_check(
self._process_service_check(
data_dict, url,
data_dict, url,
tag_by_host=tag_service_check_by_host,
tag_by_host=tag_service_check_by_host,
services_incl_filter=services_incl_filter,
services_incl_filter=services_incl_filter,
services_excl_filter=services_excl_filter
services_excl_filter=services_excl_filter
)
)


if collect_status_metrics:
if collect_status_metrics:
self._process_status_metric(
self._process_status_metric(
self.hosts_statuses, collect_status_metrics_by_host,
self.hosts_statuses, collect_status_metrics_by_host,
services_incl_filter=services_incl_filter,
services_incl_filter=services_incl_filter,
services_excl_filter=services_excl_filter,
services_excl_filter=services_excl_filter,
collate_status_tags_per_host=collate_status_tags_per_host,
collate_status_tags_per_host=collate_status_tags_per_host,
count_status_by_service=count_status_by_service
count_status_by_service=count_status_by_service
)
)


self._process_backend_hosts_metric(
self._process_backend_hosts_metric(
self.hosts_statuses,
self.hosts_statuses,
services_incl_filter=services_incl_filter,
services_incl_filter=services_incl_filter,
services_excl_filter=services_excl_filter
services_excl_filter=services_excl_filter
)
)


return data
return data


def _line_to_dict(self, fields, line):
def _line_to_dict(self, fields, line):
data_dict = {}
data_dict = {}
for i, val in enumerate(line.split(',')[:]):
for i, val in enumerate(line.split(',')[:]):
if val:
if val:
try:
try:
# Try converting to a long, if failure, just leave it
# Try converting to a long, if failure, just leave it
val = float(val)
val = float(val)
except Exception:
except Exception:
pass
pass
data_dict[fields[i]] = val
data_dict[fields[i]] = val


if 'status' in data_dict:
if 'status' in data_dict:
data_dict['status'] = self._normalize_status(data_dict['status'])
data_dict['status'] = self._normalize_status(data_dict['status'])


return data_dict
return data_dict


def _update_data_dict(self, data_dict, back_or_front):
def _update_data_dict(self, data_dict, back_or_front):
"""
"""
Adds spct if relevant, adds service
Adds spct if relevant, adds service
"""
"""
data_dict['back_or_front'] = back_or_front
data_dict['back_or_front'] = back_or_front
# The percentage of used sessions based on 'scur' and 'slim'
# The percentage of used sessions based on 'scur' and 'slim'
if 'slim' in data_dict and 'scur' in data_dict:
if 'slim' in data_dict and 'scur' in data_dict:
try:
try:
data_dict['spct'] = (data_dict['scur'] / data_dict['slim']) * 100
data_dict['spct'] = (data_dict['scur'] / data_dict['slim']) * 100
except (TypeError, ZeroDivisionError):
except (TypeError, ZeroDivisionError):
pass
pass


def _is_aggregate(self, data_dict):
def _is_aggregate(self, data_dict):
return data_dict['svname'] in Services.ALL
return data_dict['svname'] in Services.ALL


def _update_hosts_statuses_if_needed(self, collect_status_metrics,
def _update_hosts_statuses_if_needed(self, collect_status_metrics,
collect_status_metrics_by_host,
collect_status_metrics_by_host,
data_dict, hosts_statuses):
data_dict, hosts_statuses):
if data_dict['svname'] == Services.BACKEND:
if data_dict['svname'] == Services.BACKEND:
return
return
if collect_status_metrics and 'status' in data_dict and 'pxname' in data_dict:
if collect_status_metrics and 'status' in data_dict and 'pxname' in data_dict:
if collect_status_metrics_by_host and 'svname' in data_dict:
if collect_status_metrics_by_host and 'svname' in data_dict:
key = (data_dict['pxname'], data_dict['svname'], data_dict['status'])
key = (data_dict['pxname'], data_dict['svname'], data_dict['status'])
else:
else:
key = (data_dict['pxname'], data_dict['status'])
key = (data_dict['pxname'], data_dict['status'])
hosts_statuses[key] += 1
hosts_statuses[key] += 1


def _should_process(self, data_dict, collect_aggregates_only):
def _should_process(self, data_dict, collect_aggregates_only):
"""
"""
if collect_aggregates_only, we process only the aggregates
if collect_aggregates_only, we process only the aggregates
else we process all except Services.BACKEND
else we process all except Services.BACKEND
"""
"""
if collect_aggregates_only:
if collect_aggregates_only:
if self._is_aggregate(data_dict):
if self._is_aggregate(data_dict):
return True
return True
return False
return False
elif data_dict['svname'] == Services.BACKEND:
elif data_dict['svname'] == Services.BACKEND:
return False
return False
return True
return True


def _is_service_excl_filtered(self, service_name, services_incl_filter,
def _is_service_excl_filtered(self, service_name, services_incl_filter,
services_excl_filter):
services_excl_filter):
if self._tag_match_patterns(service_name, services_excl_filter):
if self._tag_match_patterns(service_name, services_excl_filter):
if self._tag_match_patterns(service_name, services_incl_filter):
if self._tag_match_patterns(service_name, services_incl_filter):
return False
return False
return True
return True
return False
return False


def _tag_match_patterns(self, tag, filters):
def _tag_match_patterns(self, tag, filters):
if not filters:
if not filters:
return False
return False
for rule in filters:
for rule in filters:
if re.search(rule, tag):
if re.search(rule, tag):
return True
return True
return False
return False


@staticmethod
@staticmethod
def _normalize_status(status):
def _normalize_status(status):
"""
"""
Try to normalize the HAProxy status as one of the statuses defined in `ALL_STATUSES`,
Try to normalize the HAProxy status as one of the statuses defined in `ALL_STATUSES`,
if it can't be matched return the status as-is in a tag-friendly format
if it can't be matched return the status as-is in a tag-friendly format
ex: 'UP 1/2' -> 'up'
ex: 'UP 1/2' -> 'up'
'no check' -> 'no_check'
'no check' -> 'no_check'
"""
"""
formatted_status = status.lower().replace(" ", "_")
formatted_status = status.lower().replace(" ", "_")
for normalized_status in Services.ALL_STATUSES:
for normalized_status in Services.ALL_STATUSES:
if formatted_status.startswith(normalized_status):
if formatted_status.startswith(normalized_status):
return normalized_status
return normalized_status
return formatted_status
return formatted_status


def _process_backend_hosts_metric(self, hosts_statuses, services_incl_filter=None,
def _process_backend_hosts_metric(self, hosts_statuses, services_incl_filter=None,
services_excl_filter=None):
services_excl_filter=None):
agg_statuses = defaultdict(lambda: {status: 0 for status in Services.COLLATED_STATUSES})
agg_statuses = defaultdict(lambda: {status: 0 for status in Services.COLLATED_STATUSES})
for host_status, count in hosts_statuses.iteritems():
for host_status, count in hosts_statuses.iteritems():
try:
try:
service, hostname, status = host_status
service, hostname, status = host_status
except Exception:
except Exception:
service, status = host_status
service, status = host_status


if self._is_service_excl_filtered(service, services_incl_filter, services_excl_filter):
if self._is_service_excl_filtered(service, services_incl_filter, services_excl_filter):
continue
continue


collated_status = Services.BACKEND_STATUS_TO_COLLATED.get(status)
collated_status = Services.BACKEND_STATUS_TO_COLLATED.get(status)
if collated_status:
if collated_status:
agg_statuses[service][collated_status] += count
agg_statuses[service][collated_status] += count
else:
else:
# create the entries for this service anyway
# create the entries for this service anyway
agg_statuses[service]
agg_statuses[service]


for service in agg_statuses:
for service in agg_statuses:
tags = ['service:%s' % service]
tags = ['service:%s' % service]
self.gauge(
self.gauge(
'haproxy.backend_hosts',
'haproxy.backend_hosts',
agg_statuses[service][Services.AVAILABLE],
agg_statuses[service][Services.AVAILABLE],
tags=tags + ['available:true'])
tags=tags + ['available:true'])
self.gauge(
self.gauge(
'haproxy.backend_hosts',
'haproxy.backend_hosts',
agg_statuses[service][Services.UNAVAILABLE],
agg_statuses[service][Services.UNAVAILABLE],
tags=tags + ['available:false'])
tags=tags + ['available:false'])
return agg_statuses
return agg_statuses


def _process_status_metric(self, hosts_statuses, collect_status_metrics_by_host,
def _process_status_metric(self, hosts_statuses, collect_status_metrics_by_host,
services_incl_filter=None, services_excl_filter=None,
services_incl_filter=None, services_excl_filter=None,
collate_status_tags_per_host=False, count_status_by_service=True):
collate_status_tags_per_host=False, count_status_by_service=True):
agg_statuses_counter = defaultdict(lambda: {status: 0 for status in Services.COLLATED_STATUSES})
agg_statuses_counter = defaultdict(lambda: {status: 0 for status in Services.COLLATED_STATUSES})


# Initialize `statuses_counter`: every value is a defaultdict initialized with the correct
# Initialize `statuses_counter`: every value is a defaultdict initialized with the correct
# keys, which depends on the `collate_status_tags_per_host` option
# keys, which depends on the `collate_status_tags_per_host` option
reported_statuses = Services.ALL_STATUSES
reported_statuses = Services.ALL_STATUSES
if collate_status_tags_per_host:
if collate_status_tags_per_host:
reported_statuses = Services.COLLATED_STATUSES
reported_statuses = Services.COLLATED_STATUSES
reported_statuses_dict = defaultdict(int)
reported_statuses_dict = defaultdict(int)
for reported_status in reported_statuses:
for reported_status in reported_statuses:
reported_statuses_dict[reported_status] = 0
reported_statuses_dict[reported_status] = 0
statuses_counter = defaultdict(lambda: copy.copy(reported_statuses_dict))
statuses_counter = defaultdict(lambda: copy.copy(reported_statuses_dict))


for host_status, count in hosts_statuses.iteritems():
for host_status, count in hosts_statuses.iteritems():
hostname = None
hostname = None
try:
try:
service, hostname, status = host_status
service, hostname, status = host_status
except Exception:
except Exception:
if collect_status_metrics_by_host:
if collect_status_metrics_by_host:
self.warning('`collect_status_metrics_by_host` is enabled but no host info\
self.warning('`collect_status_metrics_by_host` is enabled but no host info\
could be extracted from HAProxy stats endpoint for {0}'.format(service))
could be extracted from HAProxy stats endpoint for {0}'.format(service))
service, status = host_status
service, status = host_status


if self._is_service_excl_filtered(service, services_incl_filter, services_excl_filter):
if self._is_service_excl_filtered(service, services_incl_filter, services_excl_filter):
continue
continue


tags = []
tags = []
if count_status_by_service:
if count_status_by_service:
tags.append('service:%s' % service)
tags.append('service:%s' % service)
if hostname:
if hostname:
tags.append('backend:%s' % hostname)
tags.append('backend:%s' % hostname)


counter_status = status
counter_status = status
if collate_status_tags_per_host:
if collate_status_tags_per_host:
# An unknown status will be sent as UNAVAILABLE
# An unknown status will be sent as UNAVAILABLE
counter_status = Services.STATUS_TO_COLLATED.get(status, Services.UNAVAILABLE)
counter_status = Services.STATUS_TO_COLLATED.get(status, Services.UNAVAILABLE)
statuses_counter[tuple(tags)][counter_status] += count
statuses_counter[tuple(tags)][counter_status] += count


# Compute aggregates with collated statuses. If collate_status_tags_per_host is enabled we
# Compute aggregates with collated statuses. If collate_status_tags_per_host is enabled we
# already send collated statuses with fine-grained tags, so no need to compute/send these aggregates
# already send collated statuses with fine-grained tags, so no need to compute/send these aggregates
if not collate_status_tags_per_host:
if not collate_status_tags_per_host:
agg_tags = []
agg_tags = []
if count_status_by_service:
if count_status_by_service:
agg_tags.append('service:%s' % service)
agg_tags.append('service:%s' % service)
# An unknown status will be sent as UNAVAILABLE
# An unknown status will be sent as UNAVAILABLE
agg_statuses_counter[tuple(agg_tags)][Services.STATUS_TO_COLLATED.get(status, Services.UNAVAILABLE)] += count
agg_statuses_counter[tuple(agg_tags)][Services.STATUS_TO_COLLATED.get(status, Services.UNAVAILABLE)] += count


for tags, count_per_status in statuses_counter.iteritems():
for tags, count_per_status in statuses_counter.iteritems():
for status, count in count_per_status.iteritems():
for status, count in count_per_status.iteritems():
self.gauge('haproxy.count_per_status', count, tags=tags + ('status:%s' % status, ))
self.gauge('haproxy.count_per_status', count, tags=tags + ('status:%s' % status, ))


# Send aggregates
# Send aggregates
for service_tags, service_agg_statuses in agg_statuses_counter.iteritems():
for service_tags, service_agg_statuses in agg_statuses_counter.iteritems():
for status, count in service_agg_statuses.iteritems():
for status, count in service_agg_statuses.iteritems():
self.gauge("haproxy.count_per_status", count, tags=service_tags + ('status:%s' % status, ))
self.gauge("haproxy.count_per_status", count, tags=service_tags + ('status:%s' % status, ))


def _process_metrics(self, data, url, services_incl_filter=None,
def _process_metrics(self, data, url, services_incl_filter=None,
services_excl_filter=None):
services_excl_filter=None):
"""
"""
Data is a dictionary related to one host
Data is a dictionary related to one host
(one line) extracted from the csv.
(one line) extracted from the csv.
It should look like:
It should look like:
{'pxname':'dogweb', 'svname':'i-4562165', 'scur':'42', ...}
{'pxname':'dogweb', 'svname':'i-4562165', 'scur':'42', ...}
"""
"""
hostname = data['svname']
hostname = data['svname']
service_name = data['pxname']
service_name = data['pxname']
back_or_front = data['back_or_front']
back_or_front = data['back_or_front']
tags = ["type:%s" % back_or_front, "instance_url:%s" % url]
tags = ["type:%s" % back_or_front, "instance_url:%s" % url]
tags.append("service:%s" % service_name)
tags.append("service:%s" % service_name)


if self._is_service_excl_filtered(service_name, services_incl_filter,
if self._is_service_excl_filtered(service_name, services_incl_filter,
services_excl_filter):
services_excl_filter):
return
return


if back_or_front == Services.BACKEND:
if back_or_front == Services.BACKEND:
tags.append('backend:%s' % hostname)
tags.append('backend:%s' % hostname)


for key, value in data.items():
for key, value in data.items():
if HAProxy.METRICS.get(key):
if HAProxy.METRICS.get(key):
suffix = HAProxy.METRICS[key][1]
suffix = HAProxy.METRICS[key][1]
name = "haproxy.%s.%s" % (back_or_front.lower(), suffix)
name = "haproxy.%s.%s" % (back_or_front.lower(), suffix)
if HAProxy.METRICS[key][0] == 'rate':
if HAProxy.METRICS[key][0] == 'rate':
self.rate(name, value, tags=tags)
self.rate(name, value, tags=tags)
else:
else:
self.gauge(name, value, tags=tags)
self.gauge(name, value, tags=tags)


def _process_event(self, data, url, services_incl_filter=None,
def _process_event(self, data, url, services_incl_filter=None,
services_excl_filter=None):
services_excl_filter=None):
'''
'''
Main event processing loop. An event will be created for a service
Main event processing loop. An event will be created for a service
status change.
status change.
Service checks on the server side can be used to provide the same functionality
Service checks on the server side can be used to provide the same functionality
'''
'''
hostname = data['svname']
hostname = data['svname']
service_name = data['pxname']
service_name = data['pxname']
key = "%s:%s" % (hostname, service_name)
key = "%s:%s" % (hostname, service_name)
status = self.host_status[url][key]
status = self.host_status[url][key]


if self._is_service_excl_filtered(service_name, services_incl_filter,
if self._is_service_excl_filtered(service_name, services_incl_filter,
services_excl_filter):
services_excl_filter):
return
return


if status is None:
if status is None:
self.host_status[url][key] = data['status']
self.host_status[url][key] = data['status']
return
return


if status != data['status'] and data['status'] in ('up', 'down'):
if status != data['status'] and data['status'] in ('up', 'down'):
# If the status of a host has changed, we trigger an event
# If the status of a host has changed, we trigger an event
try:
try:
lastchg = int(data['lastchg'])
lastchg = int(data['lastchg'])
except Exception:
except Exception:
lastchg = 0
lastchg = 0


# Create the event object
# Create the event object
ev = self._create_event(
ev = self._create_event(
data['status'], hostname, lastchg, service_name,
data['status'], hostname, lastchg, service_name,
data['back_or_front']
data['back_or_front']
)
)
self.event(ev)
self.event(ev)


# Store this host status so we can check against it later
# Store this host status so we can check against it later
self.host_status[url][key] = data['status']
self.host_status[url][key] = data['status']


def _create_event(self, status, hostname, lastchg, service_name, back_or_front):
def _create_event(self, status, hostname, lastchg, service_name, back_or_front):
HAProxy_agent = self.hostname.decode('utf-8')
HAProxy_agent = self.hostname.decode('utf-8')
if status == 'down':
if status == 'down':
alert_type = "error"
alert_type = "error"
title = "%s reported %s:%s %s" % (HAProxy_agent, service_name, hostname, status.upper())
title = "%s reported %s:%s %s" % (HAProxy_agent, service_name, hostname, status.upper())
else:
else:
if status == "up":
if status == "up":
alert_type = "success"
alert_type = "success"
else:
else:
alert_type = "info"
alert_type = "info"
title = "%s reported %s:%s back and %s" % (HAProxy_agent, service_name, hostname, status.upper())
title = "%s reported %s:%s back and %s" % (HAProxy_agent, service_name, hostname, status.upper())


tags = ["service:%s" % service_name]
tags = ["service:%s" % service_name]
if back_or_front == Services.BACKEND:
if back_or_front == Services.BACKEND:
tags.append('backend:%s' % hostname)
tags.append('backend:%s' % hostname)
return {
return {
'timestamp': int(time.time() - lastchg),
'timestamp': int(time.time() - lastchg),
'event_type': EVENT_TYPE,
'event_type': EVENT_TYPE,
'host': HAProxy_agent,
'host': HAProxy_agent,
'msg_title': title,
'msg_title': title,
'alert_type': alert_type,
'alert_type': alert_type,
"source_type_name": SOURCE_TYPE_NAME,
"source_type_name": SOURCE_TYPE_NAME,
"event_object": hostname,
"event_object": hostname,
"tags": tags
"tags": tags
}
}


def _process_service_check(self, data, url, tag_by_host=False,
def _process_service_check(self, data, url, tag_by_host=False,
services_incl_filter=None, services_excl_filter=None):
services_incl_filter=None, services_excl_filter=None):
''' Report a service check, tagged by the service and the backend.
''' Report a service check, tagged by the service and the backend.
Statuses are defined in `STATUS_TO_SERVICE_CHECK` mapping.
Statuses are defined in `STATUS_TO_SERVICE_CHECK` mapping.
'''
'''
service_name = data['pxname']
service_name = data['pxname']
status = data['status']
status = data['status']
haproxy_hostname = self.hostname.decode('utf-8')
haproxy_hostname = self.hostname.decode('utf-8')
check_hostname = haproxy_hostname if tag_by_host else ''
check_hostname = haproxy_hostname if tag_by_host else ''


if self._is_service_excl_filtered(service_name, services_incl_filter,
if self._is_service_excl_filtered(service_name, services_incl_filter,
services_excl_filter):
services_excl_filter):
return
return


if status in Services.STATUS_TO_SERVICE_CHECK:
if status in Services.STATUS_TO_SERVICE_CHECK:
service_check_tags = ["service:%s" % service_name]
service_check_tags = ["service:%s" % service_name]
hostname = data['svname']
hostname = data['svname']
if data['back_or_front'] == Services.BACKEND:
if data['back_or_front'] == Services.BACKEND:
service_check_tags.append('backend:%s' % hostname)
service_check_tags.append('backend:%s' % hostname)


status = Services.STATUS_TO_SERVICE_CHECK[status]
status = Services.STATUS_TO_SERVICE_CHECK[status]
message = "%s reported %s:%s %s" % (haproxy_hostname, service_name,
message = "%s reported %s:%s %s" % (haproxy_hostname, service_name,
hostname, status)
hostname, status)
self.service_check(self.SERVICE_CHECK_NAME, status, message=message,
self.service_check(self.SERVICE_CHECK_NAME, status, message=message,
hostname=check_hostname, tags=service_check_tags)
hostname=check_hostname, tags=service_check_tags)