Spark 数据ETL及部分代码示例

问题导读:




1.数据如何处理?
2.从数据中如何提取有用的特征?
3.有哪些衍生特征?









数据处理以及转化


python

一、当咱们完成了一些对数据集的探索和分析,咱们知道了一些关于用户数据以及电影数据的特征,接下来咱们该作些什么呢?


 

二、为了让原始数据可以在机器学习算法中变得有用,咱们首先须要清理以及在提取有用的特征值以前使用各类方法尽量地转化它。其中的转化和特征提取步骤是紧密链接的,并且在一些状况下,特定的转化就是一种特征值提取的过程。


 

三、咱们已经看过了在电影数据集中须要清理数据的例子。一般,现实的数据集包含坏的数据、丢失的数据以及异常值。理想状况下,咱们能够纠正错误的数据;可是,这一般都是不可能的。由于不少数据集来源于那些不可以重复的集合操做。丢失的数据以及异常值也是很常见的,它们能够用相似于坏数据的处理方法处理。总的来讲,归结为如下普遍的处理方法:


 

过滤掉或者移除坏数据以及丢失的数据:


 

有时候这是不可避免的;然而这也意味着丢失掉大部分坏的或丢失的记录。


 

填充坏掉或者丢失的数据:


 

咱们能够尽力地依据剩下的数据来给坏掉的或者丢失的数据赋值。好比赋给0值、平均值、中位数、附近的值或者类似值等方法。选择正确的方法一般是一件棘手的任务,这取决于数据、状况和本身的经验。


 

应用成熟的技术到异常值:


 

异常值的主要问题在于它们的值多是正确的,尽管它们是极端值。它们也有多是错误的。因此很难知道咱们处理的是哪一种状况。异常值也能够被移除或者填充。不过幸运的是,有是统计技术(如稳健回归)来处理异常值和极端值。


 

转化潜在的异常值:


 

另外一个处理异常值或者极端值得方法是转化。例如对数或者高斯内核转化,计算出潜在的异常值,或者显示大范围的潜在数据。这些类型的转换抑制了变量大尺度变化的影响并将非线性关系转化为一个线性的。



填充坏的或丢失的数据:

程序员

咱们以前已经见过过滤坏数据的例子了。咱们接着以前的代码,下面的代码段对坏数据应用了填充的方法,经过赋给数据点以相等于year中值的值。



 

[Python]   纯文本查看   复制代码
?
1
2
3
years_pre_processed = movie_fields. map ( lambda fields: fields[ 2 ]). 
map ( lambda x: convert_year(x)).collect() 
years_pre_processed_array = np.array(years_pre_processed) 





 

首先,咱们将在选择全部的发布年限后计算year的平均值和中位数,除了那些坏的数据。以后使用numpy函数,从years_pre_processed_array中查找坏数据的索引(参考以前咱们赋予1900给数据点)。最后,咱们使用这个索引来赋予中值给坏的数据:

[Python]   纯文本查看   复制代码
?
01
02
03
04
05
06
07
08
09
10
mean_year = np.mean(years_pre_processed_array[years_pre_processed_ 
array! = 1900 ]) 
median_year = np.median(years_pre_processed_array[years_pre_processed_ 
array! = 1900 ]) 
index_bad_data = np.where(years_pre_processed_array = = 1900 )[ 0 ][ 0
years_pre_processed_array[index_bad_data] = median_year 
print "Mean year of release: %d" % mean_year 
print "Median year of release: %d" % median_year 
print "Index of '1900' after assigning median: %s" % np.where(years_ 
pre_processed_array = = 1900 )[ 0 ]


 

打印结果应该相似于以下:

[Python]   纯文本查看   复制代码
?
1
2
3
Mean year of release: 1989 
Median year of release: 1995 
Index of '1900' after assigning median: []



 

在这里咱们计算出year的平均值和中位数,从输出结果中咱们能够看出,year的中位数由于year的倾斜分布要比平均值高许多。尽管直接决定使用一个精确的值去填充数据不是常见的作法,可是因为数据的倾斜,使用中位数去赋值是一种可行的方法。




从数据中提取有用的特征

算法

一、当咱们完成了对数据初始的处理和清洗,咱们就能够准备从数据中提取一些实际有用的特征,这些特征数据能够用于之后的机器学习模型中的训练。


 

二、特征数据是指咱们用于训练模型的一些变量。每行数据都有可能包含能够提取用于训练的样例。几乎全部的机器学习模型都是工做在以数字为技术的向量数据上。所以,咱们须要将粗糙的数据转化为数字。


特征数据能够分为如下几类:

express

数字特征


 

这类特征数据是指一些数值类型的数据。


 

分类特征


 

这类特征数据表明一些相同特性的,能够归为一类的一些数据。例如用户的性别、职位或者电影的类型。


 

文本特征


 

这类特征数据是从数据中的文本内容中派生出来的,例如电影名称,描述,以及评论。


 

其余特征


 

这类特征数据都会转化为以数字为表明的特征,例如图片,视频,音频均可以表示为数字数据的集合。地理位置能够表明为经度、纬度或者经纬度之差。



数字特征

编程

一、旧数字和提取的新的特征数值有什么区别呢?其实,在现实生活中,任何的数值数据均可以做为输入变量,但在机器学习模型中,咱们学习的是每一个特征的向量权重,例如监督学习模型。


 

二、所以,咱们须要使用那些有意义的特征数据,那些模型能够从特征值与目标数据之间学习关系的特征数据。例如,年龄就是一个合理的特征数据,好比年龄的增加和产出有着直接的关系,一样,身高也是能够直接使用的数值特征。



分类特征

数组

一、分类特征数据不能直接使用它们原有的粗糙的格式做为输入使用,由于它们不是数字。可是它们其中的一些衍生值能够做为输入的变量。好比以前所说的职位就能够有学生、程序员等。


 

二、这些分类变量只是名义上的变量,由于它们不存在变量值之间的顺序的概念。相反,当变量之间存顺序概念时,咱们会倾向于使用这些常见有序的变量。


 

三、为了把这些分类变量转化为数字表示,咱们可使用经常使用的方法,例如1-of-k编码。这种方法须要把那些名义上的变量转化为对机器学习任务有用的数据。常见那些粗糙格式的数据都会以名义上的变量形式编码为有意义的数据。


 

四、咱们假设这里有k个值能够供变量获取,若是咱们能够给每一个值都赋予1到k中的索引,而后咱们就可使用程度为k的二进制向量表示一个值了。初始的实体中,向量表示的二进制值都是0,当咱们赋予变量一个状态的时候,所对应的二进制向量中对应的索引值由0变成1。


 

例如,咱们先获取上面所说的职位的全部类别变量:

[Python]   纯文本查看   复制代码
?
1
2
3
all_occupations = user_fields. map ( lambda fields: fields[ 3 ]). 
distinct().collect() 
all_occupations.sort()



 

接着咱们能够赋给每一个可能的职位类别一个值(值得索引从零开始,由于在Python、Scala、Java数组中索引都是从0开始的)

[Python]   纯文本查看   复制代码
?
1
2
3
4
5
6
7
8
9
idx = 0 
all_occupations_dict = {} 
for o in all_occupations: 
     all_occupations_dict[o] = idx 
idx + = 1 
# try a few examples to see what "1-of-k" encoding is assigned 
print "Encoding of 'doctor': %d" % all_occupations_dict[ 'doctor'
print "Encoding of 'programmer': %d" % all_occupations_ 
dict [ 'programmer' ]


 

你将看到以下打印结果:

[Python]   纯文本查看   复制代码
?
1
2
Encoding of 'doctor' : 2 
Encoding of 'programmer' : 14


 

最后咱们能够对上面打印的结果中programmer进行编码,咱们能够首先建立一个长度为k(在这个案例中)的numpy数组而且值所有填0(咱们将使用numpy数组中的zeros函数建立这个数组)。


 

咱们将提取单词programmer的索引并赋予1给数组的这个索引:

[Python]   纯文本查看   复制代码
?
1
2
3
4
5
6
K = len (all_occupations_dict) 
binary_x = np.zeros(K) 
k_programmer = all_occupations_dict[ 'programmer'
binary_x[k_programmer] = 1 
print "Binary feature vector: %s" % binary_x 
print "Length of binary vector: %d" % K



 

上面结果将呈现给咱们长度为21的二进制特征的向量:

[Python]   纯文本查看   复制代码
?
1
2
3
Binary feature vector: [ 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 
                          0. 0. 1. 0. 0. 0. 0. 0. 0.
Length of binary vector: 21

衍生特征

app

一、一般会从一或多个可得到的变量中计算出衍生特征是颇有用的,咱们但愿那些衍生特征能够相比于原来粗糙格式的变量添加更多的信息。


 

二、例如,咱们能够计算全部电影评分数据中的用户平均评分,用户平均评分将提供针对用户截差的模型。咱们已经获取了粗糙的评分数据,而且建立了新的可让咱们学习更好模型的特征。


 

三、从粗糙数据中获取衍生特征数据的例子包括平均值、中位数、求和、最大值、最小值以及总数等。好比在电影数据中,咱们能够经过如今的年限减去电影发布年限得到电影的年龄。


 

四、一般,这些转化用来产生数值数据以便于更好的让模型去学习。


 

五、把数字特征值转化为分类特征值也很常见,好比



转化timestamps值为分类特征值


dom

为了演示怎样从数字特征值衍生为分类特征值,咱们将使用电影评分数据中的评分时间。这些时间都是Unix timestamps格式。咱们能够用Python的datetime模块去从timestamp中获取date和time,而后提取day中的hour。这将为每一个评分中day的hour成一个RDD。


 

咱们将须要一个函数去提取表明评分timestamp的datetime:

[Python]   纯文本查看   复制代码
?
1
2
3
def extract_datetime(ts): 
     import datetime 
     return datetime.datetime.fromtimestamp(ts)


 

咱们继续使用以前例子之中计算出的rating_data RDD


 

首先,咱们使用map转化提取timestamp列,把它转化为Python中int类型。对每一个timestamp应用extract_datetime方法,而后从结果datetime对象中提取hour:
[Python]   纯文本查看   复制代码
?
1
2
3
timestamps = rating_data. map ( lambda fields: int (fields[ 3 ])) 
hour_of_day = timestamps. map ( lambda ts: extract_datetime(ts).hour) 
hour_of_day.take( 5 )


 

若是咱们从结果RDD中获取前五条记录,咱们将看到如下输出结果:

[Python]   纯文本查看   复制代码
?
1
[ 17 , 21 , 9 , 7 , 7 ]


 

至此咱们已经将粗糙的时间数据转化为了评分数据中表明day中hour的分类特征数据


 

如今,咱们说的这种转化可能优势粗糙,也许咱们想更加贴切地定义转化。咱们能够将天天中的小时转化为表明天天时间中的块。例如咱们能够定义morning是从7 am到 11 am、lunch是从11 am到 1am等。使用这些块,咱们能够建立方法给天天中的时间赋值,下面将day中的hour做为输入:
[Python]   纯文本查看   复制代码
?
01
02
03
04
05
06
07
08
09
10
11
def assign_tod(hr): 
     times_of_day =
         'morning' : range ( 7 , 12 ), 
         'lunch' : range ( 12 , 14 ), 
         'afternoon' : range ( 14 , 18 ), 
         'evening' : range ( 18 , 23 ), 
         'night' : range ( 23 , 7
    
for k, v in times_of_day.iteritems(): 
     if hr in v: 
     return k


 

如今,咱们能够将assign_tod函数应用到存在于hour_of_day RDD中的每一个评分记录中的hour上。

[Python]   纯文本查看   复制代码
?
1
2
time_of_day = hour_of_day. map ( lambda hr: assign_tod(hr)) 
time_of_day.take( 5 )


 

若是咱们获取这个RDD的前5条记录,咱们将看到以下转化后的值:

[Python]   纯文本查看   复制代码
?
1
[ 'afternoon' , 'evening' , 'morning' , 'morning' , 'morning' ]



 

到此,咱们已经将timestamp变量转化为24小时格式的hours变量,以及自定义的天天中的时间值。所以咱们已经有了分类特征值,可使用以前介绍的1-of-k编码方法去生成二进制特征的向量。



文本特征值

机器学习

一、在某些状况下,文本特征值是以分类以及衍生特征存在的。咱们拿电影的描述信息做为例子。这里,粗糙的数据不能被直接使用,即便是做为分类特征,由于若是每一个文本都有值,那将会产生无限种可能组合的单词。咱们的模型几乎不会出现两种相同特征,就算有那么学习效率也不会高。所以,咱们但愿将原始文本变成一种更适合机器学习的形式。


 

二、有不少的方法能够处理文本,并且天然语言领域处理致力于处理、呈现和模型化文本内容。咱们将介绍简单和标准的方法来实现文本特征提取,这个方法就是词袋模型表示。


 

三、词袋模型将文本块视为单词的集合以及可能存在的数字,词袋方法的处理以下:


 

标记:首先,一些形式的标记用于将文本分割为标记的集合(通常是单词,数字等)。例如常见的空格标记,将文本按照每一个空格分隔,还有其余的一些标点和非字母数字的标记。


 

能够移除的中止词:通常咱们会移除文本中很是常见的词,例如”the”、”and”、”but”(这些都称为中止词)。


 

词干提取:接下来的操做包括词干提取,一种获取输入项,而后将其提取为其最基础的值。一个常见的例子就是复数编程单数,或者dogs变成dog。有不少方法能够实现词干提取,有不少文本处理库也包含各类词干提取算法。


 

向量化:最后一步是将处理项转化为向量表示形式。最简单的形式也许就是二进制的向量表示形式,若是一个处理项包含在文本中,咱们就给它赋值为1,若是没有就赋值为0。本质上是咱们以前提到的分类的1-of-k编码。相似1-of-k编码,这里须要一个字典将这些项映射为一个个索引。也许你会想到,这里可能存在几百万单独项。所以,使用稀疏向量表示形式是很是严格的,只在那些处理项已被保存的状况下使用,这样能够节省内存、磁盘空间以及处理时间。



简单文本特征提取

函数

咱们使用电影评分数据中的电影名称演示以二进制向量方法提取文本特征值。


 

首先咱们建立函数去除每部电影的发布年限,仅留下电影名称。


 

电影数据示例:

[Python]   纯文本查看   复制代码
?
1
1 |Toy Story ( 1995 )| 01 - Jan - 1995 ||[url = http: / / us.imdb.com / M / title - exact?Toy % 20Story % 20 ]http: / / us.imdb.com / M / title - exact?Toy % 20Story % 20 [ / url]( 1995 )| 0 | 0 | 0 | 1 | 1 | 1 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0



 

咱们将使用Python的 regular expression模块re,去搜索出存在于电影名称列的电影发布年限。当咱们匹配到这个regular expression,咱们将只提取出电影名称,示例:

[Python]   纯文本查看   复制代码
?
01
02
03
04
05
06
07
08
09
10
def extract_title(raw): 
     import re 
     # this regular expression finds the non-word (numbers) between 
     parentheses 
     grps = re.search( "(\w+)" , raw) 
     if grps: 
     # we take only the title part, and strip the trailingwhite spacefrom the remaining text, below 
         return raw[:grps.start()].strip() 
     else
         return raw



 

接下来,咱们将从movie_fields RDD中提取出粗糙的电影名称:

[Python]   纯文本查看   复制代码
?
1
2
/ / 包含电影发布年限,格式:Toy Story ( 1995
raw_titles = movie_fields. map ( lambda fields: fields[ 1 ])


 

而后咱们经过下面的代码提取5条记录测试extract_title函数的功能:

[Python]   纯文本查看   复制代码
?
1
2
for raw_title in raw_titles.take( 5 ): 
     print extract_title(raw_title)



 

经过打印结果咱们能够验证函数执行状况,打印结果示例:

[Python]   纯文本查看   复制代码
?
1
2
3
4
5
Toy Story 
GoldenEye 
Four Rooms 
Get Shorty 
Copycat


 

咱们将应用函数以及标记模式来提取电影名称为单个元素,下面咱们使用简单地空格标记来分离电影名称。

[Python]   纯文本查看   复制代码
?
1
2
3
4
5
movie_titles = raw_titles. map ( lambda m: extract_title(m)) 
# next we tokenize the titles into terms. We'll use simple whitespace 
tokenization 
title_terms = movie_titles. map ( lambda t: t.split( " " )) 
print title_terms.take( 5 )


 

打印结果:

[Python]   纯文本查看   复制代码
?
1
[[u 'Toy' , u 'Story' ], [u 'GoldenEye' ], [u 'Four' , u 'Rooms' ], [u 'Get' ,u 'Shorty' ], [u 'Copycat' ]]



 

如今咱们能够看出电影名称以及被按照空格分离为单个的标记了。


 

为了给每一项赋值一个向量的索引,咱们须要建立词典,将每一项都映射到一个整数索引。


 

首先,咱们将使用Spark的flatMap函数来扩张title_terms RDD中每条记录的list字符串,转化为每条记录都是一项的名为all_terms的RDD。


 

咱们获取全部的惟一项,而后赋值索引,就像以前的对职位操做的1-of-k编码。

[Python]   纯文本查看   复制代码
?
01
02
03
04
05
06
07
08
09
10
# next we would like to collect all the possible terms, in order to 
build out dictionary of term < - > index mappings 
all_terms = title_terms.flatMap( lambda x: x).distinct().collect() 
# create a new dictionary to hold the terms, and assign the "1-of-k" 
indexes 
idx = 0 
all_terms_dict = {} 
for term in all_terms: 
     all_terms_dict[term] = idx 
idx + = 1



 

咱们打印出惟一项的总数来测试咱们的map功能是否正常工做:

[Python]   纯文本查看   复制代码
?
1
2
3
print "Total number of terms: %d" % len (all_terms_dict) 
print "Index of term 'Dead': %d" % all_terms_dict[ 'Dead'
print "Index of term 'Rooms': %d" % all_terms_dict[ 'Rooms' ]


 

打印结果:

[Python]   纯文本查看   复制代码
?
1
2
3
Total number of terms: 2645 
Index of term 'Dead' : 147 
Index of term 'Rooms' : 1963



 

咱们也可使用Spark的zipWithIndex函数来更加有效地实现上面的结果,这个函数获取values的RDD而后经过索引合并它们而且建立一个新的key-value对RDD,这个新的RDD的key就是惟一项,value是这个项的字典索引。咱们经过使用collectAsMap函数来将这个key-value RDD做为Python字典方法传入driver。

[Python]   纯文本查看   复制代码
?
1
2
3
4
all_terms_dict2 = title_terms.flatMap( lambda x: x).distinct(). 
zipWithIndex().collectAsMap() 
print "Index of term 'Dead': %d" % all_terms_dict2[ 'Dead'
print "Index of term 'Rooms': %d" % all_terms_dict2[ 'Rooms' ]



 

打印结果:

[Python]   纯文本查看   复制代码
?
1
2
Index of term 'Dead' : 147 
Index of term 'Rooms' : 1963


 

最后一步是建立一个函数将惟一项的集合转化为一个稀疏的向量表示形式。为了达到效果,咱们将建立一个空的,有一行以及和字典中惟一项总数的列的稀疏矩阵。而后咱们将经过输入列表中的每一项来检查这一项是否存在于咱们的惟一项字典中。若是是,咱们将给这个字典中对应的这个惟一项的索引赋值为1。


 

[Python]   纯文本查看   复制代码
?
01
02
03
04
05
06
07
08
09
10
11
12
# this function takes a list of terms and encodes it as a scipy sparse 
vector using an approach 
# similar to the 1-of-k encoding 
def create_vector(terms, term_dict): 
     from scipy import sparse as sp 
         num_terms = len (term_dict) 
         x = sp.csc_matrix(( 1 , num_terms)) 
         for t in terms: 
相关文章
相关标签/搜索