1、Spark SQL 是一个用来处理结构化数据的spark组件。它提供了一个叫做DataFrames的可编程抽象数据模型,并且可被视为一个分布式的SQL查询引擎。DataFrame是由“命名列”(类似关系表的字段定义)所组织起来的一个分布式数据集合。你可以把它看成是一个关系型数据库的表。
DataFrame可以通过多种来源创建:结构化数据文件,hive的表,外部数据库,或者RDDs,
Spark SQL 的功能是通过 SQLContext 类来使用的,而创建 SQLContext 是通过 SparkContext 创建的。可以使用SQLContext 或 RDD 来创建 DataFrames
2、 SparkSQL引入了一种新的RDD——SchemaRDD, SchemaRDD很象传统数据库中的表, SchemaRDD除了可以和RDD一样操作外,还可以通过registerTempTable注册成临时表,然后通过SQL语句进行操作。
- 通过定义case class,使用反射推断Schema(case class方式)
- 通过可编程接口,定义Schema,并应用到RDD上(applySchema 方式)
前者使用简单、代码简洁,适用于已知Schema的源数据上;后者使用较为复杂,但可以在程序运行过程中实行,适用于未知Schema的RDD上。
对于case class方式,首先要定义case class,在RDD的transform过程中使用case class可以隐式转化成SchemaRDD,然后再使用registerTempTable注册成表。注册成表后就可以在sqlContext对表进行 操作,如select 、insert、join等
- 从源RDD创建rowRDD
- 创建与rowRDD匹配的Schema
- 将Schema通过applySchema应用到rowRDD
3、Spark SQL如何使用
首先,利用sqlContext从外部数据源加载数据为DataFrame
然后,利用DataFrame上丰富的api进行查询、转换
最后,将结果进行展现或存储为各种外部数据形式
如图所示:

//示例源文件
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
//1导入数据源
val df = sqlContext.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
//// df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
df.show() // 输出数据源内容
//// +----+-------+
//// | age| name|
//// +----+-------+
//// |null|Michael|
//// | 30| Andy|
//// | 19| Justin|
//// +----+-------+
//-----DataFrames 处理结构化数据的一些基本操作
df.select("name").show() // 只显示 "name" 列
//// +-------+
//// | name|
//// +-------+
//// |Michael|
//// | Andy|
//// | Justin|
//// +-------+
df.select(df("name"), df("age") + 1).show() // 将 "age" 加 1
//// +-------+---------+
//// | name|(age + 1)|
//// +-------+---------+
//// |Michael| null|
//// | Andy| 31|
//// | Justin| 20|
//// +-------+---------+
df.filter(df("age") > 21).show() # 条件语句
//// +---+----+
//// |age|name|
//// +---+----+
//// | 30|Andy|
//// +---+----+
df.groupBy("age").count().show() // groupBy 操作
//// +----+-----+
//// | age|count|
//// +----+-----+
//// |null| 1|
//// | 19| 1|
//// | 30| 1|
//// +----+-----+
//-------使用 SQL 语句来进行操作---
df.registerTempTable("people") // 将 DataFrame 注册为临时表 people
val result = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19") // 执行 SQL 查询
result.show() // 输出结果
//// +------+---+
//// | name|age|
//// +------+---+
//// |Justin| 19|
//// +------+---+
4、与Hive结合
spark可以通过读取hive的元数据来兼容hive,读取hive的表数据,然后在spark引擎中进行sql统计分析,效率高于hive, 配置如下:
将 $HIVE_HOME/conf/hive-site.xml copy或者软链 到 $SPARK_HOME/conf/ 将 $HIVE_HOME/conf/core-site.xml copy或者软链 到 $SPARK_HOME/conf/ 将 $HIVE_HOME/conf/hdfs-site.xml copy或者软链 到 $SPARK_HOME/conf/
运行:
$ ./bin/spark-sql -master spark://master:7077 --jars /data/mylib/mysql-connector-java-5.1.12.jar #使用spark-sql
spark-sql> show tables;
default
$ ./bin/spark-shell -master spark://master:7077 --jars /data/mylib/mysql-connector-java-5.1.12.jar #使用spark-shell
scala> sqlContext.sql("show tables").collect().foreach(println)
default
Spark sql类似于hive,可以支持sql语法来对海量数据进行分析查询,跟hive不同的是,hive执行sql任务的底层运算引擎采用mapreduce运算框架,而sparksql执行sql任务的运算引擎是spark core,从而充分利用spark内存计算及DAG模型的优势,大幅提升海量数据的分析查询速度
5 使用Scala编写,并提交运行
/*SparkSqlDemo.scala*/
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
//define case class
case class Person(name:String,age:Int)
object SparkSqlDemo {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple SparkSqlDemo")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
//person.txt 格式为
/*zhangsan,14
lisi,27
wangwu,31
*/
//Convert user data RDD to a DataFrame and register it as a temp table
//// 用数据集文本文件创建一个Customer对象的DataFrame
val dfpeople=sc.textFile("hdfs://hd1:9000/test/person.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF()
// 将DataFrame注册为一个表
dfpeople.registerTempTable("tempTable")
// 显示DataFrame的内容
dfpeople.show()
// 打印DF模式
dfpeople.printSchema()
// 选择客户名称列
dfpeople.select("name").show()
// 选择客户名称和age列
dfpeople.select("name", "age").show()
// 根据age选择客户
//dfpeople.filter(dfCustomers("age").equalTo(11)).show()
// 根据age统计客户数量
//dfCustomers.groupBy("age").count().show()
var result = sqlContext.sql("SELECT * FROM tempTable")
result.collect().foreach(println)
}
}
提交编译
注意Spark内嵌Scala的版本,和Scala的版本,最好保持一样,否则会出错。
$ scalac -classpath $CLASSPATH:/data/app/spark-1.5.2/lib/*:/data/app/scala-2.10.4/lib/* SparkSqlDemo.scala
打包
jar -cvf sql.jar SparkSqlDemo*.class Person*
执行程序
$ data/app/spark-1.5.2/bin/spark-submit --class SparkSqlDemo --master spark://hd1:7077 sql.jar

