从.net parallel角度解读spark

对于我这样一个一直工做在.net平台上的developer来说,Hadoop,Spark,HBase等这些大数据名词比较陌生,对于分布式计算,.net上也有相似的Parallel(我说的不是HDInsight), 这篇文章是我尝试从.net上的Parallel类库的角度去讲述什么是spark。算法

 

咱们先从C#的一个烂大街的例子(不是Helloworld),统计一篇文章单词出现的频率。sql

下面C#代码是利用.net Parallel来写的统计单词出现频率。apache

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 
 7 namespace WordCountDemo
 8 {
 9     using System.IO;
10     using System.Threading;
11     class Program
12     {
13         /// <summary>
14         /// 咱们以计算一篇文章中单词的个数为例子
15         /// (计算文章单词个数的demo简直就是各类大数据计算的HelloWorld)。
16         /// 
17         /// WordCountFlow是数单词程序
18         /// WordCountDetail对WordCountFlow函数每一行进行拆解并作了详细解释。
19         /// </summary>
20         /// <param name="args"></param>
21         static void Main(string[] args)
22         {
23             string filePath = @"D:\BigDataSoftware\spark-2.1.0-bin-hadoop2.7\README.md";
24 
25             WordCountFlow(filePath);
26             Console.WriteLine("----------------------");
27             WordCountDetail(filePath);            
28         }
29 
30         /// <summary>
31         /// 数单词的程序流程
32         /// </summary>
33         /// <param name="filePath"></param>
34         static void WordCountFlow(string filePath)
35         {
36             File.ReadAllLines(filePath).AsParallel()
37                 .SelectMany(t => t.Split(' '))
38                 .Select(t => new { word = t, tag = 1 })
39                 .GroupBy(t => t.word).Select(t => new { word = t.Key, count = t.Select(p => p.tag).Aggregate((a, b) => a + b) })
40                 // 若是对Aggregate函数不熟悉,上面代码等同于下行
41                 //.GroupBy(t => t.word).Select(t => new { word = t.Key, count = t.Sum(p => p.tag) });
42                 .ForAll(t => Console.WriteLine($"ParationId:{Thread.CurrentThread.ManagedThreadId}   ({t.word}-{t.count})"));
43         }
44 
45         /// <summary>
46         /// 数单词程序流程的详细解释
47         /// </summary>
48         /// <param name="filePath"></param>
49         static void WordCountDetail(string filePath)
50         {
51             // 读取整篇文章,文章每一行将做为一个string存储到数组lines
52             string[] lines = File.ReadAllLines(filePath);
53             // AsParallel()是Parallel类库的核心方法,具体的意思是将string[] lines这个数组分割成几个分区(Partition)。
54             // 假设这篇文章有500行,那么这个方法会会把string[500]-lines分解成 (string[120] partitionA), 
55             // (string[180] partitionB), (string[150] partitionC),(...) 等几个Partition
56             // .net runtime将当前程序的负载(主要是cpu使用状况)状况为依据的分区算法来肯定到底要分红几个Partition,
57             // 咱们能够大概认为cpu有几个逻辑核(不许确),就会被分解成几个Partition。
58             // 后续的计算中.net runtime将会针对每个partition申请一个单独的线程来处理.
59             // 好比:partitionA由001号线程处理,partitionB由002号线程处理。。。
60             ParallelQuery<string> parallelLines = lines.AsParallel();
61             // linesA,linesB,linesC...数组中存储的每一行根据空格分割成单词,结果仍然是存放在ParallelQuery<string>这种分块的结构中
62             // 下面带有****的注释,若是对函数式编程没有了解,能够直接忽略。
63             // ****若是对函数式编程有所了解,会知道lambda天生lazy的,若是下面这行代码打个断点,当debug到这行代码的时候,
64             // ****鼠标移动到parallelWords上时,咱们不会看到每个单词,
65             // ****runtime并无真正将每一行分解成单词,这行代码仅仅是一种计算逻辑。
66             ParallelQuery<string> parallelWords = parallelLines.SelectMany(t => t.Split(' '));
67             // 将每个单子加上标记1,这行代码返回的类型为ParallelQuery<var>,var为runtime自动判断,此处var的类型的实际应该为 
68             // class 匿名类型
69             // { 
70             //        public word {get;set;}
71             //        public tag {get;set}
72             //}    
73             var wordparis = parallelWords.Select(t => new { word = t, tag = 1 });
74             // 根据单词进行分组,同一个分组中的单词个数求和,相似于以下sql  select word,count(tag) from wordparis group by word
75             // 注意,此处一样的单词可能分布在不一样的分区中,好比英语中常见的"the",可能partitionA中有3个"the",partitionB中有2个“the",
76             // 可是partitionA和partitionB分别被不一样的线程处理,若是runtime足够聪明的话,他应该先计算partitionA的the的个数(the,3),
77             // 而后计算partitionB的the的个数(the,2),最后将整个partition合并而且从新分割(shuffle),在作后续的计算
78             // shuffle后partition的分区和以前partition里面的数据会不一样。
79             // 此处wordcountParis的类型为
80             // class 匿名类型
81             // { 
82             //        public word {get;set;}
83             //        public count {get;set}
84             //}
85             var wordcountParis = wordparis.GroupBy(t => t.word).Select(t => new { word = t.Key, count = t.Select(p => p.tag).Aggregate((a, b) => a + b) });
86             // 打印结果。因为线程执行的乱序,能够看到输出的partitionId也是乱序。
87             wordcountParis.ForAll(t => Console.WriteLine($"ParationId:{Thread.CurrentThread.ManagedThreadId}   ({t.word}-{t.count})"));
88         }
89     }
90 }

   程序运行结果编程

  

 

  经过上面的c#的例子,咱们看到parallel如何将一篇文章分解成多个Partition来而且在不一样Partition上进行并行计算的,在计算过程当中,可能须要"shuffle",须要对原来的Partition进行从新洗牌。c#

  咱们假设,若是这个程序运行在集群上,这些Partition分布在不一样的机器上,这样就能够利用多台机器的力量而非一台机器多个线程的力量去作计算了,yeah!,你猜对了,这就是spark,下面的scala的wordCountFlow函数是在spark上统计单词出现频率的函数,与c#的WordCountFlow同样,也是五行代码,而且这五行代码的逻辑也彻底相同。只不过spark将数据分布在不一样的机器上,而且让机器进行计算,固然,如你所想,某些状况下须要shuffle,不一样机器上的数据将会被汇聚并从新分割成新的分区。虽然Spark中的partition和net parallel中的partition并不彻底对应(spark中的一台机器上可能有多个paratition) ,shuffle也是spark的专用词汇,但基本的原理是相似的。数组

package wordCountExample

import org.apache.spark.{SparkConf, SparkContext, TaskContext}

/**
  * Created by StevenChennet on 2017/3/10.
  */
object WordCount {
  def main(args: Array[String]): Unit = {
    // 文件路径
    val filePath="D:\\BigDataSoftware\\spark-2.1.0-bin-hadoop2.7\\README.md"

    wordCountFlow(filePath)
  }
  def wordCountFlow(filePath:String ):Unit={
    // sparkContext对象使用一个SparkConf对象来构造
    // SparkConf主要进行一些设置,好比说local【*】表示尽可能开启更多线程并行处理
    // SparkContext是spark执行任务的核心对象
    // 下面五行代码与C#的WordCountFlow五行代码一一对应
    new SparkContext(new SparkConf().setAppName("WordCount").setMaster("local[*]")).textFile(filePath)
      .flatMap(_.split(" "))
      .map((_,1))
      .reduceByKey(_+_)
      .foreach(t=>println( s"Partition: ${ TaskContext.getPartitionId() }  (${t._1}}-${t._2}})"))
  }
}

  据友情提醒,上面的Scala代码的lambda太难看了,我转换一下方式多线程

  

new SparkContext(new SparkConf().setAppName("WordCount").setMaster("local[*]")).textFile(filePath)
      .flatMap(line=>line.split(" "))
      .map(word=>(word,1))
      .reduceByKey((a,b)=>a+b)
      .foreach(t=>println( s"Partition: ${ TaskContext.getPartitionId() }  (${t._1}}-${t._2}})"))
  }

  

  程序运行结果分布式

  

  在net parallel中,若是某个线程在计算过程当中崩溃了,那可能致使整个程序都crash掉,若是是集群运算,由于一台宕机而让整个集群崩溃可不是一个好决策,spark能够在计算以前先对要计算的内容持久化,若是一台机器crash,能够将这台机器的计算任务拉到另一台机器上进行从新计算。函数式编程

相关文章
相关标签/搜索