在工做中,使用uwsgi部署项目,其中uwsgi设置为多进程,而且python中使用了kafka-python模块做为生产者不断产生数据,但上线不久后几乎全部的生产者消息都报:KafkaTimeoutError这个错误,而且在kafka服务器中并无发现收到任何消息。html
因而看了看kafka-python源码,发如今执行send方法后,消息并无当即发送,而是放到本地的缓存中,在生成KafkaProducer实例时,有个选项buffer_memory设置了缓存的大小,默认为32M,而后若是这个buffer满了就会报KafkaTimeoutError,因此初步判断两个缘由:python
1 生产者消息并无发送出去,git
2 或者消息发送相对于消息生成来讲过于缓慢致使github
同时又由于看到kafka服务器中并无接收到任何消息,遂排除第二个缘由。也就是说生产者消息没有发送出去。因而采用一样的配置用写了一个脚本发现kafka服务器能够接收到消息,鉴定是个人生产者有问题,遂谷歌解决问题,找到该帖子:https://github.com/dpkp/kafka-python/issues/721。发布人状况和我差很少,做者回复到:json
You cannot share producer instances across processes, only threads. I expect that is why the master process pattern is failing.bootstrap
Second, producer.send() is async but is not guaranteed to deliver if you close the producer abruptly. In your final example I suspect that your producer instances are so short-lived that they are being reaped before flushing all pending messages. To guarantee delivery (or exception) call producer.send().get(timeout) or producer.flush() . otherwise you'll need to figure out how to get a producer instance per-uwsgi-thread and have it shared across requests (you would still want to flush before thread shutdown to guarantee no messages are dropped)api
大致上说明了两点:缓存
1 多进程共享同一个生产者实例有问题服务器
2 send方法是异步的,当执行完send后当即关闭生产者实例的话可能会致使发送失败。app
第二点错误我没有犯,沾沾自喜,继续看评论:
Aha, thanks! After looking more closely at uWSGI options I discovered the lazy-apps option, which causes each worker to load the entire app itself. This seems to have resolved my issue.
提问者说他解决了该问题,因而查一查uwsgi中的lazy-apps,发现改文章:https://uwsgi-docs-zh.readthedocs.io/zh_CN/latest/articles/TheArtOfGracefulReloading.html#preforking-vs-lazy-apps-vs-lazy,其中说到:
默认状况下,uWSGI在第一个进程中加载整个应用,而后在加载完应用以后,会屡次 fork() 本身。
我看看了我本身的代码我确实是在app生成以前生成了生产者实例,这就致使该实例被父进程与其子进程共享。问题终于明白,开始解决:
1 使用lazy-apps,这样就能够了。
2 不使用lazy-apps,在代码层面解决问题:
# producer.py文件 import json from kafka import KafkaProducer class Single(object): """单例模式""" def __new__(cls, *args, **kwargs): if not hasattr(cls, "_instance"): cls._instance = super().__new__(cls) if hasattr(cls, "initialize"): cls._instance.initialize(*args, **kwargs) return cls._instance class MsgQueue(Single): """ 这个整成单例模式是由于:uwsgi配合kafka-python在多进程下会有问题,这里但愿每一个进程单独享有一个kafka producer实例, 也就是说当初始化app对象后,并不会生成producer实例,而是在运行时再生成, 具体参考:https://github.com/dpkp/kafka-python/issues/721 """ app = None def initialize(self): self.producer = KafkaProducer(bootstrap_servers=self.app.config["MQ_URI"], api_version=self.app.config["KAFKA_API_VERSION"]) @classmethod def init_app(cls, app): cls.app = app def send(self, topic, data): """ :param topic: :param data: :return: """ data = json.dumps(data, ensure_ascii=True) self.producer.send(topic, data.encode()) # app.py文件 from producer import MsgQueue ... MsgQueue.init_app(app) # 业务逻辑中用到生产者的文件 from producer import MsgQueue ... MsgQueue().send(msg)