【大数据分析经常使用算法】九、马尔科夫模型

简介

这一章节咱们讲解马尔科夫模型。给定一组随机变量(如顾客最近的交易日期),马尔科夫模型只根据前一个状态(前一个最近交易日期)的分部指示该变量最近的分布。html

一、马尔科夫链基本原理

令 $$ S = {S_1,S_2,...,S_n} $$ 是一个有限的状态集,咱们但愿获得以下的结果: $$ P(S_n|S_{n-1},S_{n-2},...,S_{1}) \eqsim P(S_n|S_{n-1}) $$ 这个近似公式代表了一阶马尔科夫性质(markov property):系统在时间t+1的状态只与系统在时间t的状态有关。java

那么,若是咱们在严谨一点,将这个关联状态再往前面推一个时间点,获得的就是二阶马尔科夫模型: $$ P(S_n|S_{n-1},S_{n-2},...,S_{1}) \eqsim P(S_n|S_{n-1},S_{n-2}) $$c++

下面使用马尔科夫假设描述联合几率: $$ P(S_1,S_2,...,S_n) = \prod_{i=1}^n P(S_i,S_{i-1}) $$算法

总结apache

  • 若是一个随机序列的分布仅由其当前状态肯定,则具备markov性质。具备这个性质的随机过程称为马尔科夫随机过程(markov random process)。api

  • 对于可观察的状态序列(即状态由数据可知),能够获得一个马尔科夫链模型(markov chain model,MCM),咱们可使用这个模型来作一些预测。数组

  • 对于不可观察状态,会获得一个隐式马尔科夫模型(hidden markov model,HMM)ruby

接下来咱们给出将要用到的马尔科夫链的形式化表示服务器

有限的状态空间(state space) $$ S={S_1,S_2,...S_n} $$app

转移几率(transition properties)

函数$f:S \times S \to R$:转化结果为一个N阶方阵,其中N是状态的数量,其中:

  • $0 \le f(a,b) \le 1$:即转移几率的值在0到1之间;
  • $\sum_{b \in S} f(a,b) = 1$:即同一个状态转向当前全部可能的状态的几率和为1;

初始分布(initial distribution)

函数$g:S \times R$,其中:

  • $0 \le g(a) \le 1$
  • $\sum_{a\in S}g(a) = 1$

马尔科夫链是S中的一个随机过程:

  • 时间0时,这个链的初始状态用分布函数$g$建立;
  • 若是时间t时马尔科夫链的状态为$a$,则时间$t+1$时,对于各个$b \in S$,其状态为b的几率为$f(a,b)$

二、马尔科夫状态表的运用

假设咱们已经获得了一张马尔科夫状态表。以下表所示的城市天气变化表:

今每天气\明每天气 晴天(sunny) 有雨(rainy) 多云(cloudy) 有雾(foggy)
晴天(sunny) 0.6 0.1 0.2 0.1
有雨(rainy) 0.5 0.2 0.2 0.1
多云(cloudy) 0.1 0.7 0.1 0.1
有雾(foggy) 0.0 0.3 0.4 0.3

咱们假设:

  • 城市天气只包含四种可能,即晴天(sunny)、有雨(rainy)、多云(cloudy)和有雾(foggy);
  • 一天中天气不会变化;

咱们来解释这张表的含义,其实咱们彻底能够以4阶方阵的形式表示,但为了清晰说明,将其以表格展现。

其中每个数字表明:当前的天气为XX时,明天的天气为XX的几率。例如,第一行第一列的数据0.6,就表明的是当前天气为晴天时,明每天气为晴天的几率为0.6.

咱们就能够用马尔科夫性质的思想回答下面的问题:

若是今天的天气状态是晴天,那么明天是多云并且后天有雾的几率是多大?根据马尔科夫链的思想,明天是多余的几率只与今天有关,后天有雾的几率只与明天有关,所以:

P = 0.2(第一行第三列数据) * 0.1(第三行第四列)
  = 0.02

可见,咱们的重点在于由数据推导出马尔科夫状态矩阵的过程。

二、分析客户交易数据

假设咱们有这样的一些交易数据:

55FRL8G23B,1381907878,2013-01-09,129
9SX0DJG9L4,1381907879,2013-01-09,34
0ANMD0T113,1381907880,2013-01-09,144
W1F4412PA8,1381907881,2013-01-09,26
22Z10EAYC3,1381907882,2013-01-09,167
3R56N17P1H,1381907883,2013-01-09,25
LK0P6K3DE4,1381907884,2013-01-09,25
3A4S4BMPMU,1381907885,2013-01-09,113
OF4CEO2814,1381907886,2013-01-09,138
9ICNYOFS41,1381907887,2013-01-09,79
N4EOB264U6,1381907888,2013-01-09,108
3C0WARCKYJ,1381907889,2013-01-09,204
PTT3BI00AZ,1381907890,2013-01-09,27
14UHHAVQ2Q,1381907891,2013-01-09,73
Z9GFE6TDKF,1381907892,2013-01-09,32
...

数据意义为(交易ID,顾客ID,交易日期,交易金额)。

2.一、生成状态序列

这一节的目标是将交易序列(transcation sequence)转化为一个状态序列(state sequence)。咱们的应用的目的是为了生成如下的输出 $$ customerId(Date_1,Amount_1);(Date_2,Amount_2),...,(Date_n,Amount_n) $$ 其中日期按增序排序。

咱们要把这个输出转化为一个状态序列,以下所示: $$ customerId,State_1,State_2,...,State_n $$

咱们还须要将(purchase-date,amount)对的有序序列转换为一组马尔科夫链状态。状态由一个代号(两个字母)表示,各个字母的定义以下所示:

上一次交易后通过的时间 与前次交易相比的交易额
S:小 L:显著小于
M:中 E:基本相同
L:大 G:显著大于

所以,咱们能够获得9种状态

状态名 上一次交易后的通过时间与其次交易相比的交易额
SL 小:显著小于
SE 小:基本相同
SG 小:显著大于
ML 中:显著小于
ME 中:基本相同
MG 中:显著大于
LL 大:显著小于
LE 大:基本相同
LG 大:显著大于

能够看到,咱们的马尔科夫链模型有9个状态(9*9矩阵)。

转换过程可使用一些小脚本实现,可是若是数据量过大,仍是须要一个map过程。

2.二、使用MapReduce生成马尔科夫状态转移矩阵

三、Spark实现

3.一、主类

下面是spark的主任务类。下一章节会列出所使用到的辅助类。

package com.sunrun.movieshow.algorithm.markov;

import com.sunrun.movieshow.algorithm.common.SparkHelper;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.io.Serializable;
import java.util.*;
public class SparkMarkov {
    public static void main(String[] args) {
        JavaSparkContext sc = SparkHelper.getSparkContext("markov");

        // 1.归约到不一样的分区
        JavaRDD<String> rdd = sc.textFile("data/markov").coalesce(8);

        // 2.获得(顾客ID,(交易时间,交易额))
        JavaPairRDD<String, Tuple2<Long, Integer>> pairRDD = rdd.mapToPair(line -> {
            // tokens[0] = customer-id
            // tokens[1] = transaction-id
            // tokens[2] = purchase-date
            // tokens[3] = amount
            String[] tokens = line.split(",");
            if (tokens.length != 4) {
                return null;
            } else {
                long date = 0;
                try {
                    date = DateUtil.getDateAsMilliSeconds(tokens[2]);
                }
                catch(Exception e) {
                    // 会有异常数据
                }
                int amount = Integer.parseInt(tokens[3]);
                Tuple2<Long, Integer> V = new Tuple2<>(date, amount);
                return new Tuple2<>(tokens[0], V);
            }
        });
        /**
         * (V31E55G4FI,(1356969600000,123))
         * (301UNH7I2F,(1356969600000,148))
         * (PP2KVIR4LD,(1356969600000,163))
         * (AC57MM3WNV,(1356969600000,188))
         * ...
         */

        // 4.依据交易ID进行分组 - 用户信息在最后的状态链中是没用的,但他能够限定一些状态切换的内在关系,
        // 这在不少算法过程当中都是隐含条件,须要经验判断。
        JavaPairRDD<String, Iterable<Tuple2<Long, Integer>>> customerRDD = pairRDD.groupByKey();
        /**
         * (V31E55G4FI,(1356969600000,123),1356969600000,148)....)
         * ...
         */

        // 5.建立马尔科夫状态序列 (c_id,<(time1,amount),(time2,amount)>) => (String)
        JavaPairRDD<String, List<String>> stateSequence =
                customerRDD.mapValues((Iterable<Tuple2<Long,Integer>> dateAndAmount) -> {
                    List<Tuple2<Long,Integer>> list = toList(dateAndAmount);
                    Collections.sort(list, TupleComparatorAscending.INSTANCE);
                    // now convert sorted list (be date) into a "state sequence"
                    List<String> stateSequence1 = toStateSequence(list);
                    return stateSequence1;
                });

        /**
         * stateSequence.saveAsTextFile("out/" + UUID.randomUUID());
         * 顾客id,与其相关的状态序列
         * (K40E0LA5DL,[LL, MG, SG, SL, SG, MG, SL, SG, SL, SG, LL])
         * (ICF0KFGK12,[SG, SL, SE, SG, LL, LG])
         * (4N0B1U5HVG,[SG, ML, MG, SG, SL, SG, SL, SG, ML])
         * (3KJR1907D9,[SG, SL, ML, SG, ML, LG])
         * (47620I9LOD,[LG, SL, ML, MG, SG, SL, SG, SL, SG])
         */

        // 6.接下来,咱们将状态序列以窗口为2的方式依次移动,生成一个个形如((LL, MG),1)的对
        JavaPairRDD<Tuple2<String, String>, Integer> model = stateSequence.flatMapToPair(s -> {
            // 输出形式((fromState,toState),times)
            ArrayList<Tuple2<Tuple2<String, String>, Integer>> mapperOutput = new ArrayList<>();
            List<String> states = s._2;

            if (states == null) {
                return Collections.emptyIterator();
            } else {
                for (int i = 0; i < states.size() - 1; i++) {
                    String fromState = states.get(i);
                    String toState = states.get(i + 1);
                    Tuple2<String, String> k = new Tuple2<>(fromState, toState);
                    mapperOutput.add(new Tuple2<>(k, 1));
                }
            }
            return mapperOutput.iterator();
        });
        /**
         * model.saveAsTextFile("out/model");
         * ((LG,LL),1)
         * ((LL,MG),1)
         * ((MG,SG),1)
         * ((SG,SL),1)
         * ((SL,ME),1)
         * ((ME,MG),1)
         *
         */

        // 7.咱们须要将这些结果组合归约,将相同的状态序列进行合并,即从(f,t),1) => ((f,t),3000)的形式。
        JavaPairRDD<Tuple2<String, String>, Integer> markovModel = model.reduceByKey((a, b) -> a + b);

        /**
         * markovModel.saveAsTextFile("/out/markov");
         * ((LL,SL),993)
         * ((MG,LL),1859)
         * ((LE,ME),25)
         * ((SL,ME),1490)
         * ((LG,ME),153)
         * ((ML,LG),3991)
         * ((ME,ME),58)
         */

        // 8.咱们格式化一下输出,将其形式转变为(f,t,times)的形式
        JavaRDD<String> markovFormatModel = markovModel.map(t -> t._1._1 + "\t" + t._1._2 + "\t" + t._2);

        // 9.将结果存储到HDFS服务器,固然也能够存储到本地,这时候解析类就要使用本地文件系统的API
        String markovFormatModelStorePath = "hdfs://10.21.1.24:9000/markov/";
        markovFormatModel.saveAsTextFile(markovFormatModelStorePath);

        // 9.将最终结果进行转换,生成马尔科夫几率模型
        MarkovTableBuilder.transport(markovFormatModelStorePath);
        /** 这是一个state阶的方阵,A(ij)表示状态i到状态j的转化几率
         * 0.03318,0.02441,0.6608,0.1373,0.007937,0.02398,0.06456,0.009522,0.03832
         * 0.3842,0.02532,0.2709,0.1260,0.009590,0.01331,0.1251,0.01083,0.03477
         * 0.6403,0.02881,0.05017,0.1487,0.01338,0.008602,0.08359,0.01446,0.01196
         * 0.02081,0.002368,0.7035,0.01170,0.004638,0.1518,0.03901,0.005933,0.06030
         * 0.4309,0.02140,0.2657,0.1277,0.009697,0.02006,0.08159,0.009530,0.03344
         * 0.1847,0.01816,0.5316,0.1303,0.007175,0.02351,0.06265,0.008136,0.03379
         * 0.01847,0.004597,0.6115,0.02068,0.003091,0.1346,0.06828,0.01424,0.1245
         * 0.2250,0.01900,0.2427,0.06291,0.006335,0.03182,0.2757,0.03801,0.09847
         * 0.2819,0.01944,0.1954,0.07990,0.008015,0.03202,0.2612,0.04427,0.07781
         */

    }



    // 将迭代器转化为数组
    static List<Tuple2<Long,Integer>> toList(Iterable<Tuple2<Long,Integer>> iterable) {
        List<Tuple2<Long,Integer>> list = new ArrayList<Tuple2<Long,Integer>>();
        for (Tuple2<Long,Integer> element: iterable) {
            list.add(element);
        }
        return list;
    }

    // 按时间进行排序
    static class TupleComparatorAscending implements Comparator<Tuple2<Long, Integer>>, Serializable {
        final static TupleComparatorAscending INSTANCE = new TupleComparatorAscending();
        @Override
        public int compare(Tuple2<Long, Integer> t1, Tuple2<Long, Integer> t2) {
            // return -t1._1.compareTo(t2._1);     // sorts RDD elements descending
            return t1._1.compareTo(t2._1);         // sorts RDD elements ascending
        }
    }

    // 将一个有序的交易序列(List<Tuple2<Date,Amount>>)转化为状态序列(List<String>),其中各个元素分别表示一个马尔科夫状态。
    static List<String> toStateSequence(List<Tuple2<Long,Integer>> list){
        // 没有足够的数据
        if(list.size() < 2){
            return null;
        }

        List<String> stateSequence = new ArrayList<>();
        // == 两两配对计算结果
        Tuple2<Long, Integer> prior = list.get(0);
        for (int i = 1; i < list.size(); i++) {
            Tuple2<Long, Integer> current = list.get(i);

            // === 计算时间差(天),因为数据是以ms计数的,所以须要转化为天(1d = 24*60*60*1000=86400000ms)
            long dayDiff = (current._1 - prior._1) / 86400000;

            // === 获取交易额信息
            int priorAmount = prior._2;
            int currentAmount = current._2;

            // === 根据业务规则转化为字母表示
            // ==== 处理时间关系
            String dd = null;
            if(dayDiff < 30){
                dd = "S";
            }else if(dayDiff < 60){
                dd = "M";
            }else {
                dd = "L";
            }

            // ==== 处理金额关系: 使用两次交易额的比重
            String ad = null;
            if(priorAmount < 0.9 * currentAmount){
                ad = "L";
            }else if(priorAmount < 1.1 * currentAmount){
                ad = "E";
            }else{
                ad = "G";
            }

            // === 组合结果
            String element = dd + ad;
            stateSequence.add(element);
            // 大小为2的窗口前进一格
            prior = current;
        }
        return stateSequence;
    }
}

3.二、辅助类

日期处理类

package com.sunrun.movieshow.algorithm.markov;

import java.text.SimpleDateFormat;
import java.util.Date;

public class DateUtil {

	static final String DATE_FORMAT = "yyyy-MM-dd";
	static final SimpleDateFormat SIMPLE_DATE_FORMAT = 
	   new SimpleDateFormat(DATE_FORMAT);

    /**
     *  Returns the Date from a given dateAsString
     */
	public static Date getDate(String dateAsString)  {
        try {
        	return SIMPLE_DATE_FORMAT.parse(dateAsString);
        }
        catch(Exception e) {
        	return null;
        }
	}

    /**
     *  Returns the number of milliseconds since January 1, 1970, 
     *  00:00:00 GMT represented by this Date object.
     */
	public static long getDateAsMilliSeconds(Date date) throws Exception {
        return date.getTime();
	}
	
	
    /**
     *  Returns the number of milliseconds since January 1, 1970, 
     *  00:00:00 GMT represented by this Date object.
     */
	public static long getDateAsMilliSeconds(String dateAsString) throws Exception {
		Date date = getDate(dateAsString);	
        return date.getTime();
	}
	
	public static String getDateAsString(long timestamp) {
        return SIMPLE_DATE_FORMAT.format(timestamp);
	}	
	
}

二、输入输出工具类

package com.sunrun.movieshow.algorithm.markov;

import java.io.InputStream;
import java.io.OutputStream;
import java.io.BufferedReader;
//
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.util.LineReader;

/**
 * This class provides convenient methods for accessing 
 * some Input/Output methods.
 *
 * @author Mahmoud Parsian (mahmoud.parsian@yahoo.com)
 *
 */
public class InputOutputUtil {

    public static void close(LineReader reader) {
        if (reader == null) {
            return;
        }
        //
        try {
            reader.close();
        } 
        catch (Exception ignore) {
        }
    }

    public static void close(OutputStream stream) {
        if (stream == null) {
            return;
        }
        //
        try {
            stream.close();
        } 
        catch (Exception ignore) {
        }
    }

    public static void close(InputStream stream) {
        if (stream == null) {
            return;
        }
        //
        try {
            stream.close();
        } 
        catch (Exception ignore) {
        }
    }

    public static void close(FSDataInputStream stream) {
        if (stream == null) {
            return;
        }
        //
        try {
            stream.close();
        } 
        catch (Exception ignore) {
        }
    }

    public static void close(BufferedReader reader) {
        if (reader == null) {
            return;
        }
        //
        try {
            reader.close();
        } 
        catch (Exception ignore) {
        }
    }

}

三、马尔科夫状态处理类

package com.sunrun.movieshow.algorithm.markov;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 输入数据生成马尔科夫状态表
 */
public class MarkovTableBuilder {
    // 状态的数量
    private int numberOfState;

    private int scale;

    // 状态表
    private double[][] table = null;

    // 状态序列
    private Map<String, Integer> states = null;


    public MarkovTableBuilder(int numberOfState, int scale){
        this(numberOfState);
        this.scale = scale;
    }

    private MarkovTableBuilder(int numberOfState){
        this.numberOfState = numberOfState;
        table = new double[numberOfState][numberOfState];
        initStates();
    }

    // 初始化state状态
    private void initStates(){
        states = new HashMap<String, Integer>();
        states.put("SL", 0);
        states.put("SE", 1);
        states.put("SG", 2);
        states.put("ML", 3);
        states.put("ME", 4);
        states.put("MG", 5);
        states.put("LL", 6);
        states.put("LE", 7);
        states.put("LG", 8);
    }

    // 将状态信息添加到状态表
    public void add(StateItem item){
        // 获取该状态对应的角标
        int row = states.get(item.fromState);
        int column = states.get(item.toState);
        table[row][column] = item.count;
    }

    public void normalize() {
        //
        // 拉普拉斯校订:通常经过将全部的计数+1来进行。see: http://cs.nyu.edu/faculty/davise/ai/bayesText.html
        for (int r = 0; r < numberOfState; r++) {
            boolean gotZeroCount = false;
            for (int c = 0; c < numberOfState; c++) {
                if(table[r][c] == 0) {
                    gotZeroCount = true;
                    break;
                }
            }
            if (gotZeroCount) {
                for (int c = 0; c < numberOfState; c++) {
                    table[r][c] += 1;
                }
            }
        }
        // normalize
        for (int r = 0; r < numberOfState; r++) {
            double rowSum = getRowSum(r);
            for (int c = 0; c < numberOfState; c++) {
                table[r][c] = table[r][c] / rowSum;
            }
        }
    }

    // 获取rowNumber行的累加结果
    public double getRowSum(int rowNumber) {
        double sum = 0.0;
        for (int column = 0; column < numberOfState; column++) {
            sum += table[rowNumber][column];
        }
        return sum;
    }

    /**
     * 存储状态表:做为示例,只是将结果输出便可。
     */
    private void persist() {
        for (int row = 0; row < numberOfState; row++) {
            StringBuilder builder = new StringBuilder();
            for (int column = 0; column < numberOfState; column++) {
                double element = table[row][column];
                builder.append(String.format("%.4g", element));
                if (column < (numberOfState - 1)) {
                    builder.append(",");
                }
            }
            System.out.println(builder.toString());
        }
    }



    public static void transport(String markovFormatDataPath){
        List<StateItem> items = ReadDataFromHDFS.readDirectory(markovFormatDataPath);
        MarkovTableBuilder markovTableBuilder = new MarkovTableBuilder(9);
        for (StateItem item : items) {
            markovTableBuilder.add(item);
        }

        // 归一化数据
        markovTableBuilder.normalize();

        // 存储马尔科夫状态表
        markovTableBuilder.persist();
        
    }
}

四、从文件系统读取数据的处理类

package com.sunrun.movieshow.algorithm.markov;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;

/**
 * Class containing a number of utility methods for manipulating 
 * Hadoop's SequenceFiles.
 *
 *
 * @author Mahmoud Parsian
 *
 */
public class ReadDataFromHDFS {

	private ReadDataFromHDFS() {
	}
	
	public static List<StateItem> readDirectory(String path) {
		return ReadDataFromHDFS.readDirectory(new Path(path));
	}
	
	public static List<StateItem> readDirectory(Path path) {
		FileSystem fs;
		try {
			fs = path.getFileSystem(new Configuration());
		} catch (IOException e) {
            System.out.println("Unable to access the hadoop file system!");
			throw new RuntimeException("Unable to access the hadoop file system!");
		}
		List<StateItem> list = new ArrayList<StateItem>();
		try {
			FileStatus[] stat = fs.listStatus(path);
			for (int i = 0; i < stat.length; ++i) {
				if (stat[i].getPath().getName().startsWith("part")) {
					List<StateItem> pairs = readFile(stat[i].getPath(), fs);
					list.addAll(pairs);
				}
			}
		} 
		catch (IOException e) {
            System.out.println("Unable to access the hadoop file system!");
			throw new RuntimeException("Error reading the hadoop file system!");
		}

		return list;		
	}	

	@SuppressWarnings("unchecked")
	public static List<StateItem> readFile(Path path, FileSystem fs) {
		List<StateItem> list = new ArrayList<StateItem>();
		FSDataInputStream stream = null;
		BufferedReader reader = null;
		try {
			stream = fs.open(path);
			reader = new BufferedReader(new InputStreamReader(stream));
			String line;
			while ((line = reader.readLine()) != null) {
				// line = <fromState><,><toState><TAB><count>
				String[] tokens = line.split("\t"); // TAB separator
				if (tokens.length == 3) {
					StateItem item = new StateItem(tokens[0], tokens[1], Integer.parseInt(tokens[2]));
					list.add(item);
				}
			}		
		}
		catch (IOException e) {
            System.out.println("readFileIntoCoxRegressionItem() failed!");
			throw new RuntimeException("readFileIntoCoxRegressionItem() failed!");
		}
		finally {
			InputOutputUtil.close(reader);
			InputOutputUtil.close(stream);
		}
			
		return list;
	}
	

	
	
	public static void main(String[] args) throws Exception {
		String path =  "hdfs://10.21.1.24:9000/markov/";
		List<StateItem> list = readDirectory(path);
        System.out.println("list="+list.toString());
	}
		
}

五、马尔科夫状态实体类

package com.sunrun.movieshow.algorithm.markov;

/**
 * TableItem represents an item of a Markov State Transition Model 
 * as a Tuple3<fromSate, toState, count>
 *
 */
public class StateItem  {
	String fromState;
	String toState;
	int count;
	
	public StateItem(String fromState, String toState, int count) {
		this.fromState = fromState;
		this.toState = toState;
		this.count = count;
	}
	
	/**
	 * for debugging ONLY
	 */
	public String toString() {
		return "{"+fromState+"," +toState+","+count+"}";
	}
}

四、使用最终的马尔科夫表进行预测

前面我已经在代码注释中拿出了最后的分析结果:

SL SE SG ML ME MG LL LE LG
SL 0.03318 0.02441 0.6608 0.1373 0.007937 0.02398 0.06456 0.009522 0.03832
SE 0.3842 0.02532 0.2709 0.1260 0.009590 0.01331 0.1251 0.01083 0.03477
SG 0.6403 0.02881 0.05017 0.1487 0.01338 0.008602 0.08359 0.01446 0.01196
ML 0.02081 0.002368 0.7035 0.01170 0.004638 0.1518 0.03901 0.005933 0.06030
ME 0.4309 0.02140 0.2657 0.1277 0.009697 0.02006 0.08159 0.009530 0.03344
MG 0.1847 0.01816 0.5316 0.1303 0.007175 0.02351 0.06265 0.008136 0.03379
LL 0.01847 0.004597 0.6115 0.02068 0.003091 0.1346 0.06828 0.01424 0.1245
LE 0.2250 0.01900 0.2427 0.06291 0.006335 0.03182 0.2757 0.03801 0.09847
LG 0.2819 0.01944 0.1954 0.07990 0.008015 0.03202 0.2612 0.04427 0.07781

参考

数据算法 —— Mahmoud Parsian(很喜欢该做者写的这本书,很是详细)

附录

一、ruby脚本

mark_plan.rb

#!/usr/bin/ruby

require '../lib/util.rb'      
require 'Date'

xaction_file = ARGV[0]
model_file = ARGV[1]
userXactions = {}
model = []
states = ["SL", "SE", "SG", "ML", "ME", "MG", "LL", "LE", "LG"]

# read all xactions
File.open(xaction_file, "r").each_line do |line|
	items =  line.split(",")
	custID = items[0]
	if (userXactions.key? custID)
		hist = userXactions[custID]
	else 
		hist = []
		userXactions[custID] = hist
	end	
	hist << items[2, items.size - 2]
	
end

#read model
File.open(model_file, "r").each_line do |line|
	items =  line.split(",")
	#puts "#{line}"
	row = []
	items.each do |item|
		row << item.to_i
		#puts "#{item}"
	end	
	model << row
	#puts "#{row}"
end

# marketing time
userXactions.each do |cid, xactions|
	seq = []
	last_date = Date.parse "2000-01-01"
	xactions.each_with_index do |xaction, index|	
		if (index > 0) 
			prevXaction = xactions[index-1]
			prDate = Date.parse prevXaction[0]
			prAmt = prevXaction[1].to_i
			date = Date.parse xaction[0]
			last_date = date
			amt = xaction[1].to_i
			daysDiff = date - prDate
			amtDiff = amt - prAmt
			
			if (daysDiff < 30)
				dd = "S"
			elsif (daysDiff < 60)
				dd = "M"
			else
				dd = "L"
			end
			
			if (prAmt < 0.9 * amt)
				ad = "L"
			elsif (prAmt < 1.1 * amt) 
				ad = "E"
			else 
				ad = "G"
			end
			seq << (dd + ad)
		end
	end
	
	if (!seq.empty?)
		last = seq[-1]
		row_index = states.index(last)
		row = model[row_index]
		max_col = row.max
		col_index = row.index(max_col) 
		next_state = states[col_index]
		
		if (next_state.start_with?("S"))
			next_date = last_date + 15
		elsif (next_state.start_with?("M"))
			next_date = last_date + 45
		else
			next_date = last_date + 90
		end
		
		#puts "#{cid}, #{last},  #{row_index}, #{max_col}, #{col_index}, #{next_state}, #{last_date}, #{next_date}"
		puts "#{cid}, #{next_date}"
	end
end

buy_xaction.rb

#!/usr/bin/ruby

require '../lib/util.rb'      
require 'Date'

custCount = ARGV[0].to_i
daysCount = ARGV[1].to_i
visitorPercent = ARGV[2].to_f

custIDs = []
xactionHist = {}

# transition probability matrix

idGen = IdGenerator.new
1.upto custCount do
	custIDs << idGen.generate(10)
end

xid = Time.now().to_i

date = Date.parse "2013-01-01"
1.upto daysCount do 
	numXaction = visitorPercent * custCount
	factor = 85 + rand(30)
	numXaction = (numXaction * factor) / 100
	
	1.upto numXaction do
		custID = custIDs[rand(custIDs.size)]
		if (xactionHist.key? custID)
			hist = xactionHist[custID]
			lastXaction = hist[-1]
			lastDate = lastXaction[0]
			lastAmt = lastXaction[1]
			numDays = date - lastDate
			if (numDays < 30) 
				amount = lastAmt < 40 ? 50 + rand(20) - 10  : 30 + rand(10) - 5
			elsif (numDays < 60)
				amount = lastAmt < 80 ? 100 + rand(40) - 20 : 60 + rand(20) - 10
			else 
				amount = lastAmt < 150 ? 180 + rand(60) - 30 :  120 + rand(40) - 20
			end						
		else
			hist = []
			xactionHist[custID] = hist	
			amount = 40 + rand(180)
		end
		xaction = []
		xaction << date
		xaction << amount
		hist << xaction
		xid = xid + 1
		puts "#{custID},#{xid},#{date},#{amount}"
		
	end

	date = date.next
end
相关文章
相关标签/搜索