前提:kafka有同步,多线程,gevent异步和rdkafka异步四种模式。可是在与celery和gevent连用的时候,有的模式会出错。json
下面是我代码运行的结果。多线程
结论:使用多线程方式!异步
使用同步方式能够成功发送数据spa
def send_data_kafka(data): try: client = KafkaClient(hosts=broker_list) topic = client.topics[topic_name] with topic.get_sync_producer() as producer: for d in data: print "send data" msg = json.dumps(d) producer.produce(msg) producer.stop() except Exception, e: LOGGER.exception("error in send_data_kafka") print e
使用rdkafka异步,只打印了一条send data以后卡住线程
def send_data_kafka(data): try: client = KafkaClient(hosts=broker_list) topic = client.topics[topic_name] with topic.get_producer(use_rdkafka=True) as producer: for d in data: print "send data" msg = json.dumps(d) producer.produce(msg) producer.stop() except Exception, e: LOGGER.exception("error in send_data_kafka") print e
使用多线程,能够正常生产全部数据code
def send_data_kafka(data): try: client = KafkaClient(hosts=broker_list) topic = client.topics[topic_name] with topic.get_producer() as producer: for d in data: print "send data" msg = json.dumps(d) producer.produce(msg) producer.stop() except Exception, e: LOGGER.exception("error in send_data_kafka") print e
没有用with,rdkafka异步,打印了全部的send data,后卡住blog
client = KafkaClient(hosts=broker_list) topic = client.topics[topic_name] producer = topic.get_producer(use_rdkafka=True) # 异步,使用rdkafka库,速度最快的方案 def send_data_kafka(data): try: for d in data: print "send data" msg = json.dumps(d) producer.produce(msg) producer.stop() except Exception, e: LOGGER.exception("error in send_data_kafka") print e