kafka下载地址 :http://kafka.apache.org/downloads
zookeeper下载地址:https://zookeeper.apache.org/
jdk下载地址:https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
html
一、首先 使用tar命令对jdk进行解压 tar -zxvf tar -zxvf jdk-8u181-linux-x64.tar.gz 目录下面会多出一个jdk1.8.0_181 进入里面去 使用pwd命令查看绝对路径 而且复制找个路径 最后进行jdk环境变量的配置 编辑 vim /etc/profile文件 在文件后面加上: export JAVA_HOME=(刚才pwd命令看到的路径) export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/ lib/ tools.jar export PATH=$PATH:${JAVA_HOME}/bin 最后使用source /etc/profile 刷新文件 使用java -version 查看环境变量是否配置成功 二、成功以后进行zookeeper的安装 使用 tar -zxvf zookeeper-3.4.12.tar.gz 接下下载好的zookeeper安装包 将zookeeper下的/conf/zookeeper.example更名成zoo.cfg 使用mv 和cp命令均可以 而后vim这个文件 加上下面两行 dataLogDir=/tmp/zookeeper-log #日志路径 quorumListenOnAllIPs=true #在阿里云的服务器上保证外网能够访问到 刚开始没设置这个折腾了很久 三、最后,安装kafka 使用 tar -zxvf kafka_2.12-1.1.1.tgz 解压下载好的kafka cd 到解压后的文件里面去 编辑配置文件 vim config/server.properties 加上下面几行 listeners=PLAINTEXT://:9092 advertised.host.name=阿里云服务器公网ip # advertised.port=9092 将zookeeper.connect的值改成阿里云的公网ip
首先cd到zookeeper的bin目录下 使用 ./zkServer.sh start 启动zookeeper 再cd到kafka的bin目录下 使用 ./kafka-server-start.sh ../config/server.properties 启动kafka 新建一个会话或者打开一个新的终端 这时候使用jps命令 能够看到 Kafka和QuorumPeerMain表示启动所有成功,下面建立一个主题 cd到kafka的bin目录下面,执行 ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 -- partitions 1 --topic Hello-world 输出Created topic "Hello-world". 表示topic建立成功 使用./kafka-topics.sh --list --zookeeper localhost:2181 查看主题的列表 输出里面会含有Hello-world 下面进行消息的生产和消费 先启动生产者 ./kafka-console-producer.sh --broker-list 阿里云公网ip:9092 --topic Hello- world 会出现一个 > 相似于交互界面 这时候就能够生产消息了 启动消费者 ./kafka-console-consumer.sh --zookeeper 阿里云公网ip:2181 --topic Hello- world --from-beginning 这时候当生产者生产消息的时候 消费者这边就能够看到了
首先、新建一个Maven工程(此处再也不多描述),在pom文件中加入kafka的依赖 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.0</version> </dependency> 新建一个KafkaProducerDemo和KafkaConsumerDemo类(名字能够自定义): 话很少说 上代码 KafkaProducerDemo类: public class KafkaProducerDemo { public static void main(String[] args) { //建立properties文件 Properties properties = new Properties(); //设置kafka服务器地址 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "阿里云公网ip:9092"); //设置key进行序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //设置value进行序列化 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //建立消息生产者 KafkaProducer<String,String> producer = new KafkaProducer<>(properties); //建立消息实体 制定主题、key、value ProducerRecord<String,String> record = new ProducerRecord<>("Hello-world","haha","from java client"); //发送消息 producer.send(record); System.out.println("消息发送成功"); //关闭生产者 producer.close(); } } KafkaConsumerDemo类: public class KafkaConsumerDemo { public static void main(String[] args) { //新建配置文件 Properties properties = new Properties(); //设置kafka服务器地址 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"阿里云公网ip:9092"); //设置key的反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); //设置value的反序列化 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); //设置groupid properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); //建立消费者对象 KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties); //订阅主题 consumer.subscribe(Arrays.asList("Hello-world")); while (true) { //消费消息 ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.println("消息的主题是:" + record.topic()+",消息的key是:" + record.key()+",消息的value是:"+record.value()); } } } 上面就是链接kafka远程服务器代码
可是上述过程作完以后仍是不能正确运行、这个地方折腾了很久、最后在哪里看到解决的办法记不大清了
就是要阿里云服务器服务安全设置里面加个规则 将2181和9092端口开放就能够,可是我中间也使用命令的方式
关闭了防火墙、没什么用,不知道什么鬼。 搞得我头皮发麻java