Use PyBase to write to HBase natively instead of using thrift

This commit is contained in:
Björn Busse 2020-10-21 21:56:26 +02:00
parent cbcaa81c73
commit 43df5c7104

View File

@ -7,8 +7,6 @@
#
# TODO:
#
# * Natively write to hbase instead of writing via Thrift
#
# * Remove timestamp from log msg or make them optional,
# we already have it in the journal -
# at least when not running in a container
@ -22,6 +20,7 @@ from __future__ import unicode_literals
import argparse
from bs4 import BeautifulSoup
from collections import defaultdict
from flatten_json import flatten
import datetime as dt
import io
@ -32,7 +31,7 @@ import os
from prometheus_client import start_http_server, Summary
from prometheus_client.core import GaugeMetricFamily, REGISTRY
from prometheus_client import Gauge
#import pybase
import pybase
import random
import re
import requests
@ -46,7 +45,7 @@ import xml.etree.ElementTree as et
sys.path.append('/usr/local/lib/hbase-protobuf-python')
sys.path.append('/usr/local/lib/hbase-protobuf-python/server')
sys.path.append('/usr/local/lib/hbase-protobuf-python/server/zookeeper')
from ZooKeeper_pb2 import Master as pbMaster
from ZooKeeper_pb2 import Master as hbMaster
tmp_path = '/tmp/'
logpath = tmp_path
@ -68,6 +67,7 @@ prom_hbase_num_regionservers_dead = Gauge('hbase_regionservers_dead', 'HBase Dea
prom_hbase_num_clusterrequests = Gauge('hbase_clusterrequests', 'HBase Clusterrequests')
prom_hbase_num_regions_in_transition_stale = Gauge('number_of_regions_in_transition_stale', 'Number of stale regions in transition')
prom_hbase_num_inconsistencies = Gauge('number_of_inconsistencies', 'Number of inconsistencies in HBase')
prom_hbase_readable = Gauge('hbase_is_readable', 'HBase is readable')
prom_hbase_writeable = Gauge('hbase_is_writeable', 'HBase is writeable')
prom_zookeeper_num = Gauge('zookeeper_num', 'Known ZooKeeper Servers')
prom_zookeeper_num_live = Gauge('zookeeper_num_live', 'Live ZooKeeper Servers')
@ -363,7 +363,11 @@ class jmx_query():
class hbase_exporter():
def main(self, hbase_master_hosts, run_hbck):
def __init__(self):
self.hbase_read_success = 0
self.hbase_write_success = 0
def main(self, zk_server, hbase_master_hosts, run_hbck):
hbase_active_master = self.zk_active_master()
if not hbase_active_master:
@ -386,12 +390,18 @@ class hbase_exporter():
logging.info("hbase-hbck: Number of inconsistencies: %d", self.num_inconsistencies)
prom_hbase_num_inconsistencies.set(self.num_inconsistencies)
self.hbase_write_test()
self.hbase_read_write_test(zk_server)
if self.hbase_read_success:
logging.info("hbase: Read test succeeded")
prom_hbase_readable.set(1)
else:
logging.info("hbase: Read test failed!")
prom_hbase_readable.set(0)
if self.hbase_write_success:
logging.info("hbase: Write test succeeded")
prom_hbase_writeable.set(1)
else:
logging.info("hbase: Write test failed! Is Thrift up and running?")
logging.info("hbase: Write test failed!")
prom_hbase_writeable.set(0)
hbase_health = self.check_health(run_hbck)
@ -426,7 +436,7 @@ class hbase_exporter():
msg = msg[0]
first_byte, meta_length = unpack(">cI", msg[:5])
msg = msg[meta_length + 9:]
master = pbMaster()
master = hbMaster()
master.ParseFromString(msg)
return master.master.host_name
@ -581,27 +591,54 @@ class hbase_exporter():
return None
def hbase_write_test(self):
hbase_write_env = os.environ.copy()
p = Popen(cmd_hbase_write, stdout=PIPE, stderr=PIPE, close_fds=False, env=hbase_write_env)
output, error = p.communicate()
output = output.decode("utf-8", "strict").splitlines()
error = error.decode("utf-8", "strict").splitlines()
def result_to_dict(self, rsp):
ds = defaultdict(dict)
for cell in rsp.flatten_cells():
ds[cell.family][cell.qualifier] = cell.value
return ds
logging.info("hbase-write: return code: %d", p.returncode)
for line in output:
logging.info("hbase-write: %s", line)
def hbase_read_write_test(self, zk_server):
table = "tagvalues"
key = "0x42devoops".encode('utf-8')
pybase_client = pybase.NewClient(zk_server)
cf = "t".encode('utf-8')
values = {
cf: {
"ops".encode('utf-8'): "devoops".encode('utf-8'),
}
}
for line in error:
logging.info("hbase-write: %s", line)
# Read access
try:
rsp = pybase_client.get(table, key)
self.hbase_read_success = 1
except:
self.hbase_read_success = 0
if p.returncode != 0:
rspd = self.result_to_dict(rsp)
logging.debug('hbase: Read: ')
for k, v in rspd.items():
logging.debug('key: %', k)
logging.debug('value: %', v)
# Write access
try:
self.hbase_write_success = 1
rsp = pybase_client.put(table, key, values)
except:
self.hbase_write_success = 0
return False
self.hbase_write_success = 1
return True
# Delete what we wrote
logging.info("Deleting at " + key.decode('utf-8'))
try:
pybase_client.delete(table, key, values)
except Exception as e:
logging.error('Failed to delete: %s', str(e))
self.hbase_write_success = 0
return
return
def hdfs_remove_file(hdfs_uri):
@ -769,7 +806,7 @@ if __name__ == '__main__':
logging.info("hbase-hbck: Skipping. hbck is only run every {0} seconds. Next run in {1} seconds"
.format(hbase_hbck_interval_s, hbck_t_next_s))
hbase_exporter().main(hbase_master, run_hbck)
hbase_exporter().main(zk_server, hbase_master, run_hbck)
#prom_zookeeper_num_live.set(nzookeeper_live)
if nruns == 1: