#!/usr/bin/env python36 # # HBase Prometheus Exporter # # Björn Busse # from __future__ import absolute_import from __future__ import division from __future__ import print_function from __future__ import unicode_literals import argparse from bs4 import BeautifulSoup from flatten_json import flatten import io import json import logging import os from prometheus_client import start_http_server, Summary from prometheus_client.core import GaugeMetricFamily, REGISTRY from prometheus_client import Gauge import random import re import requests import subprocess import sys import time import traceback import xml.etree.ElementTree as et logfile = '' loglevel = 'INFO' rootlogger = logging.getLogger() handler = logging.StreamHandler(sys.stdout) handler.setLevel(logging.INFO) rootlogger.addHandler(handler) # Prometheus prom_http_port = 9010 prom_scrape_interval_s = 10 # Prom vars REQUEST_TIME = Summary('request_processing_seconds', 'Time spent processing request') prom_nregions_in_transition_stale = Gauge('number_of_regions_in_transition_stale', 'Number of stale regions in transition') prom_ninconsistencies = Gauge('number_of_inconsistencies', 'Number of inconsistencies in HBase') prom_hdfs_total = Gauge('hdfs_bytes_total', 'HDFS total bytes') prom_hdfs_used = Gauge('hdfs_bytes_used', 'HDFS used bytes') prom_hdfs_remaining = Gauge('hdfs_bytes_remaining', 'HDFS remaining bytes') # HDFS/HBase hdfs_config_file = "/etc/hadoop/conf/hdfs-site.xml" cmd_hbase_active_master = ['/usr/hdp/current/hbase-client/bin/hbase-jruby', '/usr/hdp/current/hbase-client/bin/get-active-master.rb'] cmd_hdfs_namenodes = ['hdfs', 'getconf', '-namenodes'] namenodes = "" namenode_use_tls = False class jmx_query(): def main(self, hdfs_namenode_hosts): hdfs_active_namenode = self.get_active_namenode() if not hdfs_active_namenode: sys.exit("Failed to determine active namenode") url = self.get_url(hdfs_active_namenode) jmx = self.query(url) if (jmx == False): print("Could not read jmx data from: " + url) return False for k, v in jmx.items(): if not v is None: self.lookup_keys(k, v) def get_url(self, hostname): if (namenode_use_tls): url_scheme = "https://" else: url_scheme = "http://" url = url_scheme + hostname + ":" + str(hdfs_namenode_port) + "/jmx" return url def get_active_namenode(hdfs_namenode_hosts): try: r = subprocess.run(cmd_hdfs_namenodes, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except Exception as e: print("type error: " + str(e)) logging.debug("Failed to get active master") return False hosts = r.stdout.decode('utf-8').split(" ") tree = et.parse(hdfs_config_file) root = tree.getroot() has_ha_element = False active_namenode = None if has_ha_element: print("Hadoop High-Availability") for property in root: if "dfs.ha.namenodes" in property.find("name").text: has_ha_element = True nameservice_id = property.find("name").text[len("dfs.ha.namenodes")+1:] namenodes = property.find("value").text.split(",") for node in namenodes: # Get namenode address and check if it is the active node for n in root: prefix = "dfs.namenode.rpc-address." + nameservice_id + "." element_text = n.find("name").text if prefix in element_text: node_address = n.find("value").text.split(":")[0] cmd = ['hdfs haadmin -getServiceState ' + node] r = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if len(r.stderr.decode("utf-8")) > 0: print(r.stderr.decode("utf-8")) if "active" in r.stdout.decode("utf-8").lower(): print("Active namenode: " + node_address + " (" + node + ")") return node_address return False def query(self, url): try: r = requests.get(url) except Exception as e: print("Could not connect to: " + url) return False jmx = json.loads(r.text) jmx = flatten(jmx) return(jmx) def lookup_keys(self, key, value): if key == "beans_29_StorageTypeStats_0_value_capacityUsed": prom_hdfs_used.set(value) elif key == "beans_29_StorageTypeStats_0_value_capacityTotal": prom_hdfs_total.set(value) elif key == "beans_29_StorageTypeStats_0_value_capacityRemaining": prom_hdfs_remaining.set(value) class hbase_exporter(): def main(self, hbase_master_hosts): hbase_active_master = self.get_active_master() self.get_stale_regions_in_transition(hbase_active_master) #self.hbck_get_inconsistencies() def get_active_master(self): try: r = subprocess.run(cmd_hbase_active_master, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except Exception as e: print("type error: " + str(e)) logging.debug("Failed to get active master") return False return r.stdout.decode('utf-8') def get_stale_regions_in_transition(self, hbase_master): host = hbase_master.rstrip("\n\r") port = hbase_master_ui_port url = 'http://%(host)s:%(port)s/master-status' % locals() logging.debug('GET %s', url) try: req = requests.get(url) except requests.exceptions.RequestException as e: logging.debug(e) logging.debug('Failed to request ' + url) return False logging.debug("Response: %s %s", req.status_code, req.reason) logging.debug("Content:\n%s\n%s\n%s", '='*80, req.content.strip(), '='*80) if req.status_code != 200: logging.debug('Got a http return code != 200') nregions_in_transition_stale = self.hbaseui_parse_output(req.content) if nregions_in_transition_stale is None: logging.debug('Parse error - failed to find number of stale regions in transition') if not isinstance(nregions_in_transition_stale, int): logging.debug('Parse error - got non-integer for regions stale in transition') msg = '{0} regions stale in transition '\ .format(nregions_in_transition_stale) prom_nregions_in_transition_stale.set(nregions_in_transition_stale) print(msg) return nregions_in_transition_stale def hbaseui_parse_output(self, content): soup = BeautifulSoup(content, 'html.parser') nregions_in_transition_stale = 0 try: headings = soup.findAll('h2') for heading in headings: if heading.get_text() == "Regions in Transition": logging.debug('Found Regions in Transition section header') logging.debug('Looking for table') table = heading.find_next('table') nregions_in_transition_stale = self.hbaseui_parse_table(table) if not isinstance(nregions_in_transition_stale, int): logging.debug('Got non-integer \'{0}\' for stale regions in transition when parsing HBase Master UI'\ .format(nregions_in_transition_stale)) return nregions_in_transition_stale except (AttributeError, TypeError): sys.exit('Failed to parse HBase Master UI status page') def hbck_get_inconsistencies(self): re_status = re.compile(r'^Status:\s*(.+?)\s*$') re_inconsistencies = re.compile(r'^\s*(\d+)\s+inconsistencies\s+detected\.?\s*$') ninconsistencies = None hbck_status = None p = Popen(['hbase', 'hbck'], stdout=PIPE, stderr=PIPE, close_fds=False) output, error = p.communicate() output = output.splitlines() if p.returncode != 0: print("Failed to run hbck (%d)" % (p.returncode)) sys.exit(1) for line in output: match = re_inconsistencies.match(line) if match: ninconsistencies = match.group(1) logging.info('Number of inconsistencies: %s', hbck_status) continue match = re_status.match(line) if match: hbck_status = match.group(1) logging.info('hbck status = %s', hbck_status) break if hbck_status is None: print('Failed to find hbck status result') if ninconsistencies is None: print('Failed to find number of inconsistencies') if ninconsistencies != None: ninconsistencies = int(ninconsistencies) if not isinstance(ninconsistencies, int): print('Error: Non-integer detected for the number of inconsistencies') @staticmethod def hbaseui_parse_table(table): for row in table.findChildren('tr'): for col in row.findChildren('td'): if 'Regions in Transition for more than ' in col.get_text(): next_sibling = col.findNext('td') nregions_in_transition_stale = next_sibling.get_text().strip() return nregions_in_transition_stale return None if __name__ == '__main__': parser = argparse.ArgumentParser( description="") parser.add_argument('--hbase-master-hosts', dest='hbase_masters', help="Comma seperated list of HBase master hosts", type=str) parser.add_argument('--hdfs-namenode-hosts', dest='hdfs_namenodes', help="Comma seperated list of HDFS namenode hosts", type=str) parser.add_argument('--logfile', dest='logfile', help="Path to logfile, if logging to a file is desired", type=str) args = parser.parse_args() # Logging if not logfile: logging.basicConfig(filename=logfile, level=logging.INFO) # Start the Prometheus server start_http_server(prom_http_port) nscrapes = 0 if (args.hbase_masters is None): hbase_master_hosts = ['localhost'] hbase_master_ui_port = 16010 if args.hdfs_namenodes is None: hdfs_namenode_hosts = ['localhost'] hdfs_namenode_port = 50070 while True: jmx_query().main(hdfs_namenode_hosts) hbase_exporter().main(hbase_master_hosts) nscrapes += 1 if nscrapes == 1: logging.info("Started HBase exporter") time.sleep(prom_scrape_interval_s)