博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark(九) -- SparkSQL API编程
阅读量:6349 次
发布时间:2019-06-22

本文共 3514 字,大约阅读时间需要 11 分钟。

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/45957991

本文测试的Spark版本是1.3.1

Text文本文件测试

一个简单的person.txt文件内容为:

JChubby,13Looky,14LL,15

分别是Name和Age

在Idea中新建Object,原始代码如下:

object  TextFile{
def main(args:Array[String]){ }}

SparkSQL编程模型:

第一步:

需要一个SQLContext对象,该对象是SparkSQL操作的入口
而构建一个SQLContext对象需要一个SparkContext

第二步:

构建好入口对象之后,要引入隐式转换的方法,作用是将读取到的各种文件转换成DataFrame,DataFrame是SparkSQL上进行统一操作的数据类型

第三步:

根据数据的格式,构建一个样例类。作用是提供将读取到的各种各样的数据类型隐式转换成一个统一的数据格式,方便编程

第四步:

使用SQLContext对象读取文件,并将其转换成DataFrame

第五步:

对数据进行相关操作。
1.DataFrame自带的操作方式。DataFrame提供了很多操作数据的方法,如where,select等

2.DSL方式。DSL其实使用的也是DataFrame提供的方法,但是在操作属性时可以方便的使用’ + 属性名的方式进行操作

3.将数据注册成表,通过SQL语句操作

object  TextFile{    def main(args:Array[String]){        //第一步        //构建SparkContext对象,主要要使用new调用构造方法,否则就变成使用样例类的Apply方法了        val sc = new SparkContext()        //构建SQLContext对象        val sqlContext = new SQLContext(sc)        //第二步        import sqlContext.implicits._        //第三步        case Person(name:String,age:Int)        //第四步,textFile从指定路径读取文件如果是集群模式要写hdfs文件地址;通过两个map操作将读取到的文件转换成Person类的对象,每一行对应一个Person对象;toDF将其转换成DataFrame        val people = sc.textFile("文件路径").map(_.split(",")).map{
case (name,age) => Person(name,age.toInt)}.toDF() //第五步 //DataFrame方法 println("------------------------DataFrame------------------------------------") //赛选出age>10的记录,然后只选择name属性,show方法将其输出 people.where(people("age") > 10).select(people("name")).show() //DSL println("---------------------------DSL---------------------------------") people.where('age > 10).select('name).show() //SQL println("-----------------------------SQL-------------------------------") //将people注册成people表 people.registerTempTable("people") //使用sqlContext的sql方法来写SQL语句 //查询返回的是RDD,所以对其进行collect操作,之后循环打印 sqlContext.sql("select name from people where age > 10").collect.foreach(println) //保存为parquet文件,之后的parquet演示会用到 people.saveAsParquet("保存的路径") }}

parquet格式文件测试:

val sc = new SparkContext()    val sql = new SQLContext(sc)    import sql.implicits._    val parquet = sql.parquetFile(args(0))    println("------------------------DataFrame------------------------------------")    println(parquet.where(parquet("age") > 10).select(parquet("name")).show())    println("---------------------------DSL---------------------------------")    println(parquet.where('age > 10).select('name).show())    println("-----------------------------SQL-------------------------------")    parquet.registerTempTable("parquet")    sql.sql("select name from parquet where age > 10").map(p => "name:" + p(0)).collect().foreach(println)

Json格式测试:

val sc = new SparkContext()    val sql = new SQLContext(sc)    import sql.implicits._    val json = sql.jsonFile(args(0))    println("------------------------DataFrame------------------------------------")    println(json.where(json("age") > 10).select(json("name")).show())    println("---------------------------DSL---------------------------------")    println(json.where('age > 10).select('name).show())    println("-----------------------------SQL-------------------------------")    json.registerTempTable("json")    sql.sql("select name from json where age > 10").map(p => "name:" + p(0)).collect().foreach(println)

可以看到上面的代码几乎和读取文本文件的一模一样,只不顾sc在读取文件的时候使用了parquetFile/jsonFile方法,而之后的操作是一摸一样的

由于parquet和json数据读取进来就是一个可操作的格式并且会自动转换成DataFrame,所以省去了case class的定义步骤和toDF的操作

以上为SparkSQL API的简单使用

你可能感兴趣的文章
[LeetCode] Spiral Matrix 解题报告
查看>>
60906磁悬浮动力系统应用研究与模型搭建
查看>>
指纹获取 Fingerprint2
查看>>
面试题目3:智能指针
查看>>
取消凭证分解 (取消公司下的多个利润中心)
查看>>
flask ORM: Flask-SQLAlchemy【单表】增删改查
查看>>
vim 常用指令
查看>>
nodejs 获取自己的ip
查看>>
Nest.js 处理错误
查看>>
你好,C++(16)用表达式表达我们的设计意图——4.1 用操作符对数据进行运算...
查看>>
18.3 redis 的安装
查看>>
jdbc 简单连接
查看>>
Activiti 实战篇 小试牛刀
查看>>
java中的Static class
查看>>
Xshell 连接CentOS服务器解密
查看>>
[工具类]视频音频格式转换
查看>>
GNS3与抓包工具Wireshark的关联
查看>>
groovy-语句
查看>>
VIM寄存器使用
查看>>
Java VisualVM远程监控JVM
查看>>