flume接收http请求,并将数据写到kafka

flume接收http请求,并将数据写到kafka,spark消费kafka的数据。是数据采集的经典框架。java

直接上flume的配置:node

source : httpspring

channel : fileapache

sink : kafkajson

xx :~/software/flume1.8/conf$ cat http-file-kafka.conf 
# example.conf: A single-node Flume configuration ########## # data example # use post request, select raw, data example : [{"body" : "{'xx':'xxxxx1'}"}] # just use the office request demo #[{ # "headers" : { # "timestamp" : "434324343", # "host" : "random_host.example.com" # "topic" : "venn" # if headers contain topic, will replace the default topic # }, # "body" : "random_body" # random_body is the message send to channel # }] # Name the components on this agent1 agent1.sources = s1 agent1.sinks = k1 agent1.channels = c1 # Describe/configure the source agent1.sources.s1.type = http agent1.sources.s1.bind = spring # localhost 只能接收本地请求 agent1.sources.s1.port = 8084 # http的端口 agent1.sources.s1.handler = org.apache.flume.source.http.JSONHandler # 自带的接收http请求的handler # Describe the sink agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink # kafkasink agent1.sinks.k1.kafka.topic = mytopic # topic agent1.sinks.k1.kafka.bootstrap.servers = localhost:9092 # kafka host and port agent1.sinks.k1.kafka.flumeBatchSize = 20 agent1.sinks.k1.kafka.producer.acks = 1 agent1.sinks.k1.kafka.producer.linger.ms = 1 agent1.sinks.k1.kafka.producer.compression.type = snappy # 压缩 # Use a channel which buffers events in memory agent1.channels.c1.type = file #agent1.channels.c1.capacity = 1000 # 这两个参数要配置,须要配大一点,否则channel满了会报错,http返回503(通道已满) #agent1.channels.c1.transactionCapacity = 100 agent1.channels.c1.checkpointDir = /opt/flume/checkpoint agent1.channels.c1.dataDirs = /opt/flume/channel # Bind the source and sink to the channel agent1.sources.s1.channels = c1 agent1.sinks.k1.channel = c1

有了flume的配置,下面启动flume:bootstrap

./bin/flume-ng agent -n agent1 -c conf -f conf/http-to-kafka.properties -Dflume.root.logger=INFO,console

启动以后,就能够发http请求了。app

http请求的格式以下:框架

[{
  "headers" : { "timestamp" : "434324343", "host" : "random_host.example.com",  "topic" : "xxx" }, "body" : "random_body" }, { "headers" : { "namenode" : "namenode.example.com", "datanode" : "random_datanode.example.com" }, "body" : "really_random_body" }] 

注: http请求的headers中又topic 会替代配置文件中的topicdom

  flume官网文档说:1.8.0版本的flume只支持0.9.x的kafka,不支持0.8.x的kafka了(没测过)post

而后就是发数的程序了(本身请求太麻烦了。)

package com.venn.http;

import com.venn.entity.User; import java.io.BufferedReader; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.io.UnsupportedEncodingException; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; import java.util.*; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.event.JSONEvent; import com.google.gson.Gson; import org.apache.flume.source.http.HTTPBadRequestException; import org.apache.flume.source.http.HTTPSourceHandler; import javax.servlet.http.HttpServletRequest; /** * Created by venn on 19-1-17. */ public class HttpDemo { private static String urlStr = "http://localhost:8084"; private static Random random = new Random(); public static void main(String[] args) throws InterruptedException { while (true){ String message = new User().toString(); send(message); // Thread.sleep(1);  } } public static void send(String message){ System.out.println("send message : " + message); try{ //建立链接 URL url = new URL(urlStr); HttpURLConnection connection = (HttpURLConnection) url.openConnection(); connection.setDoOutput(true); connection.setDoInput(true); connection.setRequestMethod("POST"); connection.setUseCaches(false); connection.setInstanceFollowRedirects(true); connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); connection.connect(); //POST请求 DataOutputStream out = new DataOutputStream( connection.getOutputStream()); JSONEvent jsonEvent = new JSONEvent(); Map header = new HashMap(); header.put("timestamp", System.currentTimeMillis()); header.put("host", "venn"); header.put("topic","venn"+random.nextInt(4)); jsonEvent.setBody(message.getBytes()); jsonEvent.setHeaders(header); Gson gson = new Gson(); List list = new ArrayList(); list.add(jsonEvent); out.writeBytes(gson.toJson(list)); out.flush(); out.close(); //读取响应 BufferedReader reader = new BufferedReader(new InputStreamReader( connection.getInputStream())); // 不会返回数据 int code = connection.getResponseCode(); String lines; StringBuffer sb = new StringBuffer(""); while ((lines = reader.readLine()) != null) { lines = new String(lines.getBytes(), "utf-8"); sb.append(lines); } System.out.println("code : " + code + ", message : " + sb); reader.close(); // 断开链接  connection.disconnect(); } catch (MalformedURLException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }

搞定。。

发数:

 kafka接收到的数据:

 

注意: 因为在headers中加入了topic参数,实际接收到的数据是在不一样的kafka topic中的

相关文章
相关标签/搜索