brew install kafka
安装kafka须要依赖zookeeper的,因此安装kafka的时候也会包含zookergit
server.properties中重要配置
- broker.id=0
- listeners=PLAINTEXT://:9092
- advertised.listeners=PLAINTEXT://127.0.0.1:9092
- log.dirs=/usr/local/var/lib/kafka-logs
zookeeper.properties重要配置
- dataDir=/usr/local/var/lib/zookeeper
- clientPort=2181
- maxClientCnxns=0
新建立终端启动zookeeper
- cd /usr/local/Cellar/kafka/2.1.0
- ./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
- 打印台显示:INFO Reading configuration from: /usr/local/etc/kafka/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
- ...便是启动成功
新建立终端启动kafka(启动kafka以前必须先启动zookeeper)
- cd /usr/local/Cellar/kafka/2.1.0
- ./bin/kafka-server-start /usr/local/etc/kafka/server.properties
- 打印台显示:INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
- ...即启动成功
- 启动了kafka以后,zookeeper端会报一些Error:KeeperErrorCode = NoNode for /config/topics/test之类的错误,这个是没有问题的,这是由于kafka向zookeeper发送了关于该路径的一些请求信息,可是不存在,因此这是没有问题的
新建立终端
- cd /usr/local/Cellar/kafka/2.1.0
- 建立一个名为“test”的主题:./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
- 查看全部的topic:./bin/kafka-topics --list --zookeeper localhost:2181
- 查看某个topic的信息,好比test:./bin/kafka-topics --describe --zookeeper localhost:2181 --topic test
新建立一个终端,做为生产者,用于发送消息,每一行就是一条信息,将消息发送到kafka服务器
- cd /usr/local/Cellar/kafka/2.1.0
- ./bin/kafka-console-producer --broker-list localhost:9092 --topic test
- send one message
- send two message
新建立一个终端做为消费者,接受消息
- cd /usr/local/Cellar/kafka/2.1.0
- ./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
- send one message
- send two message(这些即是从生产者得到的消息)
import ( "fmt" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() // 等待服务器全部副本都保存成功后的响应 config.Producer.RequiredAcks = sarama.WaitForAll // 随机的分区类型:返回一个分区器,该分区器每次选择一个随机分区 config.Producer.Partitioner = sarama.NewRandomPartitioner // 是否等待成功和失败后的响应 config.Producer.Return.Successes = true // 使用给定代理地址和配置建立一个同步生产者 producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { panic(err) } defer producer.Close() //构建发送的消息, msg := &sarama.ProducerMessage { //Topic: "test",//包含了消息的主题 Partition: int32(10),// Key: sarama.StringEncoder("key"),// } var value string var msgType string for { _, err := fmt.Scanf("%s", &value) if err != nil { break } fmt.Scanf("%s",&msgType) fmt.Println("msgType = ",msgType,",value = ",value) msg.Topic = msgType //将字符串转换为字节数组 msg.Value = sarama.ByteEncoder(value) //fmt.Println(value) //SendMessage:该方法是生产者生产给定的消息 //生产成功的时候返回该消息的分区和所在的偏移量 //生产失败的时候返回error partition, offset, err := producer.SendMessage(msg) if err != nil { fmt.Println("Send message Fail") } fmt.Printf("Partition = %d, offset=%d\n", partition, offset) } }
import ( "fmt" "github.com/Shopify/sarama" "sync" ) var ( wg sync.WaitGroup ) func main() { // 根据给定的代理地址和配置建立一个消费者 consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil) if err != nil { panic(err) } //Partitions(topic):该方法返回了该topic的全部分区id partitionList, err := consumer.Partitions("test") if err != nil { panic(err) } for partition := range partitionList { //ConsumePartition方法根据主题,分区和给定的偏移量建立建立了相应的分区消费者 //若是该分区消费者已经消费了该信息将会返回error //sarama.OffsetNewest:代表了为最新消息 pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest) if err != nil { panic(err) } defer pc.AsyncClose() wg.Add(1) go func(sarama.PartitionConsumer) { defer wg.Done() //Messages()该方法返回一个消费消息类型的只读通道,由代理产生 for msg := range pc.Messages() { fmt.Printf("%s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Topic,msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) } }(pc) } wg.Wait() consumer.Close() }
服务解耦github
好比咱们发了一个帖子,除了写入数据库以外还有不少联动操做,好比给关注这个用户的人发送通知,推送到首页的时间线列表,若是用代码实现的话,发帖服务就要调用通知服务,时间线服务,这样的耦合很大,而且若是增长一个功能依赖发帖,除了要增长新功能外还要修改发帖代码。golang
解决方法:引入kafka,将发完贴的消息放入kafka消息队列中,对这个主题感兴趣的功能就本身去消费这个消息,那么发帖功能就可以彻底独立。同时即便发帖进程挂了,其余功能还可以使用,这样能够将bug隔离在最小范围内数据库
流量削峰apache
流量削峰在消息队列中也是经常使用场景,通常在秒杀或团购活动中使用比较普遍。当流量太大的时候达到服务器瓶颈的时候能够将事件放在kafka中,下游服务器当接收到消息的时候本身去消费,有效防止服务器被挤垮bootstrap
消息队列通常都内置了高效的通讯机制,所以也能够用在纯的消息通信中,好比客户端A跟客户端B都使用同一队列进行消息通信,客户端A,客户端B,客户端N都订阅了同一个主题进行消息发布和接受不了实现相似聊天室效果数组
参考代码服务器