hbase-exporter/hbase-exporter

818 lines
29 KiB
Python
Executable File

#!/usr/bin/env python3
#
# HBase Prometheus Exporter
#
# Björn Busse <bj.rn@baerlin.eu>
#
#
# TODO:
#
# * Remove timestamp from log msg or make them optional,
# we already have it in the journal -
# at least when not running in a container
#
# * Add hdfs/hbase binaries to container
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import configargparse
from bs4 import BeautifulSoup
from collections import defaultdict
from flatten_json import flatten
import datetime as dt
import io
import json
from kazoo.client import KazooClient
import logging
import os
from prometheus_client import start_http_server, Summary
from prometheus_client.core import GaugeMetricFamily, REGISTRY
from prometheus_client import Gauge
import pybase
import random
import re
import requests
from struct import unpack
import subprocess
from subprocess import Popen,PIPE
import sys
import time
import traceback
import xml.etree.ElementTree as et
sys.path.append('/usr/local/lib/hbase-protobuf-python')
sys.path.append('/usr/local/lib/hbase-protobuf-python/server')
sys.path.append('/usr/local/lib/hbase-protobuf-python/server/zookeeper')
from ZooKeeper_pb2 import Master as hbMaster
tmp_path = '/tmp/'
logpath = tmp_path
# ZooKeeper
zk_reconnect_interval_s = 30
# Prom vars
REQUEST_TIME = Summary('request_processing_seconds', 'Time spent processing request')
prom_hdfs_total = Gauge('hdfs_bytes_total', 'HDFS total bytes')
prom_hdfs_used = Gauge('hdfs_bytes_used', 'HDFS used bytes')
prom_hdfs_remaining = Gauge('hdfs_bytes_remaining', 'HDFS remaining bytes')
prom_hdfs_num_datanodes_live = Gauge('hdfs_datanodes_live', 'HDFS Live DataNodes')
prom_hdfs_num_datanodes_dead = Gauge('hdfs_datanodes_dead', 'HDFS Dead DataNodes')
prom_hbase_up = Gauge('hbase_up', 'HBase is up and running, a master is elected')
prom_hbase_healthy = Gauge('hbase_healthy', 'HBase is up and running, a master is elected, no inconsistencies are detected, hbase is queryable')
prom_hbase_num_regionservers_live = Gauge('hbase_regionservers_live', 'HBase Live Regionservers')
prom_hbase_num_regionservers_dead = Gauge('hbase_regionservers_dead', 'HBase Dead Regionservers')
prom_hbase_num_clusterrequests = Gauge('hbase_clusterrequests', 'HBase Clusterrequests')
prom_hbase_num_regions_in_transition_stale = Gauge('number_of_regions_in_transition_stale', 'Number of stale regions in transition')
prom_hbase_num_inconsistencies = Gauge('number_of_inconsistencies', 'Number of inconsistencies in HBase')
prom_hbase_readable = Gauge('hbase_is_readable', 'HBase is readable')
prom_hbase_writeable = Gauge('hbase_is_writeable', 'HBase is writeable')
prom_zookeeper_num = Gauge('zookeeper_num', 'Known ZooKeeper Servers')
prom_zookeeper_num_live = Gauge('zookeeper_num_live', 'Live ZooKeeper Servers')
prom_zookeeper_has_leader = Gauge('zookeeper_has_leader', 'ZooKeeper cluser has a leader')
# HDFS/HBase
hdfs_config_file = "/etc/hadoop/conf/hdfs-site.xml"
cmd_hbase_active_master = ['/usr/hdp/current/hbase-client/bin/hbase-jruby', '/usr/hdp/current/hbase-client/bin/get-active-master.rb']
cmd_hbase_hbck = ['hbase', 'hbck']
cmd_hbase_write = ['hbase-write']
cmd_hdfs_namenodes = ['hdfs', 'getconf', '-namenodes']
# Use command line arguments to set the following vars
# Do not change them here (See TODO)
namenodes = ""
namenode_use_tls = False
hbase_master_ui_default_port = 16010
hdfs_namenode_default_port = 50070
cluster_is_kerberized = False
hbase_hbck_remove_lockfile = True
class zk():
zk_client = ""
@classmethod
def main(self, address_list, use_tls, timeout=5):
addresses = ','.join(address_list)
zk_client = KazooClient(addresses, use_ssl=use_tls, read_only=True)
try:
zk_client.start(timeout)
except Exception as e:
logging.debug("ZooKeeper Error: " + str(e))
return False
self.zk_client = zk_client
self.zk_client.add_listener(self.listener)
return True
def listener(state):
if state == kz_client.KazooState.CONNECTED:
logging.info("ZooKeeper: Client connected")
else:
logging.info("ZooKeeper: Failed to connect to ZooKeeper")
@classmethod
def znode_data(self, znode):
data = ""
try:
self.zk_client.exists(znode)
except Exception as e:
logging.info("ZooKeeper: znode does not exist: " + znode)
return False
try:
data = self.zk_client.get(znode)
except:
logging.info("ZooKeeper: Could not get znode data from " + znode)
return False
return data
def listener(state):
if state == KazooState.LOST:
logging.debug("ZooKeeper: Connection lost")
# Register somewhere that the session was lost
elif state == KazooState.SUSPENDED:
logging.debug("ZooKeeper: Connection suspended")
# Handle being disconnected from Zookeeper
else:
logging.debug("ZooKeeper: Connection re-established")
# Handle being connected/reconnected to Zookeeper
def active_servers(address_list):
zk_has_leader = 0
zk_leader_address = ""
num_active_servers = 0
re_mode = re.compile(r'^Mode:\s*(.+?)\s*$')
for address in address_list:
cmd = 'echo stat | nc ' + address + ' 2181'
p = Popen(['/bin/sh', '-c', cmd], stdout=PIPE, stderr=PIPE, close_fds=False)
output, error = p.communicate()
output = output.splitlines()
error = error.splitlines()
for line in output:
match = re_mode.match(line.decode('utf-8'))
if match:
mode = match.group(1)
logging.info("zk: server %s: %s", address, mode)
num_active_servers += 1
if match.group(1) == "leader":
has_leader = 1
zk_leader_address = address
prom_zookeeper_has_leader.set(has_leader)
for line in error:
logging.info(line)
prom_zookeeper_num_live.set(num_active_servers)
logging.info("zk: %d active ZooKeeper servers", num_active_servers)
if has_leader:
logging.info("zk: Zookeeper has leader: True")
logging.info("zk: leader: %s", zk_leader_address)
else:
logging.info("zk: Zookeeper has leader: False")
class jmx_query():
def __init__(self, relay_complete_jmx):
self.relay_complete_jmx = relay_complete_jmx
self.prom_jmx_keys = []
self.prom_jmx = {}
def main(self, hdfs_namenode_hosts):
hdfs_active_namenode = self.active_namenode(hdfs_namenode_hosts)
hbase_active_master = hbase_exporter.zk_active_master()
if not hdfs_active_namenode:
logging.info("Failed to determine active HDFS namenode")
return False
if not hbase_active_master:
logging.info("Failed to determine active HBase master")
return False
url = self.get_url('hdfs', hdfs_active_namenode)
logging.info("hdfs: Fetching jmx data")
self.jmx_data(url)
url = self.get_url('hbase', hbase_active_master)
logging.info("hbase: Fetching jmx data")
self.jmx_data(url)
def get_url(self, service, hostname):
if (namenode_use_tls):
url_scheme = "https://"
else:
url_scheme = "http://"
if service == 'hdfs':
url = url_scheme + hostname + ":" + str(hdfs_namenode_default_port) + "/jmx"
elif service == 'hbase':
url = url_scheme + hostname + ":" + str(hbase_master_ui_default_port) + "/jmx"
return url
def jmx_data(self, url):
jmx = self.query(url)
if (jmx == False):
logging.info("Could not read jmx data from: " + url)
return False
for k, v in jmx.items():
if not v is None:
self.lookup_keys(k, v)
return True
def active_namenode(self, hdfs_namenode_hosts):
if not which(cmd_hdfs_namenodes[0]):
logging.info("Could not find hdfs executable in PATH")
return False
try:
r = subprocess.run(cmd_hdfs_namenodes, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except Exception as e:
logging.debug("Type error: " + str(e))
return False
hosts = r.stdout.decode('utf-8').split(" ")
tree = et.parse(hdfs_config_file)
root = tree.getroot()
has_ha_element = False
active_namenode = None
for property in root:
if "dfs.ha.namenodes" in property.find("name").text:
has_ha_element = True
nameservice_id = property.find("name").text[len("dfs.ha.namenodes")+1:]
namenodes = property.find("value").text.split(",")
for node in namenodes:
# Get namenode address and check if it is the active node
for n in root:
prefix = "dfs.namenode.rpc-address." + nameservice_id + "."
element_text = n.find("name").text
if prefix in element_text:
node_address = n.find("value").text.split(":")[0]
# Needs to either run with root privileges or as hdfs user
cmd = ['hdfs haadmin -getServiceState ' + node]
r = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if len(r.stderr.decode("utf-8")) > 0:
logging.debug(r.stderr.decode("utf-8"))
if "active" in r.stdout.decode("utf-8").lower():
logging.info("hdfs: Active namenode: " + node_address + " (" + node + ")")
return node_address
if has_ha_element:
logging.info("Hadoop High-Availability: True")
else:
logging.info("Hadoop High-Availability: False")
return False
def query(self, url):
try:
r = requests.get(url)
except Exception as e:
logging.info("Could not connect to: " + url)
return False
jmx = json.loads(r.text)
jmx = flatten(jmx)
return(jmx)
def lookup_keys(self, key, value):
denylist = ["Name", "name", "Type", "Object",
"ObjectName", "Valid", "tag.Context", "tag.Hostname"]
if key.endswith("capacityUsed"):
prom_hdfs_used.set(value)
logging.debug("Found jmx key: " + key)
elif key.endswith("capacityTotal"):
prom_hdfs_total.set(value)
logging.debug("Found jmx key: " + key)
elif key.endswith("capacityRemaining"):
prom_hdfs_remaining.set(value)
logging.debug("Found jmx key: " + key)
elif key.endswith("NumLiveDataNodes"):
prom_hdfs_num_datanodes_live.set(value)
logging.debug("Found jmx key: " + key)
elif key.endswith("NumDeadDataNodes"):
prom_hdfs_num_datanodes_dead.set(value)
logging.debug("Found jmx key: " + key)
elif key.endswith("numRegionServers"):
prom_hbase_num_regionservers_live.set(value)
logging.debug("Found jmx key: " + key)
elif key.endswith("numDeadRegionServers"):
prom_hbase_num_regionservers_dead.set(value)
logging.debug("Found jmx key: " + key)
elif key.endswith("clusterRequests"):
prom_hbase_num_clusterrequests.set(value)
logging.debug("Found jmx key: " + key)
else:
if not self.relay_complete_jmx:
return
jmx_key = key.split("_", 2)
if jmx_key[2] not in denylist:
jmx_key = "jmx_" + key
jmx_key = jmx_key.replace(".", "_")
jmx_key = jmx_key.replace("-", "_")
logging.debug("Found jmx key: " + jmx_key)
if not isinstance(value, str) and not type(value) is list:
prom_jmx_key = "prom_" + jmx_key
# Check if key is already registered
if prom_jmx_key not in self.prom_jmx_keys:
self.prom_jmx_keys.append(prom_jmx_key)
self.prom_jmx[prom_jmx_key] = Gauge(prom_jmx_key, prom_jmx_key)
# Set prometheys value
self.prom_jmx[prom_jmx_key].set(value)
class hbase_exporter():
def __init__(self):
self.hbase_read_success = 0
self.hbase_write_success = 0
def main(self, zk_server, hbase_master_hosts, run_hbck):
hbase_active_master = self.zk_active_master()
if not hbase_active_master:
logging.info("hbase: Failed to determine active HBase master")
prom_hbase_up.set(0)
prom_hbase_healthy.set(0)
return False
prom_hbase_up.set(1)
self.stale_regions_in_transition(hbase_active_master)
msg = 'hbase: {0} stale regions in transition '\
.format(self.num_regions_in_transition_stale)
logging.info(msg)
prom_hbase_num_regions_in_transition_stale.set(self.num_regions_in_transition_stale)
if run_hbck:
self.hbck_inconsistencies()
logging.info("hbase-hbck: Number of inconsistencies: %d", self.num_inconsistencies)
prom_hbase_num_inconsistencies.set(self.num_inconsistencies)
self.hbase_read_write_test(zk_server)
if self.hbase_read_success:
logging.info("hbase: Read test succeeded")
prom_hbase_readable.set(1)
else:
logging.info("hbase: Read test failed!")
prom_hbase_readable.set(0)
if self.hbase_write_success:
logging.info("hbase: Write test succeeded")
prom_hbase_writeable.set(1)
else:
logging.info("hbase: Write test failed!")
prom_hbase_writeable.set(0)
hbase_health = self.check_health(run_hbck)
prom_hbase_healthy.set(hbase_health)
def check_health(self, run_hbck):
# Only check for inconsistencies if we actually ran hbck
if run_hbck and self.num_inconsistencies > 0:
return False
if self.num_regions_in_transition_stale > 0:
return False
if self.hbase_write_success != 0:
return False
return True
# The prefered method to get the active
# HBase Master by directly looking into ZooKeeper
@staticmethod
def zk_active_master():
msg = zk.znode_data(znode_hbase + "/master")
if not msg:
logging.info("ZooKeeper: Failed to get HBase master")
return False
else:
msg = msg[0]
first_byte, meta_length = unpack(">cI", msg[:5])
msg = msg[meta_length + 9:]
master = hbMaster()
master.ParseFromString(msg)
return master.master.host_name
# An alternative way to get the HBase Master
# without directly looking into ZooKeeper
@staticmethod
def jruby_active_master():
if not which(cmd_hbase_active_master[0]):
logging.info("Could not find hdfs executable in PATH")
return False
try:
r = subprocess.run(cmd_hbase_active_master, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except:
return False
if 'Master not running' in r.stdout.decode('utf-8'):
return False
active_master = r.stdout.decode('utf-8').rstrip()
return active_master
def stale_regions_in_transition(self, hbase_master):
host = hbase_master.rstrip("\n\r")
port = hbase_master_ui_default_port
url = 'http://%(host)s:%(port)s/master-status' % locals()
logging.debug('GET %s', url)
try:
req = requests.get(url)
except requests.exceptions.RequestException as e:
logging.debug(e)
logging.debug('Failed to request ' + url)
return False
logging.debug("Response: %s %s", req.status_code, req.reason)
if req.status_code != 200:
logging.debug('Got a http return code != 200')
num_regions_in_transition_stale = self.hbaseui_parse_output(req.content)
if num_regions_in_transition_stale is None:
logging.debug('hbase-ui: Parse error - failed to find number of stale regions in transition')
if not isinstance(num_regions_in_transition_stale, int):
logging.debug('hbase-ui: Parse error - got non-integer for stale regions in transition')
self.num_regions_in_transition_stale = num_regions_in_transition_stale
def hbaseui_parse_output(self, content):
soup = BeautifulSoup(content, 'html.parser')
num_regions_in_transition_stale = 0
try:
headings = soup.findAll('h2')
for heading in headings:
# The section only exists if there are stale regions in transition
if heading.get_text() == "Regions in Transition":
logging.info('hbase-ui: Found Regions in Transition section header')
logging.info('hbase-ui: Looking for table')
table = heading.find_next('table')
num_regions_in_transition_stale = self.hbaseui_parse_table(table)
if not isinstance(num_regions_in_transition_stale, int):
logging.info('hbase-ui: Got non-integer \'{0}\' for stale regions in transition when parsing HBase Master UI'\
.format(num_regions_in_transition_stale))
except (AttributeError, TypeError):
logging.info('hbase-ui: Failed to parse HBase Master UI status page')
return -1
return num_regions_in_transition_stale
def hbck_inconsistencies(self):
re_status = re.compile(r'^Status:\s*(.+?)\s*$')
re_duplicate = re.compile(r'(.*)ERROR\s\[main\]\sutil\.HBaseFsck\:\sAnother\sinstance\sof\shbck\sis\srunning(.*)$')
re_inconsistencies = re.compile(r'^\s*(\d+)\s+inconsistencies\s+detected\.?\s*$')
self.num_inconsistencies = None
hbck_status = None
logging.info("hbase: Running hbck consistency check")
p = Popen(['hbase', 'hbck'], stdout=PIPE, stderr=PIPE, close_fds=False)
output, error = p.communicate()
output = output.splitlines()
error = error.splitlines()
for line in output:
match = re_inconsistencies.match(line.decode('utf-8'))
if match:
self.num_inconsistencies = match.group(1)
logging.info('hbase-hbck: Number of inconsistencies: %s', self.num_inconsistencies)
continue
match = re_status.match(line.decode('utf-8'))
if match:
hbck_status = match.group(1)
logging.info('hbase-hbck: hbck status = %s', hbck_status)
break
for line in error:
match = re_duplicate.match(line.decode('utf-8'))
if match:
hbck_status = match.group(0)
logging.info('hbase-hbck: hbck status = %s', hbck_status)
hdfs_lock_uri = re.findall('hdfs://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', hbck_status)
for uri in hdfs_lock_uri:
logging.info('hbase-hbck: Locked by lockfile: {0}'.format(hdfs_lock_uri[0]))
if hbase_hbck_remove_lockfile:
hdfs_remove_file(uri)
else:
logging.info('hbase-hbck: Please remove lockfile manually if no hbck is running')
break
if hbck_status is None:
logging.info('hbase-hbck: Failed to find hbck status result')
if self.num_inconsistencies is None:
logging.info('hbase-hbck: Failed to find number of inconsistencies')
self.num_inconsistencies = -1
if self.num_inconsistencies != None:
self.num_inconsistencies = int(self.num_inconsistencies)
if not isinstance(self.num_inconsistencies, int):
logging.info('hbase-hbck: Non-integer detected for the number of inconsistencies')
self.num_inconsistencies = -1
return False
if p.returncode != 0:
logging.info("hbase-hbck: Failed to run hbck (%d)" % (p.returncode))
self.num_inconsistencies = -1
return False
@staticmethod
def hbaseui_parse_table(table):
for row in table.findChildren('tr'):
for col in row.findChildren('td'):
if 'Regions in Transition for more than ' in col.get_text():
next_sibling = col.findNext('td')
num_regions_in_transition_stale = next_sibling.get_text().strip()
return num_regions_in_transition_stale
return None
def result_to_dict(self, rsp):
ds = defaultdict(dict)
for cell in rsp.flatten_cells():
ds[cell.family][cell.qualifier] = cell.value
return ds
def hbase_read_write_test(self, zk_server):
table = os.environ['HBASE_TABLE']
key = "0x42devoops".encode('utf-8')
pybase_client = pybase.NewClient(zk_server)
cf = "t".encode('utf-8')
values = {
cf: {
"ops".encode('utf-8'): "devoops".encode('utf-8'),
}
}
# Read access
try:
rsp = pybase_client.get(table, key)
self.hbase_read_success = 1
except:
self.hbase_read_success = 0
rspd = self.result_to_dict(rsp)
logging.debug('hbase: Read: ')
for k, v in rspd.items():
logging.debug('key: %', k)
logging.debug('value: %', v)
# Write access
try:
self.hbase_write_success = 1
rsp = pybase_client.put(table, key, values)
except:
self.hbase_write_success = 0
# Delete what we wrote
logging.info("Deleting at " + key.decode('utf-8'))
try:
pybase_client.delete(table, key, values)
except Exception as e:
logging.error('Failed to delete: %s', str(e))
self.hbase_write_success = 0
return
return
def hdfs_remove_file(hdfs_uri):
p = Popen(['hadoop', 'fs', '-rm', hdfs_uri], stdout=PIPE, stderr=PIPE, close_fds=False)
output, error = p.communicate()
output = output.splitlines()
error = error.splitlines()
for line in output:
logging.info("hdfs-rm: %s", line)
for line in error:
logging.info("hdfs-rm: %s", line)
return error
def which(program):
def is_executable(fn):
return os.path.isfile(fn) and os.access(fn, os.X_OK)
filepath, fname = os.path.split(program)
if filepath:
if is_executable(program):
return program
else:
for path in os.environ["PATH"].split(os.pathsep):
exec_file = os.path.join(path, program)
if is_executable(exec_file):
return exec_file
return None
if __name__ == '__main__':
hbase_master_default_address = 'localhost:' + str(hbase_master_ui_default_port)
hdfs_namenode_default_address = 'localhost:' + str(hdfs_namenode_default_port)
parser = configargparse.ArgParser( description="")
parser.add_argument('--hbase-master', dest='hbase_master', env_var='HBASE_MASTER', action='append', help="HBase master address, can be specified multiple times", type=str, default=hbase_master_default_address)
parser.add_argument('--hdfs-namenode', dest='hdfs_namenode', env_var='HDFS_NAMENODE', action='append', help="HDFS namenode address, can be specified multiple times", type=str, default=hdfs_namenode_default_address)
parser.add_argument('--zookeeper-server-address', dest='zk_server', env_var='ZK_SERVER', action='append', help="ZooKeeper server address, can be specified multiple times", type=str, required=True)
parser.add_argument('--zookeeper-use-tls', dest='zk_use_tls', env_var='ZK_USE_TLS', help="Use TLS when connecting to ZooKeeper", type=bool, default=False)
parser.add_argument('--exporter-port', dest='prom_http_port', env_var='PROM_HTTP_PORT', help="Listen port for Prometheus export", type=int, default=9010)
parser.add_argument('--export-refresh-rate', dest='prom_export_interval_s', env_var='PROM_EXPORT_INTERVAL_S', help="Time between metrics are gathered in seconds", type=int, default=60)
parser.add_argument('--hbck-refresh-rate', dest='hbase_hbck_interval_s', env_var='HBASE_HBCK_INTERVAL_S', help="Minimum time between two consecutive hbck runs in seconds", type=int, default=600)
parser.add_argument('--relay-jmx', dest='relay_jmx', env_var='RELAY_JMX', help="Relay complete JMX data", type=bool, default=False)
parser.add_argument('--logfile', dest='logfile', env_var='LOGFILE', help="Path to optional logfile", type=str)
parser.add_argument('--loglevel', dest='loglevel', env_var='LOGLEVEL', help="Loglevel, default: INFO", type=str, default='INFO')
args = parser.parse_args()
prom_http_port = args.prom_http_port
logfile = args.logfile
loglevel = args.loglevel
zk_server = args.zk_server
zk_use_tls = args.zk_use_tls
hbase_master = args.hbase_master
hdfs_namenodes = args.hdfs_namenode
relay_complete_jmx = args.relay_jmx
prom_export_interval_s = args.prom_export_interval_s
hbase_hbck_interval_s = args.hbase_hbck_interval_s
del locals()['args']
nzk_server = len(zk_server)
prom_zookeeper_num.set(nzk_server)
# Optional File Logging
if logfile:
tlog = logfile.rsplit('/', 1)
logpath = tlog[0]
logfile = tlog[1]
if not os.access(logpath, os.W_OK):
# Our logger is not set up yet, so we use print here
print("Logging: Can not write to directory. Skippking filelogging handler")
else:
fn = logpath + '/' + logfile
file_handler = logging.FileHandler(filename=fn)
# Our logger is not set up yet, so we use print here
print("Logging: Logging to " + fn)
stdout_handler = logging.StreamHandler(sys.stdout)
if 'file_handler' in locals():
handlers = [file_handler, stdout_handler]
else:
handlers = [stdout_handler]
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s',
handlers=handlers
)
logger = logging.getLogger(__name__)
level = logging.getLevelName(loglevel)
logger.setLevel(level)
# Start the Prometheus server
try:
start_http_server(prom_http_port)
except Exception as e:
logging.debug("Failed to start Prometheus webserver: " + str(e))
logging.info("There might be another instance of " + sys.argv[0] + \
" already running, can not bind to " + str(prom_http_port) + ", exiting..")
sys.exit()
nruns = 0
# Start a ZooKeeper client
r = False
nzk = 0
# Try to connect to one of the known servers
while not r:
r = zk.main(zk_server, zk_use_tls)
time.sleep(zk_reconnect_interval_s)
if cluster_is_kerberized:
znode_hbase = "/hbase"
else:
znode_hbase = "/hbase-unsecure"
clusterid = zk.znode_data(znode_hbase + "/hbaseid")
if not clusterid:
logging.info("ZooKeeper: Could not read clusterid")
else:
logging.info("ZooKeeper: Clusterid: " + str(clusterid[0]))
jmx = jmx_query(relay_complete_jmx)
while True:
nruns += 1
run_hbck = False
# Set the initial hbck timer
if nruns == 1:
hbase_hbck_timer_s = dt.datetime.now()
run_hbck = True
hbase_active_master = hbase_exporter.zk_active_master()
logging.debug("hbase: Active master: {0}".format(hbase_active_master))
zk.active_servers(zk_server)
jmx.main(hdfs_namenodes)
hbase_hbck_time_s = int((dt.datetime.now() - hbase_hbck_timer_s).total_seconds())
logging.debug("hbase-hbck: Timer: {0} seconds".format(hbase_hbck_time_s))
# Do an hbck on the first run and then whenever the interval
# between to consecutive runs in seconds is higher than the configured interval
if hbase_hbck_interval_s < hbase_hbck_time_s or run_hbck:
run_hbck = True
# Set a new hbck timer
hbase_hbck_timer_s = dt.datetime.now()
else:
hbck_t_next_s = hbase_hbck_interval_s - hbase_hbck_time_s
if hbck_t_next_s < prom_export_interval_s:
# Minimum wait time is our export refresh rate -
# the time how long we sleep between two runs
hbck_t_next_s = prom_export_interval_s
logging.info("hbase-hbck: Skipping. hbck is only run every {0} seconds. Next run in {1} seconds"
.format(hbase_hbck_interval_s, hbck_t_next_s))
hbase_exporter().main(zk_server, hbase_master, run_hbck)
#prom_zookeeper_num_live.set(nzookeeper_live)
if nruns == 1:
logging.info("Started HBase exporter")
logging.info("Sleeping for {0} seconds ".format(prom_export_interval_s))
time.sleep(prom_export_interval_s)