容易的linux自动化运维工具之clinet端(二)

原文:https://www.3qos.com/article/101.htmlhtml

做者:容易  日期:2015-03-17python

备注:非本人赞成,请勿转载linux


    接上一篇 容易的linux集中管理工具之master端(一)git

    client端代码shell

#-*- coding: UTF-8 -*- 

__author__ = 'tiger'

#!/usr/bin/env python

import zmq, time, sys, os, atexit

from signal import SIGTERM

import random

import logging

#import ConfigParser

import subprocess

from logging.handlers import RotatingFileHandler

import socket

import fcntl

import struct

import signal

  

#定义日志函数

def mylog(logfile):

    rthandler = RotatingFileHandler(logfile, 'a', maxBytes=50 * 1024 * 1024, backupCount=3)

    formatter = logging.Formatter(

        '%(levelname)s %(thread)d %(threadName)s %(process)d %(funcName)s %(asctime)s %(filename)s[line:%(lineno)d] %(message)s',

        datefmt='%a, %d %b %Y %H:%M:%S')

    rthandler.setFormatter(formatter)

    log = logging.getLogger()

    log.setLevel(logging.INFO)

    log.addHandler(rthandler)

    return log

  

#获取指定接口的IP地址

def get_ip_address(ifname):

    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    return socket.inet_ntoa(fcntl.ioctl(

        s.fileno(),

        0x8915, # SIOCGIFADDR

        struct.pack('256s', ifname[:15])

    )[20:24])

  

#定义命令或者脚本执行超时所触发的错误

class ProcessTimeout(Exception):

    pass

  

#定义命令或者脚本执行超时所触发的错误句柄

def timeout_handler(signum, frame):

    raise ProcessTimeout

  

#执行命令的函数

def exec_shell(task, rundir=None, timeout=None):

    #定义超时

    if timeout:

        signal.signal(signal.SIGALRM, timeout_handler)

        signal.alarm(timeout)

    p = subprocess.Popen(task, bufsize=0, shell=True, cwd=rundir, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    try:

        stdout, stderr = p.communicate()

        returncode = p.poll()

        signal.alarm(0)

    except ProcessTimeout:

        p.stdout.close()

        p.stderr.close()

        p.terminate()

        stderr = 'Calculation was taking too long, so I killed it dead.\n'

        returncode = 'timeout'

    del p

    #返回的信息,若是成功返回标准输出

    #错误返回标准错误

    if returncode == 0:

        return [returncode, stdout]

    else:

        return [returncode, stderr]

  

#任务类别为脚本时调用该函数

def exec_script(fileserver, script, dirpath, rundir=None, timeout=None):

    fileurl = fileserver + script['script_name']

    #去http服务器下载脚本,直接调用系统的wget命令下载

    getfile = 'wget -N -P ' + dirpath + ' ' + fileurl

    filename = dirpath + script['script_name']

    task = script['script_env'] + ' ' + filename

    getfile_res = exec_shell(getfile, rundir=rundir, timeout=timeout)

    if getfile_res[0] == 0:

        task_res = exec_shell(task, rundir=rundir, timeout=timeout)

        try:

            os.remove(filename)

        except Exception, err:

            task_res[1] = task_res[1] + str(err)

        return task_res

    else:

        return getfile_res

  

#与master创建链接完成与master的通讯

def ioloop(sock_file, homedir, mip, stdout):

    #定义运行目录,默认状况下,脚或者命令在该目录执行

    dirpath = homedir + '/run/'

    #定义平常函数

    log = mylog(stdout)

    context = zmq.Context()

    socket = context.socket(zmq.REQ)

    socket.setsockopt(zmq.LINGER, 0)

    socket.connect(sock_file)

    poller = zmq.Poller()

    poller.register(socket, zmq.POLLIN)

    #循环

    while 1:

        #发送请求信息

        try:

            socket.send_pyobj({'req_type': 'task', 'ip': mip})

        except Exception, err:

            log.warn(str(err))

            socket.close()

            context.term()

            time.sleep(random.randint(1, 5))

            context = zmq.Context()

            socket = context.socket(zmq.REQ)

            socket.setsockopt(zmq.LINGER, 0)

            socket.connect(sock_file)

            poller = zmq.Poller()

            poller.register(socket, zmq.POLLIN)

            continue

        #服务端响应超时

        if poller.poll(20 * 1000):

            rep = socket.recv_pyobj()

            #若是有响应信息,判断响应的类别。

            try:

                rep_type = rep['rep_type']

            except Exception, err:

                log.info(str(err))

                time.sleep(random.uniform(0.8, 1.2))

                continue

            #若是响应类别为newtask,则获取本次任务所需的其余信息

            if rep_type == 'newtask':

                try:

                    job_id = rep['id']

                    job_task = rep['task']

                    job_type = rep['type']

                    cmd_timeout = rep['cmdtimeout']

                    rundir = rep['rundir']

                    log.warn('start new job ' + str(rep))

                except Exception, err:

                    if job_id:

                        socket.send_pyobj(

                            {'id': job_id, 'code': '99', 'info': str(err), 'ip': mip, 'req_type': 'report'})

                        socket.recv_pyobj()

                    time.sleep(random.uniform(0.8, 1.2))

                    log.warn(str(err) + str(rep))

                    continue

                #若是任务类别是脚本,则尝试获取执行脚本所需的其余信息

                if job_type == 's':

                    try:

                        script_env = rep['env']

                        script = {'script_name': job_task, 'script_env': script_env}

                        fileserver = rep['fileserver']

                        #调用运行脚本的函数执行脚本

                        if rundir == 'None':

                            res = exec_script(fileserver, script, dirpath, rundir=dirpath, timeout=cmd_timeout)

                        else:

                            res = exec_script(fileserver, script, dirpath, rundir=rundir, timeout=cmd_timeout)

                    except Exception, err:

                        log.warn(str(err))

                        continue

                #任务类别为其余时则统一看成命令执行

                else:

                    if rundir == 'None':

                        res = exec_shell(job_task, rundir=dirpath, timeout=cmd_timeout)

                    else:

                        res = exec_shell(job_task, rundir=rundir, timeout=cmd_timeout)

                #将执行结果返回给master,标记请求类别为report,然master知道该请求是任务报告请求。

                socket.send_pyobj({'id': job_id, 'code': res[0], 'info': res[1], 'ip': mip, 'req_type': 'report'})

                socket.recv_pyobj()

                log.info(str({'id': job_id, 'code': res[0], 'info': res[1], 'ip': mip}))

                time.sleep(random.uniform(0.8, 1.2))

            else:

                time.sleep(random.uniform(0.8, 1.2))

        else:

            #响应超时时尝试重连master端

            log.warn("master server connect time out,will colse current socket,try again.")

            socket.close()

            context.term()

            time.sleep(random.randint(1, 5))

            context = zmq.Context()

            socket = context.socket(zmq.REQ)

            socket.setsockopt(zmq.LINGER, 0)

            socket.connect(sock_file)

            poller = zmq.Poller()

            poller.register(socket, zmq.POLLIN)

    socket.close()

    context.term()

  

  

def ip_check(ip):

    q = ip.split('.')

    return len(q) == 4 and len(filter(lambda x: x >= 0 and x <= 255, \

                                      map(int, filter(lambda x: x.isdigit(), q)))) == 4

  

  

class Daemon:

    def __init__(self, pidfile, sock_file, homedir, mip, stdin='/dev/null', stdout='/dev/null',

                 stderr='/dev/null'):

        self.stdin = stdin

        self.stdout = stdout

        self.stderr = stderr

        self.pidfile = pidfile

        self.homedir = homedir

        self.sock_file = sock_file

        self.mip = mip

  

    def _daemonize(self):

  

        #脱离父进程

        try:

            pid = os.fork()

            if pid > 0:

                sys.exit(0)

        except OSError, e:

            sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))

            sys.exit(1)

            #脱离终端

        os.setsid()

        #修改当前工做目录

        os.chdir("/")

        #重设文件建立权限

        os.umask(0)

        #第二次fork,禁止进程从新打开控制终端

        try:

            pid = os.fork()

            if pid > 0:

                sys.exit(0)

        except OSError, e:

            sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))

            sys.exit(1)

        sys.stdout.flush()

        sys.stderr.flush()

        si = file(self.stdin, 'r')

        so = file(self.stdout, 'a+')

        se = file(self.stderr, 'a+', 0)

        #重定向标准输入/输出/错误

        os.dup2(si.fileno(), sys.stdin.fileno())

        os.dup2(so.fileno(), sys.stdout.fileno())

        os.dup2(se.fileno(), sys.stderr.fileno())

        #注册程序退出时的函数,即删掉pid文件

        atexit.register(self.delpid)

        pid = str(os.getpid())

        file(self.pidfile, 'w+').write("%s\n" % pid)

  

    def delpid(self):

        os.remove(self.pidfile)

  

    def start(self):

        # Check for a pidfile to see if the daemon already runs

        try:

            pf = file(self.pidfile, 'r')

            pid = int(pf.read().strip())

            pf.close()

        except IOError:

            pid = None

        if pid:

            message = "pidfile %s already exist. Daemon already running?\n"

            sys.stderr.write(message % self.pidfile)

            sys.exit(1)

            # Start the daemon

        self._daemonize()

        self._run()

  

    def stop(self):

        # Get the pid from the pidfile

        try:

            pf = file(self.pidfile, 'r')

            pid = int(pf.read().strip())

            pf.close()

        except IOError:

            pid = None

        if not pid:

            message = "pidfile %s does not exist. Daemon not running?\n"

            sys.stderr.write(message % self.pidfile)

            return # not an error in a restart

            # Try killing the daemon process

        try:

            while 1:

                os.kill(pid, SIGTERM)

                time.sleep(0.1)

        except OSError, err:

            err = str(err)

            if err.find("No such process") > 0:

                if os.path.exists(self.pidfile):

                    os.remove(self.pidfile)

            else:

                print str(err)

                sys.exit(1)

  

    def restart(self):

        self.stop()

        self.start()

  

    def _run(self):

        pass

  

#重写Daemon的_run函数实现本身的Daemon进程

class MyDaemon(Daemon):

    def _run(self, ):

        ioloop(self.sock_file, self.homedir, self.mip, self.stdout)

  

#定义主函数,建立相关运行目录和定义日志路径等

def main():

    homedir = os.getcwd()

    for i in ('log', 'run'):

        path = homedir + '/' + i

        if not os.path.exists(path):

            os.makedirs(path, 0755)

    stdout = homedir + '/log' + '/oaos_client.log'

    stderr = homedir + '/log' + '/oaos_client.err'

    pidfile = homedir + '/run' + '/oaos_client.pid'

    #master的tcp接口

    sock_file = "tcp://192.168.4.194:7777"

    #该接口是指用来与master通讯的客户端IP接口

    ifname = 'eth0'

    try:

        mip = get_ip_address(ifname)

    except Exception, err:

        print err

        sys.exit(3)

    daemon = MyDaemon(pidfile, sock_file, homedir, mip, stdout=stdout, stderr=stderr)

    if len(sys.argv) == 2:

        if 'start' == sys.argv[1]:

            daemon.start()

        elif 'stop' == sys.argv[1]:

            daemon.stop()

        elif 'restart' == sys.argv[1]:

            daemon.restart()

        else:

            print "Unknown command"

            sys.exit(2)

        sys.exit(0)

    else:

        print "usage: %s start|stop|restart" % sys.argv[0]

        sys.exit(2)

  

  

if __name__ == "__main__":

    main()
相关文章
相关标签/搜索