版权声明:本套技术专栏是做者(秦凯新)平时工做的总结和升华,经过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,若有任何学术交流,可随时联系。java
下载Anaconda3 Linux版本python
Anaconda3-5.3.1-Linux-x86_64.sh
复制代码
安装Anaconda3apache
bash Anaconda3-5.3.1-Linux-x86_64.sh -b
复制代码
环境变量配置PYSPARK_DRIVER_PYTHON以及PYSPARK_PYTHON配置浏览器
export SCALA_HOME=/usr/local/install/scala-2.11.8
export JAVA_HOME=/usr/lib/java/jdk1.8.0_45
export HADOOP_HOME=/usr/local/install/hadoop-2.7.3
export SPARK_HOME=/usr/local/install/spark-2.3.0-bin-hadoop2.7
export FLINK_HOME=/usr/local/install/flink-1.6.1
export ANACONDA_PATH=/root/anaconda3
export PYSPARK_DRIVER_PYTHON=$ANACONDA_PATH/bin/ipython
export PYSPARK_PYTHON=$ANACONDA_PATH/bin/python
export JRE_HOME=${JAVA_HOME}/jre
export CLASS_PATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=:${JAVA_HOME}/bin:${SCALA_HOME}/bin:${HADOOP_HOME}/bin:${SPARK_HOME}/bin:$PATH
export PATH=/root/anaconda3/bin:$PATH
复制代码
启动Saprkbash
启动jupyter notebook服务器
老版本
PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook --allow-root" pyspark
将来版本
PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=`jupyter notebook --allow-root` pyspark
复制代码
jupyter远程访问app
jupyter notebook --generate-config
vi ~/.jupyter/jupyter_notebook_config.py
c.NotebookApp.ip = '*' # 容许访问此服务器的 IP,星号表示任意 IP
c.NotebookApp.open_browser = False # 运行时不打开本机浏览器
c.NotebookApp.port = 12035 # 使用的端口,随意设置
c.NotebookApp.enable_mathjax = True # 启用 MathJax
c.NotebookApp.allow_remote_access = True
复制代码
jupyter NoteBook开发界面dom
lines=sc.textFile("/LICENSE")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
counts.count()
243
counts.first()
(' Apache License', 1)
复制代码
Standalone模式启动函数
PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook --allow-root" MASTER=spark://SparkMaster:7077 pyspark
复制代码
val intRDD=sc.parallelize(List(1,2,3))
intRDD.collect
Array[Int] = Array(1, 2, 3)
复制代码
python基础RDD操做oop
#parallelize
intRDD=sc.parallelize([1,2,3])
intRDD.collect()
[1, 2, 3]
StringRDD=sc.parallelize(["Apple","Orange"])
StringRDD.collect()
['Apple', 'Orange']
#具名函数
def addOne(x):
return x+1
intRDD.map(addOne).collect()
#匿名函数
intRDD=sc.parallelize([1,2,3])
intRDD.map(lambda x:x+1).collect()
[2, 3, 4]
#过滤器
intRDD.filter(lambda x:1< x and x<5).collect()
[2, 3]
#in
stringRDD =sc.parallelize(["apple","blue"])
stringRDD.filter(lambda x:"apple" in x).collect()
['apple']
#distinct
intRDD=sc.parallelize([1,2,3,2,7])
intRDD.distinct().collect()
[1, 2, 3, 7]
#randomSplit
sRDD=intRDD.randomSplit([0.4,0.6])
sRDD[0].collect()
[1, 2]
#groupBy
group=intRDD.groupBy(lambda x:"even" if(x%2==0) else "odd").collect()
print(group)
[('odd', <pyspark.resultiterable.ResultIterable object at 0x7f2186897978>), ('even', <pyspark.resultiterable.ResultIterable object at 0x7f21868978d0>)]
print (sorted(group[0][1]))
[1, 3, 7]
print (sorted(group[1][1]))
[2, 2]
复制代码
python多个RDD转换操做
intRDD1=sc.parallelize(["apple","blue"])
intRDD2=sc.parallelize([1,2])
intRDD3=sc.parallelize(["apple","blue"])
#合并运算
intRDD1.union(intRDD2).union(intRDD3).collect()
['apple', 'blue', 1, 2, 'apple', 'blue']
#交集运算
intRDD1=sc.parallelize([3,1,2,5,5])
intRDD2=sc.parallelize([5,6])
intRDD3=sc.parallelize([2,7])
intRDD1.intersection(intRDD2).collect()
[5]
intRDD1=sc.parallelize([3,1,2,5,5])
intRDD2=sc.parallelize([5,6])
intRDD3=sc.parallelize([2,7])
intRDD1.subtract(intRDD2).collect()
[2, 3, 1]
intRDD1.first()
intRDD1.take(3)
intRDD1.takeOrdered(3)
[1, 2, 3]
intRDD1.takeOrdered(3,lambda x:-x)
[5, 5, 3]
复制代码
Python RDD基于Key-Value转换
kvRDD1=sc.parallelize([(3,4),[3,6],[5,6],[1,2]])
kvRDD1.collect()
[(3, 4), [3, 6], [5, 6], [1, 2]]
kvRDD1.keys().collect()
[3, 3, 5, 1]
kvRDD1.values().collect()
[4, 6, 6, 2]
kvRDD1.filter(lambda keyvalue :keyvalue[0]<5).collect()
[(3, 4), [3, 6], [1, 2]]
kvRDD1.mapValues(lambda x:x*x).collect()
[(3, 16), (3, 36), (5, 36), (1, 4)]
kvRDD1.sortByKey(ascending=False).collect()
[[1, 2], (3, 4), [3, 6], [5, 6]]
kvRDD1.reduceByKey(lambda x,y:x+y).collect()
[(3, 10), (5, 6), (1, 2)]
复制代码
Python 多个RDD 转换操做
#join
kvRDD1=sc.parallelize([(3,4),[3,6],[5,6],[1,2]])
kvRDD2=sc.parallelize([(3,8)])
kvRDD1.join(kvRDD2).collect()
[(3, (4, 8)), (3, (6, 8))]
#左链接
kvRDD1.leftOuterJoin(kvRDD2).collect()
[(3, (4, 8)), (3, (6, 8)), (5, (6, None)), (1, (2, None))]
#右链接
kvRDD1.rightOuterJoin(kvRDD2).collect()
[(3, (4, 8)), (3, (6, 8))]
#去除掉相同的key
kvRDD1.subtractByKey(kvRDD2).collect()
[(5, 6), (1, 2)]
kvRDD1.countByKey()
defaultdict(int, {3: 2, 5: 1, 1: 1})
#建立字典,对于Key=3的以value=6为输出
KV1=kvRDD1.collectAsMap()
{3: 6, 5: 6, 1: 2}
KV1[3]
6
kvRDD1.lookup(3)
[4, 6]
复制代码
Python 的广播变量
kvFruit = sc.parallelize([(1,"apple"),(2,"orange"),(3,"banana"),(4,"grape")])
FruitMap=kvFruit.collectAsMap()
print(FruitMap)
#广播
broadcastFruitMap=sc.broadcast(FruitMap)
print(broadcastFruitMap.value)
{1: 'apple', 2: 'orange', 3: 'banana', 4: 'grape'}
#取出广播
fruitIds =sc.parallelize([2,4,3,1])
fruitNames =fruitIds.map(lambda x:broadcastFruitMap.value[x]).collect()
print ("水果名称" +str(fruitNames))
水果名称['orange', 'grape', 'banana', 'apple']
复制代码
Python 的累加器
intRDD=sc.parallelize([1,2,3])
total=sc.accumulator(0.0)
num=sc.accumulator(0)
intRDD.foreach(lambda i:[total.add(i),num.add(1)])
avg=total.value/num.value
print (str(total.value )+" "+ str(num.value) + " "+ str(avg))
6.0 3 2.0
复制代码
Python持久化操做
intRDD=sc.parallelize([1,2,3])
intRDD.persist()
intRDD.is_cached
#没有执行成功
intRDD.persist(StorageLevel.MEMORY_AND_DISK)
复制代码
python 综合案例
textFile=sc.textFile("/LICENSE")
stringRDD = textFile.flatMap(lambda line:line.split(" ")).map(lambda x: (x,1)).reduceByKey(lambda x,y:x+y)
print(stringRDD.take(10))
stringRDD.saveAsTextFile("/pythonWordCount")
[('', 1445), ('Apache', 6), ('License', 9), ('Version', 2), ('2.0,', 1), ('January', 1), ('2004', 1), ('http://www.apache.org/licenses/', 1), ('TERMS', 2), ('AND', 3)]
复制代码
经过Python技术栈与Spark大数据数据平台整合,咱们将实现python生态最完善的计算和可视化体系。
秦凯新 于深圳 201812132319