linux 环境下监控thriftserver 运行内存(python)

最近发现thriftserver 运行时,运行内存有时超过配置文件 thriftserver.conf 中SPARK_EXCUTOR_MEM配置的内存,致使thriftserver执行查询异常。因此写了小程序,定时监控thriftserver的运行状况,当运行内存大于配置文件的内存时,将thriftserver重启。
 
一、配置远程ssh 命令执行接口。
由于spark 一般是集群部署,因此thriftserver的配置文件 或者 部署不是在咱们所监控的机器上,因此不可避免须要远程执行命令。ssh 接口以下:
def ssh_execmd(hostname, port, username, password):
    client = paramiko.SSHClient()   # 须要 import paramiko 包
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 
    try:
        client.connect(hostname=hostname, port=port, username=username, password=password)
        return client
    except Exception as e:
        return None
 
后面执行远程代码,只须要调用链接接口,用 stdin, stdout, stderr = conn.exec_command(cmd) 执行对应的cmd命令就能够了,三个变量分别记录 命令的输入、输出、错误信息。调用完以后,记得调用conn.close()方法,关闭已经创建的链接,避免重复创建白白消耗网络资源。
 
二、获取进程pid
根据应用名称获取进程的id 代号,有时间一个名称可能对应多个进程的id,这种状况在grep 后就须要经过awk 命令对结果进行过滤,按指定的规则输出。获取进程pid 代码以下:
 
def get_thrift_pid():
    cmd = "ps -ef |grep thriftserver|awk '$0 ~/memory/ {print $2}' | tr -s '\n' ' ' "
    (status, pid_list) = commands.getstatusoutput(" %s " %cmd)  #须要import commands 包
    result = commands.getoutput(cmd)
 
commands 包中的getstatusoutput 方法,执行linux 命令的时候返回两个变量,status表示命令执行状态,0表示成功,其余状态表示执行不成功。pid_list表示执行结果,可能有多个。
awk 的字段解释网上不少,也很详细,基本上就是对查找结果的一个过滤,这里的$0表示整个当前行,$2每行的第二个字段,~表示匹配后面的字符;同理,!~表示不匹配,这里的不是精确的比较。后面的tr -s 表示将输出的内容中包含的换行符替换成空格。
关于执行命令有多个pid返回,因为执行的python 代码中可能包含pid,因此执行时返回了一个当前python进程的pid,而这个pid不是咱们真正须要的,怎么去除掉呢?个人作法是将这个命令执行两次,两次返回的结果中相同的那个pid就是咱们真正把须要的进行pid,完整代码以下,最后返回的是咱们须要的进程的pid:
 
def get_thrift_pid():
    cmd = "ps -ef |grep thriftserver|awk '$0 ~/memory/ {print $2}' | tr -s '\n' ' ' "
    (status, pid_list) = commands.getstatusoutput(" %s " %cmd)
    result = commands.getoutput(cmd)
    pid_list = pid_list.split(" ")[:-1]
    result_list = result.split(" ")[:-1]
    for pid in pid_list:
        if pid in result_list:
            return int(pid)
    return None
 
三、获取程序运行时的占用内存
根据前面获取的进程pid 获取当前进程执行时的资源占用状况,获取资源占用,能够根据top -p 进程号 ,在线查看当前pid的资源占用状况,固然此种方式不太适宜程序中直接读取。事实上,查看 /pro/进程号/statm 能够直接读取,这里的进程号就是获取的pid 号。查询占用内存的程序代码以下:
 
def  get_running_memory(pid):
    file_name = "/proc/%d/statm" %pid
    (status, output) = commands.getstatusoutput("cat %s" %file_name)
    use_mem = None
    if status == 0:
        [size, resident, shared, trs, lrs, drs, dt]  = output.split(" ")
        resident = int(resident)  # 字符串转换为整形
        use_mem  = resident * 4   # 使用内存, 每页占用4kb 内存
    return use_mem
 
读取statm获取的7个变量分别表示的含义以下:
size:任务虚拟地址空间大小
Resident:正在使用的物理内存大小
Shared:共享页数
Trs:程序所拥有的可执行虚拟内存大小
Lrs:被映像倒任务的虚拟内存空间的库的大小
Drs:程序数据段和用户态的栈的大小
dt:脏页数量
 
四、读取thriftserver配置文件
获取了thriftserver运行时的内存,还须要读取出thriftserver配置文件的内存,将二者进行比较。可能实际环境和我代码中的环境不一致,只须要修改ssh链接的信息便可,代码以下:
 
def get_thrift_conf():
    conf = ConfUtil().getThriftServerConf()   #这里是我机器封装的ConfUtil 类,别的机器可能不能运行!
    hostname = conf['ip']    #获取thriftserver 安装主机
    file_name = "cat /home/bsaworker/spark/conf/thriftserver.conf"
    conn = ssh_execmd(hostname, port, username, password)
    stdin, stdout, stderr = conn.exec_command(file_name)
    result = stdout.readlines()
    conn.close()
    conf_memroy= result[5].split('=')[1]
    conf_memroy = conf_memroy.replace('\n','')
    conf_memroy = conf_memroy.lower()
    if conf_memroy.find('t') !=-1:
        conf_memroy = int(conf_memroy.split('t')[0])*1024*1024*1024
    elif conf_memroy.find('g') !=-1:
        conf_memroy = int(conf_memroy.split('g')[0])*1024*1024
    elif conf_memroy.find('m') !=-1:
        conf_memroy = int(conf_memroy.split('m')[0])*1024 
    else:
        return 0
    return conf_memroy
 
这里单位统一转换成kb级别的,通常配置文件中不大可能给thriftserver配置kb级别的内存,因此在这里我直接过滤掉了。另外python能够ConfigParser类来读取或者操做配置文件,可是thriftserver的配置文件没有section 字段,因此没法直接进行读取了,我这里是本身写的一个方法,固然确定还有其余方式。
 
五、比较完以后相关操做
运行时的内存和配置的内存都已经获取到了,接下来咱们就要对获取的结果进行操做了,因为咱们的需求是一直要监听,因此我这里写了个死循环,一直运行。固然也能够配置到cron中去,让程序定时运行。
 
def process_thrift():
    try:
        while True:
            pid = get_thrift_pid()
            running_memory = get_running_memory(pid)
            conf_memroy = get_thrift_conf()
            if running_memory > conf_memroy:
                commands.getoutput("kill -9 %d" %pid)
                print "running memory is exceed conf memory, the thriftserver will restart"
                time.sleep(100)   #100 秒后再执行
     # 执行restart 操做,由于咱们这里只须要kill 就完了,因此就没有restart 命令,实际状况中可能restart 比较好
            else:
                time.sleep(30)    # 30秒以后再运行
                print "the thriftserver process is normal "
    except Exception as e:
        print "检测程序运行异常,exit! "
        return 
 
这里须要捕获异常信息了,处理方式还能够有其余方式,根据具体的需求能够再修改。
 
花了半天时间研究这个其实以为仍是蛮有趣的,先记录下来~
相关文章
相关标签/搜索