架构、分布式、日志队列,标题本身都看着唬人,其实就是一个日志收集的功能,只不过中间加了一个Kafka作消息队列罢了。java
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它能够处理消费者规模的网站中的全部动做流数据。 这种动做(网页浏览,搜索和其余用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据一般是因为吞吐量的要求而经过处理日志和日志聚合来解决。git
Kafka是一种高吞吐量的分布式发布订阅消息系统,有以下特性:redis
发布和订阅消息流,这个功能相似于消息队列,这也是kafka归类为消息队列框架的缘由spring
以容错的方式记录消息流,kafka以文件的方式来存储消息流apache
能够再消息发布的时候进行处理json
在系统或应用程序之间构建可靠的用于传输实时数据的管道,消息队列功能服务器
构建实时的流数据处理程序来变换或处理数据流,数据处理功能网络
Linux、JDK、Zookeepersession
wget https://archive.apache.org/dist/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz
tar -zxvf kafka_2.11-0.10.0.1.tgz cd kafka_2.11-0.10.0.1
bin 启动,中止等命令 config 配置文件 libs 类库
#########################参数解释############################## broker.id=0 #当前机器在集群中的惟一标识,和zookeeper的myid性质同样 port=9092 #当前kafka对外提供服务的端口默认是9092 host.name=192.168.1.170 #这个参数默认是关闭的 num.network.threads=3 #这个是borker进行网络处理的线程数 num.io.threads=8 #这个是borker进行I/O处理的线程数 log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录能够配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,若是配置多个目录,新建立的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个 socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一会儿就发送的,先回存储到缓冲区了到达必定的大小后在发送,能提升性能 socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达必定大小后在序列化到磁盘 socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小 num.partitions=1 #默认的分区数,一个topic默认1个分区数 log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天 message.max.byte=5242880 #消息保存的最大值5M default.replication.factor=2 #kafka保存消息的副本数,若是一个副本失效了,另外一个还能够继续提供服务 replica.fetch.max.bytes=5242880 #取消息的最大直接数 log.segment.bytes=1073741824 #这个参数是:由于kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件 log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过时的消息若是有,删除 log.cleaner.enable=false #是否启用log压缩,通常不用启用,启用的话能够提升性能 zookeeper.connect=192.168.1.180:12181,192.168.1.181:12181,192.168.1.182:1218 #设置zookeeper的链接端口、若是非集群配置一个地址便可 #########################参数解释##############################
启动kafka以前要启动相应的zookeeper集群、自行安装,这里不作说明。数据结构
#进入到kafka的bin目录 ./kafka-server-start.sh -daemon ../config/server.properties
spring-boot、elasticsearch、kafka
<!-- kafka 消息队列 --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.1.RELEASE</version> </dependency>
import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; /** * 生产者 * 建立者 科帮网 * 建立时间 2018年2月4日 */ @Configuration @EnableKafka public class KafkaProducerConfig { @Value("${kafka.producer.servers}") private String servers; @Value("${kafka.producer.retries}") private int retries; @Value("${kafka.producer.batch.size}") private int batchSize; @Value("${kafka.producer.linger}") private int linger; @Value("${kafka.producer.buffer.memory}") private int bufferMemory; public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); } }
mport java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; /** * 消费者 * 建立者 科帮网 * 建立时间 2018年2月4日 */ @Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.consumer.servers}") private String servers; @Value("${kafka.consumer.enable.auto.commit}") private boolean enableAutoCommit; @Value("${kafka.consumer.session.timeout}") private String sessionTimeout; @Value("${kafka.consumer.auto.commit.interval}") private String autoCommitInterval; @Value("${kafka.consumer.group.id}") private String groupId; @Value("${kafka.consumer.auto.offset.reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setPollTimeout(1500); return factory; } public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return propsMap; } @Bean public Listener listener() { return new Listener(); } }
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import com.itstyle.es.common.utils.JsonMapper; import com.itstyle.es.log.entity.SysLogs; import com.itstyle.es.log.repository.ElasticLogRepository; /** * 扫描监听 * 建立者 科帮网 * 建立时间 2018年2月4日 */ @Component public class Listener { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private ElasticLogRepository elasticLogRepository; @KafkaListener(topics = {"itstyle"}) public void listen(ConsumerRecord<?, ?> record) { logger.info("kafka的key: " + record.key()); logger.info("kafka的value: " + record.value()); if(record.key().equals("itstyle_log")){ try { SysLogs log = JsonMapper.fromJsonString(record.value().toString(), SysLogs.class); logger.info("kafka保存日志: " + log.getUsername()); elasticLogRepository.save(log); } catch (Exception e) { e.printStackTrace(); } } } }
/** * kafka 日志队列测试接口 */ @GetMapping(value="kafkaLog") public @ResponseBody String kafkaLog() { SysLogs log = new SysLogs(); log.setUsername("红薯"); log.setOperation("开源中国社区"); log.setMethod("com.itstyle.es.log.controller.kafkaLog()"); log.setIp("192.168.1.80"); log.setGmtCreate(new Timestamp(new Date().getTime())); log.setExceptionDetail("开源中国社区"); log.setParams("{'name':'码云','type':'开源'}"); log.setDeviceType((short)1); log.setPlatFrom((short)1); log.setLogType((short)1); log.setDeviceType((short)1); log.setId((long)200000); log.setUserId((long)1); log.setTime((long)1); //模拟日志队列实现 String json = JsonMapper.toJsonString(log); kafkaTemplate.send("itstyle", "itstyle_log",json); return "success"; }
以前简单的介绍过,JavaWeb项目架构之Redis分布式日志队列,有小伙伴们聊到, Redis PUB/SUB没有任何可靠性保障,也不会持久化。固然了,原项目中仅仅是记录日志,并非十分重要的信息,能够有必定程度上的丢失
Kafka与Redis PUB/SUB之间最大的区别在于Kafka是一个完整的分布式发布订阅消息系统,而Redis PUB/SUB只是一个组件而已。