github.com/little-bit-shy/docker-hadoop(项目地址)node
若是你已安装可跳过此步骤
bash docker.installpython
若是你已安装可跳过此步骤
bash compose.installmysql
systemctl start dockergit
首选准备3台服务器以备使用zookeeper一、zookeeper2 、zookeeper3github
修改zookeeper/instances.yml,配置集群信息,用于生成hosts、myid文件sql
bash zookeeper.shdocker
首选准备3台服务器以备使用hadoop一、hadoop2 、hadoop3json
./hadoop/ssh下面默认有一份私钥、公钥,这里建议删除默认秘钥改成本身的私钥
秘钥生成方法参考hadoop-key.sh脚本bootstrap
修改hadoop/instances.yml,配置集群信息,用于生成hosts文件api
bash hadoop.sh
`注意下载连接可能存在失效状况注意替换,
若是出现下载过慢也可以使用迅雷下载以后根据启动脚本修改文件名后放入/hadoop/tar/便可`
在hadoop1启动全部JournalNode:hadoop-daemons.sh start journalnode
在hadoop1上格式化namenode数据:hdfs namenode -format
在hadoop1上启动namenode:hadoop-daemon.sh start namenode
在hadoop2 上执行同步namenode元数据:hdfs namenode -bootstrapStandby
在hadoop2上启动namenode:hadoop-daemon.sh start namenode
在hadoop1上初始化zkfc:hdfs zkfc -formatZK
在hadoop1上中止业务:stop-dfs.sh
在hadoop1上全面启动业务:start-all.sh
至此hadoop2.x hdfs彻底分布式 HA 搭建完毕
无需重复初次启动时的频繁操做
在hadoop1上全面启动业务:start-all.sh
修改hive/hive-site.xml,配置MySQL用于储存Hive元数据
在hadoop1上初始化元数据:schematool -initSchema -dbType mysql
在hadoop1上启动:hiveserver2 &
# 建立表事例 CREATE TABLE IF NOT EXISTS test ( id int ,uid int ,title string ,name string ,status int ,time timestamp) COMMENT '简介' ROW FORMAT DELIMITED FIELDS TERMINATED BY "\001" LINES TERMINATED BY "\n" STORED AS TEXTFILE; CREATE TABLE IF NOT EXISTS test_out ( name string , count int ,time date) COMMENT '简介' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LINES TERMINATED BY '\n' STORED AS TEXTFILE; # 统计后将结果数据加入另外一个表 INSERT INTO TABLE test_out(name,count,time) SELECT name,count(1),to_date(time) FROM test GROUP BY name,to_date(time); INSERT OVERWRITE TABLE test_out SELECT name,count(1),to_date(time) FROM test GROUP BY name,to_date(time);
#!/usr/bin/python # -*- coding: UTF-8 -*- from pyhive import hive import commands # hive HOST="hadoop" PORT="10000" USERNAME="hadoop" DATABASE="default" # mysql MYSQL_HOST="192.168.253.129" MYSQL_PORT="3306" MYSQL_USERNAME="root" MYSQL_PASSWORD="123456" MYSQL_DATABASE="test" ######################## Data synchronization Mysql to Hive ######################## print '\033[1;32mStart data synchronization!!\033[0m' (status, output) = commands.getstatusoutput("sqoop import \ --driver com.mysql.jdbc.Driver \ --connect jdbc:mysql://" + MYSQL_HOST + ":" + MYSQL_PORT + "/" + MYSQL_DATABASE + " \ --username " + MYSQL_USERNAME + " \ --password " + MYSQL_PASSWORD + " \ --table test \ --check-column time \ --incremental lastmodified \ --last-value '2018-08-09 15:30:29' \ --merge-key id \ --fields-terminated-by '\001' \ --lines-terminated-by '\n' \ --num-mappers 1 \ --target-dir /user/hive/warehouse/test \ --hive-drop-import-delims") if status != 0: print '\033[1;31mData synchronization failure!!\033[0m' print output exit() else: print '\033[1;32mData synchronization successful!!\033[0m' ######################## Data statistics Hive to Hive ######################## print '\033[1;32mStart data statistics!!\033[0m' conn=hive.Connection(host=HOST, port=PORT, username=USERNAME,database=DATABASE) cursor = conn.cursor() cursor.execute("INSERT OVERWRITE TABLE test_out SELECT name,count(1),to_date(time) FROM test GROUP BY name,to_date(time)") print '\033[1;32mData statistics successful!!\033[0m' #cursor.execute("SELECT * FROM test") #for result in cursor.fetchall(): # print(result[2]) ######################## Data synchronization Hive to Mysql ######################## print '\033[1;32mStart data synchronization!!\033[0m' (status, output) = commands.getstatusoutput("sqoop export \ --driver com.mysql.jdbc.Driver \ --connect 'jdbc:mysql://" + MYSQL_HOST + ":" + MYSQL_PORT + "/" + MYSQL_DATABASE + "?useUnicode=true&characterEncoding=utf-8' \ --username " + MYSQL_USERNAME + " \ --password " + MYSQL_PASSWORD + " \ --table test_out \ --num-mappers 1 \ --export-dir /user/hive/warehouse/test_out \ --fields-terminated-by '\001' \ --lines-terminated-by '\n'") if status != 0: print '\033[1;31mData synchronization failure!!\033[0m' print output exit() else: print '\033[1;32mData synchronization successful!!\033[0m'
在hadoop1上启动:start-hbase.sh
/usr/local/sqoop/bin/sqoop \ list-databases \ --connect jdbc:mysql://hadoop001:3306/ \ --username root \ --password 123456
# mysql全表导入hive bin/sqoop import \ --driver com.mysql.jdbc.Driver \ --connect jdbc:mysql://hadoop001:3306/hadoop \ --username root \ --password 123456 \ --table test \ --fields-terminated-by '\001' \ --lines-terminated-by '\n' \ --delete-target-dir \ --num-mappers 1 \ --hive-import \ --hive-database default \ --hive-table test \ --direct # mysql导入hive增量更新 bin/sqoop import \ --driver com.mysql.jdbc.Driver \ --connect jdbc:mysql://hadoop001:3306/hadoop \ --username root \ --password 123456 \ --table test \ --check-column time \ --incremental lastmodified \ --last-value '2018-08-09 15:30:29' \ --merge-key id \ --fields-terminated-by '\001' \ --lines-terminated-by '\n' \ --num-mappers 1 \ --target-dir /user/hive/warehouse/test \ --hive-drop-import-delims # --hive-delims-replacement '-'
# 添加一个增量更新job bin/sqoop job --create test -- \ import \ --driver com.mysql.jdbc.Driver \ --connect jdbc:mysql://hadoop001:3306/hadoop \ --username root \ --password 123456 \ --table test \ --check-column time \ --incremental lastmodified \ --last-value '2018-08-09 15:30:29' \ --merge-key id \ --fields-terminated-by '\001' \ --lines-terminated-by '\n' \ --num-mappers 1 \ --target-dir /user/hive/warehouse/test
执行job
bin/sqoop job --exec test
再次执行job后查看数据已被更新
查看job
bin/sqoop job --show test Job: test Tool: import Options: ---------------------------- verbose = false hcatalog.drop.and.create.table = false # sqoop会自动帮你记录last-value并更新,这使得增量更新变得至关简便 incremental.last.value = 2018-08-10 03:51:47.0 db.connect.string = jdbc:mysql://hadoop001:3306/hadoop codegen.output.delimiters.escape = 0 codegen.output.delimiters.enclose.required = false codegen.input.delimiters.field = 0 mainframe.input.dataset.type = p split.limit = null hbase.create.table = false db.require.password = false skip.dist.cache = false hdfs.append.dir = false db.table = test codegen.input.delimiters.escape = 0 db.password = 123456 accumulo.create.table = false import.fetch.size = null codegen.input.delimiters.enclose.required = false db.username = root reset.onemapper = false codegen.output.delimiters.record = 10 import.max.inline.lob.size = 16777216 sqoop.throwOnError = false hbase.bulk.load.enabled = false hcatalog.create.table = false db.clear.staging.table = false incremental.col = time codegen.input.delimiters.record = 0 enable.compression = false hive.overwrite.table = false hive.import = false codegen.input.delimiters.enclose = 0 accumulo.batch.size = 10240000 hive.drop.delims = false customtool.options.jsonmap = {} codegen.output.delimiters.enclose = 0 hdfs.delete-target.dir = false codegen.output.dir = . codegen.auto.compile.dir = true relaxed.isolation = false mapreduce.num.mappers = 1 accumulo.max.latency = 5000 import.direct.split.size = 0 sqlconnection.metadata.transaction.isolation.level = 2 codegen.output.delimiters.field = 9 export.new.update = UpdateOnly incremental.mode = DateLastModified hdfs.file.format = TextFile sqoop.oracle.escaping.disabled = true codegen.compile.dir = /tmp/sqoop-hadoop/compile/028365970856b88aa0aa91435ff172e5 direct.import = false temporary.dirRoot = _sqoop hdfs.target.dir = /user/hive/warehouse/test hive.fail.table.exists = false merge.key.col = id jdbc.driver.class = com.mysql.jdbc.Driver db.batch = false
一般状况下,咱们能够结合sqoop job和crontab等任务调度工具实现相关业务
bin/sqoop export \ --driver com.mysql.jdbc.Driver \ --connect "jdbc:mysql://hadoop001:3306/hadoop?useUnicode=true&characterEncoding=utf-8" \ --username root \ --password 123456 \ --table test_out \ --num-mappers 1 \ --export-dir /user/hive/warehouse/test_out \ --fields-terminated-by '\001' \ --lines-terminated-by '\n'
kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
Kafka使用简介
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
kafka-topics.sh --zookeeper localhost:2181 --describe --topic test
kafka-topics.sh --zookeeper localhost:2181 -alter --partitions 3 --topic test
在hadoop1上启动:${SPARK_HOME}/sbin/start-all.sh
在hadoop1启动mr-jobhistory:mr-jobhistory-daemon.sh start historyserver 在hadoop1上启动:kylin.sh start 初始用户名和密码为ADMIN/KYLIN