因为项目中必须使用 kafka 来做为消息组件,因此使用 kafka 有一段时间了。不得不感叹 kafka 是一个至关优秀的消息系统。下面直接对使用过程作一总结,但愿对你们有用。git
kafka 的简单搭建咱们使用 docker 进行,方便快捷单节点。生产环境不推荐这样的单节点 kafka 部署。github
网上不少教程,安装也简单,不做为重点赘述。docker
将如下内容直接复制到新建空文件docker-compose.yml
中。c#
version: "3" services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka depends_on: [zookeeper] ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: localhost KAFKA_CREATE_TOPICS: "test" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: - /var/run/docker.sock:/var/run/docker.sock
在docker-compose.yml
文件的目录下执行如下命令:服务器
docker-compose build # 打包 docker-compose up # 启动, 添加 -d 能够后台启动。
看到日志输出:异步
Creating network "desktop_default" with the default driver Creating desktop_zookeeper_1 ... done Creating desktop_kafka_1 ... done Attaching to desktop_zookeeper_1, desktop_kafka_1 zookeeper_1 | ZooKeeper JMX enabled by default zookeeper_1 | Using config: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg zookeeper_1 | 2020-05-17 03:34:31,794 [myid:] - INFO [main:QuorumPeerConfig@136] - Reading configuration from: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg ... zookeeper_1 | 2020-05-17 03:34:31,872 [myid:] - INFO [main:ZooKeeperServer@836] - tickTime set to 2000 ... kafka_1 | Excluding KAFKA_VERSION from broker config
没有错误输出说明部署成功。async
在 github 上可以找到好几个 c#可使用的 kafka 客户端。你们能够去搜一下,本文就只说明rdkafka-dotnet和confluent-kafka-dotnet。tcp
咱们生产环境中就使用的该客户端。在该项目 github 首页中能够看到:函数
var config = new Config() { GroupId = "example-csharp-consumer" }; using (var consumer = new EventConsumer(config, "127.0.0.1:9092")) { consumer.OnMessage += (obj, msg) => { //... }; }
没错,使用它的缘由就是它提供了EventConsumer,能够直接异步订阅消息。总体上来讲该客户端很是的稳定,性能优良。使用过程当中比较难缠的就是它的配置,比较不直观。它基于librdkafka(C/C++)实现,配置 Config 类中显式配置比较少,大多数是经过字典配置的,好比:性能
var config = new Config(); config["auto.offset.reset"] = "earliest";//配置首次消息偏移位置为最先
这对于新手来讲并非很友好,很难想到去这样配置。固然若是有 librdkafka 的使用经验会好不少。大多数配置在 librdkafka 项目的CONFIGURATION。
还有一个须要注意的是 Broker 的版本支持Broker version support: >=0.8,也在 librdkafka 项目中能够找到。
confluent-kafka-dotnet 是 rdkafka-dotnet(好几年没有维护了)的官方后续版本。推荐使用 confluent-kafka-dotnet,由于配置相对友好,更加全面。好比:
var conf = new ConsumerConfig { AutoOffsetReset = AutoOffsetReset.Earliest//显式强类型赋值配置 };
对于 EventConsumer 怎么办呢?在项目变动记录中已经明确提出移除了 OnMessage 多播委托,而 EventConsumer,也就不存在了。但这不难,咱们能够参照基项目写一个:
public class EventConsumer<TKey, TValue> : IDisposable { private Task _consumerTask; private CancellationTokenSource _consumerCts; public IConsumer<TKey, TValue> Consumer { get; } public ConsumerBuilder<TKey, TValue> Builder { get; set; } public EventConsumer(IEnumerable<KeyValuePair<string, string>> config) { Builder = new ConsumerBuilder<TKey, TValue>(config); Consumer = Builder.Build(); } public event EventHandler<ConsumeResult<TKey, TValue>> OnConsumeResult; public event EventHandler<ConsumeException> OnConsumeException; public void Start() { if (Consumer.Subscription?.Any() != true) { throw new InvalidOperationException("Subscribe first using the Consumer.Subscribe() function"); } if (_consumerTask != null) { return; } _consumerCts = new CancellationTokenSource(); var ct = _consumerCts.Token; _consumerTask = Task.Factory.StartNew(() => { while (!ct.IsCancellationRequested) { try { var cr = Consumer.Consume(TimeSpan.FromSeconds(1)); if (cr == null) continue; OnConsumeResult?.Invoke(this, cr); } catch (ConsumeException e) { OnConsumeException?.Invoke(this, e); } } }, ct, TaskCreationOptions.LongRunning, TaskScheduler.Default); } public async Task Stop() { if (_consumerCts == null || _consumerTask == null) return; _consumerCts.Cancel(); try { await _consumerTask; } finally { _consumerTask = null; _consumerCts = null; } } public void Dispose() { if (_consumerTask != null) { Stop().Wait(); } Consumer?.Dispose(); } }
使用测试:
static async Task Main(string[] args) { Console.WriteLine("Hello World!"); var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "localhost:9092", AutoOffsetReset = AutoOffsetReset.Earliest, }; var eventConsumer = new EventConsumer<Ignore, string>(conf); eventConsumer.Consumer.Subscribe(new[] {"test"}); eventConsumer.OnConsumeResult += (sen, cr) => { Console.WriteLine($"Receive '{cr.Message.Value}' from '{cr.TopicPartitionOffset}'"); }; do { var line = Console.ReadLine(); switch (line) { case "stop": eventConsumer.Stop(); break; case "start": eventConsumer.Start(); break; } } while (true); }
!!!如下讨论都是对confluent-kafka-dotnet。
因为用户终端也使用了 kafka 客户端订阅消息。若是终端长时间没有上线,而且消息过时时间也较长,服务端会存有大量消息。终端一上线就会读取到大量的堆积消息,很容易就把内存耗尽了。考虑到客户端不是长期在线的场景,无需不间断的处理全部消息,服务端才适合这个角色(:。因此客户端只需每次从登陆时的最新点开始读取就能够了,历史性统计就交给服务器去作。
最便捷的方法是每次客户端链接都使用新的groupid,用时间或者guid撒盐。但这样会使服务端记录大量的group信息(若是终端不少m个,而且终端断开链接重连的次数也会不少随机n次,那么也是m*n个group信息),势必对服务端性能形成影响。
另外一种方法是在保持groupid不变的状况下,修改消费偏移。那如何去设置位置偏移为最新点呢?
在配置中存在一个让新手容易产生误解的配置项AutoOffsetReset.Latest自动偏移到最新位置。当你兴冲冲的准备大干一番时发现只有首次建立GroupId时会起做用,当 groupid 已经存在 kafka 记录中时它就无论用了。
咱们可以在IConsumer<TKey, TValue>
中找到该 commit 方法,它有三个重载:
1. 无参函数。就是提交当前客户端`IConsumer<TKey, TValue>.Assignment`记录的偏移。 2. 参数ConsumeResult<TKey, TValue>。一次仅提交一个偏移。固然配置中默认设置为自动提交(`conf.EnableAutoCommit = true;`),无需手动提交。 3. 参数IEnumerable<TopicPartitionOffset> offsets。直接提交到某一个位置。TopicPartitionOffset有三个决定性属性:话题topic、分区:partition、偏移offset。
第三个函数就是咱们想要的,咱们只需获得对应参数TopicPartitionOffset的值就能够。
topic 是咱们惟一能够肯定的。在IConsumer<TKey, TValue>.Assignment
中能够获得 topic 和 partition。但遗憾的是它只有不会当即有值。咱们只能主动去服务端获取,在IAdminClient
中找到了可获取该信息的方法,因此咱们作一扩展:
public static IEnumerable<TopicPartition> GetTopicPartitions(ConsumerConfig config, string topic, TimeSpan timeout) { using var adv = new AdminClientBuilder(config).Build(); var topPns = adv.GetTopicPartition(topic, timeout); return topPns; } public static IEnumerable<TopicPartition> GetTopicPartition(this AdminClient client, string topic, TimeSpan timeout) { var mta = client.GetMetadata(timeout); var topicPartitions = mta.Topics .Where(t => topic == t.Topic) .SelectMany(t => t.Partitions.Select(tt => new TopicPartition(t.Topic, tt.PartitionId))) .ToList(); return topicPartitions; }
咱们还差 offset 的值,经过IConsumer<TKey, TValue>.QueryWatermarkOffsets
方法能够查到当前水位,而其中 High 水位就是最新偏移。
如今咱们能够完成咱们的任务了吗?问题再次出现,虽然客户端表现得从最新点消费了,可是在此以前的卡顿和相似与内存溢出让人不得心安。Commit 仍是消费了全部消息:(,只不过暗搓搓的进行。在全部消息消费期间读取全部未消费,而后拼命提交。客户端哪有这么大的内存和性能呢。最终,找到一个和第三个 commit 方法同样接受参数的方法Assign
,一试果真灵验。
public static void AssignOffsetToHighWatermark<TKey, TValue>(this IConsumer<TKey, TValue> consumer, TopicPartition partition, TimeSpan timeout) { var water = consumer.QueryWatermarkOffsets(partition, timeout); if (water == null || water.High == 0) return; var offset = new TopicPartitionOffset(partition.Topic, partition.Partition, water.High); consumer.Assign(offset); }
最终的使用示例:
//... var topicPartitions = ConsumerEx.GetTopicPartitions(conf, "test", TimeSpan.FromSeconds(5)); topicPartitions?.ToList().ForEach(t => { eventConsumer.Consumer.AssignOffsetToHighWatermark(t, TimeSpan.FromSeconds(5)); }); eventConsumer.Start();//在消费事件开始以前就能够进行偏移设置 //...
请注意,若是您关闭了自动提交功能,而且不主动提交任何偏移信息,那么服务端对该 group 的偏移记录将一直不变,Assign 函数并不会改变任何服务的偏移记录。
这一圈下来整个 kafka 的基本消费流程也就搞清楚了。kafka 消费者须要对消费的消息进行提交。事实上,每一个消息体里都有偏移信息。不提交对于服务端来讲就是客户端没有处理过该消息,将不会更改已消费偏移。以此来保证消息消费的可靠性。这和 tcp 中三次握手有殊途同归之妙。
服务端保存着每个 groupid 对应的已经提交偏移Committed Offset
。固然客户端不提交它是不会变动的(不考虑直接操做服务端的形式)。
客户端保存本身的当前偏移Current Offset
,能够经过Assign
和Commit
进行更改,两者区别是Commit
将连同提交到服务端对应的偏移中进行更改,而Assign
仅改变客户端偏移,这一更改记录在IConsumer<TKey, TValue>.Assignment
中,首次启动时客户端异步向服务端请求Committed Offset
来对其赋值。这就是在 3.2 节中咱们没有当即获得该值的的缘由,该值将在可能在几秒中后被赋值,因此写了一个主动获取的方法GetTopicPartition
。客户端下一次消费将根据IConsumer<TKey, TValue>.Assignment
进行。
使用AdminClientBuilder.GetMetadata
函数能够获得对应话题的元数据,包括:topic、partition、Brokers 等。
使用IConsumer<TKey, TValue>.QueryWatermarkOffsets
函数能够获得当前服务端的水位,low 为最先的偏移(可能不是 0,考虑消息过时被删除的状况),high 为最新的偏移。