pip install kazoo

 pyinstaller -F --hidden-import=urlparse zk_kafka_broker.py 

#! /usr/bin/env python
# -*- coding: UTF-8 -*-

from kazoo.client import KazooClient

try:
    zk = KazooClient(hosts='localhost:2181')
    zk.start()

    cs = zk.get_children("/brokers/ids")
    print(len(cs))
except Exception as e:
    print(str(e))

下面是监控zk的完整代码,自己编译打包好了

cat zk.py
#!/usr/bin/env python
import socket
from cStringIO import StringIO
import sys
import struct
import os
import json
import time
import random
from kazoo.client import KazooClient
#from StringIO import StringIO

class ZabbixSender:
    
    def __init__(self,server_host,server_port=10051):
        self.server_ip = server_host
        self.server_port = server_port
        self.zbx_header = 'ZBXD'
        self.zbx_sender_data = {u'request':u'sender data',u'data':[]}
        self.zbx_version = 1
        send_data = ''

    def AddData(self,host,key,value,clock=None):
        add_data = {u"host":host,u'key':key,u'value':value}
        if clock != None:
            add_data[u'clock'] = clock

        self.zbx_sender_data['data'].append(add_data)

        return self.zbx_sender_data

    def ClearData(self):
        self.zbx_sender_data['data'] = []
        return self.zbx_sender_data

    def __MakeSendData(self):
        zbx_sender_json = json.dumps(self.zbx_sender_data,separators=(',',':'),ensure_ascii=False).encode('utf-8')
        json_byte = len(zbx_sender_json)
        self.send_data = struct.pack("<4sBq" + str(json_byte) + "s",self.zbx_header,self.zbx_version,json_byte,zbx_sender_json)

    def Send(self):
        self.__MakeSendData()
        so = socket.socket()
        so.connect((self.server_ip,self.server_port))
        wobj = so.makefile(u'wb')
        wobj.write(self.send_data)
        wobj.close()
        robj = so.makefile(u'rb')
        recv_data = robj.read()
        robj.close()
        so.close()
        tmp_data = struct.unpack("<4sBq" + str(len(recv_data) - struct.calcsize("<4sBq")) + "s",recv_data)
        recv_json = json.loads(tmp_data[3])
        return recv_json



s=socket.socket()
s.connect(('localhost',2181))
s.send(sys.argv[1])
data=s.recv(2048)
s.close()

if sys.argv[1] == "ruok":
    h = StringIO(data)
    zresult = {}
    ruok = h.readline()
    key="zk.status[ruok]"
    if ruok:
       if ruok == "imok":
           ruok = 1
    else:
       ruok = 0
    zresult[key] = ruok
elif sys.argv[1] == "isro":
    h = StringIO(data)
    zresult = {}
    ruok = h.readline()
    key="zk.status[isro]"
    zresult[key] = ruok

elif sys.argv[1] == "livebroker":
    zk = KazooClient(hosts='localhost:2181')
    zk.start()
    cs = zk.get_children("/brokers/ids")
    zresult = {}
    key="zk.status[livebroker]"
    zresult[key] = len(cs)

else:
    h=StringIO(data)
    result={}
    zresult={}
    for line in  h.readlines():
        key,value=map(str.strip,line.split('\t'))
        zkey='zk.status' + '[' + key + ']'
        zvalue=value
        result[key]=value
        zresult[zkey]=zvalue
print(zresult)
    
if __name__=="__main__":
    zabbix_server=sys.argv[2]
    hostname=sys.argv[3]

    sender = ZabbixSender(zabbix_server)
    for k,v in zresult.items():
        sender.AddData(hostname,k,v)
    res = sender.Send()
    #print sender.send_data
    #print res

 

发表评论

邮箱地址不会被公开。 必填项已用*标注