是什么?java
1) Apache Kafka 是一个消息队列(生产者消费者模式)node
2) Apache Kafka 目标:构建企业中统一的、高通量、低延时的消息平台。mysql
3) 大多的是消息队列(消息中间件)都是基于JMS标准实现的,Apache Kafka 相似于JMS的实现。git
有什么用?github
1) 做为缓冲,来异构、解耦系统。 redis
消息持久化(Kafka 基于文件系统来存储和缓存消息)。算法
高吞吐量(Kafka 将数据写到磁盘,可是在底层采用了零拷贝技术,因此速度比较快)。spring
高扩展性(Kafka 依赖ZooKeeper来对集群进行协调管理,同时在机器扩展时无需将整个集群停机)。sql
多客户端支持(Kafka 核心模块用Scala 语言开发,但提供多种开发语言接入,包括Java,Python等)。shell
安全机制(支持代理与ZooKeeper 链接身份验证,客户端读、写权限认证)。
数据备份(Kafka 能够为每一个主题指定副本数,对数据进行持久化备份)。
轻量级(Kafka 的实例是无状态的,同时集群自己几乎不须要生产者和消费者的状态信息)。
消息压缩(Kafka 支持Gzip, Snappy 、LZ4 这3 种压缩方式,把多条消息压缩成MessageSet)。
解耦,异构
Kafka Cluster:由多个服务器组成。每一个服务器单独的名字broker(掮客)。
Kafka Producer:生产者、负责生产数据。
Kafka consumer:消费者、负责消费数据。
Kafka Topic: 主题,一类消息的名称。存储数据时将一类数据存放在某个topci下,消费数据也是消费同样。
ZooKeeper:Kafka的元数据都是存放在zookeeper中。
建立一个topic(主题):
bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic order
启动一个生产者,用来生产数据 :
bin/kafka-console-producer.sh --broker-list kafka01:9092 --topic order
启动给一个消费者,消费数据:
bin/kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic order
JavaAPI操做Kafka所须要的依赖:
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.1</version>
生产者相关操做:
//建立Properties配置参数对象,并设置参数
Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092");
props.put("acks", "all");
//建立一个KafkaProducer,Kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 1000; i++) {
// 发送数据 ,须要一个producerRecord对象,最少参数 String topic, V value
kafkaProducer.send(new ProducerRecord<String, String>("order", "订单信息!"+i));
Thread.sleep(100);
}
}
消费者相关操做:
// 一、建立配置参数对象,并链接集群
Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//二、建立Kafka的消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
//三、订阅一个主题,订阅主题需传入List格式
kafkaConsumer.subscribe(Arrays.asList("order"));
//四、使用死循环不停拉取数据
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : consumerRecords) {
System.out.println("消费的数据为:" + record.value());
}
}
topic相关操做:
因为主题的元数据信息是注册在 ZooKeeper 相应节点之中,因此对主题的操做实质是对ZooKeeper中记录主题元数据信息相关路径的操做。Kafka将对ZooKeeper的相关操做封装成一 个ZkUtils 类,井封装了一个AdrninUtils类调用ZkClient类的相关方法以实现对 Kafka 元数据 的操做,包括对主题、代理、消费者等相关元数据的操做。对主题操做的相关API调用较简单,相应操做都是经过调用AdminUtils类的方法来完成的。
建立topic(通常经常使用方法一):
方法一:
//参数:zookeeper的地址,session超时时间,链接超时时间,是否启用zookeeper安全机制
zkUtils = ZkUtils.apply("node01:2181", 30000, 3000, JaasUtils.isZkSecurityEnabled());
方法二:
//参数:zkUtils,topic名称,partition数量,副本数量,参数,机架感知模式
AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), AdminUtils.createTopic$default$6());
删除topic:
//参数:zkUtils,topic名称,partition数量,副本数量,参数,机架感知模式
AdminUtils.deleteTopic(zkUtils, topicName);
判断是否存在:
AdminUtils.topicExists(zkUtils, topicName);
分区:当数据量很是大的时候,一个服务器存放不了,就将数据分红两个或者多个部分,存放在多台服务器上。每一个服务器上的数据,叫作一个分区。
副本:当数据只保存一份时,有丢失风险。为了更好的容错和容灾,将数据拷贝几份,保存到其余机器上。
设置分区和副本的方法:
控制台上:--replication-factor 1 --partitions 3
API代码:AdminUtils.createTopic(zkUtils, topicName, 3, 1, new Properties(),
AdminUtils.createTopic$default$6());
生产者消息不丢失机制:
发送消息的同步和异步模式:
同步模式:生产者重试3次,若是尚未响应,就报错。生产者等待10S,若是broker没有给出ack响应,就认为失败。
异步模式:先将数据保存在生产者端的buffer中。Buffer大小是2万条。发送一批数据的大小是500条。知足数据阈值或者数量阈值其中的一个条件就能够发送数据。
消息确认的三个状态:
0状态:生产者只负责发送数据,无论Kafka的broker是否接收到数据;
1状态:某个partition的leader收到数据给出响应;
-1状态:某个partition的全部副本都收到数据后给出响应
Borker端消息不丢失机制:
消费者端消息不丢失:
若是有一个外部存储可以记录每一个consumer消费partition的offset值。就不会形成数据丢失,只会有重复消费的可能。而在Kafka0.8之后,offset值能够存放到Kafka内置的topic中。
Kafka做为消息中间件,只负责消息的临时存储,并非永久存储,须要删除过时的数据;
若是一个partition中有10T数据,是如何存放的?是存放在一个文件中,仍是存放在多个文件中?
Kafka时采用存储到多个文件中的方式。由于若是将全部数据都存放在一个文件中,须要删除过时数据的时候,就比较麻烦。由于文件有日期属性,删除过时数据,只须要根据文件的日期属性删除就好。
Kafka的数据是存储在/export/data/kafka(能够本身设置)目录下,存储时是将数据划分为一个个的segment段,在segment段中有两个核心的文件,一个是log,一个是index。当log文件等于1G时,新的会写入到下一个segment中。
在Kafka中进行消息查询时,首先会查找segment中的index索引文件,index索引文件是以起始来命名的,根据查询索引文件能很快的定位到具体文件。
当根据index索引文件定位到须要查询的具体文件时,就会去查找log文件,在该文件中按顺序查找到目标文件
kafka在数据生产的时候,有一个数据分发策略。默认的状况使用DefaultPartitioner.class类。若是用户制定了partition,生产就不会调用DefaultPartitioner.partition()方法。
当用户指定key,就会使用hash算法来肯定发往那个patition。若是key一直不变,同一个key算出来的hash值是个固定值。若是是固定值,这种hash取模就没有意义。
例:Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions
还能够指定将数据发往哪一个partition。当ProducerRecord 的构造参数中有partition的时候,就能够发送到对应partition上。
例:public ProducerRecord(String topic, Integer partition, K key, V value)
若是既没有指定partition,也没有key的状况下,那就使用轮询的方式发送数据。
一个partition只能被一个组中的成员消费。因此若是消费组中有多于partition数量的消费者,那么必定会有消费者没法消费数据。若是消费组中的消费组小于partition,那么消费的数据就不完整,会形成错误。
概述:Spring对kafka作了支持,以便简化咱们的开发工做,官网:https://spring.io/projects/spring-kafka
依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.0.6.RELEASE</version>
</dependency>
第一步:编写application-kafka-producer.xml文件
第二步:编写java代码 TestSpringKafkaProducer
//类上方添加以下注释 @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(value = {"classpath:application-kafka-producer.xml"}) //注入一个KafkaTemplate对象 @Autowired private KafkaTemplate kafkaTemplate; //使用注入的对象发送数据到Kafka(发送的数据能够是对象,会自动进行json转换) kafkaTemplate.sendDefault(order);
第一步:编写application-kafka-consumer.xml文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <bean id="consumerConfig" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="node01:9092,node02:9092"/> <entry key="group.id" value="my-group-spring-spring-3"/> <entry key="client.id" value="my-test-client-spring-3"/> <entry key="enable.auto.commit" value="true"/> <entry key="auto.commit.interval.ms" value="1000"/> <!--反序列化器,这里要注意设置的是字符串的反序列化--> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> </map> </constructor-arg> </bean> <!-- 定义消费者的工厂 --> <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg ref="consumerConfig"/> </bean> <!--定义消息监听器,用于接收消息--> <bean id="myMessageListener" class="cn.itcast.kafka.MyMessageListener"/> <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> <!--设置消费的topic,这里能够指定多个topic--> <constructor-arg value="my-kafka-topic" type="java.lang.String[]"/> <property name="messageListener" ref="myMessageListener"/> </bean> <!--建立Listener容器--> <bean class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="start"> <constructor-arg index="0" ref="consumerFactory"/> <constructor-arg index="1" ref="containerProperties"/> </bean> </beans>
第二步:建立一个类,注入上述配置文件便可接收
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(value = {"classpath:application-kafka-consumer.xml"}) public class TestSpringKafkaConsumer { @Test public void testConsumer() { } }
Kafka Manager 由 yahoo 公司开发,该工具能够方便查看集群 主题分布状况,同时支持对 多个集群的管理、分区平衡以及建立主题等操做。
Kafka和Kafka-manager的详细安装和部署,请详看其安装部署文件。
启动Kafka-manager:
cd /export/servers/kafka-manager-1.3.3.17/bin ./kafka-manager -Dconfig.file=../conf/application.conf
官网:http://storm.apache.org/ 源码:https://github.com/apache/storm
Storm是一个开源免费的分布式实时计算系统,Storm能够轻松的处理无界的数据流。
Storm只负责数据的计算,不负责数据的存储。
2013年先后,阿里巴巴基于storm框架,使用java语言开发了相似的流式计算框架佳做,Jstorm。2016年年末阿里巴巴将源码贡献给了Apache storm,两个项目开始合并,新的项目名字叫作storm2.x。阿里巴巴团队专一flink开发。
架构说明:
在集群架构中,用户提交到任务到storm,交由nimbus处理。
nimbus经过zookeeper进行查找supervisor的状况,而后选择supervisor进行执行任务。
supervisor会启动一个woker进程,在worker进程中启动线程进行执行具体的业务逻辑。
Spout:Spout继承BaseRichSpout,其中有三个方法:open(用来进行初始化),nextTuple(storm框架会不断调用该方法进行执行,向下游发送数据),declareOutputFields(定义向下游发送的数据的名称,定义的名称和发送的数据的顺序要一致)。
Bolt:Bolt继承BaseRichBolt,其中有三个方法:prepare(初始化操做,只会执行一次),execute(storm框架会不断调用该方法进行执行,处理业务逻辑,向下游发送数据),declareOutputFields(定义向下游发送的数据的名称,定义的名称和发送的数据的顺序要一致)。
Tuple:一次消息传递的基本单元。原本应该是一个key-value的map,可是因为各个组件间传递的tuple的字段名称已经事先定义好,因此tuple中只要按序填入各个value就好了,因此就是一个value list.
Stream:源源不断传递的tuple就组成了stream。
Topology:Storm中运行的一个实时应用程序,由于各个组件间的消息流动造成逻辑上的一个拓扑结构。
建立Topology的步骤:
第一步,建立一个TopologyBuilder;
第二步,向TopologyBuilder中设置Spout和Bolt,而且进行链接;
第三步,topologyBuilder.createTopology()获得Topology对象;
第四步,提交到Storm进行运行(本地模式:localCluster.submitTopology(),集群模式:
StormSubmitter.submitTopology());
由于频繁切换本地和集群模式太麻烦了,当进行Topology优化后,本地和集群就能够相同代码,当启动时,能够根据main方法中args作出判断,若是args没有参数,说明是本地模式,若是有参数,说明是集群模式。
优化以后,本地模式和集群模式可使用相同的代码,不要进行切换,并且在集群模式下,能够对Topology进行自定义名称。
因为worker和supervisor分离,当worker宕机,但supervisor能运行时,supervisor会尝试从新启动worker
若是supervisor也宕机了,那nimbus会从新分配其它的supervisor进行执行
若是只有supervisor宕机,但其下面的worker没有宕机时,那worker会正常工做,不会有影响
若是supervisor及其下面的worker宕机,首先会将分配给该机器的任务暂停,而且nimbus会从新分配机器来执行该节点上的任务
因此须要借助外部的监控手段来保障supervisor的高可用
当nimbus宕机时,不会影响任务的执行,将影响的是,任务的提交,不能再向集群提交任何任务
因此须要配置nimbus的高可用:能够配置、启动多个nimbus来保障;也能够须要借助外部的监控手段来保障nimbus的高可用
有些时候咱们须要将Storm计算完的数据持久化到数据库,因此须要在Storm中整合JDBC进行持久化。
官方文档:https://github.com/apache/storm/blob/master/docs/storm-jdbc.md
引入storm-jdbc和mysql驱动依赖
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.34</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-jdbc</artifactId> <version>1.1.1</version> </dependency>
使用JdbcInsertBolt进行整合的步骤
public static IRichBolt build() { //第一步:定义数据库链接信息,包括数据库驱动和数据库地址,用户名,密码等 Map<String, Object> hikariConfigMap = new HashMap<String, Object>(); hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource"); hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost:3306/storm"); hikariConfigMap.put("dataSource.user", "root"); hikariConfigMap.put("dataSource.password", "root"); ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap); // 第二步:定义表名,以及定义字段的映射,这里指定的是tupe中的字段名称,用于获取数据 String tableName = "tb_wordcount"; List<Column> columnSchema = Lists.newArrayList( new Column("word", Types.VARCHAR), new Column("count", Types.INTEGER)); // 第三步:定义jdbc的映射器 JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema); //第四步: 定义插入数据的Bolt,而且指定了插入的sql语句 JdbcInsertBolt wordCountBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper) .withInsertQuery("INSERT INTO `tb_wordcount` VALUES (NULL, ?, ?, NOW())") .withQueryTimeoutSecs(30); //第五步:将结果进行返回 return wordCountBolt; }
//整合到TopologyBuilder中 topologyBuilder.setBolt("JdbcBolt",JdbcBoltBuilder.build()).shuffleGrouping("WordBolt");
Storm和Redis整合是很是经常使用的场景,Storm也支持了Redis的支持。
官方文档:https://github.com/apache/storm/blob/master/docs/storm-redis.md
引入依赖
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-redis</artifactId> <version>1.1.1</version> </dependency>
建立WordCountStoreMapper
public class WordCountStoreMapper implements RedisStoreMapper { private RedisDataTypeDescription redisDataTypeDescription; public WordCountStoreMapper() { // 定义Redis中的数据类型 this.redisDataTypeDescription = new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING); } @Override public RedisDataTypeDescription getDataTypeDescription() { return this.redisDataTypeDescription; } @Override public String getKeyFromTuple(ITuple iTuple) { // 生成redis中的key String word = iTuple.getStringByField("word"); return "wordCount:" + word; } @Override public String getValueFromTuple(ITuple iTuple) { // 存储到redis中的值 Integer count = iTuple.getIntegerByField("count"); return String.valueOf(count); } }
整合到Topology中
//建立Redis的链接参数 JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() .setHost("node01").setPort(6379).build(); //根据Redis的链接参数和Redis的Mapper类,建立Redis的RedisStoreBolt对象,并将该对象整合到Topology中 topologyBuilder.setBolt("RedistBolt", new RedisStoreBolt(poolConfig, new WordCountStoreMapper())).localOrShuffleGrouping("WordCountBolt");
Storm与Kafka的整合也是很是常见的,经常用于数据读取,因此咱们更多的是要关注Spout。
官方文档:https://github.com/apache/storm/blob/master/docs/storm-kafka-client.md
引入依赖
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> <version>1.1.1</version> </dependency>
整合KafkaSpout到Topology
//已知存在kafka的一个topic,该topic有3个分区,2个副本 //第一步,定义TopologyBuilder对象,用于构建拓扑 TopologyBuilder topologyBuilder = new TopologyBuilder(); //第二步,设置spout和bolt,建立kafka的Spout,并链接到kafka的模板topic上,并设置消费者id KafkaSpoutConfig.Builder<String, String> kafkaSpoutBuilder = KafkaSpoutConfig.builder("node01:9092", "kafka-storm-topic"); kafkaSpoutBuilder.setGroupId("kafka-storm-topic-consumer-groupid"); //设置消费者组id // 这里设置Spout的并行度为3,缘由是建立topic时,指定的partition为3 topologyBuilder.setSpout("kafka_spout", new KafkaSpout<>(kafkaSpoutBuilder.build()),3); //设置一个bolt,并指定从kafka中获取数据的spout为这个bolt的上游 topologyBuilder.setBolt("SplitSentenceBolt", new SplitSentenceBolt()).localOrShuffleGrouping("kafka_spout");
当上游为Kafka的Spout时,下游Bolt获取值的方法
经过KafkaSpout向下游发送的Tupe的方式是这样的:collect.emti();
因此,咱们须要经过value获取值:String sentence = input.getStringByField("value");
在Storm中,当Spout、Bolt是多个时,那上游的Spout或Bolt不知道将数据传入下游的哪一个Bolt时,就须要流分组
在Storm中,提供了8种流分组方式,其中3中常用
随机分组(Shuffle grouping)
随机分发tuple到Bolt的任务,保证每一个任务得到相等数量的tuple。 跨服务器通讯,浪费网络资源,尽可能不用
本地或随机分组(Local or shuffle grouping)(重点掌握,经常使用)
优先将数据发送到本地的Task,节约网络通讯的资源
部分关键字分组(Partial Key grouping)
字段分组(Fields Grouping )(了解,仅用于WordCount案例)
在不一样的worker间进行数据传输时,会产生通讯(相同主机的进程通讯,不一样主机需经过网络通讯)
Disruptor是一个Queue。Disruptor是实现了“队列”的功能,并且是一个有界队列。而队列的应用场景天然就是“生产者-消费者”模型。
Disruptor的核心有3个:第一,维护生产者序号;第二,消费者序号;第三,数组的长度。
缘由一:Disruptor 没有使用锁机制;
缘由二:Disruptor 有一个相似中间数组的组件,生产者往这个组件中放入数据,消费者取出数据
Storm的消息不丢失机制核心是acker。当Spout或Bolt处理完数据后,会标记状态到acker中,而后经过异或算法,计算出是否有失败的状况。结果为0就表示执行成功,结果不为0就表示失败。
当结果为0时,acker会通知Spout执行成功,从而调用Spout的ack()方法,此时能够在ack()方法中执行操做。
当结果不为0时,acker会通知Spout失败,从而调用它的fail()方法,此时能够在该方法中从新执行。
Spout实现:在Spout中需实现ack和fail方法,而且在Spout中发送消息时须要发送消息id(惟一,实现以下)
// 生成消息id,而且把数据存放到messages的map中,若是执行失败能够在fail方法中调用messages中的数据从新执行 String msgId = UUID.randomUUID().toString(); messages.put(msgId, sentence); //向下游输出 this.collector.emit(new Values(sentence),msgId);
Bolt中的实现:若是执行成功就调用ack方法,若是执行失败就调用fail方法
public void execute(Tuple input) { // 经过Tuple的getValueByField获取上游传递的数据,其中"sentence"是定义的字段名称 String sentence = input.getStringByField("sentence"); // 将获取的数据进行处理 // 向下游输出数据,需加入锚点 for (String word : words) { // 注意这里,须要将原始数据input传入,在这里称之为锚点,意思是将新的数据和原有数据进行关联 this.collector.emit(input,new Values(word)); } // 若是处理成功,执行ack方法,若是处理失败,执行fail方法,并将传入的input传入该方法中 this.collector.ack(input); this.collector.fail(input); }
从上看出,若是执行失败需手动调用方法,很不方便,因此咱们能够继承BaseBasicBolt类,这样当执行失败时就不须要手动调用方法了,只须要抛出FailedException便可。请注意:就是继承BaseBasicBolt类,在Spout中仍是需重写ack和fail方法,这是当执行成功或失败后会调用的方法。
好处一:若是执行失败,不须要手动调用fail和ack方法,只须要使用try...catch包裹执行代码,而后抛出FailedException异常便可。
好处二:向下游输出数据时,不须要加入锚点,只须要将目标数据加入。
好处三:能够在Topology驱动类中设置acker(消息不丢失机制)是否启动,设置方法以下
Config.setNumAckers(conf, ackerParal);
首先在Spout中需将发送到下游的数据进行记录(能够记录到一个Map中)。若是该数据执行成功,能够将Map中的该数据删除(this.msgData.remove(msgId);)。若是该数据执行失败,能够在fail方法中从新执行该数据,就是将Map中对应的数据取出,而后从新发送到下游执行。
若是执行失败,最好设置从新执行的限制条件:需限制失败的重试次数,若是重试次数超出,需将该数据记录下来(能够保存到数据库等本地文件中),同时删除内存中的数据(Map集合中的该数据);在进行重试时,须要有停顿,不能失败后立刻执行(有多是网络问题),须要有计数,不能无限执行。
为了解决信息过载和用户无明确需求的问题,找到用户感兴趣的物品,才有了个性化推荐系统。
推荐系统普遍存在于各种网站中,做为一个应用为用户提供个性化的推荐。它须要一些用户的历史数据,通常由三个部分组成:基础数据、推荐算法系统、前台展现。
基础数据包括不少维度,包括用户的访问、浏览、下单、收藏,用户的历史订单信息,评价信息等不少信息;
推荐算法系统主要是根据不一样的推荐诉求由多个算法组成的推荐模型;
前台展现主要是对客户端系统进行响应,返回相关的推荐信息以供展现。
迄今为止,在个性化推荐系统中,协同过滤技术是应用最成功的技术。目前国内外有许多大型网站应用这项技术为用户更加智能(个性化、千人千面)的推荐内容。
核心思想:协同过滤通常是在海量的用户中发掘出一小部分和你品位比较相似的,在协同过滤中,这些用户成为邻居,而后根据他们喜欢的其余东西组织成一个排序的目彔做为推荐给你。
问题:如何肯定一个用户是丌是和你有类似的品位?如何将邻居们的喜爱组织成一个排序的目彔?
在社交项目中,如微信、QQ,显然选择基于用户推荐比较好,由于推荐每每都是和人相关的。
如:在QQ登陆后,会有提示,好友的好友多是你认识的,推荐给你添加好友。
在电商项目中,用户的数量远大于商品数量,因此基于商品的推荐的复杂度要低,并且也比较合理。
其实,在实际的推荐系统中,每每不单是使用一种推荐,而是会多种推荐混合使用。
因此,选择基于用户仍是基于商品的推荐,和应用场景有很大的关系。
不管是基于用户仍是基于商品的推荐,都是须要找到类似的用户或者商品,才能作推荐,因此,类似度算法就变得很是重要了。
常见的类似度算法有:
欧几里德距离算法(Euclidean Distance)
皮尔逊类似度算法(Pearson Correlation Coefficient)
基于夹角余弦类似度算法(Consine Similarity)
基于Tanimoto系数类似度(Tanimoto Coefficient)
经过类似度计算,能够计算出邻居,问题来了,咱们若是选取出几个邻居做为参考,进行推荐呢?
一般有2种方式:
固定数量的邻居:K-neighborhoods
基于类似度门槛的邻居:Threshold-based neighborhoods
Mahout使用了Taste来提升协同过滤算法的实现,它是一个基于Java实现的可扩展的,高效的推荐引擎。Taste既实现了最基本的基于用户的和基于内容的推荐算法,同时也提供了扩展接口,使用户能够方便的定义和实现本身的推荐算法。同时,Taste不只仅只适用于Java应用程序,它能够做为内部服务器的一个组件以HTTP和Web Service的形式向外界提供推荐的逻辑。Taste的设计使它能知足企业对推荐引擎在性能、灵活性和可扩展性等方面的要求。
2008年成为Lucene的子项目,Lucene做为搜索引擎项目,存在不少文本数据分析和挖掘的需求(例如文本重复判断,文本自动分类等等),致使Lucene项目中部分开发者转向机器学习领域研究算法,最终这些机器学习算法造成最初的Mahout
吸取开源协同过滤算法项目Taste
2010年成为Apache顶级项目
DataModel 是用户喜爱信息的抽象接口,它的具体实现支持从任意类型的数据源抽取用户喜爱信息。Taste 默认提供 JDBCDataModel 和 FileDataModel,分别支持从数据库和文件中读取用户的喜爱信息。
DataModel接口的部分实现: org.apache.mahout.cf.taste.impl.model.GenericDataModel org.apache.mahout.cf.taste.impl.model.GenericBooleanPrefDataModel org.apache.mahout.cf.taste.impl.model.PlusAnonymousUserDataModel org.apache.mahout.cf.taste.impl.model.file.FileDataModel org.apache.mahout.cf.taste.impl.model.hbase.HBaseDataModel
UserSimilarity 和 ItemSimilarity 。UserSimilarity 用于定义两个用户间的类似度,它是基于协同过滤的推荐引擎的核心部分,能够用来计算用户的“邻居”,这里咱们将与当前用户口味类似的用户称为他的邻居。ItemSimilarity 相似的,计算Item之间的类似度。
UserSimilarity 和 ItemSimilarity 类似度实现有如下几种: CityBlockSimilarity :基于Manhattan距离类似度 EuclideanDistanceSimilarity :基于欧几里德距离计算类似度 LogLikelihoodSimilarity :基于对数似然比的类似度 PearsonCorrelationSimilarity :基于皮尔逊相关系数计算类似度 SpearmanCorrelationSimilarity :基于皮尔斯曼相关系数类似度 TanimotoCoefficientSimilarity :基于谷本系数计算类似度 UncenteredCosineSimilarity :计算 Cosine 类似度
UserNeighborhood 用于基于用户类似度的推荐方法中,推荐的内容是基于找到与当前用户喜爱类似的邻居用户的方式产生的。UserNeighborhood 定义了肯定邻居用户的方法,具体实现通常是基于 UserSimilarity 计算获得的。
UserNeighborhood 主要实现有两种: NearestNUserNeighborhood:对每一个用户取固定数量N个最近邻居 ThresholdUserNeighborhood:对每一个用户基于必定的限制,取落在类似度限制之内的全部用户为邻居
Recommender 是推荐引擎的抽象接口,Taste 中的核心组件。程序中,为它提供一个 DataModel,它能够计算出对不一样用户的推荐内容。实际应用中,主要使用它的实现类 GenericUserBasedRecommender 或者 GenericItemBasedRecommender,分别实现基于用户类似度的推荐引擎或者基于内容的推荐引擎。
Recommender分为如下几种实现: GenericUserBasedRecommender:基于用户的推荐引擎 GenericBooleanPrefUserBasedRecommender:基于用户的无偏好值推荐引擎 GenericItemBasedRecommender:基于物品的推荐引擎 GenericBooleanPrefItemBasedRecommender:基于物品的无偏好值推荐引擎
RecommenderEvaluator有如下几种实现: AverageAbsoluteDifferenceRecommenderEvaluator :计算平均差值 RMSRecommenderEvaluator :计算均方根差
@Test public void testBaseUser() throws Exception { String fileName = "user_item.data"; File file = FileUtils.toFile(TestMahout.class.getClassLoader().getResource(fileName)); // 第一步,定义数据模型 DataModel dataModel = new FileDataModel(file); // 第二步,定义相识度,这里使用的欧几里得 UserSimilarity userSimilarity = new EuclideanDistanceSimilarity(dataModel); // 第三步,定义最近邻域,这里使用的是固定数量的邻居 UserNeighborhood userNeighborhood = new NearestNUserNeighborhood(10, userSimilarity, dataModel); long[] longs = userNeighborhood.getUserNeighborhood(1); for (long aLong : longs) { System.out.println(aLong); } // 第四步,定义推荐器,这里使用的是基于用户的推荐 Recommender recommender = new GenericUserBasedRecommender(dataModel, userNeighborhood, userSimilarity); LongPrimitiveIterator userIDs = dataModel.getUserIDs(); while (userIDs.hasNext()) { Long userId = userIDs.next(); List<RecommendedItem> recommendedItemList = recommender.recommend(userId, 4); StringBuffer sb = new StringBuffer(); for (RecommendedItem item : recommendedItemList) { sb.append(item.getItemID() + "|"+item.getValue()+","); } System.out.println(userId + "-->" + sb); } }
@Test public void testBaseItem() throws Exception { String fileName = "user_item.data"; File file = FileUtils.toFile(TestMahout.class.getClassLoader().getResource(fileName)); // 第一步,定义数据模型 DataModel dataModel = new FileDataModel(file); // 第二步,定义相识度,这里使用的欧几里得 ItemSimilarity itemSimilarity = new EuclideanDistanceSimilarity(dataModel); // 第三步,定义推荐器,这里使用的是基于用户的推荐 Recommender recommender = new GenericItemBasedRecommender(dataModel, itemSimilarity); LongPrimitiveIterator userIDs = dataModel.getUserIDs(); while (userIDs.hasNext()) { Long userId = userIDs.next(); List<RecommendedItem> recommendedItemList = recommender.recommend(userId, 2); StringBuffer sb = new StringBuffer(); for (RecommendedItem item : recommendedItemList) { sb.append(item.getItemID() + "|"+item.getValue()+","); } System.out.println(userId + "-->" + sb); } }
@Test public void testBaseItem2() throws Exception { String fileName = "user_item.data"; File file = FileUtils.toFile(TestMahout.class.getClassLoader().getResource(fileName)); // 第一步,定义数据模型 DataModel dataModel = new FileDataModel(file); // 第二步,定义相识度,这里使用的欧几里得 ItemSimilarity itemSimilarity = new EuclideanDistanceSimilarity(dataModel); // 第三步,定义推荐器,这里使用的是基于用户的推荐 GenericItemBasedRecommender recommender = new GenericItemBasedRecommender(dataModel, itemSimilarity); LongPrimitiveIterator userIDs = dataModel.getUserIDs(); //用户1,正在浏览103商品,进行推荐 Long userId = 1L; Long itemId = 103L; List<RecommendedItem> recommendedItemList = recommender.recommendedBecause(userId,itemId,2); StringBuffer sb = new StringBuffer(); for (RecommendedItem item : recommendedItemList) { sb.append(item.getItemID() + "|"+item.getValue()+","); } System.out.println(userId + "-->" + sb); }
提交到hadoop运行,须要有2步操做:
第一步,将须要计算的数据上传到hdfs
第二步,经过hadoop执行mahout-examples-0.13.0-job.jar中的RecommenderJob类
第三步,在输出结果文件中查看结果
hadoop jar mahout-examples-0.13.0-job.jar org.apache.mahout.cf.taste.hadoop.item.RecommenderJob --input /user_item.data --output /cc -s SIMILARITY_EUCLIDEAN_DISTANCE
参数说明:
--input(path)(-i): 存储用户偏好数据的目录,该目录下能够包含一个或多个存储用户偏好数据的文本文件; --output(path)(-o): 结算结果的输出目录 --numRecommendations (integer): 为每一个用户推荐的item数量,默认为10 --usersFile (path): 指定一个包含了一个或多个存储userID的文件路径,仅为该路径下全部文件包含的userID作推荐计算 (该选项可选) --itemsFile (path): 指定一个包含了一个或多个存储itemID的文件路径,仅为该路径下全部文件包含的itemID作推荐计算 (该选项可选) --filterFile (path): 指定一个路径,该路径下的文件包含了[userID,itemID]值对,userID和itemID用逗号分隔。计算结果将不会为user推荐[userID,itemID]值对中包含的item (该选项可选) --booleanData (boolean): 若是输入数据不包含偏好数值,则将该参数设置为true,默认为false --maxPrefsPerUser (integer): 在最后计算推荐结果的阶段,针对每个user使用的偏好数据的最大数量,默认为10 --minPrefsPerUser (integer): 在类似度计算中,忽略全部偏好数据量少于该值的用户,默认为1 --maxSimilaritiesPerItem (integer): 针对每一个item的类似度最大值,默认为100 --maxPrefsPerUserInItemSimilarity (integer): 在item类似度计算阶段,针对每一个用户考虑的偏好数据最大数量,默认为1000 --similarityClassname (classname)(-s): 向量类似度计算类 ([SIMILARITY_COOCCURRENCE, SIMILARITY_LOGLIKELIHOOD, SIMILARITY_TANIMOTO_COEFFICIEN T, SIMILARITY_CITY_BLOCK, SIMILARITY_COSINE, SIMILARITY_PEARSON_CORRELATION , SIMILARITY_EUCLIDEAN_DISTANCE] ) outputPathForSimilarityMatrix:SimilarityMatrix输出目录 --randomSeed:随机种子 –sequencefileOutput:序列文件输出路径 --tempDir (path): 存储临时文件的目录,默认为当前用户的home目录下的temp目录 --startPhase --endPhase --threshold (double): 忽略类似度低于该阀值的item对
补充网站:http://www.javashuo.com/article/p-gzuteolz-mz.html
HBase是一个使用Java语言实现的,构建于Hadoop分布式文件系统(HDFS)上的分布式数据库
。
Hbase是参考谷歌的BigTable的论文开发实现的,Hadoop 生态系统引入了Bigtable的大部分功能。
海量存储:Hbase单表能够有百亿行,百万列,相对计较传统关系型数据库而言,存储能力很是强悍。
列式存储:建立表时,无需指定具体的列,根据数据的插入动态插入;能够针对列进行权限控制和读取。
多版本:能够为数据添加版本信息,如用户信息的logo变动历史。
稀疏性:为空的列不占用实际存储空间。
高扩展、高可用性:底层基于HDFS,高可用和扩展性获得的了保障。
表(table):用于存储管理数据,具备稀疏的、面向列的特色。HBase中的每一张表,就是所谓的大表(Bigtable)。
行键(RowKey):相似于MySQL中的主键,HBase根据行键来快速检索数据,一个行键对应一条记录。与MySQL主键不一样的是,HBase的行键是自然固有的,每一行数据都存在行键。
列族(簇)(ColumnFamily):是列的集合。列族在表定义时须要指定,而列在插入数据时动态指定。列中的数据都是以二进制形式存在,没有数据类型。在物理存储结构上,每一个表中的每一个列族单独以一个文件存储(参见图1.2)。一个表能够有多个列族。
时间戳(TimeStamp):是列的一个属性,是一个64位整数。由行键和列肯定的单元格,能够存储多个数据,每一个数据含有时间戳属性,数据具备版本特性。可根据版本(VERSIONS)或时间戳来指定查询历史版本数据,若是都不指定,则默认返回最新版本的数据。
全局架构:
有此能够看出,Hbase须要依赖于ZooKeeper和HDFS。
Zookeeper
保证任什么时候候,集群中只有一个running master,避免单点问题;
存贮全部Region的寻址入口,包括-ROOT-表地址、HMaster地址;
实时监控Region Server的状态,将Region server的上线和下线信息,实时通知给Master;
存储Hbase的schema,包括有哪些table,每一个table有哪些column family。
Master
能够启动多个HMaster,经过Zookeeper的Master Election机制保证总有一个Master运行。
RegionServer
HBase中最核心的模块,主要负责响应用户I/O请求,向HDFS文件系统中读写数据。
维护Master分配给它的region,处理对这些region的IO请求;
负责切分在运行过程当中变得过大的region。
HDFS
负责存储数据。
经过hbase shell命令进行命令行模式进行操做。
#指定表名,列族名 create 'user' , 'user_info', 'login_info' #建立表 -表名 -列族1名 -列族2名 list #查看全部的表 describe 'user' #查看该表的具体信息
put 'user', '1001', 'user_info:name','张三' put 'user', '1001', 'user_info:address', '上海' put 'user', '1001', 'login_info:user_name', 'zhangsan' put 'user', '1001', 'login_info:password', '123456' put 'user', '1002', 'user_info:name','李四' put 'user', '1002', 'user_info:address', '北京' put 'user', '1002', 'login_info:user_name', 'lisi' put 'user', '1002', 'login_info:password', '123456'
Hbase只支持2种查询数据,单行查询,全表查询。
get 'user', '1001' #查询所有数据 scan 'user' #查询一条数据 scan 'user', {LIMIT => 1}
#删除一行中的一列数据 delete 'user','1002', 'user_info:name' #删除一行数据 deleteall 'user','1002' #清空表 truncate 'user'
#修改用1001的密码为888888,直接put覆盖便可 put 'user', '1001', 'login_info:password', '888888' #删除列族 alter 'user' , {NAME=>'user_info', METHOD => 'delete'} #增长列族 alter 'user', 'user_info', {NAME => 'user_info_2' , VERSIONS => 5}
#删除表以前先要禁用表,再删除 disable 'user' drop 'user'