这一章节咱们讲解马尔科夫模型。给定一组随机变量(如顾客最近的交易日期),马尔科夫模型只根据前一个状态(前一个最近交易日期)的分部指示该变量最近的分布。html
令 $$ S = {S_1,S_2,...,S_n} $$ 是一个有限的状态集,咱们但愿获得以下的结果: $$ P(S_n|S_{n-1},S_{n-2},...,S_{1}) \eqsim P(S_n|S_{n-1}) $$ 这个近似公式代表了一阶马尔科夫性质(markov property):系统在时间t+1的状态只与系统在时间t的状态有关。java
那么,若是咱们在严谨一点,将这个关联状态再往前面推一个时间点,获得的就是二阶马尔科夫模型: $$ P(S_n|S_{n-1},S_{n-2},...,S_{1}) \eqsim P(S_n|S_{n-1},S_{n-2}) $$c++
下面使用马尔科夫假设描述联合几率: $$ P(S_1,S_2,...,S_n) = \prod_{i=1}^n P(S_i,S_{i-1}) $$算法
总结:apache
若是一个随机序列的分布仅由其当前状态肯定,则具备markov性质。具备这个性质的随机过程称为马尔科夫随机过程(markov random process)。api
对于可观察的状态序列(即状态由数据可知),能够获得一个马尔科夫链模型(markov chain model,MCM),咱们可使用这个模型来作一些预测。数组
对于不可观察状态,会获得一个隐式马尔科夫模型(hidden markov model,HMM)ruby
接下来咱们给出将要用到的马尔科夫链的形式化表示服务器
有限的状态空间(state space) $$ S={S_1,S_2,...S_n} $$app
转移几率(transition properties)
函数$f:S \times S \to R$:转化结果为一个N阶方阵,其中N是状态的数量,其中:
初始分布(initial distribution)
函数$g:S \times R$,其中:
马尔科夫链是S中的一个随机过程:
假设咱们已经获得了一张马尔科夫状态表。以下表所示的城市天气变化表:
今每天气\明每天气 | 晴天(sunny) | 有雨(rainy) | 多云(cloudy) | 有雾(foggy) |
---|---|---|---|---|
晴天(sunny) | 0.6 | 0.1 | 0.2 | 0.1 |
有雨(rainy) | 0.5 | 0.2 | 0.2 | 0.1 |
多云(cloudy) | 0.1 | 0.7 | 0.1 | 0.1 |
有雾(foggy) | 0.0 | 0.3 | 0.4 | 0.3 |
咱们假设:
咱们来解释这张表的含义,其实咱们彻底能够以4阶方阵的形式表示,但为了清晰说明,将其以表格展现。
其中每个数字表明:当前的天气为XX时,明天的天气为XX的几率。例如,第一行第一列的数据0.6,就表明的是当前天气为晴天时,明每天气为晴天的几率为0.6.
咱们就能够用马尔科夫性质的思想回答下面的问题:
若是今天的天气状态是晴天,那么明天是多云并且后天有雾的几率是多大?根据马尔科夫链的思想,明天是多余的几率只与今天有关,后天有雾的几率只与明天有关,所以:
P = 0.2(第一行第三列数据) * 0.1(第三行第四列) = 0.02
可见,咱们的重点在于由数据推导出马尔科夫状态矩阵的过程。
假设咱们有这样的一些交易数据:
55FRL8G23B,1381907878,2013-01-09,129 9SX0DJG9L4,1381907879,2013-01-09,34 0ANMD0T113,1381907880,2013-01-09,144 W1F4412PA8,1381907881,2013-01-09,26 22Z10EAYC3,1381907882,2013-01-09,167 3R56N17P1H,1381907883,2013-01-09,25 LK0P6K3DE4,1381907884,2013-01-09,25 3A4S4BMPMU,1381907885,2013-01-09,113 OF4CEO2814,1381907886,2013-01-09,138 9ICNYOFS41,1381907887,2013-01-09,79 N4EOB264U6,1381907888,2013-01-09,108 3C0WARCKYJ,1381907889,2013-01-09,204 PTT3BI00AZ,1381907890,2013-01-09,27 14UHHAVQ2Q,1381907891,2013-01-09,73 Z9GFE6TDKF,1381907892,2013-01-09,32 ...
数据意义为(交易ID,顾客ID,交易日期,交易金额)。
这一节的目标是将交易序列(transcation sequence)转化为一个状态序列(state sequence)。咱们的应用的目的是为了生成如下的输出 $$ customerId(Date_1,Amount_1);(Date_2,Amount_2),...,(Date_n,Amount_n) $$ 其中日期按增序排序。
咱们要把这个输出转化为一个状态序列,以下所示: $$ customerId,State_1,State_2,...,State_n $$
咱们还须要将(purchase-date,amount)对的有序序列转换为一组马尔科夫链状态。状态由一个代号(两个字母)表示,各个字母的定义以下所示:
上一次交易后通过的时间 | 与前次交易相比的交易额 |
---|---|
S:小 | L:显著小于 |
M:中 | E:基本相同 |
L:大 | G:显著大于 |
所以,咱们能够获得9种状态
状态名 | 上一次交易后的通过时间与其次交易相比的交易额 |
---|---|
SL | 小:显著小于 |
SE | 小:基本相同 |
SG | 小:显著大于 |
ML | 中:显著小于 |
ME | 中:基本相同 |
MG | 中:显著大于 |
LL | 大:显著小于 |
LE | 大:基本相同 |
LG | 大:显著大于 |
能够看到,咱们的马尔科夫链模型有9个状态(9*9矩阵)。
转换过程可使用一些小脚本实现,可是若是数据量过大,仍是须要一个map过程。
下面是spark的主任务类。下一章节会列出所使用到的辅助类。
package com.sunrun.movieshow.algorithm.markov; import com.sunrun.movieshow.algorithm.common.SparkHelper; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.io.Serializable; import java.util.*; public class SparkMarkov { public static void main(String[] args) { JavaSparkContext sc = SparkHelper.getSparkContext("markov"); // 1.归约到不一样的分区 JavaRDD<String> rdd = sc.textFile("data/markov").coalesce(8); // 2.获得(顾客ID,(交易时间,交易额)) JavaPairRDD<String, Tuple2<Long, Integer>> pairRDD = rdd.mapToPair(line -> { // tokens[0] = customer-id // tokens[1] = transaction-id // tokens[2] = purchase-date // tokens[3] = amount String[] tokens = line.split(","); if (tokens.length != 4) { return null; } else { long date = 0; try { date = DateUtil.getDateAsMilliSeconds(tokens[2]); } catch(Exception e) { // 会有异常数据 } int amount = Integer.parseInt(tokens[3]); Tuple2<Long, Integer> V = new Tuple2<>(date, amount); return new Tuple2<>(tokens[0], V); } }); /** * (V31E55G4FI,(1356969600000,123)) * (301UNH7I2F,(1356969600000,148)) * (PP2KVIR4LD,(1356969600000,163)) * (AC57MM3WNV,(1356969600000,188)) * ... */ // 4.依据交易ID进行分组 - 用户信息在最后的状态链中是没用的,但他能够限定一些状态切换的内在关系, // 这在不少算法过程当中都是隐含条件,须要经验判断。 JavaPairRDD<String, Iterable<Tuple2<Long, Integer>>> customerRDD = pairRDD.groupByKey(); /** * (V31E55G4FI,(1356969600000,123),1356969600000,148)....) * ... */ // 5.建立马尔科夫状态序列 (c_id,<(time1,amount),(time2,amount)>) => (String) JavaPairRDD<String, List<String>> stateSequence = customerRDD.mapValues((Iterable<Tuple2<Long,Integer>> dateAndAmount) -> { List<Tuple2<Long,Integer>> list = toList(dateAndAmount); Collections.sort(list, TupleComparatorAscending.INSTANCE); // now convert sorted list (be date) into a "state sequence" List<String> stateSequence1 = toStateSequence(list); return stateSequence1; }); /** * stateSequence.saveAsTextFile("out/" + UUID.randomUUID()); * 顾客id,与其相关的状态序列 * (K40E0LA5DL,[LL, MG, SG, SL, SG, MG, SL, SG, SL, SG, LL]) * (ICF0KFGK12,[SG, SL, SE, SG, LL, LG]) * (4N0B1U5HVG,[SG, ML, MG, SG, SL, SG, SL, SG, ML]) * (3KJR1907D9,[SG, SL, ML, SG, ML, LG]) * (47620I9LOD,[LG, SL, ML, MG, SG, SL, SG, SL, SG]) */ // 6.接下来,咱们将状态序列以窗口为2的方式依次移动,生成一个个形如((LL, MG),1)的对 JavaPairRDD<Tuple2<String, String>, Integer> model = stateSequence.flatMapToPair(s -> { // 输出形式((fromState,toState),times) ArrayList<Tuple2<Tuple2<String, String>, Integer>> mapperOutput = new ArrayList<>(); List<String> states = s._2; if (states == null) { return Collections.emptyIterator(); } else { for (int i = 0; i < states.size() - 1; i++) { String fromState = states.get(i); String toState = states.get(i + 1); Tuple2<String, String> k = new Tuple2<>(fromState, toState); mapperOutput.add(new Tuple2<>(k, 1)); } } return mapperOutput.iterator(); }); /** * model.saveAsTextFile("out/model"); * ((LG,LL),1) * ((LL,MG),1) * ((MG,SG),1) * ((SG,SL),1) * ((SL,ME),1) * ((ME,MG),1) * */ // 7.咱们须要将这些结果组合归约,将相同的状态序列进行合并,即从(f,t),1) => ((f,t),3000)的形式。 JavaPairRDD<Tuple2<String, String>, Integer> markovModel = model.reduceByKey((a, b) -> a + b); /** * markovModel.saveAsTextFile("/out/markov"); * ((LL,SL),993) * ((MG,LL),1859) * ((LE,ME),25) * ((SL,ME),1490) * ((LG,ME),153) * ((ML,LG),3991) * ((ME,ME),58) */ // 8.咱们格式化一下输出,将其形式转变为(f,t,times)的形式 JavaRDD<String> markovFormatModel = markovModel.map(t -> t._1._1 + "\t" + t._1._2 + "\t" + t._2); // 9.将结果存储到HDFS服务器,固然也能够存储到本地,这时候解析类就要使用本地文件系统的API String markovFormatModelStorePath = "hdfs://10.21.1.24:9000/markov/"; markovFormatModel.saveAsTextFile(markovFormatModelStorePath); // 9.将最终结果进行转换,生成马尔科夫几率模型 MarkovTableBuilder.transport(markovFormatModelStorePath); /** 这是一个state阶的方阵,A(ij)表示状态i到状态j的转化几率 * 0.03318,0.02441,0.6608,0.1373,0.007937,0.02398,0.06456,0.009522,0.03832 * 0.3842,0.02532,0.2709,0.1260,0.009590,0.01331,0.1251,0.01083,0.03477 * 0.6403,0.02881,0.05017,0.1487,0.01338,0.008602,0.08359,0.01446,0.01196 * 0.02081,0.002368,0.7035,0.01170,0.004638,0.1518,0.03901,0.005933,0.06030 * 0.4309,0.02140,0.2657,0.1277,0.009697,0.02006,0.08159,0.009530,0.03344 * 0.1847,0.01816,0.5316,0.1303,0.007175,0.02351,0.06265,0.008136,0.03379 * 0.01847,0.004597,0.6115,0.02068,0.003091,0.1346,0.06828,0.01424,0.1245 * 0.2250,0.01900,0.2427,0.06291,0.006335,0.03182,0.2757,0.03801,0.09847 * 0.2819,0.01944,0.1954,0.07990,0.008015,0.03202,0.2612,0.04427,0.07781 */ } // 将迭代器转化为数组 static List<Tuple2<Long,Integer>> toList(Iterable<Tuple2<Long,Integer>> iterable) { List<Tuple2<Long,Integer>> list = new ArrayList<Tuple2<Long,Integer>>(); for (Tuple2<Long,Integer> element: iterable) { list.add(element); } return list; } // 按时间进行排序 static class TupleComparatorAscending implements Comparator<Tuple2<Long, Integer>>, Serializable { final static TupleComparatorAscending INSTANCE = new TupleComparatorAscending(); @Override public int compare(Tuple2<Long, Integer> t1, Tuple2<Long, Integer> t2) { // return -t1._1.compareTo(t2._1); // sorts RDD elements descending return t1._1.compareTo(t2._1); // sorts RDD elements ascending } } // 将一个有序的交易序列(List<Tuple2<Date,Amount>>)转化为状态序列(List<String>),其中各个元素分别表示一个马尔科夫状态。 static List<String> toStateSequence(List<Tuple2<Long,Integer>> list){ // 没有足够的数据 if(list.size() < 2){ return null; } List<String> stateSequence = new ArrayList<>(); // == 两两配对计算结果 Tuple2<Long, Integer> prior = list.get(0); for (int i = 1; i < list.size(); i++) { Tuple2<Long, Integer> current = list.get(i); // === 计算时间差(天),因为数据是以ms计数的,所以须要转化为天(1d = 24*60*60*1000=86400000ms) long dayDiff = (current._1 - prior._1) / 86400000; // === 获取交易额信息 int priorAmount = prior._2; int currentAmount = current._2; // === 根据业务规则转化为字母表示 // ==== 处理时间关系 String dd = null; if(dayDiff < 30){ dd = "S"; }else if(dayDiff < 60){ dd = "M"; }else { dd = "L"; } // ==== 处理金额关系: 使用两次交易额的比重 String ad = null; if(priorAmount < 0.9 * currentAmount){ ad = "L"; }else if(priorAmount < 1.1 * currentAmount){ ad = "E"; }else{ ad = "G"; } // === 组合结果 String element = dd + ad; stateSequence.add(element); // 大小为2的窗口前进一格 prior = current; } return stateSequence; } }
日期处理类
package com.sunrun.movieshow.algorithm.markov; import java.text.SimpleDateFormat; import java.util.Date; public class DateUtil { static final String DATE_FORMAT = "yyyy-MM-dd"; static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat(DATE_FORMAT); /** * Returns the Date from a given dateAsString */ public static Date getDate(String dateAsString) { try { return SIMPLE_DATE_FORMAT.parse(dateAsString); } catch(Exception e) { return null; } } /** * Returns the number of milliseconds since January 1, 1970, * 00:00:00 GMT represented by this Date object. */ public static long getDateAsMilliSeconds(Date date) throws Exception { return date.getTime(); } /** * Returns the number of milliseconds since January 1, 1970, * 00:00:00 GMT represented by this Date object. */ public static long getDateAsMilliSeconds(String dateAsString) throws Exception { Date date = getDate(dateAsString); return date.getTime(); } public static String getDateAsString(long timestamp) { return SIMPLE_DATE_FORMAT.format(timestamp); } }
二、输入输出工具类
package com.sunrun.movieshow.algorithm.markov; import java.io.InputStream; import java.io.OutputStream; import java.io.BufferedReader; // import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.util.LineReader; /** * This class provides convenient methods for accessing * some Input/Output methods. * * @author Mahmoud Parsian (mahmoud.parsian@yahoo.com) * */ public class InputOutputUtil { public static void close(LineReader reader) { if (reader == null) { return; } // try { reader.close(); } catch (Exception ignore) { } } public static void close(OutputStream stream) { if (stream == null) { return; } // try { stream.close(); } catch (Exception ignore) { } } public static void close(InputStream stream) { if (stream == null) { return; } // try { stream.close(); } catch (Exception ignore) { } } public static void close(FSDataInputStream stream) { if (stream == null) { return; } // try { stream.close(); } catch (Exception ignore) { } } public static void close(BufferedReader reader) { if (reader == null) { return; } // try { reader.close(); } catch (Exception ignore) { } } }
三、马尔科夫状态处理类
package com.sunrun.movieshow.algorithm.markov; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * 输入数据生成马尔科夫状态表 */ public class MarkovTableBuilder { // 状态的数量 private int numberOfState; private int scale; // 状态表 private double[][] table = null; // 状态序列 private Map<String, Integer> states = null; public MarkovTableBuilder(int numberOfState, int scale){ this(numberOfState); this.scale = scale; } private MarkovTableBuilder(int numberOfState){ this.numberOfState = numberOfState; table = new double[numberOfState][numberOfState]; initStates(); } // 初始化state状态 private void initStates(){ states = new HashMap<String, Integer>(); states.put("SL", 0); states.put("SE", 1); states.put("SG", 2); states.put("ML", 3); states.put("ME", 4); states.put("MG", 5); states.put("LL", 6); states.put("LE", 7); states.put("LG", 8); } // 将状态信息添加到状态表 public void add(StateItem item){ // 获取该状态对应的角标 int row = states.get(item.fromState); int column = states.get(item.toState); table[row][column] = item.count; } public void normalize() { // // 拉普拉斯校订:通常经过将全部的计数+1来进行。see: http://cs.nyu.edu/faculty/davise/ai/bayesText.html for (int r = 0; r < numberOfState; r++) { boolean gotZeroCount = false; for (int c = 0; c < numberOfState; c++) { if(table[r][c] == 0) { gotZeroCount = true; break; } } if (gotZeroCount) { for (int c = 0; c < numberOfState; c++) { table[r][c] += 1; } } } // normalize for (int r = 0; r < numberOfState; r++) { double rowSum = getRowSum(r); for (int c = 0; c < numberOfState; c++) { table[r][c] = table[r][c] / rowSum; } } } // 获取rowNumber行的累加结果 public double getRowSum(int rowNumber) { double sum = 0.0; for (int column = 0; column < numberOfState; column++) { sum += table[rowNumber][column]; } return sum; } /** * 存储状态表:做为示例,只是将结果输出便可。 */ private void persist() { for (int row = 0; row < numberOfState; row++) { StringBuilder builder = new StringBuilder(); for (int column = 0; column < numberOfState; column++) { double element = table[row][column]; builder.append(String.format("%.4g", element)); if (column < (numberOfState - 1)) { builder.append(","); } } System.out.println(builder.toString()); } } public static void transport(String markovFormatDataPath){ List<StateItem> items = ReadDataFromHDFS.readDirectory(markovFormatDataPath); MarkovTableBuilder markovTableBuilder = new MarkovTableBuilder(9); for (StateItem item : items) { markovTableBuilder.add(item); } // 归一化数据 markovTableBuilder.normalize(); // 存储马尔科夫状态表 markovTableBuilder.persist(); } }
四、从文件系统读取数据的处理类
package com.sunrun.movieshow.algorithm.markov; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.List; /** * Class containing a number of utility methods for manipulating * Hadoop's SequenceFiles. * * * @author Mahmoud Parsian * */ public class ReadDataFromHDFS { private ReadDataFromHDFS() { } public static List<StateItem> readDirectory(String path) { return ReadDataFromHDFS.readDirectory(new Path(path)); } public static List<StateItem> readDirectory(Path path) { FileSystem fs; try { fs = path.getFileSystem(new Configuration()); } catch (IOException e) { System.out.println("Unable to access the hadoop file system!"); throw new RuntimeException("Unable to access the hadoop file system!"); } List<StateItem> list = new ArrayList<StateItem>(); try { FileStatus[] stat = fs.listStatus(path); for (int i = 0; i < stat.length; ++i) { if (stat[i].getPath().getName().startsWith("part")) { List<StateItem> pairs = readFile(stat[i].getPath(), fs); list.addAll(pairs); } } } catch (IOException e) { System.out.println("Unable to access the hadoop file system!"); throw new RuntimeException("Error reading the hadoop file system!"); } return list; } @SuppressWarnings("unchecked") public static List<StateItem> readFile(Path path, FileSystem fs) { List<StateItem> list = new ArrayList<StateItem>(); FSDataInputStream stream = null; BufferedReader reader = null; try { stream = fs.open(path); reader = new BufferedReader(new InputStreamReader(stream)); String line; while ((line = reader.readLine()) != null) { // line = <fromState><,><toState><TAB><count> String[] tokens = line.split("\t"); // TAB separator if (tokens.length == 3) { StateItem item = new StateItem(tokens[0], tokens[1], Integer.parseInt(tokens[2])); list.add(item); } } } catch (IOException e) { System.out.println("readFileIntoCoxRegressionItem() failed!"); throw new RuntimeException("readFileIntoCoxRegressionItem() failed!"); } finally { InputOutputUtil.close(reader); InputOutputUtil.close(stream); } return list; } public static void main(String[] args) throws Exception { String path = "hdfs://10.21.1.24:9000/markov/"; List<StateItem> list = readDirectory(path); System.out.println("list="+list.toString()); } }
五、马尔科夫状态实体类
package com.sunrun.movieshow.algorithm.markov; /** * TableItem represents an item of a Markov State Transition Model * as a Tuple3<fromSate, toState, count> * */ public class StateItem { String fromState; String toState; int count; public StateItem(String fromState, String toState, int count) { this.fromState = fromState; this.toState = toState; this.count = count; } /** * for debugging ONLY */ public String toString() { return "{"+fromState+"," +toState+","+count+"}"; } }
前面我已经在代码注释中拿出了最后的分析结果:
SL | SE | SG | ML | ME | MG | LL | LE | LG | |
---|---|---|---|---|---|---|---|---|---|
SL | 0.03318 | 0.02441 | 0.6608 | 0.1373 | 0.007937 | 0.02398 | 0.06456 | 0.009522 | 0.03832 |
SE | 0.3842 | 0.02532 | 0.2709 | 0.1260 | 0.009590 | 0.01331 | 0.1251 | 0.01083 | 0.03477 |
SG | 0.6403 | 0.02881 | 0.05017 | 0.1487 | 0.01338 | 0.008602 | 0.08359 | 0.01446 | 0.01196 |
ML | 0.02081 | 0.002368 | 0.7035 | 0.01170 | 0.004638 | 0.1518 | 0.03901 | 0.005933 | 0.06030 |
ME | 0.4309 | 0.02140 | 0.2657 | 0.1277 | 0.009697 | 0.02006 | 0.08159 | 0.009530 | 0.03344 |
MG | 0.1847 | 0.01816 | 0.5316 | 0.1303 | 0.007175 | 0.02351 | 0.06265 | 0.008136 | 0.03379 |
LL | 0.01847 | 0.004597 | 0.6115 | 0.02068 | 0.003091 | 0.1346 | 0.06828 | 0.01424 | 0.1245 |
LE | 0.2250 | 0.01900 | 0.2427 | 0.06291 | 0.006335 | 0.03182 | 0.2757 | 0.03801 | 0.09847 |
LG | 0.2819 | 0.01944 | 0.1954 | 0.07990 | 0.008015 | 0.03202 | 0.2612 | 0.04427 | 0.07781 |
数据算法 —— Mahmoud Parsian(很喜欢该做者写的这本书,很是详细)
mark_plan.rb
#!/usr/bin/ruby require '../lib/util.rb' require 'Date' xaction_file = ARGV[0] model_file = ARGV[1] userXactions = {} model = [] states = ["SL", "SE", "SG", "ML", "ME", "MG", "LL", "LE", "LG"] # read all xactions File.open(xaction_file, "r").each_line do |line| items = line.split(",") custID = items[0] if (userXactions.key? custID) hist = userXactions[custID] else hist = [] userXactions[custID] = hist end hist << items[2, items.size - 2] end #read model File.open(model_file, "r").each_line do |line| items = line.split(",") #puts "#{line}" row = [] items.each do |item| row << item.to_i #puts "#{item}" end model << row #puts "#{row}" end # marketing time userXactions.each do |cid, xactions| seq = [] last_date = Date.parse "2000-01-01" xactions.each_with_index do |xaction, index| if (index > 0) prevXaction = xactions[index-1] prDate = Date.parse prevXaction[0] prAmt = prevXaction[1].to_i date = Date.parse xaction[0] last_date = date amt = xaction[1].to_i daysDiff = date - prDate amtDiff = amt - prAmt if (daysDiff < 30) dd = "S" elsif (daysDiff < 60) dd = "M" else dd = "L" end if (prAmt < 0.9 * amt) ad = "L" elsif (prAmt < 1.1 * amt) ad = "E" else ad = "G" end seq << (dd + ad) end end if (!seq.empty?) last = seq[-1] row_index = states.index(last) row = model[row_index] max_col = row.max col_index = row.index(max_col) next_state = states[col_index] if (next_state.start_with?("S")) next_date = last_date + 15 elsif (next_state.start_with?("M")) next_date = last_date + 45 else next_date = last_date + 90 end #puts "#{cid}, #{last}, #{row_index}, #{max_col}, #{col_index}, #{next_state}, #{last_date}, #{next_date}" puts "#{cid}, #{next_date}" end end
buy_xaction.rb
#!/usr/bin/ruby require '../lib/util.rb' require 'Date' custCount = ARGV[0].to_i daysCount = ARGV[1].to_i visitorPercent = ARGV[2].to_f custIDs = [] xactionHist = {} # transition probability matrix idGen = IdGenerator.new 1.upto custCount do custIDs << idGen.generate(10) end xid = Time.now().to_i date = Date.parse "2013-01-01" 1.upto daysCount do numXaction = visitorPercent * custCount factor = 85 + rand(30) numXaction = (numXaction * factor) / 100 1.upto numXaction do custID = custIDs[rand(custIDs.size)] if (xactionHist.key? custID) hist = xactionHist[custID] lastXaction = hist[-1] lastDate = lastXaction[0] lastAmt = lastXaction[1] numDays = date - lastDate if (numDays < 30) amount = lastAmt < 40 ? 50 + rand(20) - 10 : 30 + rand(10) - 5 elsif (numDays < 60) amount = lastAmt < 80 ? 100 + rand(40) - 20 : 60 + rand(20) - 10 else amount = lastAmt < 150 ? 180 + rand(60) - 30 : 120 + rand(40) - 20 end else hist = [] xactionHist[custID] = hist amount = 40 + rand(180) end xaction = [] xaction << date xaction << amount hist << xaction xid = xid + 1 puts "#{custID},#{xid},#{date},#{amount}" end date = date.next end