分布式消息系统:Kafka

前言

做为消息中间件,Kafka用以服务消息的异步传输
功能和JMS相似:生产者把消息放进队列中,消费者从队列中获取数据
可是实现起来倒是彻底不同spring

Kafka流程结构图

clipboard.png

1.Producer是经过链接Broker来发送消息的
2.Producer是向某个Topic来传递送消息的
3.发送到Topic中的消息是能够给不一样Broker处理的
4.Consumer是链接到某个Broker来监听订阅的Topic的
5.Brokers是经过Zookeeper来进行管理的,并互相通知各自的运行状况
6.若是Producer或Consumer链接到的Broker没有相关的Topic的,那么消息会自动路由到相关的Broker, 下一次Producer或者Consumer会自动记住相关的Brokerapache

clipboard.png

1.在发送消息后,每一个消息会依次排列到每一个Partition
2.消息是能够经过配置来决定要在Partition上保留多久
3.每一个消费者能够从不一样队列位置来开始消费消息,而且能够重复消费bootstrap

Kafka安装

下载:curl

curl -L -O http://mirror.bit.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz

修改配置异步

vi kakfa/config/server.properties

日志目录
log.dirs=/tmp/kafka/logs

分配内存(小于256会致使内存溢出)
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" 

向zookeeper注册
zookeeper.connect=106.15.205.155:2181

删除数据(防止磁盘爆炸)maven

vi /config/server.properties

根据存放时间
log.retention.hours=168

根据数据大小
log.segment.bytes=1073741824

启动url

bin/kafka-server-start.sh config/server.properties &

关闭spa

bin/kafka-server-stop.sh

建立一个topic日志

bin/kafka-topics.sh --create --zookeeper 106.15.205.155:2181 --replication-factor 1 --partitions 1 --topic test

查看全部topiccode

bin/kafka-topics.sh --list --zookeeper 106.15.205.155:2181

producer发送消息

bin/kafka-console-producer.sh --broker-list 106.15.205.155:9092 --topic test

consumer接收消息

bin/kafka-console-consumer.sh --zookeeper 106.15.205.155:2181 --topic test --from-beginning

删除指定topic

./bin/kafka-topics.sh  --delete --zookeeper 106.15.205.155:2181  --topic test

实际操做代码

使用spring操做kafka

导入maven依赖:

<!--kafka-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.3.0.RELEASE</version>
        </dependency>

消息生产者:

配置类:

@Configuration
public class KafkaProducerConfig {
    private String bootstrapServers = "IP:9092";

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

发送消息类:

@Component
public class KafkaSender {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void send(){
        kafkaTemplate.send("topic_1","luxiaotao");
    }
}

消息接受者
配置类:

@Configuration
public class KafkaConsumerConfig {

    private String bootstrapServers="IP:9092";


    private String topic = "topic_1";

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, topic);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

}

消息接受类:

@Component
public class KafkaReceive {
    @KafkaListener(topics = {"topic_1"})
    public void receive(String content){
        System.out.println("============================="+content+"============================");
    }
}

以上即是kafka的基本安装和使用,谢谢阅读

相关文章
相关标签/搜索