Spark-SQL、Hive多 Metastore、多后端、多库

【完美解决】Spark-SQL、Hive多 Metastore、多后端、多库java

SparkSQL 支持同时链接多种 Metastore,包括Atlas2(PB),Hive 0.12+几种格式。用户能够在一条SQL语句中操做来自多个 Metastore 的表。mysql

配置 Metastoresql

按照正常的使用方式配置 conf/hive-site.xml数据库

好比配置访问 mysql:apache

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property><name>mysql.metastore.zk.server</name><value>zk.weilx.com:2183</value><source>programatically</source></property>
<property><name>mysql.metastore.zk.path</name><value>/biglog/metaspark</value><source>programatically</source></property>
<property><name>hive.metastore.type</name><value>mysql</value><source>programatically</source></property>
<property><name>mysql.identity.user</name><value>test</value><source>programatically</source></property>
</configuration>后端

执行 spark-sql:session

$ ./bin/spark-sql
spark-sql> show databases;
OK
default
mysql
Time taken: 2.301 seconds, Fetched 5 row(s)app

能够看到已经能够正常访问 mysql 了。ide

添加一个 Metastore

添加一个新的 metastore 须要在 conf/ 中新增长一个配置文件,好比 hive-site2.xml(文件名无限制),里边配置新的metastore,实例内容以下:oop

<property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://10.xx.xx.xx/hive13?createDatabaseIfNotExist=true</value>
</property>
<property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>com.mysql.jdbc.Driver</value>
</property>
<property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>test</value>
</property>
<property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <value>test</value>
</property>
<property>
    <name>hive.metastore.type</name>
    <value>hive</value>
</property>

而后启动 spark-sql 客户端:

# 添加新的配置文件到环境中
spark-sql> set metaclient.config.hive2=hive-site2.xml;
Time taken: 0.104 seconds
  
# 为 hive2.default 数据库指定别名 default2
spark-sql> set metaclient.alias.default2=hive2.default;
Time taken: 0.109 seconds
 
# 使用 default2 数据库
spark-sql> use default2;
spark-sql> show tables;
ares_test       false
inserttest      false
people  false
src     false
srczzz  false
Time taken: 0.433 seconds, Fetched 5 row(s)

费元星的hive-site.xml 配置:

<configuration>
<!--若是没有配置说明,等于耍流氓 by feiyuanxing-->
<property>
<name>hive.metastore.warehouse.dir</name>
<value>hdfs://IP:prot/app/ns/df</value>
<description>location of default database for the warehouse</description>
</property>
<property>
<name>hive.exec.scratchdir</name>
<value>hdfs://IP:prot/app/ns/df/tmp/hive-${user.name}</value>
<description>Scratch space for Hive jobs</description>
</property>
  <property>
    <name>hive.security.authorization.enabled</name>
    <value>false</value>
  </property>
<!-- -->
<property>
    <name>hive.metastore.client.connect.retry.delay</name>
    <value>-1</value>
  </property>

<property>
    <name>hive.cli.print.current.db</name>
    <value>true</value>
  </property>

  <property>
    <name>hive.metastore.thrift.framed.transport.enabled</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.metastore.use.combined</name>
    <value>true</value>
  </property>

<!-- 链接mysql -->

<property>
<name>metaclient.config.mysql</name>
<value>hive-site-mysql.xml</value>
</property>
<property>
<name>metaclient.alias.mysql</name>
<value>mysql.mysql</value>
</property>
<property>
<name>metaclient.config.hive</name>
<value>hive-site-hive.xml</value>
</property>
<property>
<name>metaclient.alias.hive</name>
<value>hive.hive</value>
</property>

</configuration>

 

 

跨 Metastore 操做

通过上边两步配置,当前系统中一共存在两个 metastore: mysql 和 hive2. 并且咱们经过为 hive2 中的 default 数据指定别名为 default2 避免了命名冲突的问题,那么如今就能够同时操做两个数据库中的表了。好比:

select T1.event_id, T1.event_time from default.test_table T1 join default2.test_table2 T2 on T1.event_id == T2.event_id;

 

在本博客的《使用Spark SQL读取Hive上的数据》文章中我介绍了如何经过Spark去读取Hive里面的数据,不过有时候咱们在建立SQLContext实例的时候遇到相似下面的异常:

java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

    at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)

    at org.apache.spark.sql.hive.client.ClientWrapper.<init>(ClientWrapper.scala:171)

    at org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala:162)

    at org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala:160)

    at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:391)

    at org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:235)

    at org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:234)

    at scala.collection.Iterator$class.foreach(Iterator.scala:727)

    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

    at org.apache.spark.sql.SQLContext.<init>(SQLContext.scala:234)

    at org.apache.spark.sql.hive.HiveContext.<init>(HiveContext.scala:72)

    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)

    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)

    at org.apache.spark.repl.SparkILoop.createSQLContext(SparkILoop.scala:1028)

    at $iwC$$iwC.<init>(<console>:9)

    at $iwC.<init>(<console>:18)

    at <init>(<console>:20)

    at .<init>(<console>:24)

    at .<clinit>(<console>)

    at .<init>(<console>:7)

    at .<clinit>(<console>)

    at $print(<console>)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:606)

    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)

    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)

    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)

    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)

    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)

    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)

    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)

    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)

    at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:132)

    at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:124)

    at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:324)

    at org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:124)

    at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:64)

    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:974)

    at org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:159)

    at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:64)

    at org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:108)

    at org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:64)

    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:991)

    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)

    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)

    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)

    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)

    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)

    at org.apache.spark.repl.Main$.main(Main.scala:31)

    at org.apache.spark.repl.Main.main(Main.scala)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:606)

    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)

    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)

    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)

    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)

    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

    at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1523)

    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:86)

    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:132)

    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)

    at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3005)

    at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3024)

    at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503)

    ... 64 more

Caused by: java.lang.reflect.InvocationTargetException

    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)

    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)

    at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1521)

    ... 70 more

Caused by: MetaException(message:Version information not found in metastore. )

    at org.apache.hadoop.hive.metastore.ObjectStore.checkSchema(ObjectStore.java:6664)

    at org.apache.hadoop.hive.metastore.ObjectStore.verifySchema(ObjectStore.java:6645)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:606)

    at org.apache.hadoop.hive.metastore.RawStoreProxy.invoke(RawStoreProxy.java:114)

    at com.sun.proxy.$Proxy14.verifySchema(Unknown Source)

    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:572)

    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:620)

    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:461)

    at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:66)

    at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:72)

    at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:5762)

    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:199)

    at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.<init>(SessionHiveMetaStoreClient.java:74)

    ... 75 more

这是由于在建立SQLContext实例的时候,要求spark编译的Hive版本和HiveMetaStore里面记录的Hive版本一致,咱们能够经过配置hive.metastore.schema.verification参数来取消这种验证,这个参数的默认值是true,咱们能够取消验证,配置以下:

<property> 

   <name>hive.metastore.schema.verification</name> 

   <value>false</value> 

    <description> 

    Enforce metastore schema version consistency. 

    True: Verify that version information stored in metastore matches with one from Hive jars.  Also disable automatic 

          schema migration attempt. Users are required to manully migrate schema after Hive upgrade which ensures 

          proper metastore schema migration. (Default) 

    False: Warn if the version information stored in metastore doesn't match with one from in Hive jars. 

    </description> 

 </property>

而后咱们再启动Spark,这时候就能够建立SQLContext实例了。

 

 

相关文章
相关标签/搜索