欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark SQL on hive配置和实战 博客分类: SparkHiveHadoop sparkhive 

程序员文章站 2024-03-22 15:08:40
...
spark sql 官网:http://spark.apache.org/docs/latest/sql-programming-guide.html#starting-point-sqlcontext

首先要配置好hive,保存元数据到mysql中,参考:http://kevin12.iteye.com/blog/2280777

然后配置Spark SQL,
1.配置hive-site.xml
在master1上的/usr/local/spark/spark-1.6.0-bin-hadoop2.6/conf目录创建hive-site.xml文件,内容如下:
<configuration>
<property>
    <name>hive.metastore.uris</name>
    <value>thrift://master1:9083</value>
    <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
  </property>
</configuration>

这个配置信息从hive的目录$HIVE_HOME/conf/hive-default.xml.template文件中找到的,默认情况下的value是空值;
2.配置驱动
将$HIVE_HOME/lib/mysql-connector-java-5.1.35-bin.jar 中的mysql驱动拷贝到$SPARK_HOME/lib/下面即可。

注意:
因为之前我spark环境配置了Zookeeper,做HA,现在不在练习HA,将HA的配置去掉。如果不去掉,必须启动Zookeeper集群才可以单独一台节点上启动spark-shell等;
因为我在master1、worker1、worker2上安装了Zookeeper,所以,要将这三台节点上的$SPARK_HOME/conf/spark-env.sh文件中的SPARK_DAEMON_JAVA_OPTS去掉,并且将配置HA是注释的SPARK_MASTER_IP参数的注释去掉;
export SPARK_MASTER_IP=master1
#export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=master1:2181,worker1:2181,worker2:2181 -Dspark.deploy.zookeeper.dir=/spark"

3.启动hive的metastore后台进程
将日志打印到/root/logs/hive目录下,如果目录不存在则先创建
root@master1:~/logs# hive --service metastore >> /root/logs/hive/metastore.log 2>& 1&
因为配置了上面的hive.metastore.uris,所以必须启动hive的service metastore后台进程才可以执行./spark-shell --master spark://master1:7077和./spark-sql --master spark://master1:7077命令。
在$SPARK_HOME/bin下执行 ./spark-shell --master spark://master1:7077命令。
然后依次执行下面的命令
scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
scala> hiveContext.sql("show databases").collect.foreach(println)
scala> hiveContext.sql("use testdb").collect.foreach(println)
scala> hiveContext.sql("show tables").collect.foreach(println)
res5: org.apache.spark.sql.DataFrame = [tableName: string, isTemporary: boolean]
scala> hiveContext.sql("show tables").collect.foreach(println)
[student,false]
[student2,false]
[student3,false]
[student4,false]
[tbsogou,false]
[tmp_pre_hour_seach_info,false]
scala> hiveContext.sql("select d,count(*)cnt from tbsogou group by d ").collect.foreach(println)
当同时用hive和spark sql运行查询时,如果没有资源spark sql会打印下面的语句,一直等待资源释放,获取资源后会执行spark sql进行查询;
16/03/25 20:13:04 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
16/03/25 20:13:19 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
16/03/25 20:13:34 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources


先将$SPARK_HOME下面的examples/src/main/resources/中的所有文件都上传到hdfs的/library/examples/src/main/resources/目录中以备后用。

Starting Point: SQLContext

valsc:SparkContext// An existing SparkContext.valsqlContext=neworg.apache.spark.sql.SQLContext(sc)// this is used to implicitly convert an RDD to a DataFrame.importsqlContext.implicits._
Creating DataFrames

valsc:SparkContext// An existing SparkContext.valsqlContext=neworg.apache.spark.sql.SQLContext(sc)
valdf=sqlContext.read.json("examples/src/main/resources/people.json")// Displays the content of the DataFrame to stdoutdf.show()
DataFrame Operations

val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create the DataFrame
val df = sqlContext.read.json("examples/src/main/resources/people.json")

// Show the content of the DataFrame
df.show()
// age  name
// null Michael
// 30   Andy
// 19   Justin

// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// name
// Michael
// Andy
// Justin

// Select everybody, but increment the age by 1
df.select(df("name"), df("age") + 1).show()
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// Select people older than 21
df.filter(df("age") > 21).show()
// age name
// 30  Andy

// Count people by age
df.groupBy("age").count().show()
// age  count
// null 1
// 19   1
// 30   1


相关标签: spark hive