python之路 之一pyspark

pip包下载安装pysparksql

pip install pyspark  这里可能会遇到安装超时的状况   加参数  --timeout=100架构

pip   -default   -timeout=100     install -U pyspark app

下面是我写的一些代码,在运行时,没什么问题,可是目前不知道怎么拿到rdd与dataframe中的值 分布式

from pyspark import SparkConf, SparkContextfrom pyspark.sql import HiveContext,Row,DataFramefrom pyspark.sql.types import StructType,StructField,StringType,IntegerTypeappname = "myappname"master = "local"myconf = SparkConf().setAppName(appname).setMaster(master)sc = SparkContext(conf=myconf)hc = HiveContext(sc)#  构建一个表格 Parallelize a list and convert each line to a Row  将列表并行化并将每行转换为一行#  构建表能够用applySchema  或者 inferSchema  inferSchema已经在1.5以后弃用,由createDataFrame代替datas = ["1 b 28", "3 c 30", "2 d 29"]source = sc.parallelize(datas)splits = source.map(lambda line: line.split(" "))  # 后面是注释rows = splits.map(lambda words : Row(id=int(words[0]),name=words[1],age=int(words[2])))myrows = Row(id="a",name="zhangkun",age="28")#print(myrows.__getitem__(0))#print(myrows.__getitem__(1))#print(myrows.__getitem__(2))# Infer the schema,and register the schema as a table 推断架构,并将架构注册为表fields=[]fields.append(StructField("id", IntegerType(), True))fields.append(StructField("name", StringType(), True))fields.append(StructField("age", IntegerType(), True))schema = StructType(fields)people=hc.createDataFrame(myrows,schema);  # 1.5以前使用的是inferSchema# people.printSchema()people.registerTempTable("people")#  SQL can be run over SchemaRDD that been registered as a table  sql 能够在注册过的表上正常运行了results=hc.sql("select * from people")#print(results.show)for i in results :    print(i)sc.stop()忽然来个新任务,CDH部署大数据分布式平台 ,含如下组建安装:hadoop、hbase、hive、kafka、spark  暂时上面的线搁置,等回头用到在看,主要仍是本人基础比较差,须要多学习一些基础。
相关文章
相关标签/搜索