#!/usr/bin/env python3 # # HBase Prometheus Exporter # # Björn Busse # # # TODO: # # * Remove timestamp from log msg or make them optional, # we already have it in the journal - # at least when not running in a container # # * Ask ZooKeeper directly for active namenode/hbase master # # * Add an option to relay all metrics # # * Add hdfs/hbase binaries to container # # * Fix command line argument handling # # * Add loglevel and other missing args from __future__ import absolute_import from __future__ import division from __future__ import print_function from __future__ import unicode_literals import argparse from bs4 import BeautifulSoup from flatten_json import flatten import io import json from kazoo import client as kz_client import logging import os from prometheus_client import start_http_server, Summary from prometheus_client.core import GaugeMetricFamily, REGISTRY from prometheus_client import Gauge import random import re import requests import subprocess from subprocess import Popen,PIPE import sys import time import traceback import xml.etree.ElementTree as et tmp_path = '/tmp/' logpath = tmp_path # ZooKeeper zk_reconnect_interval_s = 30 prom_scrape_interval_s = 10 # Prom vars REQUEST_TIME = Summary('request_processing_seconds', 'Time spent processing request') prom_hbase_num_regions_in_transition_stale = Gauge('number_of_regions_in_transition_stale', 'Number of stale regions in transition') prom_hbase_num_inconsistencies = Gauge('number_of_inconsistencies', 'Number of inconsistencies in HBase') prom_hdfs_total = Gauge('hdfs_bytes_total', 'HDFS total bytes') prom_hdfs_used = Gauge('hdfs_bytes_used', 'HDFS used bytes') prom_hdfs_remaining = Gauge('hdfs_bytes_remaining', 'HDFS remaining bytes') prom_hdfs_num_datanodes_live = Gauge('hdfs_datanodes_live', 'HDFS Live DataNodes') prom_hdfs_num_datanodes_dead = Gauge('hdfs_datanodes_dead', 'HDFS Dead DataNodes') prom_hbase_up = Gauge('hbase_up', 'HBase is up and running, a master is elected') prom_hbase_healthy = Gauge('hbase_healthy', 'HBase is up and running, a master is elected, no inconsistencies are detected, hbase is queryable') prom_hbase_num_regionservers_live = Gauge('hbase_regionservers_live', 'HBase Live Regionservers') prom_hbase_num_regionservers_dead = Gauge('hbase_regionservers_dead', 'HBase Dead Regionservers') prom_hbase_num_clusterrequests = Gauge('hbase_clusterrequests', 'HBase Clusterrequests') # HDFS/HBase hdfs_config_file = "/etc/hadoop/conf/hdfs-site.xml" cmd_hbase_active_master = ['/usr/hdp/current/hbase-client/bin/hbase-jruby', '/usr/hdp/current/hbase-client/bin/get-active-master.rb'] cmd_hbase_hbck = ['hbase', 'hbck'] cmd_hdfs_namenodes = ['hdfs', 'getconf', '-namenodes'] # Use command line arguments to set the following vars # Do not change them here (See TODO) namenodes = "" namenode_use_tls = False hbase_master_ui_default_port = 16010 hdfs_namenode_default_port = 50070 cluster_is_kerberized = False class zk(): zk_client = "" @classmethod def main(self, address, timeout=5): zk_client = kz_client.KazooClient(address) try: zk_client.start(timeout) except Exception as e: logging.debug("ZooKeeper Error: " + str(e)) return False self.zk_client = zk_client self.zk_client.add_listener(self.listener) return True def listener(state): if state == kz_client.KazooState.CONNECTED: logging.info("ZooKeeper: Client connected") else: logging.info("ZooKeeper: Failed to connect to ZooKeeper") @classmethod def get_znode_data(self, znode): data = "" try: self.zk_client.exists(znode) except Exception as e: logging.info("ZooKeeper: znode does not exist: " + znode) return False try: data = self.zk_client.get(znode) except: logging.info("ZooKeeper: Could not get znode data from " + znode) return False return data def listener(state): if state == KazooState.LOST: logging.debug("ZooKeeper: Connection lost") # Register somewhere that the session was lost elif state == KazooState.SUSPENDED: logging.debug("ZooKeeper: Connection suspended") # Handle being disconnected from Zookeeper else: logging.debug("ZooKeeper: Connection re-established") # Handle being connected/reconnected to Zookeeper class jmx_query(): def main(self, hdfs_namenode_hosts): hdfs_active_namenode = self.get_active_namenode() hbase_active_master = hbase_exporter.get_active_master() if not hdfs_active_namenode: logging.info("Failed to determine active HDFS namenode") return False if not hbase_active_master: logging.info("Failed to determine active HBase master") return False url = self.get_url('hdfs', hdfs_active_namenode) self.get_jmx_data(url) url = self.get_url('hbase', hbase_active_master) self.get_jmx_data(url) def get_url(self, service, hostname): if (namenode_use_tls): url_scheme = "https://" else: url_scheme = "http://" if service == 'hdfs': url = url_scheme + hostname + ":" + str(hdfs_namenode_default_port) + "/jmx" elif service == 'hbase': url = url_scheme + hostname + ":" + str(hbase_master_ui_default_port) + "/jmx" return url def get_jmx_data(self, url): jmx = self.query(url) if (jmx == False): logging.info("Could not read jmx data from: " + url) return False for k, v in jmx.items(): if not v is None: self.lookup_keys(k, v) return True def get_active_namenode(hdfs_namenode_hosts): if not which(cmd_hdfs_namenodes[0]): logging.info("Could not find hdfs executable in PATH") return False try: r = subprocess.run(cmd_hdfs_namenodes, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except Exception as e: logging.debug("Type error: " + str(e)) logging.info("Failed to determine active master") return False hosts = r.stdout.decode('utf-8').split(" ") tree = et.parse(hdfs_config_file) root = tree.getroot() has_ha_element = False active_namenode = None if has_ha_element: logging.info("Hadoop High-Availability: True") else: logging.info("Hadoop High-Availability: False") for property in root: if "dfs.ha.namenodes" in property.find("name").text: has_ha_element = True nameservice_id = property.find("name").text[len("dfs.ha.namenodes")+1:] namenodes = property.find("value").text.split(",") for node in namenodes: # Get namenode address and check if it is the active node for n in root: prefix = "dfs.namenode.rpc-address." + nameservice_id + "." element_text = n.find("name").text if prefix in element_text: node_address = n.find("value").text.split(":")[0] cmd = ['hdfs haadmin -getServiceState ' + node] r = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if len(r.stderr.decode("utf-8")) > 0: logging.debug(r.stderr.decode("utf-8")) if "active" in r.stdout.decode("utf-8").lower(): logging.info("Active namenode: " + node_address + " (" + node + ")") return node_address return False def query(self, url): try: r = requests.get(url) except Exception as e: logging.info("Could not connect to: " + url) return False jmx = json.loads(r.text) jmx = flatten(jmx) return(jmx) def lookup_keys(self, key, value): if key.endswith("capacityUsed"): prom_hdfs_used.set(value) logging.debug("Found jmx key: " + key) elif key.endswith("capacityTotal"): prom_hdfs_total.set(value) logging.debug("Found jmx key: " + key) elif key.endswith("capacityRemaining"): prom_hdfs_remaining.set(value) logging.debug("Found jmx key: " + key) elif key.endswith("NumLiveDataNodes"): prom_hdfs_num_datanodes_live.set(value) logging.debug("Found jmx key: " + key) elif key.endswith("NumDeadDataNodes"): prom_hdfs_num_datanodes_dead.set(value) logging.debug("Found jmx key: " + key) elif key.endswith("numRegionServers"): prom_hbase_num_regionservers_live.set(value) logging.debug("Found jmx key: " + key) elif key.endswith("numDeadRegionServers"): prom_hbase_num_regionservers_dead.set(value) logging.debug("Found jmx key: " + key) elif key.endswith("clusterRequests"): prom_hbase_num_clusterrequests.set(value) logging.debug("Found jmx key: " + key) class hbase_exporter(): def main(self, hbase_master_hosts): hbase_active_master = self.get_active_master() if not hbase_active_master: logging.info("Failed to determine active HBase master") prom_hbase_up.set(0) prom_hbase_healthy.set(0) return False self.get_stale_regions_in_transition(hbase_active_master) self.hbck_get_inconsistencies() self.check_health() def check_health(): if self.num_inconsistencies > 0: prom_hbase_healthy.set(0) return False if prom_ihbase_regions_in_transition_stale > 0: prom_hbase_healthy.set(0) return False prom_hbase_up.set(1) prom_hbase_healthy.set(1) return True @staticmethod def get_active_master(): if not which(cmd_hbase_active_master[0]): logging.info("Could not find hdfs executable in PATH") return False try: r = subprocess.run(cmd_hbase_active_master, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except: return False if 'Master not running' in r.stdout.decode('utf-8'): return False prom_hbase_up.set(1) active_master = r.stdout.decode('utf-8').rstrip() logging.info("Active master: " + active_master) return active_master def get_stale_regions_in_transition(self, hbase_master): host = hbase_master.rstrip("\n\r") port = hbase_master_ui_default_port url = 'http://%(host)s:%(port)s/master-status' % locals() logging.debug('GET %s', url) try: req = requests.get(url) except requests.exceptions.RequestException as e: logging.debug(e) logging.debug('Failed to request ' + url) return False logging.debug("Response: %s %s", req.status_code, req.reason) if req.status_code != 200: logging.debug('Got a http return code != 200') num_regions_in_transition_stale = self.hbaseui_parse_output(req.content) if num_regions_in_transition_stale is None: logging.debug('Parse error - failed to find number of stale regions in transition') if not isinstance(num_regions_in_transition_stale, int): logging.debug('Parse error - got non-integer for stale regions in transition') msg = '{0} stale regions in transition '\ .format(num_regions_in_transition_stale) prom_hbase_num_regions_in_transition_stale.set(num_regions_in_transition_stale) logging.info(msg) return num_regions_in_transition_stale def hbaseui_parse_output(self, content): soup = BeautifulSoup(content, 'html.parser') num_regions_in_transition_stale = 0 try: headings = soup.findAll('h2') for heading in headings: if heading.get_text() == "Regions in Transition": logging.debug('Found Regions in Transition section header') logging.debug('Looking for table') table = heading.find_next('table') num_regions_in_transition_stale = self.hbaseui_parse_table(table) if not isinstance(num_regions_in_transition_stale, int): logging.debug('Got non-integer \'{0}\' for stale regions in transition when parsing HBase Master UI'\ .format(num_regions_in_transition_stale)) return num_regions_in_transition_stale except (AttributeError, TypeError): logging.info('Failed to parse HBase Master UI status page') def hbck_get_inconsistencies(self): re_status = re.compile(r'^Status:\s*(.+?)\s*$') re_inconsistencies = re.compile(r'^\s*(\d+)\s+inconsistencies\s+detected\.?\s*$') num_inconsistencies = None hbck_status = None logging.info("HBase: Running hbck consistency check") p = Popen(['hbase', 'hbck'], stdout=PIPE, stderr=PIPE, close_fds=False) output, error = p.communicate() output = output.splitlines() if p.returncode != 0: logging.info("Failed to run hbck (%d)" % (p.returncode)) return False for line in output: match = re_inconsistencies.match(line) if match: num_inconsistencies = match.group(1) logging.info('Number of inconsistencies: %s', hbck_status) continue match = re_status.match(line) if match: hbck_status = match.group(1) logging.info('hbck status = %s', hbck_status) break if hbck_status is None: logging.info('Failed to find hbck status result') if num_inconsistencies is None: logging.info('Failed to find number of inconsistencies') if num_inconsistencies != None: num_inconsistencies = int(num_inconsistencies) if not isinstance(num_inconsistencies, int): logging.info('Error: Non-integer detected for the number of inconsistencies') return False self.num_inconsistencies = num_inconsistencies @staticmethod def hbaseui_parse_table(table): for row in table.findChildren('tr'): for col in row.findChildren('td'): if 'Regions in Transition for more than ' in col.get_text(): next_sibling = col.findNext('td') num_regions_in_transition_stale = next_sibling.get_text().strip() return num_regions_in_transition_stale return None def which(program): def is_executable(fn): return os.path.isfile(fn) and os.access(fn, os.X_OK) filepath, fname = os.path.split(program) if filepath: if is_executable(program): return program else: for path in os.environ["PATH"].split(os.pathsep): exec_file = os.path.join(path, program) if is_executable(exec_file): return exec_file return None if __name__ == '__main__': hbase_master_default_address = 'localhost:' + str(hbase_master_ui_default_port) hdfs_namenode_default_address = 'localhost:' + str(hdfs_namenode_default_port) parser = argparse.ArgumentParser( description="") parser.add_argument('--hbase-master', dest='hbase_master', action='append', help="HBase master address, can be specified multiple times", type=str, default=hbase_master_default_address) parser.add_argument('--hdfs-namenodes', dest='hdfs_namenode', action='append', help="HDFS namenode address, can be specified multiple times", type=str, default=hdfs_namenode_default_address) parser.add_argument('--zookeeper-server-address', dest='zookeeper_server', action='append', help="ZooKeeper server address, can be specified multiple times", type=str) parser.add_argument('--prometheus-exporter-port', dest='prom_http_port', help="Listen port for Prometheus export", type=int, default=9010) parser.add_argument('--logfile', dest='logfile', help="Path to optional logfile", type=str) parser.add_argument('--loglevel', dest='loglevel', help="Loglevel, default: INFO", type=str, default='INFO') args = parser.parse_args() prom_http_port = args.prom_http_port logfile = args.logfile loglevel = args.loglevel zookeeper_server = args.zookeeper_server hbase_master = args.hbase_master hdfs_namenodes = args.hdfs_namenode del locals()['args'] # Optional File Logging if 'logfile' is not None: tlog = logfile.rsplit('/', 1) logpath = tlog[0] logfile = tlog[1] if not os.access(logpath, os.W_OK): # Our logger is not set up yet, so we use print here print("Logging: Can not write to directory. Skippking filelogging handler") else: fn = logpath + '/' + logfile file_handler = logging.FileHandler(filename=fn) # Our logger is not set up yet, so we use print here print("Logging: Logging to " + fn) stdout_handler = logging.StreamHandler(sys.stdout) if 'file_handler' in locals(): handlers = [file_handler, stdout_handler] else: handlers = [stdout_handler] logging.basicConfig( level=logging.INFO, format='[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', handlers=handlers ) logger = logging.getLogger(__name__) level = logging.getLevelName(loglevel) logger.setLevel(level) # Start the Prometheus server try: start_http_server(prom_http_port) except Exception as e: logging.debug("Failed to start Prometheus webserver: " + str(e)) logging.info("There might be another instance of " + sys.argv[0] + \ " already running, can not bind to " + str(prom_http_port) + ", exiting..") sys.exit() nruns = 0 # Start a ZooKeeper client r = False nzk = 0 while not r: for zk_address in zookeeper_server: nzk += 1 logging.info("ZooKeeper: Trying to connect to " + zk_address + ' (' + str(nzk) + '/' + str(len(zookeeper_server)) + ')') r = zk.main(zk_address) time.sleep(zk_reconnect_interval_s) if cluster_is_kerberized: znode_hbase = "/hbase" else: znode_hbase = "/hbase-unsecure" clusterid = zk.get_znode_data(znode_hbase + "/hbaseid") if not clusterid: logging.info("ZooKeeper: Could not read clusterid") else: logging.info("ZooKeeper: Clusterid: " + str(clusterid[0])) while True: m = zk.get_znode_data(znode_hbase + "/master") if not m: logging.info("ZooKeeper: Failed to get HBase master") else: logging.info("ZooKeeper: " + str(m[0])) jmx_query().main(hdfs_namenodes) hbase_exporter().main(hbase_master) nruns += 1 if nruns == 1: logging.info("Started HBase exporter") time.sleep(prom_scrape_interval_s)