Hive源码剖析之HiveServer2服务启动过程

1、前言java

    前段时间用Hive JDBC出现了阻塞问题,客户端一直处于等待状态,为了解决该问题,花了几天研究了Hive源码,因而准备写成系列博文,与你们分享其中的心得。有不当之处,请你们更正。web

2、背景算法

    咱们生活在一个数据的时代,咱们在经意与不经意间留下了数据。带着手机出门,不经意间咱们留下了计步数据和活动轨迹数据,咱们网购商品,不经意间留下我的偏好数据,发个朋友圈,留下了各类图片和文字数据……咱们每一个人都是数据贡献者。编程

    数据的暴增给数据的存储、分析和查询带来了不少难点,因而谷歌提出了不少解决方案,并发表了三篇意义重大的论文Google File System、Google MapReduce、Google Bigtable,基于这三篇论文,便产生了Hadoop和HBase。Hadoop是基于GFS和MapReduce理论的开源实现,解决大文件的存储与分析问题,HBase是基于Bigtable理论的开源实现,解决海量数据的实时检索。服务器

    MapReduce对于Hadoop而言,既是算法模型,也是框架。要使用MapReduce必须先把业务转化为适合该算法模型,并基于该框架编程。对于框架,笔者这样认为,框架提升了开发效率的同时,也限定了人的思惟范围和编程范围。房子是一个框架结构,住在房子里的人,活动范围只限于活动范围之间,开发框架亦是如此,Struts2是MVC框架,开发人员只能按照它所限定的模式开发,这未尝不是一种禁锢呢?做为一个软件架构者,了解框架实现的细节,才能走出禁锢。这也是笔者坚持看各类框架源码的缘由之一。多线程

    编写出好的MapReduce程序自己不是垂手可得的事,全部就有了Hive,它能把Sql指令,转化为MapReduce任务,让开发人员用Sql的方式去使用Hadoop的运算能力。咱们在使用这些工具的同时,也不得不赞叹这些杰出科学家奇思妙想。架构

3、分享重点并发

    HiveServer2服务启动过程负载均衡

4、源码框架

    一、开一段程序,入口确定是main方法

public static void main(String[] args) {
    //设置加载配置
	HiveConf.setLoadHiveServer2Config(true);
	try {
		ServerOptionsProcessor oproc = new ServerOptionsProcessor(
				"hiveserver2");
		ServerOptionsProcessorResponse oprocResponse = oproc.parse(args);
		String initLog4jMessage = LogUtils.initHiveLog4j();
		LOG.debug(initLog4jMessage);
		HiveStringUtils
				.startupShutdownMessage(HiveServer2.class, args, LOG);
		LOG.debug(oproc.getDebugMessage().toString());
        //这里即是要启动服务了
		oprocResponse.getServerOptionsExecutor().execute();
	} catch (LogInitializationException e) {
		LOG.error("Error initializing log: " + e.getMessage(), e);
		System.exit(-1);
	}
}

    二、ServerOptionsExecutor接口有三个实现类

    DeregisterOptionExecutor:将HiveServer2实例从zookeeper中移除

    HelpOptionExecutor:打印help参数

    ServerOptionsExecutor:启动HiveServer2服务

    这里很显然,咱们应该看ServerOptionsExecutor类

static class StartOptionExecutor implements ServerOptionsExecutor {
	@Override
	public void execute() {
		try {
            //启动服务
			startHiveServer2();
		} catch (Throwable t) {
			LOG.fatal("Error starting HiveServer2", t);
			System.exit(-1);
		}
	}
}

    三、startHiveServer2方法,重要的地方给出了必要的注释

private static void startHiveServer2() throws Throwable {
	long attempts = 0, maxAttempts = 1;
	while (true) {
		LOG.info("Starting HiveServer2");
		HiveConf hiveConf = new HiveConf();
		maxAttempts = hiveConf
				.getLongVar(HiveConf.ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS);
		HiveServer2 server = null;
		try {
			server = new HiveServer2();
            //初始化
			server.init(hiveConf);
            //启动
			server.start();
           //省略了一些代码
			break;
		} catch (Throwable throwable) {
			throwable.printStackTrace();
            //出现异常就中止服务
			if (server != null) {
				try {
					server.stop();
				} catch (Throwable t) {
					LOG.info(
							"Exception caught when calling stop of HiveServer2 before retrying start",
							t);
				} finally {
					server = null;
				}
			}
            //若是抛出异常,并尝试启动超过了配置的最大尝试次数,抛出错误,启动失败
			if (++attempts >= maxAttempts) {
				throw new Error("Max start attempts " + maxAttempts
						+ " exhausted", throwable);
			} else {
                //60秒后再次尝试启动
				LOG.warn("Error starting HiveServer2 on attempt "
						+ attempts + ", will retry in 60 seconds",
						throwable);
				try {
					Thread.sleep(60L * 1000L);
				} catch (InterruptedException e) {
					Thread.currentThread().interrupt();
				}
			}
		}
	}
}

    四、接下来咱们看看server的初始化作了哪些事情

@Override
public synchronized void init(HiveConf hiveConf) {
    //实例化clientService实例,该实例用于把用户请求转化并传递给Driver
	cliService = new CLIService(this);
	addService(cliService);
	if (isHTTPTransportMode(hiveConf)) {
		thriftCLIService = new ThriftHttpCLIService(cliService);
	} else {//默认状况是Thrift二进制服务
		thriftCLIService = new ThriftBinaryCLIService(cliService);
	}
    //添加进服务列表
	addService(thriftCLIService);
	super.init(hiveConf);
	// 省略获取配置信息……
	
	// 启动web UI,改web UI用于查看正在运行的Hive任务,默认端口10002
	try {
		        //省略大量获取的配置的代码
                。。。。。。
				webServer = builder.build();
                //添加查询运行任务数据的servlet
				webServer.addServlet("query_page", "/query_page",
						QueryProfileServlet.class);
			}
		}
	} catch (IOException ie) {
		throw new ServiceException(ie);
	}
	// Add a shutdown hook for catching SIGTERM & SIGINT
	final HiveServer2 hiveServer2 = this;
    //添加钩子
	Runtime.getRuntime().addShutdownHook(new Thread() {
		@Override
		public void run() {
			hiveServer2.stop();
		}
	});
}

    初始化主要是获取了一些配置参数,而且告诉程序,要启动这些服务CLIService、thriftCLIService 、webServer

    五、下面是重头戏,启动服务了,紧接着第3步中server.start()看下来,start方法

@Override
public synchronized void start() {
	super.start();//调用父类start方法
	if (webServer != null) {
		try {
			webServer.start();//启动web服务
			LOG.info("Web UI has started on port " + webServer.getPort());
		} catch (Exception e) {
			LOG.error("Error starting Web UI: ", e);
			throw new ServiceException(e);
		}
	}
}

    start方法,作了两件事情,就是调用父类start方法,启动web服务

    六、下面我跟进去看看,父类CompositeService中定义的start方法

@Override
public synchronized void start() {
	int i = 0;
	try {
		for (int n = serviceList.size(); i < n; i++) {
			Service service = serviceList.get(i);
			service.start();
		}
		super.start();
	} catch (Throwable e) {
		stop(i);
		throw new ServiceException("Failed to Start " + getName(), e);
	}
}

这里仅仅是把第4步中添加的服务列表中的服务进行了启动,下面咱们咱们说服务中的重点,HiveServer2怎么提供远程服务,请看ThriftBinaryCLIService

    七、ThriftBinaryCLIService中的run方法

@Override
public void run() {
	try {
		// 定义处理请求的线程池
		String threadPoolName = "HiveServer2-Handler-Pool";
		ExecutorService executorService = new ThreadPoolExecutor(
				minWorkerThreads, maxWorkerThreads, workerKeepAliveTime,
				TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
				new ThreadFactoryWithGarbageCleanup(threadPoolName));


		//省略定义传输协议、配置参数代码
         ........
		int maxMessageSize = hiveConf
				.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
		int requestTimeout = (int) hiveConf.getTimeVar(
				HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT,
				TimeUnit.SECONDS);
		int beBackoffSlotLength = (int) hiveConf
				.getTimeVar(
						HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH,
						TimeUnit.MILLISECONDS);
		TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(
				serverSocket)
				.processorFactory(processorFactory)
				.transportFactory(transportFactory)
				.protocolFactory(new TBinaryProtocol.Factory())
				.inputProtocolFactory(
						new TBinaryProtocol.Factory(true, true,
								maxMessageSize, maxMessageSize))
				.requestTimeout(requestTimeout)
				.requestTimeoutUnit(TimeUnit.SECONDS)
				.beBackoffSlotLength(beBackoffSlotLength)
				.beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
				.executorService(executorService);

		// TCP Server
		server = new TThreadPoolServer(sargs);
		server.setServerEventHandler(serverEventHandler);
		server.serve();
		String msg = "Started "
				+ ThriftBinaryCLIService.class.getSimpleName()
				+ " on port " + portNum + " with " + minWorkerThreads
				+ "..." + maxWorkerThreads + " worker threads";
		LOG.info(msg);
	} catch (Throwable t) {
		LOG.fatal("Error starting HiveServer2: could not start "
				+ ThriftBinaryCLIService.class.getSimpleName(), t);
		System.exit(-1);
	}
}

    说明:这里的ExecutorService线程池的定义中用了SynchronousQueue队列,该队列的能够认为是长度为1的阻塞队列,当线程池满,而且没有空闲线程,便会阻塞。TThreadPoolServer的特色是,客户端只要不从服务器上断开链接,就会一直占据服务器的一个线程,因此出现了本文中开头出现的阻塞问题,解决办法,若是服务器内存容许,能够适当加大线程池长度,或者增长hive节点,在配合负载均衡。

    八、特别说明

    Thrift是RPC界的利器,Facebook的杰做,能够轻松实现夸语言的服务调用,支持的语言有C++, Java, Go,Python, PHP, Haskell, C#,  JavaScript, Node.js等等

   (1)、模型接口
       服务的调用接口以及接口参数model、返回值model 
   (2)、Tprotocol协议定义 
         将数据(model)编码 、解码 。 
  ( 3)、Ttramsport传输层定义
        编码后的数据传输(简单socket、http) 
   (5)、Tserver服务类型
        服务的Tserver类型,实现了几种rpc调用(单线程、多线程、非阻塞IO)

5、后记

    本文中的描述了HiveServer2的启动过程当中,启动的服务,那么客户端是如何与服务端交互的呢?为何经过JDBC的方式就能够去Hive上执行任务了呢?Hive JDBC除了实现JDBC标准接口外,还作了哪些事情呢?敬请期待《Hive源码剖析之Hive JDBC》

快乐源于分享。

此博客乃做者原创, 转载请注明出处

相关文章
相关标签/搜索