spring+Kafka+springmvc Demo

1、下载(官网下载:http://kafka.apache.org/downloads.html)

任意下载一个zip解压html

2、制做demo示例,spring+springmvc+Kafka

开发前请先安装zookeeper,传送门 java

1. 启动kafka服务:安装目录下.\bin\windows\kafka-server-start.bat .\config\server.properties git

启动zookeeper:进入zk的安装文件夹bin目录下双击zkServer.bat文件(由于kafka的topic须要在zk注册中心注册)github


2.建立topic:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testweb


3.引入maven依赖,其中引入了多余的依赖,直接从笔者的demo项目中复制过来spring

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.1</version>
    <exclusions>
        <exclusion>
            <artifactId>jmxri</artifactId>
            <groupId>com.sun.jmx</groupId>
        </exclusion>
        <exclusion>
            <artifactId>jms</artifactId>
            <groupId>javax.jms</groupId>
        </exclusion>
        <exclusion>
            <artifactId>jmxtools</artifactId>
            <groupId>com.sun.jdmk</groupId>
        </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.1.1.RELEASE</version>
</dependency>


<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.9.0.1</version>
</dependency>复制代码
复制代码
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-web</artifactId>
    <version>4.3.10.RELEASE</version>
</dependency>

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-webmvc</artifactId>
    <version>4.3.14.RELEASE</version>
</dependency>

<dependency>
    <groupId>org.freemarker</groupId>
    <artifactId>freemarker</artifactId>
    <version>2.3.22</version>
</dependency>

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context-support</artifactId>
    <version>4.1.6.RELEASE</version>
</dependency>

<dependency>
    <groupId>com.github.miemiedev</groupId>
    <artifactId>mybatis-paginator</artifactId>
    <version>1.2.15</version>
</dependency>

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>2.8.9</version>
</dependency>
<dependency>
    <groupId>carhouse-test</groupId>
    <artifactId>carhouse-test-api</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>dubbo</artifactId>
    <version>2.5.3</version>
    <exclusions>
        <exclusion>
            <artifactId>spring</artifactId>
            <groupId>org.springframework</groupId>
        </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>com.101tec</groupId>

    <artifactId>zkclient</artifactId>
    <version>0.10</version>
</dependency>

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.7.5.RELEASE</version>
</dependency>复制代码
复制代码

4.producer配置(kafka-producer.xml)apache

<!--基本配置 -->
<bean id="producerProperties" class="java.util.HashMap">
    <constructor-arg>
        <map>
            <!-- kafka服务地址,多是集群-->
            <entry key="bootstrap.servers" value="localhost:9092,localhost:9093,localhost:9094" />
            <!-- 有可能致使broker接收到重复的消息,默认值为3-->
            <entry key="retries" value="10" />
            <!-- 每次批量发送消息的数量-->
            <entry key="batch.size" value="1638" />
            <!-- 默认0ms,在异步IO线程被触发后(任何一个topic,partition满均可以触发)-->
            <entry key="linger.ms" value="1" />
            <!--producer能够用来缓存数据的内存大小。若是数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常 -->
            <entry key="buffer.memory" value="33554432 " />
            <!-- producer须要server接收到数据以后发出的确认接收的信号,此项配置就是指procuder须要多少个这样的确认信号-->
            <entry key="acks" value="all" />
            <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
            <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
        </map>
    </constructor-arg>
</bean>

<!-- 建立kafkatemplate须要使用的producerfactory bean -->
<bean id="producerFactory"
      class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
    <constructor-arg>
        <ref bean="producerProperties" />
    </constructor-arg>
</bean>

<!-- 建立kafkatemplate bean,使用的时候,只须要注入这个bean,便可使用template的send消息方法 -->
<bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg ref="producerFactory" />
    <!--设置对应topic-->
    <property name="defaultTopic" value="test" />
</bean>复制代码

5.consumer配置(kafka-consumer.xml)bootstrap

<bean id="consumerProperties" class="java.util.HashMap">
    <constructor-arg>
        <map>
            <!--Kafka服务地址 -->
            <entry key="bootstrap.servers" value="localhost:9092" />
            <!--Consumer的组ID,相同goup.id的consumer属于同一个组。 -->
            <entry key="group.id" value="order-beta" />
            <!--若是此值设置为true,consumer会周期性的把当前消费的offset值保存到zookeeper。当consumer失败重启以后将会使用此值做为新开始消费的值。 -->
            <entry key="enable.auto.commit" value="true" />
            <!--网络请求的socket超时时间。实际超时时间由max.fetch.wait + socket.timeout.ms 肯定 -->
            <entry key="session.timeout.ms" value="15000 " />
            <entry key="key.deserializer"
                   value="org.apache.kafka.common.serialization.StringDeserializer" />
            <entry key="value.deserializer"
                   value="org.apache.kafka.common.serialization.StringDeserializer" />
        </map>
    </constructor-arg>
</bean>

<!--指定具体监听类的bean -->
<bean id="messageListernerConsumerService" class="mq.kafka.KafkaConsumerListener" />

<!-- 建立consumerFactory bean -->
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
    <constructor-arg>
        <ref bean="consumerProperties"/>
    </constructor-arg>
</bean>

<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
    <constructor-arg value="test"/>
    <property name="messageListener" ref="messageListernerConsumerService"/>
</bean>

<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
    <constructor-arg ref="consumerFactory"/>
    <constructor-arg ref="containerProperties"/>
</bean>复制代码

5.在spring-mvc.xml的配置windows

<!-- 使用zookeeper注册中心暴露服务地址 -->
<dubbo:application name="test-provider"/>
<dubbo:registry protocol="zookeeper" address="zookeeper://127.0.0.1:2181"/>复制代码
<!-- 引入kafka配置文件,根据我的文件位置-->
<import resource="classpath:./kafka/kafka-producer.xml"/>
<import resource="classpath:./kafka/kafka-consumer.xml"/>
<context:annotation-config />
复制代码


6.实际使用api

最简单的一条消息发送

@Resource
private KafkaTemplate<Integer, String> kafkaTemplate;

@RequestMapping(value = "/hello.do")
public void hello(){

    kafkaTemplate.sendDefault("test it");
    
}复制代码

消息消费

package mq.kafka;


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.MessageListener;

public class KafkaConsumerListener implements MessageListener<Integer, String> {



    public void onMessage(ConsumerRecord<Integer, String> consumerRecord) {

        Object o = consumerRecord.value();
        System.out.println(String.valueOf(o));
    }
}复制代码

最后会在控制台输出“test it”


写的有什么不对的,欢迎各位指出。

相关文章
相关标签/搜索