大屏幕上实时跳动的营业额,背后的技术结构。前端
现现在,咱们来到了数据时代,数据信息化与咱们的生活与工做息息相关。此篇文章简述利用大数据框架,实时处理数据的流程与相关框架的介绍,主要包括:java
- 数据实时处理的概念和意义
- 数据实时处理能作什么
- 数据实时处理架构简介
- 数据实时处理代码演示
什么是数据实时处理呢?我我的对数据实时处理的理解为:数据从生成->实时采集->实时缓存存储->(准)实时计算->实时落地->实时展现->实时分析。这一个流程线下来,处理数据的速度在秒级甚至毫秒级。node
数据实时处理有什么意义呢?咱们获得数据能够进行数据分析,利用数据统计方法,从错综复杂的数据关系中梳理出事物的联系,好比发展趋势、影响因素、因果关系等。甚至创建一些BI,对一些数据的有用信息进行可视化呈现,并造成数据故事。算法
何为数据的实时计算?咱们从数据源端拿到数据,可能不尽如人意,咱们想对获得的数据进行 ETL 操做、或者进行关联等等,那么咱们就会用到数据的实时计算。目前主流的实时计算框架有 spark,storm,flink 等。sql
数据的实时落地,意思是将咱们的源数据或者计算好的数据进行实时的存储。在大数据领域,推荐使用 HDFS,ES 等进行存储。数据库
咱们拿到了数据,要会用数据的价值。数据的价值体如今数据中相互关联关系,或与历史关联,或能预测将来。咱们实时获得数据,不只可以利用前端框架进行实时展现,还能够对其中的一些数据进行算法训练,预测将来走势等。apache
example:bootstrap
淘宝双 11 大屏,每一年的双 11 是淘宝粉丝疯狂的日子。马云会在双 11 的当天在阿里总部竖起一面大的电子屏幕,展现淘宝这一天的成绩。例如成交额,访问人数,订单量,下单量,成交量等等。这个电子大屏的背后,就是用到的咱们所说的数据的实时处理。首先,阿里的服务器遍及全国各地,这些服务器收集PC端、手机端等日志,上报到服务器,在服务上部署数据采集工具。接下来,因为数据量庞大,须要作数据的缓存缓冲处理。下一步,对原始日志进行实时的计算,好比筛选出上面所述的各个指标。最后,经过接口或者其余形式,进行前端屏幕的实时展现。缓存
接下来是咱们介绍的重点,先放一张数据流程图:前端框架
下面将分别简单的介绍下各个组件:
flume 是一个分布式的数据收集系统,具备高可靠、高可用、事务管理、失败重启、聚合和传输等功能。数据处理速度快,彻底能够用于生产环境。
flume 的核心概念有:event,agent,source,channel,sink
flume 的数据流由事件 (event) 贯穿始终。event 是 flume 的基本数据单位,它携带日志数据而且携带数据的头信息,这些 event 由 agent 外部的 source 生成,当 source 捕获事件后会进行特定的格式化,而后 source 会把事件推入 channel 中。能够把 channel 看做是一个缓冲区,它将保存事件直到 sink 处理完该事件。sink 负责持久化日志或者把事件推向另外一个 source。
flume 的核心是 agent。agent 是一个 java 进程,运行在日志收集端,经过 agent 接收日志,而后暂存起来,再发送到目的地。 每台机器运行一个 agent。 agent 里面能够包含多个 source,channel,sink。
source 是数据的收集端,负责将数据捕获后进行特殊的格式化,将数据封装到 event 里,而后将事件推入 channel 中。flume 提供了不少内置的 source,支持 avro,log4j,syslog 等等。若是内置的 source 没法知足环境的需求,flume 还支持自定义 source。
channel 是链接 source 和 sink 的组件,你们能够将它看作一个数据的缓冲区(数据队列),它能够将事件暂存到内存中也能够持久化到本地磁盘上, 直到 sink 处理完该事件。两个较为经常使用的 channel,MemoryChannel 和 FileChannel。
sink 从 channel 中取出事件,而后将数据发到别处,能够向文件系统、数据库、hadoop、kafka,也能够是其余 agent 的 source。
口述抽象,上两张官网贴图:
单个 agent 收集数据流程图
Kafka 是一个高吞吐量的分布式发布-订阅消息系统。企业中通常使用 kafka 作消息中间件,作缓冲缓存处理。须要 zookeeper 分布式协调组件管理。
kafka 的设计目标:
kafka 核心概念
贴两张官网图
prodecer-broker-consumer
spark 是一个分布式的计算框架,是我目前认为最火的计算框架。
spark,是一种"one stack to rulethem all"的大数据计算框架,指望使用一个技术栈就完美地解决大数据领域的各类计算任务。apache 官方,对 spark 的定义是:通用的大数据快速处理引擎(一“栈”式)。
贴个spark架构图
须要搭建 flume 集群,kafka 集群,es 集群,zookeeper 集群,因为本例 spark 是在本地模式运行,因此无需搭建 spark 集群。
搭建好集群后,根据集群组件直接的整合关系,配置好配置文件。其中主要的配置为 flume 的配置,以下图:
建立好 es 对应的表,表有三个字段,对应代码里面的 case class(代码随后贴上)。
代码以下:
package run
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.Logger
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.elasticsearch.spark.rdd.EsSpark
/** * @author wangjx * 测试kafka数据进行统计 kafka自身维护offset(建议使用自定义维护方式维护偏移量) */
object SparkStreamingAutoOffsetKafka {
//定义样例类 与es表对应
case class people(name:String,country:String,age:Int)
def main(args: Array[String]): Unit = {
val logger = Logger.getLogger(this.getClass);
//spark 配置
val conf = new SparkConf().setAppName("SparkStreamingAutoOffsetKafka").setMaster("local[2]")
conf.set("es.index.auto.create","true")
conf.set("es.nodes","127.0.0.1")
conf.set("es.port","9200")
//spark streaming实时计算初始化 定义每10秒一个批次 准实时处理 企业通常都是准实时 好比每隔10秒统计近1分钟的数据等等
val ssc = new StreamingContext(conf, Seconds(10))
val spark = SparkSession.builder()
.config(conf)
.getOrCreate()
spark.sparkContext.setLogLevel("WARN");
//设置kafka参数
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "x:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "exactly-once",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//kafka主题
val topic = Set("kafka8")
//从kafka获取数据
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topic, kafkaParams)
)
//具体的业务逻辑
val kafkaValue: DStream[String] = stream.flatMap(line=>Some(line.value()))
val peopleStream = kafkaValue
.map(_.split(":"))
//造成people样例对象
.map(m=>people(m(0),m(1),m(2).toInt))
//存入ES
peopleStream.foreachRDD(rdd =>{
EsSpark.saveToEs(rdd, "people/man")
})
//启动程序入口
ssc.start()
ssc.awaitTermination()
}
}
复制代码
文 / 皮蛋二哥
一直觉得只要保持低调,就没人知道其实我是一名做家。
——迷同样的皮蛋二哥
编 / 荧声
本文已由做者受权发布,版权属于创宇前端。欢迎注明出处转载本文。本文连接:knownsec-fed.com/2018-08-31-…
想要看到更多来自知道创宇开发一线的分享,请搜索关注咱们的微信公众号:创宇前端(KnownsecFED)。欢迎留言讨论,咱们会尽量回复。
感谢您的阅读。