Spark SQL on hive配置和实战 博客分类: SparkHiveHadoop sparkhive
程序员文章站
2024-03-22 15:08:40
...
spark sql 官网:http://spark.apache.org/docs/latest/sql-programming-guide.html#starting-point-sqlcontext
首先要配置好hive,保存元数据到mysql中,参考:http://kevin12.iteye.com/blog/2280777
然后配置Spark SQL,
1.配置hive-site.xml
在master1上的/usr/local/spark/spark-1.6.0-bin-hadoop2.6/conf目录创建hive-site.xml文件,内容如下:
这个配置信息从hive的目录$HIVE_HOME/conf/hive-default.xml.template文件中找到的,默认情况下的value是空值;
2.配置驱动
将$HIVE_HOME/lib/mysql-connector-java-5.1.35-bin.jar 中的mysql驱动拷贝到$SPARK_HOME/lib/下面即可。
注意:
因为之前我spark环境配置了Zookeeper,做HA,现在不在练习HA,将HA的配置去掉。如果不去掉,必须启动Zookeeper集群才可以单独一台节点上启动spark-shell等;
因为我在master1、worker1、worker2上安装了Zookeeper,所以,要将这三台节点上的$SPARK_HOME/conf/spark-env.sh文件中的SPARK_DAEMON_JAVA_OPTS去掉,并且将配置HA是注释的SPARK_MASTER_IP参数的注释去掉;
export SPARK_MASTER_IP=master1
#export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=master1:2181,worker1:2181,worker2:2181 -Dspark.deploy.zookeeper.dir=/spark"
3.启动hive的metastore后台进程
将日志打印到/root/logs/hive目录下,如果目录不存在则先创建
root@master1:~/logs# hive --service metastore >> /root/logs/hive/metastore.log 2>& 1&
因为配置了上面的hive.metastore.uris,所以必须启动hive的service metastore后台进程才可以执行./spark-shell --master spark://master1:7077和./spark-sql --master spark://master1:7077命令。
在$SPARK_HOME/bin下执行 ./spark-shell --master spark://master1:7077命令。
然后依次执行下面的命令
scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
scala> hiveContext.sql("show databases").collect.foreach(println)
scala> hiveContext.sql("use testdb").collect.foreach(println)
scala> hiveContext.sql("show tables").collect.foreach(println)
res5: org.apache.spark.sql.DataFrame = [tableName: string, isTemporary: boolean]
scala> hiveContext.sql("show tables").collect.foreach(println)
[student,false]
[student2,false]
[student3,false]
[student4,false]
[tbsogou,false]
[tmp_pre_hour_seach_info,false]
scala> hiveContext.sql("select d,count(*)cnt from tbsogou group by d ").collect.foreach(println)
当同时用hive和spark sql运行查询时,如果没有资源spark sql会打印下面的语句,一直等待资源释放,获取资源后会执行spark sql进行查询;
先将$SPARK_HOME下面的examples/src/main/resources/中的所有文件都上传到hdfs的/library/examples/src/main/resources/目录中以备后用。
首先要配置好hive,保存元数据到mysql中,参考:http://kevin12.iteye.com/blog/2280777
然后配置Spark SQL,
1.配置hive-site.xml
在master1上的/usr/local/spark/spark-1.6.0-bin-hadoop2.6/conf目录创建hive-site.xml文件,内容如下:
<configuration> <property> <name>hive.metastore.uris</name> <value>thrift://master1:9083</value> <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description> </property> </configuration>
这个配置信息从hive的目录$HIVE_HOME/conf/hive-default.xml.template文件中找到的,默认情况下的value是空值;
2.配置驱动
将$HIVE_HOME/lib/mysql-connector-java-5.1.35-bin.jar 中的mysql驱动拷贝到$SPARK_HOME/lib/下面即可。
注意:
因为之前我spark环境配置了Zookeeper,做HA,现在不在练习HA,将HA的配置去掉。如果不去掉,必须启动Zookeeper集群才可以单独一台节点上启动spark-shell等;
因为我在master1、worker1、worker2上安装了Zookeeper,所以,要将这三台节点上的$SPARK_HOME/conf/spark-env.sh文件中的SPARK_DAEMON_JAVA_OPTS去掉,并且将配置HA是注释的SPARK_MASTER_IP参数的注释去掉;
export SPARK_MASTER_IP=master1
#export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=master1:2181,worker1:2181,worker2:2181 -Dspark.deploy.zookeeper.dir=/spark"
3.启动hive的metastore后台进程
将日志打印到/root/logs/hive目录下,如果目录不存在则先创建
root@master1:~/logs# hive --service metastore >> /root/logs/hive/metastore.log 2>& 1&
因为配置了上面的hive.metastore.uris,所以必须启动hive的service metastore后台进程才可以执行./spark-shell --master spark://master1:7077和./spark-sql --master spark://master1:7077命令。
在$SPARK_HOME/bin下执行 ./spark-shell --master spark://master1:7077命令。
然后依次执行下面的命令
scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
scala> hiveContext.sql("show databases").collect.foreach(println)
scala> hiveContext.sql("use testdb").collect.foreach(println)
scala> hiveContext.sql("show tables").collect.foreach(println)
res5: org.apache.spark.sql.DataFrame = [tableName: string, isTemporary: boolean]
scala> hiveContext.sql("show tables").collect.foreach(println)
[student,false]
[student2,false]
[student3,false]
[student4,false]
[tbsogou,false]
[tmp_pre_hour_seach_info,false]
scala> hiveContext.sql("select d,count(*)cnt from tbsogou group by d ").collect.foreach(println)
当同时用hive和spark sql运行查询时,如果没有资源spark sql会打印下面的语句,一直等待资源释放,获取资源后会执行spark sql进行查询;
16/03/25 20:13:04 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 16/03/25 20:13:19 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 16/03/25 20:13:34 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
先将$SPARK_HOME下面的examples/src/main/resources/中的所有文件都上传到hdfs的/library/examples/src/main/resources/目录中以备后用。
Starting Point: SQLContext valsc:SparkContext// An existing SparkContext.valsqlContext=neworg.apache.spark.sql.SQLContext(sc)// this is used to implicitly convert an RDD to a DataFrame.importsqlContext.implicits._ Creating DataFrames valsc:SparkContext// An existing SparkContext.valsqlContext=neworg.apache.spark.sql.SQLContext(sc) valdf=sqlContext.read.json("examples/src/main/resources/people.json")// Displays the content of the DataFrame to stdoutdf.show() DataFrame Operations val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // Create the DataFrame val df = sqlContext.read.json("examples/src/main/resources/people.json") // Show the content of the DataFrame df.show() // age name // null Michael // 30 Andy // 19 Justin // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // name // Michael // Andy // Justin // Select everybody, but increment the age by 1 df.select(df("name"), df("age") + 1).show() // name (age + 1) // Michael null // Andy 31 // Justin 20 // Select people older than 21 df.filter(df("age") > 21).show() // age name // 30 Andy // Count people by age df.groupBy("age").count().show() // age count // null 1 // 19 1 // 30 1
上一篇: Leetcode:27.Remove Element
下一篇: 求大神帮我看看这个GMM代码!