最近不是一直在学习大数据框架和引用嘛(我是按照尚硅谷B站视频先学习过一遍路线,之后找准方向研究),除了本身手动利用Kafka和HDFS写一个简单的分布式文件传输(分布式课程开放性实验,恰好用上了所学的来练练手)之外,还学习这个学习路线一个项目,电信客服实战。在这个项目里面仍是学习到了很多内容,包括Java上不足的不少地方、Java工程开发上的要求和少数框架的复习。不排除本身太菜,啥都不知道,认为一些常见的东西不常见的状况(哭html
鉴于网上有不少相似的内容,这里我只是将我学习和复code的过程当中,学习到的知识和遇到问题的解决方案写下,以做记录和回顾。
1.个人手敲代码,2.老师源码,含数据和笔记(提取码:pfbv)
声明一下,没有任何广告意思,这种渠道是很容易找到而且也不少的,我只是刚好学习了这个,而且我以为还不错(求生欲极强java
项目主要是模拟电信中的信息部门,从生产环境中获取通话信息数据,根据业务需求存储和分析数据。node
统计天天、每个月以及每一年的每一个人的通话次数及时长。git
编写代码的流程分为四步:①数据生产,②数据消费,③数据分析,④数据展现。我在学习过程当中,没有学习数据展现部分。github
主要任务是利用contact.log(联系人文件)的数据,生成不一样联系人之间通话记录的流程。
这个Part,老师有句话我以为很在理,“大数据开发人员虽然无论数据怎么来的,怎么出去的,可是必须知道和了解这个过程才能按照需求code”。脑海中闪过中间件redis
在之前的编程学习过程当中,老是一古脑儿的猛写代码,虽然我自认为我在咱们宿舍已是模块化思想最为严重的了,可是从未接触到面向接口编程。这学期也学习了软件工程,(虽然咱们学得很水),这门课虽然不是在教咱们写代码,但倒是教咱们如何正确的作项目和写代码(晕。
面向接口编程也是如此,在这个项目中,了解了咱们的数据来源和需求后,第一步要作的是弄清楚须要的对象和须要的功能,即接口,在共同的模块中肯定好接口和接口的方法签名,接下来才是对接口模块的实现和实现业务。
在这个项目中,创建了一个ct-common模块做为公共模块,简单介绍几个:编程
接口或抽象类 | 描述 |
---|---|
Val | 通常数据都须要的实现的接口,只包括名称意义上的获取值value()方法 |
DataIn | 数据输入接口,功能有设置输入路径,读取数据,故存在setPath()和read()方法 |
Producer | 数据生产者接口,功能有获取输入信息,设置生产输出和生产,故存在setIn()和和setOut()和produce()方法 |
下图是ct-common的代码结构:数组
这个思想其实我在以前的编码过程当中就有点领悟了,之因此在这里提出,是由于在这个跟进过程当中,更加体会到Java编码就是各类对象组合调用的含义。或许是老师项目拉得太快,让我感受本身太菜,skrskr
我以前编码过程当中,也会不停的封装对象,但通常都是那些很明显的功能集成对象,更别说是对数据进行封装成数据集成对象了。换句话说,就是我以前封装的对象都是含有必定动做的(除了getter&setter)。可是对于一些对象之间传递的数据,若是每次都传相同的数据而且数量>1的话,最好的是封装成对象,提升扩展性。在业务须要增添一个数据传递的状况下,封装数据对象只须要更改对象的属性和对象的构成,不然每一个传递的地方(语句)都须要增添传递的数据变量。缓存
下面用一张图表示,在该项目中封装Calllog和Contact对象的效果:
若是不封装对象,若是联系人对象里面在加入一个new item(好比性别),那么几乎全部的地方都须要修改;反之,只须要在Contact类中增添new item属性和在Calllog中增添A.new&B.new属性,以及修改构造方法就能够了,同时在Producer过程当中,没有增添和修改过多代码。网络
在这个Part中主要仍是熟悉任务就能够完成,没遇到什么问题。若是不用上述的tricks那这不就是一个读入文件和写入文件的代码嘛(我一main方法就能搞定),可是用了以后感受就明显不一样,更加工程化,逻辑感更强。
主要操做是利用Flume和Kafka将收集不断生产的数据,而且将数据插入到HBase中。
主要是学到了一些新的知识,还有知识的简单运用,我并无深究这些新概念(估计得学到头秃)。
类加载器:类加载器是负责将多是网络上、也多是磁盘上的class文件加载到内存中。并为其生成对应的java.lang.class对象。
有三种类加载器,分别按照顺序是启动类加载器BootstrapClassLoader、扩展类加载器Extension ClassLoader和系统类加载器App ClassLoader。还存在一种双亲委派模型,简单的意思就是说当一个类加载器收到加载请求时,首先会向上层(父)类加载器发出加载请求。而且每个类加载器都是如此,因此每一个类加载器的请求都会被传递到最顶层的类加载器中,一开始我以为很麻烦,不过这确实能够避免类的重复加载。
在电信客服的项目中,类加载器被用于加载resource文件夹的配置文件。
Properties prop = new Properties(); // 利用类加载器获取配置文件 prop.load(Thread.currentThread().getContextClassLoader().getResourceAsStream("consumer.properties"));
ThreadLocal:这是一个线程内维护的存储变量数组。举个简单的比方,在Java运行的时候有多个线程,存在一个Map<K,V>,K就是每一个线程的Id,V则是每一个线程内存储的数据变量。
这是多线程相同的变量的访问冲突问题解决方法之一,是经过给每一个线程单独一份存储空间(牺牲空间)来解决访问冲突;而熟悉的Synchronized经过等待(牺牲时间)来解决访问冲突。同时ThreadLocal还具备线程隔离的做用,即A线程不能访问B线程的V。
在电信客服的项目中,ThreadLocal被用来持久化Connection和Admin链接。由于在HBase的DDL和DML操做中,不一样的操做都须要用到链接,因此将其和该线程进行绑定,加快获取的链接的速度和减小内存占用。固然也能够直接new 几个对象,最后统一关闭。
// 经过ThreadLocal保证同一个线程中能够不重复建立链接和Admin。 private ThreadLocal<Connection> connHolder = new ThreadLocal<Connection>(); private ThreadLocal<Admin> adminHolder = new ThreadLocal<Admin>(); private Connection getConnection() throws IOException { Connection conn = connHolder.get(); if (conn == null) { Configuration conf = HBaseConfiguration.create(); conn = ConnectionFactory.createConnection(conf); connHolder.set(conn); } return conn; } private Admin getAdmin() throws IOException { Admin admin = adminHolder.get(); if (admin == null) { getConnection(); admin = connHolder.get().getAdmin(); adminHolder.set(admin); } return admin; }
分区键的设计通常是机器数量。rowKey的设计基于表的分区数,而且知足长度原则(10~100KB便可,最好是8的倍数)、惟一性原则和散列性原则(负载均衡,防止出现数据热点)
本项目中共6个分区,故分区号为"0|"、"1|"、"2|"、"3|"、"4|"。举一个例子,3****的第二位不管是任何数字都会小于"|"(第二大的字符),因此"2|"<"3****"<"3|",故分到第四个分区。
设计好了分区键后,rowKey的设计主要是根据业务需求哪些数据须要汇集在一块儿方便查询,那就利用那些数据设计数据的分区号。
数据含有主叫用户(13312341234)、被叫用户(14443214321)、通话日期(20181010)和通话时长(0123)。业务要求咱们将常常须要统计一个用户在某一月内的通话记录,即主叫用户和通话日期中的年月是关键数据。根据这些数据计算分区号,保证同一用户在同一月的通话记录在HBase上是紧邻的(还有一个前提要求是rowkey还必须是分,分区号+主叫用户+通话日期+others,不然在一个分区上仍是有多是乱的)。下面是计算分区号的代码:
/** * 计算获得一条数据的分区编号 * * @param tel 数据的主叫电话 * @param date 数据的通话日期 * @return regionNum 分区编号 */ protected int genRegionNum(String tel, String date) { // 获取电话号码的随机部分 String userCode = tel.substring(tel.length() - 4); // 获取年月 String yearMonth = date.substring(0, 6); // 哈希 int userCodeHash = userCode.hashCode(); int yearMonthHash = yearMonth.hashCode(); // crc 循环冗余校验 int crc = Math.abs(userCodeHash ^ yearMonthHash); // 取余,保证分区号在分区键范围内 int regionNum = crc & ValueConstants.REGION_NUMS; return regionNum; }
例子:查询13312341234用户在201810的通话记录
startKey <- genRegionNum("13312341234","201810")+""+"13312341234"+"_"+"201810"
endKey <- genRegionNum("13312341234","201810")+""+"13312341234"+"_"+"201810"+"|"
电信客服中一般须要计算两个客户之间亲密度,计算的数据来源于二者的通话记录。举个例子,计算A和B的亲密度,那么须要A和B之间的通话记录,特别注意的是不只须要A call B的记录,还须要B call A的记录。
就比如MySQL中的触发器同样,MySQL的触发器有针对update、insert和delete的,还有before和after等等,协处理器也有相似的对应函数。好比,在本项目中,须要的是再插入一条数据后,协处理器被触发插入另一条“重复数据”,因此复写的方法是postPut。
设计具体逻辑是:根据插入的Put得到插入的数据信息,而后判断插入的标志位Flag是否是1,若是是1,则插入另一条重复数据。
下面是代码:
public class InsertCalleeCoprocessor extends BaseRegionObserver { /** * 这是HBase上的协处理器方法,在一次Put以后接下来的动做 * * @param e * @param put * @param edit * @param durability * @throws IOException */ @Override public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { // 1. 获取表对象 Table table = e.getEnvironment().getTable(TableName.valueOf(Names.TABLE.getValue())); // 2. 构造Put // 在rowKey中存在不少数据信息,这一点就不具有普适性 String values = Bytes.toString(put.getRow()); String[] split = values.split("_"); String call1 = split[1]; String call2 = split[2]; String callTime = split[3]; String duration = split[4]; String flag = split[5]; // 在协处理器中也发生了Put操做,可是此时的Put不引起协处理器再次响应 // 必须得关闭表链接 if ("0".equals(flag)) { table.close(); return; } CoprocessorDao dao = new CoprocessorDao(); String rowKey = dao.genRegionNums(call2, callTime) + "_" + call2 + "_" + call1 + "_" + callTime + "_" + duration + "_" + "0"; Put calleePut = new Put(Bytes.toBytes(rowKey)); calleePut.addColumn(Bytes.toBytes(Names.CF_CALLEE.getValue()), Bytes.toBytes("call1"), Bytes.toBytes(call2)); calleePut.addColumn(Bytes.toBytes(Names.CF_CALLEE.getValue()), Bytes.toBytes("call2"), Bytes.toBytes(call1)); calleePut.addColumn(Bytes.toBytes(Names.CF_CALLEE.getValue()), Bytes.toBytes("callTime"), Bytes.toBytes(callTime)); calleePut.addColumn(Bytes.toBytes(Names.CF_CALLEE.getValue()), Bytes.toBytes("duration"), Bytes.toBytes(duration)); calleePut.addColumn(Bytes.toBytes(Names.CF_CALLEE.getValue()), Bytes.toBytes("flag"), Bytes.toBytes("0")); // 3. 插入Put table.put(calleePut); // 4. 关闭资源,不然内存会溢出 table.close(); } private class CoprocessorDao extends BaseDao { public int genRegionNums(String tel, String date) { return super.genRegionNum(tel, date); } } }
须要注意的问题:
这个流程是我学习最多的流程,除了复习这个大数据框架的API,更多的是对个人Java有了更多的拓展。除了上述提到的,还有一些注解,泛型和泛型的PECS原则等等。另外就是学习怎么一步步排除错误和寻找本身的(低级)错误的方法了,这种DeBug的方式对于我来讲很新鲜。
同时利用redis缓存数据,利用MapReduce将HBase中的数据提取到MySQL中。
出现的问题:MapReduce任务执行成功,可是MySQL中未插入数据,同时查看MapReduce8088端口,看不到日志,显示no log for container available。
问题分析:
1.观察MapReduce的任务,发现Reduce的确是正确输出了字节,可是MySQL没有插入数据,那只能多是编写的OutputFormat出现了问题。
2.no log for container available, 在网上查阅资料提示有多是内存不足的问题。
3.查看MapReduce的Reduce任务,发现是在nodemanager是在slave1上运行,而slave1只分配了2G内存。
4.kill slave1和slave2的nodemanager,只运行master的nodemanager,由于master我分配了4G内存。
5.查看日志成功,寻找错误。
6.发现是MySQL语句出现了语法错误????????(离谱,就**离谱)
7.修改MySQL语句,任务成功执行。
这是个人一个学习上手的大数据项目,虽然简单可是也学习很多。作这个项目的时候是考试周,也算是忙里偷闲完成了!主要是这个项目和咱们小队准备参加的服创大赛的项目很相似,也算是提早练练手,熟悉下基本的流程。不过咱们小队的项目最好仍是得上Spark和好的机器(虚拟机老拉跨,因此继续学习!!!