最近发现thriftserver 运行时,运行内存有时超过配置文件 thriftserver.conf 中SPARK_EXCUTOR_MEM配置的内存,致使thriftserver执行查询异常。因此写了小程序,定时监控thriftserver的运行状况,当运行内存大于配置文件的内存时,将thriftserver重启。
一、配置远程ssh 命令执行接口。
由于spark 一般是集群部署,因此thriftserver的配置文件 或者 部署不是在咱们所监控的机器上,因此不可避免须要远程执行命令。ssh 接口以下:
def ssh_execmd(hostname, port, username, password):
client = paramiko.SSHClient() # 须要 import paramiko 包
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(hostname=hostname, port=port, username=username, password=password)
return client
except Exception as e:
return None
后面执行远程代码,只须要调用链接接口,用 stdin, stdout, stderr = conn.exec_command(cmd) 执行对应的cmd命令就能够了,三个变量分别记录 命令的输入、输出、错误信息。调用完以后,记得调用conn.close()方法,关闭已经创建的链接,避免重复创建白白消耗网络资源。
二、获取进程pid
根据应用名称获取进程的id 代号,有时间一个名称可能对应多个进程的id,这种状况在grep 后就须要经过awk 命令对结果进行过滤,按指定的规则输出。获取进程pid 代码以下:
def get_thrift_pid():
cmd = "ps -ef |grep thriftserver|awk '$0 ~/memory/ {print $2}' | tr -s '\n' ' ' "
(status, pid_list) = commands.getstatusoutput(" %s " %cmd) #须要import commands 包
result = commands.getoutput(cmd)
commands 包中的getstatusoutput 方法,执行linux 命令的时候返回两个变量,status表示命令执行状态,0表示成功,其余状态表示执行不成功。pid_list表示执行结果,可能有多个。
awk 的字段解释网上不少,也很详细,基本上就是对查找结果的一个过滤,这里的$0表示整个当前行,$2每行的第二个字段,~表示匹配后面的字符;同理,!~表示不匹配,这里的不是精确的比较。后面的tr -s 表示将输出的内容中包含的换行符替换成空格。
关于执行命令有多个pid返回,因为执行的python 代码中可能包含pid,因此执行时返回了一个当前python进程的pid,而这个pid不是咱们真正须要的,怎么去除掉呢?个人作法是将这个命令执行两次,两次返回的结果中相同的那个pid就是咱们真正把须要的进行pid,完整代码以下,最后返回的是咱们须要的进程的pid:
def get_thrift_pid():
cmd = "ps -ef |grep thriftserver|awk '$0 ~/memory/ {print $2}' | tr -s '\n' ' ' "
(status, pid_list) = commands.getstatusoutput(" %s " %cmd)
result = commands.getoutput(cmd)
pid_list = pid_list.split(" ")[:-1]
result_list = result.split(" ")[:-1]
for pid in pid_list:
if pid in result_list:
return int(pid)
return None
三、获取程序运行时的占用内存
根据前面获取的进程pid 获取当前进程执行时的资源占用状况,获取资源占用,能够根据top -p 进程号 ,在线查看当前pid的资源占用状况,固然此种方式不太适宜程序中直接读取。事实上,查看 /pro/进程号/statm 能够直接读取,这里的进程号就是获取的pid 号。查询占用内存的程序代码以下:
def get_running_memory(pid):
file_name = "/proc/%d/statm" %pid
(status, output) = commands.getstatusoutput("cat %s" %file_name)
use_mem = None
if status == 0:
[size, resident, shared, trs, lrs, drs, dt] = output.split(" ")
resident = int(resident) # 字符串转换为整形
use_mem = resident * 4 # 使用内存, 每页占用4kb 内存
return use_mem
读取statm获取的7个变量分别表示的含义以下:
size:任务虚拟地址空间大小
Resident:正在使用的物理内存大小
Shared:共享页数
Trs:程序所拥有的可执行虚拟内存大小
Lrs:被映像倒任务的虚拟内存空间的库的大小
Drs:程序数据段和用户态的栈的大小
dt:脏页数量
四、读取thriftserver配置文件
获取了thriftserver运行时的内存,还须要读取出thriftserver配置文件的内存,将二者进行比较。可能实际环境和我代码中的环境不一致,只须要修改ssh链接的信息便可,代码以下:
def get_thrift_conf():
conf = ConfUtil().getThriftServerConf() #这里是我机器封装的ConfUtil 类,别的机器可能不能运行!
hostname = conf['ip'] #获取thriftserver 安装主机
file_name = "cat /home/bsaworker/spark/conf/thriftserver.conf"
conn = ssh_execmd(hostname, port, username, password)
stdin, stdout, stderr = conn.exec_command(file_name)
result = stdout.readlines()
conn.close()
conf_memroy= result[5].split('=')[1]
conf_memroy = conf_memroy.replace('\n','')
conf_memroy = conf_memroy.lower()
if conf_memroy.find('t') !=-1:
conf_memroy = int(conf_memroy.split('t')[0])*1024*1024*1024
elif conf_memroy.find('g') !=-1:
conf_memroy = int(conf_memroy.split('g')[0])*1024*1024
elif conf_memroy.find('m') !=-1:
conf_memroy = int(conf_memroy.split('m')[0])*1024
else:
return 0
return conf_memroy
这里单位统一转换成kb级别的,通常配置文件中不大可能给thriftserver配置kb级别的内存,因此在这里我直接过滤掉了。另外python能够ConfigParser类来读取或者操做配置文件,可是thriftserver的配置文件没有section 字段,因此没法直接进行读取了,我这里是本身写的一个方法,固然确定还有其余方式。
五、比较完以后相关操做
运行时的内存和配置的内存都已经获取到了,接下来咱们就要对获取的结果进行操做了,因为咱们的需求是一直要监听,因此我这里写了个死循环,一直运行。固然也能够配置到cron中去,让程序定时运行。
def process_thrift():
try:
while True:
pid = get_thrift_pid()
running_memory = get_running_memory(pid)
conf_memroy = get_thrift_conf()
if running_memory > conf_memroy:
commands.getoutput("kill -9 %d" %pid)
print "running memory is exceed conf memory, the thriftserver will restart"
time.sleep(100) #100 秒后再执行
# 执行restart 操做,由于咱们这里只须要kill 就完了,因此就没有restart 命令,实际状况中可能restart 比较好
else:
time.sleep(30) # 30秒以后再运行
print "the thriftserver process is normal "
except Exception as e:
print "检测程序运行异常,exit! "
return
这里须要捕获异常信息了,处理方式还能够有其余方式,根据具体的需求能够再修改。
花了半天时间研究这个其实以为仍是蛮有趣的,先记录下来~