登录 |  注册 |  繁體中文


Spark SQL详解

分类: Spark 颜色:橙色 默认  字号: 阅读(2627) | 评论(0)

1、Spark SQL 是一个用来处理结构化数据的spark组件。它提供了一个叫做DataFrames的可编程抽象数据模型,并且可被视为一个分布式的SQL查询引擎。DataFrame是由“命名列”(类似关系表的字段定义)所组织起来的一个分布式数据集合。你可以把它看成是一个关系型数据库的表。
DataFrame可以通过多种来源创建:结构化数据文件,hive的表,外部数据库,或者RDDs, 

Spark SQL 的功能是通过 SQLContext 类来使用的,而创建 SQLContext 是通过 SparkContext 创建的。可以使用SQLContext 或 RDD 来创建 DataFrames

2、 SparkSQL引入了一种新的RDD——SchemaRDD, SchemaRDD很象传统数据库中的表, SchemaRDD除了可以和RDD一样操作外,还可以通过registerTempTable注册成临时表,然后通过SQL语句进行操作。

 Spark提供了两种方式将RDD转换成SchemaRDD:
  • 通过定义case class,使用反射推断Schema(case class方式)
  • 通过可编程接口,定义Schema,并应用到RDD上(applySchema 方式)

前者使用简单、代码简洁,适用于已知Schema的源数据上;后者使用较为复杂,但可以在程序运行过程中实行,适用于未知Schema的RDD上。

对于case class方式,首先要定义case class,在RDD的transform过程中使用case class可以隐式转化成SchemaRDD,然后再使用registerTempTable注册成表。注册成表后就可以在sqlContext对表进行 操作,如select 、insert、join等

applySchema 方式比较复杂,通常有3步过程:
  • 从源RDD创建rowRDD
  • 创建与rowRDD匹配的Schema
  • 将Schema通过applySchema应用到rowRDD

3、Spark SQL如何使用

首先,利用sqlContext从外部数据源加载数据为DataFrame
然后,利用DataFrame上丰富的api进行查询、转换
最后,将结果进行展现或存储为各种外部数据形式
如图所示:

 
3、DataFrames简单示例
//示例源文件
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

//1导入数据源
val df = sqlContext.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
//// df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
 
df.show()     // 输出数据源内容
//// +----+-------+
//// | age|   name|
//// +----+-------+
//// |null|Michael|
//// |  30|   Andy|
//// |  19| Justin|
//// +----+-------+

//-----DataFrames 处理结构化数据的一些基本操作

df.select("name").show()    // 只显示 "name" 列
//// +-------+
//// |   name|
//// +-------+
//// |Michael|
//// |   Andy|
//// | Justin|
//// +-------+
 
df.select(df("name"), df("age") + 1).show()   // 将 "age" 加 1
//// +-------+---------+
//// |   name|(age + 1)|
//// +-------+---------+
//// |Michael|     null|
//// |   Andy|       31|
//// | Justin|       20|
//// +-------+---------+
 
df.filter(df("age") > 21).show()     # 条件语句
//// +---+----+
//// |age|name|
//// +---+----+
//// | 30|Andy|
//// +---+----+
 
df.groupBy("age").count().show()   // groupBy 操作
//// +----+-----+
//// | age|count|
//// +----+-----+
//// |null|    1|
//// |  19|    1|
//// |  30|    1|
//// +----+-----+

//-------使用 SQL 语句来进行操作---

df.registerTempTable("people")     // 将 DataFrame 注册为临时表 people
val result = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")  // 执行 SQL 查询
result.show()        // 输出结果
//// +------+---+
//// |  name|age|
//// +------+---+
//// |Justin| 19|
//// +------+---+

4、与Hive结合

spark可以通过读取hive的元数据来兼容hive,读取hive的表数据,然后在spark引擎中进行sql统计分析,效率高于hive, 配置如下

将 $HIVE_HOME/conf/hive-site.xml copy或者软链 到 $SPARK_HOME/conf/ 
将 $HIVE_HOME/conf/core-site.xml copy或者软链 到 $SPARK_HOME/conf/  
将 $HIVE_HOME/conf/hdfs-site.xml copy或者软链 到 $SPARK_HOME/conf/

运行:

$ ./bin/spark-sql -master spark://master:7077 --jars /data/mylib/mysql-connector-java-5.1.12.jar #使用spark-sql
spark-sql> show tables;
default

$ ./bin/spark-shell -master spark://master:7077 --jars /data/mylib/mysql-connector-java-5.1.12.jar #使用spark-shell
scala> sqlContext.sql("show tables").collect().foreach(println)
default

Spark sql类似于hive,可以支持sql语法来对海量数据进行分析查询,跟hive不同的是,hive执行sql任务的底层运算引擎采用mapreduce运算框架,而sparksql执行sql任务的运算引擎是spark core,从而充分利用spark内存计算及DAG模型的优势,大幅提升海量数据的分析查询速度

5 使用Scala编写,并提交运行

	/*SparkSqlDemo.scala*/ 
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._

//define case class
case class Person(name:String,age:Int)  

object SparkSqlDemo {
  def main(args: Array[String]) {
     
	val conf = new SparkConf().setAppName("Simple SparkSqlDemo")
	val sc = new SparkContext(conf)    
	val sqlContext = new org.apache.spark.sql.SQLContext(sc)
	// this is used to implicitly convert an RDD to a DataFrame.	
	import sqlContext.implicits._	
    
	//person.txt 格式为  
	/*zhangsan,14
	  lisi,27
	  wangwu,31
	*/
    //Convert user data RDD to a DataFrame and register it as a temp table
	//// 用数据集文本文件创建一个Customer对象的DataFrame
	val dfpeople=sc.textFile("hdfs://hd1:9000/test/person.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF()
    // 将DataFrame注册为一个表
	dfpeople.registerTempTable("tempTable")

    
	// 显示DataFrame的内容
	dfpeople.show()

	// 打印DF模式
	dfpeople.printSchema()

	// 选择客户名称列
	dfpeople.select("name").show()

	// 选择客户名称和age列
	dfpeople.select("name", "age").show()

	// 根据age选择客户
	//dfpeople.filter(dfCustomers("age").equalTo(11)).show()

	// 根据age统计客户数量
	//dfCustomers.groupBy("age").count().show()
    
	var result = sqlContext.sql("SELECT * FROM tempTable")
    result.collect().foreach(println)
  }
}

提交编译

注意Spark内嵌Scala的版本,和Scala的版本,最好保持一样,否则会出错。

$ scalac  -classpath $CLASSPATH:/data/app/spark-1.5.2/lib/*:/data/app/scala-2.10.4/lib/*  SparkSqlDemo.scala 

打包

jar -cvf sql.jar SparkSqlDemo*.class Person*

执行程序

$ data/app/spark-1.5.2/bin/spark-submit --class SparkSqlDemo --master  spark://hd1:7077 sql.jar 




姓 名: *
邮 箱:
内 容: *
验证码: 点击刷新 *   

回到顶部