【python】spark+kafka使用

网上用python写spark+kafka的资料好少啊 本身记录一点踩到的坑~html

 

spark+kafka介绍的官方网址:http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.htmlpython

python的pyspark库函数文档:http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html?highlight=kafkautils.createdirectstream#pyspark.streaming.kafka.KafkaUtils.createDirectStreamapache

上面两个是最重要的资料,大多数问题能够经过仔细研读上面两个文档获得答案json

 

官网上说了,spark和kafka连用有两种方式:接收器形式  以及 直连形式api

 

1、 接收器形式

优势:支持kafka的group.id设置,支持用kafka api查询offset,若是数据断掉后,能够经过group.id轻松找到上一次失败的位置app

缺点:分布式

1.失败处理复杂。因为kafka队列信息由kafka本身记录,当spark消费了数据可是处理中出错时会致使数据丢失。为了不数据丢失就必须开启Write Ahead Logs,把spark接收到的数据都存储到分布式文件系统中,好比HDFS,而后失败时从存储的记录中找到失败的消息。这致使同一批数据被kafka和spark存储了2次。形成数据冗余。函数

2.若是有多个地方都想获取同一个kafka队列的数据,必须创建多个流,没法用一个流并行处理。spa

该方法是比较老的一种方式,并不太被推荐。code

 

2、直连形式

优势:

1. 不需两次存储数据,直连形式时,spark本身管理偏移信息,再也不使用kafka的offset信息。因此spark能够自行处理失败状况,不要再次存储数据。spark保证数据传输时Exactly-once。

2.只需创建一个流就能够并行的在多个地方使用流中的数据

缺点:

不支持kafka的group,不支持经过kafka api查询offset信息!!!!

在链接后spark会根据fromOffsets参数设置起始offset,默认是从最新的数据开始的。也就是说,必须本身记录spark消耗的offset位置。不然在两次脚本启动中间的数据都会丢失。

 

我选用的是直连形式,我处理offset的方法是将spark消费的offset信息实时记录到文件中。在启动脚本时经过记录的文件来找到起始位置。

#!/usr/bin/python
# coding=utf-8
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
import time
import os
import json
broker_list = "xxxx"
topic_name = "xxxx"
timer = 5
offsetRanges = []


def store_offset_ranges(rdd):
    global offsetRanges
    offsetRanges = rdd.offsetRanges()
    return rdd


def save_offset_ranges(rdd):
    root_path = os.path.dirname(os.path.realpath(__file__))
    record_path = os.path.join(root_path, "offset.txt")
    data = dict()
    f = open(record_path, "w")
    for o in offsetRanges:
        data = {"topic": o.topic, "partition": o.partition, "fromOffset": o.fromOffset, "untilOffset": o.untilOffset}
    f.write(json.dumps(data))
    f.close()


def deal_data(rdd):
    data = rdd.collect()
    for d in data:
        # do something
        pass

def save_by_spark_streaming():
    root_path = os.path.dirname(os.path.realpath(__file__))
    record_path = os.path.join(root_path, "offset.txt")
    from_offsets = {}
    # 获取已有的offset,没有记录文件时则用默认值即最大值
    if os.path.exists(record_path):
        f = open(record_path, "r")
        offset_data = json.loads(f.read())
        f.close()
        if offset_data["topic"] != topic_name:
            raise Exception("the topic name in offset.txt is incorrect")

        topic_partion = TopicAndPartition(offset_data["topic"], offset_data["partition"])
        from_offsets = {topic_partion: long(offset_data["untilOffset"])}  # 注意设置起始offset时的方法
        print "start from offsets: %s" % from_offsets

    sc = SparkContext(appName="Realtime-Analytics-Engine")
    ssc = StreamingContext(sc, int(timer))

    kvs = KafkaUtils.createDirectStream(ssc=ssc, topics=[topic_name], fromOffsets=from_offsets,
                                        kafkaParams={"metadata.broker.list": broker_list})
    kvs.foreachRDD(lambda rec: deal_data(rec))
    kvs.transform(store_offset_ranges).foreachRDD(save_offset_ranges)

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()


if __name__ == '__main__':
    save_by_spark_streaming()

 

运行:

正常状况下,只要输入下面的语句就能够运行了

spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 spark_kafka.py

然而,个人老是报错,找不到依赖包,说各类库不认识。因此我只好用--jars来手动指定包的位置了..................

spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 --jars /root/.ivy2/jars/org.apache.kafka_kafka_2.11-0.8.2.1.jar,/root/.ivy2/jars/com.yammer.metrics_metrics-core-2.2.0.jar spark_kafka.py

 

 

 

吐槽:

我就踩在直连形式不支持offset的坑上了..... 开始官方文档没仔细看,就瞄了一眼说是直连形式好,就豪不犹豫的用了。结果个人脚本不稳定,各类断,而后中间数据就各类丢啊.......

还有官网上竟然彻底没有对fromOffsets这个参数的说明,我找了很久很久才弄清楚这个参数怎么拼出来啊.................

相关文章
相关标签/搜索