1、前言java
前段时间用Hive JDBC出现了阻塞问题,客户端一直处于等待状态,为了解决该问题,花了几天研究了Hive源码,因而准备写成系列博文,与你们分享其中的心得。有不当之处,请你们更正。web
2、背景算法
咱们生活在一个数据的时代,咱们在经意与不经意间留下了数据。带着手机出门,不经意间咱们留下了计步数据和活动轨迹数据,咱们网购商品,不经意间留下我的偏好数据,发个朋友圈,留下了各类图片和文字数据……咱们每一个人都是数据贡献者。编程
数据的暴增给数据的存储、分析和查询带来了不少难点,因而谷歌提出了不少解决方案,并发表了三篇意义重大的论文Google File System、Google MapReduce、Google Bigtable,基于这三篇论文,便产生了Hadoop和HBase。Hadoop是基于GFS和MapReduce理论的开源实现,解决大文件的存储与分析问题,HBase是基于Bigtable理论的开源实现,解决海量数据的实时检索。服务器
MapReduce对于Hadoop而言,既是算法模型,也是框架。要使用MapReduce必须先把业务转化为适合该算法模型,并基于该框架编程。对于框架,笔者这样认为,框架提升了开发效率的同时,也限定了人的思惟范围和编程范围。房子是一个框架结构,住在房子里的人,活动范围只限于活动范围之间,开发框架亦是如此,Struts2是MVC框架,开发人员只能按照它所限定的模式开发,这未尝不是一种禁锢呢?做为一个软件架构者,了解框架实现的细节,才能走出禁锢。这也是笔者坚持看各类框架源码的缘由之一。多线程
编写出好的MapReduce程序自己不是垂手可得的事,全部就有了Hive,它能把Sql指令,转化为MapReduce任务,让开发人员用Sql的方式去使用Hadoop的运算能力。咱们在使用这些工具的同时,也不得不赞叹这些杰出科学家奇思妙想。架构
3、分享重点并发
HiveServer2服务启动过程负载均衡
4、源码框架
一、开一段程序,入口确定是main方法
public static void main(String[] args) { //设置加载配置 HiveConf.setLoadHiveServer2Config(true); try { ServerOptionsProcessor oproc = new ServerOptionsProcessor( "hiveserver2"); ServerOptionsProcessorResponse oprocResponse = oproc.parse(args); String initLog4jMessage = LogUtils.initHiveLog4j(); LOG.debug(initLog4jMessage); HiveStringUtils .startupShutdownMessage(HiveServer2.class, args, LOG); LOG.debug(oproc.getDebugMessage().toString()); //这里即是要启动服务了 oprocResponse.getServerOptionsExecutor().execute(); } catch (LogInitializationException e) { LOG.error("Error initializing log: " + e.getMessage(), e); System.exit(-1); } }
二、ServerOptionsExecutor接口有三个实现类
DeregisterOptionExecutor:将HiveServer2实例从zookeeper中移除
HelpOptionExecutor:打印help参数
ServerOptionsExecutor:启动HiveServer2服务
这里很显然,咱们应该看ServerOptionsExecutor类
static class StartOptionExecutor implements ServerOptionsExecutor { @Override public void execute() { try { //启动服务 startHiveServer2(); } catch (Throwable t) { LOG.fatal("Error starting HiveServer2", t); System.exit(-1); } } }
三、startHiveServer2方法,重要的地方给出了必要的注释
private static void startHiveServer2() throws Throwable { long attempts = 0, maxAttempts = 1; while (true) { LOG.info("Starting HiveServer2"); HiveConf hiveConf = new HiveConf(); maxAttempts = hiveConf .getLongVar(HiveConf.ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS); HiveServer2 server = null; try { server = new HiveServer2(); //初始化 server.init(hiveConf); //启动 server.start(); //省略了一些代码 break; } catch (Throwable throwable) { throwable.printStackTrace(); //出现异常就中止服务 if (server != null) { try { server.stop(); } catch (Throwable t) { LOG.info( "Exception caught when calling stop of HiveServer2 before retrying start", t); } finally { server = null; } } //若是抛出异常,并尝试启动超过了配置的最大尝试次数,抛出错误,启动失败 if (++attempts >= maxAttempts) { throw new Error("Max start attempts " + maxAttempts + " exhausted", throwable); } else { //60秒后再次尝试启动 LOG.warn("Error starting HiveServer2 on attempt " + attempts + ", will retry in 60 seconds", throwable); try { Thread.sleep(60L * 1000L); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } } }
四、接下来咱们看看server的初始化作了哪些事情
@Override public synchronized void init(HiveConf hiveConf) { //实例化clientService实例,该实例用于把用户请求转化并传递给Driver cliService = new CLIService(this); addService(cliService); if (isHTTPTransportMode(hiveConf)) { thriftCLIService = new ThriftHttpCLIService(cliService); } else {//默认状况是Thrift二进制服务 thriftCLIService = new ThriftBinaryCLIService(cliService); } //添加进服务列表 addService(thriftCLIService); super.init(hiveConf); // 省略获取配置信息…… // 启动web UI,改web UI用于查看正在运行的Hive任务,默认端口10002 try { //省略大量获取的配置的代码 。。。。。。 webServer = builder.build(); //添加查询运行任务数据的servlet webServer.addServlet("query_page", "/query_page", QueryProfileServlet.class); } } } catch (IOException ie) { throw new ServiceException(ie); } // Add a shutdown hook for catching SIGTERM & SIGINT final HiveServer2 hiveServer2 = this; //添加钩子 Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { hiveServer2.stop(); } }); }
初始化主要是获取了一些配置参数,而且告诉程序,要启动这些服务CLIService、thriftCLIService 、webServer
五、下面是重头戏,启动服务了,紧接着第3步中server.start()看下来,start方法
@Override public synchronized void start() { super.start();//调用父类start方法 if (webServer != null) { try { webServer.start();//启动web服务 LOG.info("Web UI has started on port " + webServer.getPort()); } catch (Exception e) { LOG.error("Error starting Web UI: ", e); throw new ServiceException(e); } } }
start方法,作了两件事情,就是调用父类start方法,启动web服务
六、下面我跟进去看看,父类CompositeService中定义的start方法
@Override public synchronized void start() { int i = 0; try { for (int n = serviceList.size(); i < n; i++) { Service service = serviceList.get(i); service.start(); } super.start(); } catch (Throwable e) { stop(i); throw new ServiceException("Failed to Start " + getName(), e); } }
这里仅仅是把第4步中添加的服务列表中的服务进行了启动,下面咱们咱们说服务中的重点,HiveServer2怎么提供远程服务,请看ThriftBinaryCLIService
七、ThriftBinaryCLIService中的run方法
@Override public void run() { try { // 定义处理请求的线程池 String threadPoolName = "HiveServer2-Handler-Pool"; ExecutorService executorService = new ThreadPoolExecutor( minWorkerThreads, maxWorkerThreads, workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryWithGarbageCleanup(threadPoolName)); //省略定义传输协议、配置参数代码 ........ int maxMessageSize = hiveConf .getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE); int requestTimeout = (int) hiveConf.getTimeVar( HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT, TimeUnit.SECONDS); int beBackoffSlotLength = (int) hiveConf .getTimeVar( HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, TimeUnit.MILLISECONDS); TThreadPoolServer.Args sargs = new TThreadPoolServer.Args( serverSocket) .processorFactory(processorFactory) .transportFactory(transportFactory) .protocolFactory(new TBinaryProtocol.Factory()) .inputProtocolFactory( new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize)) .requestTimeout(requestTimeout) .requestTimeoutUnit(TimeUnit.SECONDS) .beBackoffSlotLength(beBackoffSlotLength) .beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS) .executorService(executorService); // TCP Server server = new TThreadPoolServer(sargs); server.setServerEventHandler(serverEventHandler); server.serve(); String msg = "Started " + ThriftBinaryCLIService.class.getSimpleName() + " on port " + portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads"; LOG.info(msg); } catch (Throwable t) { LOG.fatal("Error starting HiveServer2: could not start " + ThriftBinaryCLIService.class.getSimpleName(), t); System.exit(-1); } }
说明:这里的ExecutorService线程池的定义中用了SynchronousQueue队列,该队列的能够认为是长度为1的阻塞队列,当线程池满,而且没有空闲线程,便会阻塞。TThreadPoolServer的特色是,客户端只要不从服务器上断开链接,就会一直占据服务器的一个线程,因此出现了本文中开头出现的阻塞问题,解决办法,若是服务器内存容许,能够适当加大线程池长度,或者增长hive节点,在配合负载均衡。
八、特别说明
Thrift是RPC界的利器,Facebook的杰做,能够轻松实现夸语言的服务调用,支持的语言有C++, Java, Go,Python, PHP, Haskell, C#, JavaScript, Node.js等等
(1)、模型接口
服务的调用接口以及接口参数model、返回值model
(2)、Tprotocol协议定义
将数据(model)编码 、解码 。
( 3)、Ttramsport传输层定义
编码后的数据传输(简单socket、http)
(5)、Tserver服务类型
服务的Tserver类型,实现了几种rpc调用(单线程、多线程、非阻塞IO)
5、后记
本文中的描述了HiveServer2的启动过程当中,启动的服务,那么客户端是如何与服务端交互的呢?为何经过JDBC的方式就能够去Hive上执行任务了呢?Hive JDBC除了实现JDBC标准接口外,还作了哪些事情呢?敬请期待《Hive源码剖析之Hive JDBC》
快乐源于分享。
此博客乃做者原创, 转载请注明出处