Kafka是由LinkedIn开发的一个分布式的消息中间件。git
首先到官网下载页面下载最新的发布版本,目前最新版是2.3.0(发布于2019年6月25日)。github
❯ wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
❯ tar -xzf kafka_2.12-2.3.0.tgz
❯ cd kafka_2.12-2.3.0
复制代码
Kafka须要配置Zookeeper使用,Zookeeper是Hadoop和Hbase的重要组件,能够为分布式应用程序协调服务。因此须要先启动一个ZooKeeper服务器,kafka源码中自带了便捷脚原本快速简单地建立一个单节点ZooKeeper实例:golang
❯ bin/zookeeper-server-start.sh config/zookeeper.properties # 放在终端1或者tmux里面
复制代码
而后启动Kafka服务器(启动前应该已经安装Openjdk了):apache
❯ bin/kafka-server-start.sh config/server.properties
复制代码
这样Kafka服务器就启动了,首先体验下在终端用源码自带的脚本建立Topic,发布和消费消息等:bootstrap
# 建立叫作“strconv” 的topic, 它有一个分区和一个副本
❯ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic strconv
# 列出所有topic
❯ bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
strconv
test
# 启动生产者,在交互模式下输入2条消息
❯ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic strconv
>Message 1
>Message 2
# 启动消费者,从开始部分消费,会将消息转储到标准输出
❯ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic strconv --from-beginning
Message 1
Message 2
复制代码
我就不继续演示多代理集群等用法了,能够看官方文档了解。bash
接着用Golang编写生产者和消费者,目前有2个主流的Golang客户端,咱们挨个体验下。服务器
顺便解释下,虽然 LinkedIn 开源了 Kafka,可是这个公司的核心语言用的是 Java,并且鲜少 Golang 应用,因此本身并无 Golang 客户端。session
confluent-kafka-go是Confluent公司的开源的Golang客户端。它其 实是C/C++客户端librdkafka的Golang封装。先安装它:异步
❯ brew install librdkafka pkg-config
❯ go get -u gopkg.in/confluentinc/confluent-kafka-go.v1/kafka
复制代码
项目下的examples 目录下有不少例子供参考。我就仿着写一个例子,首先看生产者:分布式
package main
import (
"fmt"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
"os"
)
func main() {
if len(os.Args) != 3 {
fmt.Fprintf(os.Stderr, "Usage: %s <broker> <topic>\n",
os.Args[0])
os.Exit(1)
}
broker := os.Args[1]
topic := os.Args[2]
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})
if err != nil {
fmt.Printf("Failed to create producer: %s\n", err)
os.Exit(1)
}
fmt.Printf("Created Producer %v\n", p)
deliveryChan := make(chan kafka.Event)
value := "Hello Go!"
err = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(value),
Headers: []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}},
}, deliveryChan)
e := <-deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
close(deliveryChan)
}
复制代码
例子里面用了os.Args,能够得到终端位置参数的结果,confluent_producer.go
须要传递2个参数:broker和topic。用Produce方法就发布一条消息,内容是"Hello Go!",另外消息中有个键为myTestHeader值为"header values are binary"的头信息" 。最后看Message的结果判断是否是交付成功了,成功后会打印分区和消费进度offset。而后是消费者:
package main
import (
"fmt"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
"os"
"os/signal"
"syscall"
)
func main() {
if len(os.Args) < 4 {
fmt.Fprintf(os.Stderr, "Usage: %s <broker> <group> <topics..>\n",
os.Args[0])
os.Exit(1)
}
broker := os.Args[1]
group := os.Args[2]
topics := os.Args[3:]
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": broker,
"broker.address.family": "v4",
"group.id": group,
"session.timeout.ms": 6000,
"auto.offset.reset": "earliest"})
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
os.Exit(1)
}
fmt.Printf("Created Consumer %v\n", c)
err = c.SubscribeTopics(topics, nil)
run := true
for run == true {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
ev := c.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("%% Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
if e.Headers != nil {
fmt.Printf("%% Headers: %v\n", e.Headers)
}
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
if e.Code() == kafka.ErrAllBrokersDown {
run = false
}
default:
fmt.Printf("Ignored %v\n", e)
}
}
}
fmt.Printf("Closing consumer\n")
c.Close()
}
复制代码
消费者接受三个参数:broker地址、GroupID和Topic名字。GroupID做用于消费者组,相同的GroupID标识消费者在一个组内,这些消费者协调在一块儿来消费订阅主题的全部分区,因此用一个新的GroupID能够再订阅主题的全部分区的消息一次。先生产2个消息:
❯ go run confluent_producer.go localhost:9092 strconv
Created Producer rdkafka#producer-1
Delivered message to topic strconv [0] at offset 0
❯ go run confluent_producer.go localhost:9092 strconv
Created Producer rdkafka#producer-1
Delivered message to topic strconv [0] at offset 1
复制代码
能够看到执行一次就会发布一个消息,因为strconv只有一个分区,因此输出都是[0]
,而offset会从0开始递增。接着启动消费者:
# 终端1
❯ go run confluent_consumer.go localhost:9092 1 strconv
Created Consumer rdkafka#consumer-1
% Message on strconv[0]@0:
Hello Go!
% Headers: [myTestHeader="header values are binary"]
% Message on strconv[0]@1:
Hello Go!
% Headers: [myTestHeader="header values are binary"]
Ignored OffsetsCommitted (<nil>, [strconv[0]@2])
# 终端2
❯ go run confluent_consumer.go localhost:9092 2 strconv
Created Consumer rdkafka#consumer-1
% Message on strconv[0]@0:
Hello Go!
% Headers: [myTestHeader="header values are binary"]
% Message on strconv[0]@1:
Hello Go!
% Headers: [myTestHeader="header values are binary"]
Ignored OffsetsCommitted (<nil>, [strconv[0]@2])
复制代码
2个终端下,GroupID不一样,因此他们各自消费了所有消息(2个)。
另外说明一下,这篇文章里全部代码部分,生产者都接收2个参数:消息代理服务器地址、Topic名字,消费者都接收三个参数:消息代理服务器地址、GroupID、Topic名字。
sarama是Shopify开源的Golang客户端。第一步仍是先安装它:
❯ go get -u github.com/Shopify/sarama
复制代码
为了演示多分区消息,此次从新建立一个Topic(叫作sarama),建2个分区:
❯ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic sarama
复制代码
而后看生产者:
package main
import (
"fmt"
"log"
"math/rand"
"os"
"os/signal"
"strconv"
"time"
"github.com/Shopify/sarama"
)
func main() {
if len(os.Args) != 3 {
fmt.Fprintf(os.Stderr, "Usage: %s <broker> <topic>\n",
os.Args[0])
os.Exit(1)
}
broker := os.Args[1]
topic := os.Args[2]
config := sarama.NewConfig()
config.Producer.Retry.Max = 5
config.Producer.RequiredAcks = sarama.WaitForAll
producer, err := sarama.NewAsyncProducer([]string{broker}, config)
if err != nil {
panic(err)
}
defer func() {
if err := producer.Close(); err != nil {
panic(err)
}
}()
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
chars := []byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ")
var enqueued, errors int
doneCh := make(chan struct{})
go func() {
for {
time.Sleep(1 * time.Second)
buf := make([]byte, 4)
for i := 0; i < 4; i++ {
buf[i] = chars[rand.Intn(len(chars))]
}
strTime := strconv.Itoa(int(time.Now().Unix()))
msg := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(strTime),
Value: sarama.StringEncoder(buf),
}
select {
case producer.Input() <- msg:
enqueued++
fmt.Printf("Produce message: %s\n", buf)
case err := <-producer.Errors():
errors++
fmt.Println("Failed to produce message:", err)
case <-signals:
doneCh <- struct{}{}
}
}
}()
<-doneCh
log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
}
复制代码
sarama有AsyncProducer和SyncProducer2中,这里是异步的。每次把键位当时时间戳,值为随机4个字符串的消息经过producer.Input()
通道发布。
消费程序也是用消费组:
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"github.com/Shopify/sarama"
)
type Consumer struct {
ready chan bool
}
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
close(consumer.ready)
return nil
}
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
log.Printf("Message claimed: key = %s, value = %v, topic = %s, partition = %v, offset = %v", string(message.Key), string(message.Value), message.Topic, message.Partition, message.Offset)
session.MarkMessage(message, "")
}
return nil
}
func main() {
if len(os.Args) < 4 {
fmt.Fprintf(os.Stderr, "Usage: %s <broker> <group> <topics..>\n",
os.Args[0])
os.Exit(1)
}
broker := os.Args[1]
group := os.Args[2]
topics := os.Args[3:]
version, err := sarama.ParseKafkaVersion("2.3.0")
if err != nil {
log.Panicf("Error parsing Kafka version: %v", err)
}
config := sarama.NewConfig()
config.Version = version
consumer := Consumer{
ready: make(chan bool, 0),
}
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup([]string{broker}, group, config)
if err != nil {
log.Panicf("Error creating consumer group client: %v", err)
}
wg := &sync.WaitGroup{}
go func() {
wg.Add(1)
defer wg.Done()
for {
if err := client.Consume(ctx, topics, &consumer); err != nil {
log.Panicf("Error from consumer: %v", err)
}
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool, 0)
}
}()
<-consumer.ready
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
log.Println("terminating: context cancelled")
case <-sigterm:
log.Println("terminating: via signal")
}
cancel()
wg.Wait()
if err = client.Close(); err != nil {
log.Panicf("Error closing client: %v", err)
}
}
复制代码
消费者要这个要复杂一些,一开始声明了Consumer结构体,包含Setup/Cleanup/ConsumeClaim方法,这个都是处理时须要的方法。另外在消费组用法下须要sarama.ParseKafkaVersion("2.3.0")
指定Kafka版本,另外这里面也添加了信号Signal,当终端程序时会判断终止缘由,若是是Ctrl+c之类的信号引发的会打印terminating: via signal
。
另外用了消费逻辑是放在协程中运行的,用了sync.WaitGroup保证协程运行结束再关闭。还要要注意用context.WithCancel
建立的上下文也也返回了取消函数,须要在最后取消上下文,要否则信号没法终止程序。
运行一下:
❯ go run sarama_producer.go localhost:9092 sarama
Produce message: BZGB
Produce message: CTCU
Produce message: SJFB
Produce message: DNJO
Produce message: EZQL
Produce message: JZPF
Produce message: SBZR
Produce message: FDZD
❯ go run sarama_consumer.go localhost:9092 1 sarama
2019/07/19 13:01:07 Message claimed: key = 1563512466, value = BZGB, topic = sarama, partition = 0, offset = 0
2019/07/19 13:01:07 Message claimed: key = 1563512467, value = CTCU, topic = sarama, partition = 1, offset = 0
2019/07/19 13:01:08 Message claimed: key = 1563512468, value = SJFB, topic = sarama, partition = 0, offset = 1
2019/07/19 13:01:09 Message claimed: key = 1563512469, value = DNJO, topic = sarama, partition = 1, offset = 1
2019/07/19 13:01:10 Message claimed: key = 1563512470, value = EZQL, topic = sarama, partition = 1, offset = 2
2019/07/19 13:01:11 Message claimed: key = 1563512471, value = JZPF, topic = sarama, partition = 0, offset = 2
2019/07/19 13:01:12 Message claimed: key = 1563512472, value = SBZR, topic = sarama, partition = 1, offset = 3
2019/07/19 13:01:13 Message claimed: key = 1563512473, value = FDZD, topic = sarama, partition = 0, offset = 3
复制代码
能够看到这些消息被相对均匀的分布到0和1这2个分区里面。另外能够在其余终端上用别的GroupID订阅消息如go run sarama_consumer.go localhost:9092 2 sarama
(GroupID为2)。
若是多个终端的GroupID同样,不一样的进程会消费绑定的对应分区里面的消息,不会重复消费。举个例子:有2个终端执行go run sarama_consumer.go localhost:9092 1 sarama
,那么终端A会消费0分区的,终端B会消费1分区的消息。但有3个终端执行的话,因为目前只有2个分区,终端C因为没有绑定到分区,什么都不会消费
Sarama不管是文档仍是API都差不少,至于Star会比confluent-kafka-go高不少我不太理解,多是Sarama建立时间比较早。
我决定在之后的项目中使用confluent-kafka-go,若是你有对应的在生产环境中使用的经验,欢迎留言告诉我~
原文地址: strconv.com/posts/use-k…
完整代码能够在这个地址找到。