zookeeper: Add a listener and log events

This commit is contained in:
Björn Busse 2019-04-15 17:18:17 +02:00
parent de010e9561
commit 7c27f4360c

View File

@ -50,6 +50,10 @@ import xml.etree.ElementTree as et
tmp_path = '/tmp/'
logpath = tmp_path
# ZooKeeper
zk_address = 'localhost:2181'
zk_reconnect_interval_s = 30
# Prometheus
prom_http_port = 9010
prom_scrape_interval_s = 10
@ -98,19 +102,36 @@ class zk():
zk_client = kz_client.KazooClient(address)
zk_client.add_listener(listener)
zk_client.start(timeout)
try:
zk_client.start(timeout)
except Exception as e:
logging.debug("ZooKeeper Error: " + str(e))
return False
self.zk_client = zk_client
@classmethod
def get_znode_data(self, znode):
try:
self.zk_client.get_children(znode)
self.zk_client.get(znode)
except Exception as e:
logging.debug("Type error: " + str(e))
logging.info("ZooKeeper: Could not find znode: " + znode)
return False
def listener(state):
if state == KazooState.LOST:
logging.debug("ZooKeeper: Connection lost")
# Register somewhere that the session was lost
elif state == KazooState.SUSPENDED:
logging.debug("ZooKeeper: Connection suspended")
# Handle being disconnected from Zookeeper
else:
logging.debug("ZooKeeper: Connection re-established")
# Handle being connected/reconnected to Zookeeper
class jmx_query():
@ -476,10 +497,14 @@ if __name__ == '__main__':
logger = logging.getLogger(__name__)
# Start the Prometheus server
start_http_server(prom_http_port)
try:
start_http_server(prom_http_port)
except Exception as e:
logging.debug("Failed to start Prometheus webserver: " + str(e))
logging.info("There might be another instance of " + sys.argv[0] + " already running, can not bind to " + str(prom_http_port) + ", exiting..")
sys.exit()
nruns = 0
if (args.hbase_masters is None):
@ -491,8 +516,16 @@ if __name__ == '__main__':
hdfs_namenode_port = hdfs_namenode_default_port
# Start a ZooKeeper client
zk.main()
if cluster_is_kerbeized:
r = False
while not r:
logging.info("ZooKeeper: Trying to connect to " + zk_address)
r = zk.main(zk_address)
time.sleep(zk_reconnect_interval_s)
zk.add_listener(listener)
if cluster_is_kerberized:
znode_clusterid("/hbase")
else:
znode_clusterid("/hbase-unsecure")