hive on spark
hive on Spark
- 一. 配置
- 二. hive三种模式
- 三. spark sql 远程连接(thriftserver -- beeline)
- 四 .spark on hive(HiveServer 2)
- 五. Spark Thrift Server配置 & 运行
- 1、$SPARK_HOME/conf/hive-site.xml(与前面的配置相同)(node2)
- 2、启动hive metastore服务(node3 或 node1)
- 3、启动Spark ThriftServer(node2)/home/xdl/spark-2.3.3-bin-hadoop2.7/conf start-thriftserver.sh
- 4、检查日志(node2)在node2中查看日志如下所示:
- 5、检查进程(node2)
- 6、检查端口(缺省端口号是10000,可配置)
- beeline连接hiveserver2报错。
- SparkSQL通过jdbc访问hive
- 六.UDF
- 七. UDAF
- 八. 从MySQL读取数据
一. 配置
把hive-site.xml复制到spark/conf下
把MySQL驱动复制到spark/lib
保证每个节点都有hive,spark
Metadata即元数据:元数据包含用Hive创建的database、table等的元信息。元数据存储在关系型数据库中。如Derby、MySQL等。
Metastore的作用是:客户端连接metastore服务,metastore再去连接MySQL数据库来存取元数据。
有了metastore服务,就可以有多个客户端同时连接,而且这些客户端不需要知道MySQL数据库的用户名和密码,只需要连接 metastore 服务即可。
hive中对metastore的配置包含3部分:
metastore database
metastore server
metastore client
二. hive三种模式
1、内嵌Derby方式
这个是Hive默认的启动模式,一般用于单元测试,这种存储方式有一个缺点:在同一时间只能有一个进程连接使用数据库。
当 hive-site.xml没有配置第三方库时自动使用derby库
执行初始化命令:schematool -dbType derby -initSchema
查看初始化后的信息: schematool -dbType derby -info
配置完成后就可在shell中以CLI的方式访问hive 进行操作验证。
2.Local方式
以本地Mysql数据库为例:创建好用户:hive;database:hive。
配置文件 hive-site.xml 中jdbc URL、驱动、用户名、密码等属性值配置如下:
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://localhost/hive?createDatabaseIfNotExist=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>hive</value>
<description>password to use against metastore database</description>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<!-- base hdfs path -->
<value>/user/hive/warehouse</value>
<description>location of default database for the warehouse</description>
</property>
注意: 需要把mysql的驱动包copy到目录 <HIVE_HOME>/lib 中
如果是第一次需要执行初始化命令:
schematool -dbType mysql -initSchema
3.Remote方式 (远程模式)
仅连接远程的mysql并不能称之为“远程模式”,是否远程指的是 metastore 和 hive 服务是否在同一进程内;
以Mysql数据库为例:创建好用户:hive;database:hive_meta。Remote方式需要分别配置服务端和客户端的配置文件:
服务端的 hive-site.xml 中jdbc URL、驱动、用户名、密码等属性值配置和上面相同:
客户端的 hive-site.xml 中jdbc URL、驱动、用户名、密码等属性值配置和上面相同 ,
再加上thrift配置找服务端
<!-- thrift://<host_name>:<port> 默认端口是9083 -->
<property>
<name>hive.metastore.uris</name>
<value>thrift://master:9083,thrift://slaver1:9083</value>
<description>Thrift uri for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
hive metastore 服务端启动命令:
服务端口可以不写会找配置文件的默认9083
1) hive --service metastore -p <port_num>
2)hive --service metastore &
注意客户端中的端口配置需要和启动监听的端口一致。
客户端启动
输入hive
如果不加端口默认启动:hive --service metastore,则默认监听端口是:9083 ,注意客户端中的端口配置需要和启动监听的端口一致。服务端启动正常后,客户端就可以执行hive操作了。
注意:
客户端中配置hive.metastore.uris,如 thrift://master:9083。如果有多个metastore服务器,将URL之间用逗号分隔(不能有空格)
写多个是为了当前面的宕机了会自动配置后面的uris
优先第一个当第一个没有宕机其他的客户端都连接第一个
确认metastore服务启动:
netstat -an | grep 9083
lsof –i:9083
小结:
hive metastore可以配置多个实例,防止单点故障;(推荐)
配置了metastore,启动hive的时候,本地client端就无需实例化hive的metastore,启动速度会加快;
三. spark sql 远程连接(thriftserver – beeline)
ThriftServer是一个JDBC/ODBC接口,用户可以通过JDBC/ODBC连接ThriftServer来访问SparkSQL的数据。ThriftServer在启动的时候,会启动了一个sparkSQL的应用程序,而通过JDBC/ODBC连接进来的客户端共同分享这个sparkSQL应用程序的资源,也就是说不同的用户之间可以共享数据;ThriftServer启动时还开启一个侦听器,等待JDBC客户端的连接和提交查询。所以,在配置ThriftServer的时候,至少要配置ThriftServer的主机名和端口,如果要使用hive数据的话,还要提供hive metastore的uris。
注意: 集群模式必须保证每个节点都有metastore_db
metastore_db:在哪启动就在那生成 和 sparkwarehouse
/start-thriftserver.sh --master 类型
--hiveconf hive.server2.thrift.port=11000//端口号可改
--conf "hive.metastore.warehouse.dir=
hdfs://master:9000/user/hive/warehouse"
启动后启动beeline
bin/beeline --hiveconf hive.server2.thrift.port=11000
--conf hive.metastore.warehouse.dir=
hdfs://master:9000/user/hive/warehouse"
beeline启动后连接thriftserver 注意:别忘把驱动包复制到spark/lib下
!connect jdbc:hive2://localhost:11000
!quit //退出
!help //获取帮助
thriftserver和普通的spark-shell/spark-sql的区别?
spark-shell,spark-sql都是一个spark application
thriftserver,不管你启动多少个客户端(beeline/code),永远都是一个spark application
解决了一个数据共享的问题,多个客户端可以共享数据;
beeline : 使用它可以实现一个节点多个打开spark-sql
web UI 4040查看job
在spark shell 中操作hdfs 上的数据是很方便的,但是操作也未免过于繁琐,幸好spark 还想用户提供另外两种操作 spark sql 的方式
2 .spark-sql
启动方式比较简单但一个节点只能启动一个
/start-thriftserver.sh --master 类型
--conf "hive.metastore.warehouse.dir=
hdfs://master:9000/user/hive/warehouse"
3.scala操作spark-SQL
spark.sql(“SQL语句”)
四 .spark on hive(HiveServer 2)
1. HiveServer 2
HiveServer2(HS2)是一个服务端接口,使远程客户端可以执行对Hive的查询并返回结果。目前基于Thrift RPC的实现是 HiveServer 的改进版本,并支持多客户端并发和身份验证;
HiveServer、HiveServer2都是基于Thrift的。由于HiveServer不能处理多于一个客户端的并发请求,因此在Hive-0.11.0版本中重写了HiveServer代码得到了HiveServer2。
HiveServer2支持多客户端的并发和认证,为开放API客户端如JDBC、ODBC提供了更好的支持。
- 正常的hive仅允许使用HiveQL执行查询、更新等操作,并且该方式比较笨拙单一。Hive提供了轻客户端的实现,通过HiveServer或者HiveServer2,客户端可以在不启动CLI的情况下对Hive中的数据进行操作,两者都允许远程客户端使用多种编程语言如Java、Python向Hive提交请求,取回结果;
- 可以实现远程访问;
- 可以通过命令链接多个hive;
生产环境中使用Hive,建议使用HiveServer2来提供服务,好处很多:
- 在应用端不用部署Hadoop和Hive客户端;
- 相比hive-cli方式,HiveServer2不用直接将HDFS和Metastore暴露给用户;
- 有安全认证机制,并且支持自定义权限校验;
- 有HA机制,解决应用端的并发和负载均衡问题;
- JDBC方式,可以使用任何语言,方便与应用进行数据交互;
- 从2.0开始,HiveServer2提供了WEB UI。
2. beline
beeline是从 Hive 0.11版本引入的,是Hive新的命令行客户端工具;Hive客户端工具后续将使用beeline 替代HiveCLI ,并且后续版本将也会废弃HiveCLI 客户端工具;
beeline方式相当于瘦客户端模式,采用JDBC方式借助于Hive Thrift服务访问Hive数据仓库;
从Hive 0.14版本开始,Beeline使用HiveServer2工作时,它会从HiveServer2输出日志信息到STDERR;
Beeline 要与 HiveServer2 配合使用;需要启动 HiverServer2 (在node3);
hive --service hiveserver2 &
Hiveserver2 &
使用Beeline:
启动beeline;
!connect jdbc:hive2://node3:10000
jdbc:hive2://:/
默认用户名(spark)、密码不验证(hive.server2.authentication缺省值为NONE)
执行SQL命令
检查端口:lsof –i:10000
备注:启动 hiveserver2 后立即检查,看不见任何信息;Beeline连接后检查才能看见。
退出beeline命令行则是!quit, 很多命令都是前面需要加一个感叹号, 但对于登录了后的DDL,DML,则直接运行SQL语句即可,语句后带上一个分号,然后回车执行;
Beeline和其他工具有一些不同,执行查询都是正常的SQL输入,但是如果是一些管理的命令,比如进行连接,中断,退出,执行Beeline命令需要带上“!”,不需要终止符。如:
1、!connect url –连接不同的Hive2服务器
2、!exit –退出shell
3、!help –显示全部命令列表
备注:
beeline在我的机器上可能有两个:$ HIVE_HOME/bin、$SPARK_HOME/bin
3. Spark Thrift Server
Thrift JDBC/ODBC Server (简称 Spark Thrift Server 或者 STS)是Spark SQL的Apache Hive HiveServer2的端口,通过这个端口可以允许用户使用JDBC/ODBC端口协议来执行SQL查询;
通过使用STS,用户可以用使用其他的BI工具,比如Tableau来连接Spark进行基于大数据量的报表制作;
Thrift Server在启动的时候,启动了一个SparkSQL的应用程序,而通过JDBC/ODBC连接进来的客户端共同分享SparkSQL应用程序的资源;
Thrift Server启动时还开启一个侦听器,等待JDBC客户端的连接和提交查询;
在配置Thrift Server的时候,通常要配置Thrift Server的主机名和端口,如果要使用hive数据的话,还要提供hive metastore的uris;
五. Spark Thrift Server配置 & 运行
1、$SPARK_HOME/conf/hive-site.xml(与前面的配置相同)(node2)
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://node3:9083,thrift://node1:9083</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
</configuration>
备注:仅配置了hive.metastore.uris的信息,其他均采用默认配置
2、启动hive metastore服务(node3 或 node1)
hive --service metastore&
3、启动Spark ThriftServer(node2)/home/xdl/spark-2.3.3-bin-hadoop2.7/conf start-thriftserver.sh
4、检查日志(node2)在node2中查看日志如下所示:
5、检查进程(node2)
9678:启动了SparkSubmit
9741:根据当前配置,在node1、node2、node3上均启动了executor
6、检查端口(缺省端口号是10000,可配置)
beeline连接hiveserver2报错。
错误:Error: Could not open client transport with JDBC Uri: jdbc:hive2://s1:10000/hive: Failed to open new session: java.lang.RuntimeException: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.authorize.AuthorizationException): User: xxx is not allowed to impersonate anonymous (state=08S01,code=0)
解决方案
在hadoop的配置文件core-site.xml增加如下配置,重启hdfs,其中“xxx”是连接beeline的用户,将“xxx”替换成自己的用户名即可
- 表示可通过超级代理“xxx”操作hadoop的用户、用户组和主机
hadoop.proxyuser.xxx.hosts
*
Spark Thrift Server配置 & 运行
Beeline执行连接到RDBMS:
!connect jdbc:mysql://master:3306/metastore hive hive
show databases;
SparkSQL通过jdbc访问hive
import java.sql.DriverManager
object SparkSQLThriftServer {
def main(args: Array[String]): Unit = {
// 添加驱动
val driver = "org.apache.hive.jdbc.HiveDriver"
Class.forName(driver)
// 获取connection
val (url, username, password) = ("jdbc:hive2://master:10000", "lyb", "lyb")
val connection= DriverManager.getConnection(url, username, password)
val sql = "SELECT count(*) as mycount FROM test1.test"
// 获取statement
val statement= connection.prepareStatement(sql)
// 获取结果
val res = statement.executeQuery()
while(res.next()){
println(s"${res.getString("mycount")}")
}
// 关闭资源
res.close()
statement.close()
connection.close()
}
}
六.UDF
UDF: 自定义函数。函数的输入、输出都是一条数据记录,类似于Spark SQL中普通的数学或字符串函数,从实现上看就是普通的Scala函数;
为了解决一些复杂的计算,并在SQL函数与Scala函数之间左右逢源
UDF的参数视为数据表的某个列;
书写规范:
1.注册版
- import spark.implicits._
- def funName(参数:类型)={函数体} //自定义函数
- spark.udf.register(“fun1”, funName _ )
fun1 :是sql中要用的函数
funName _ :自定义的函数名+空格+下划线
// 注册函数
4)val x=spark.sql(“select id, fun1(colname) from tbName ”)
2.非注册版
- import org.apache.spark.sql.functions._
import spark.implicits._ - val fun2=udf((参数:类型,length:Int)=>参数.length>length)
- val getData=DataFrame类型数据.filter(fun2($ ”参数”,lit(10)))
$ : 可以接收的数据会当成Column对象($符号来包裹一个字符串表示一个Column)
当不用注册时要有udf包住自定义函数—>udf函数
3. 案例
import org.apache.spark.sql.{Row, SparkSession}
object UDFDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("UDFDemo")
.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
val data = List(("scala", "author1"), ("spark", "author2"), ("hadoop", "author3"), ("hive", "author4"), ("strom", "author5"), ("kafka", "author6"))
val df = spark.createDataFrame(data).toDF("title", "author")
df.createTempView("books")
// 定义函数并注册
def len1(bookTitle: String):Int = bookTitle.length
spark.udf.register("len1", len1 _)
// UDF可以在select语句、where语句等多处使用
spark.sql("select title, author, len1(title) from books").show
spark.sql("select title, author from books where len1(title)>5").show
// UDF可以在DataFrame、Dataset的API中使用
import spark.implicits._
df.filter("len1(title)>5").show
// 不能通过编译
//df.filter(len1($"title")>5).show
// 能通过编译,但不能执行
//df.select("len1(title)").show
// 不能通过编译
//df.select(len1($"title")).show
// 如果要在DSL语法中使用$符号包裹字符串表示一个Column,需要用udf方法来接收函数。这种函数无需注册
import org.apache.spark.sql.functions._
val len2 = udf((bookTitle: String) => bookTitle.length)
df.filter(len2($"title")>5).show
df.select(len2($"title")).show
// 不使用UDF
df.map{case Row(title: String, author: String) => (title, author, title.length)}.show
spark.stop()
}
}
七. UDAF
UDAF :用户自定义聚合函数。函数本身作用于数据集合,能够在聚合操作的基础上进行自定义操作(多条数据输入,一条数据输出);类似于在group by之后使用的sum、avg等函数
abstract class UserDefinedAggregateFunction extends Serializable{
def inputSchema : StructType
//inputSchema用于定义与DataFrame列有关的输入样式
def bufferSchema : StructType
//bufferSchema用于定义存储聚合运算时产生的中间数据结果的Schema;
def dataType : DataFrame
//dataType标明了UDAF函数的返回值类型;
def deterministic : Boolean
//deterministic是一个布尔值,用以标记针对给定的一组输入,UDAF是否总是生成相同的结果;
def initialize ( buffer : MutableAggregationBuffer) : Unit
//initialize对聚合运算中间结果的初始化;
def update ( buffer : MutableAggregationBuffer , input :Row) :Unit
//update函数的第一个参数为bufferSchema中两个Field的索引,默认以0开始;
UDAF的核心计算都发生在update函数中;
update函数的第二个参数input: Row对应的并非DataFrame的行,
而是被inputSchema投影了的行;
def merge (buffer1 : MutableAggregationBuffer , buffer2 : Row):Unit
//merge函数负责合并两个聚合运算的buffer,再将其存储到MutableAggregationBuffer中;
def evluate ( buffer :Row ): Any
//evaluate函数完成对聚合Buffer值的运算,得到最终的结果
}
普通的UDF不支持数据的聚合运算。如当要对销售数据执行年度同比计算,就需要对当年和上一年的销量分别求和,然后再利用同比公式进行计算。
书写UDAF 先继承UserDefinedAggregateFunction接口
在重写他的方法
def update ( buffer : MutableAggregationBuffer , input :Row) :Unit
// UDAF的核心计算都发生在update函数中。
// 扫描每行数据,都会调用一次update,输入buffer(缓存中间结果)、input(这一行的输入值)
// update函数的第一个参数为bufferSchema中两个Field的索引,默认以0开始
// update函数的第二个参数input: Row对应的是被inputSchema投影了的行。
// 本例中每一个input就应该只有两个Field的值,input(0)代表销量,input(1)代表销售日期
案例
class YearOnYearBasis extends UserDefinedAggregateFunction {
// UDAF与DataFrame列有关的输入样式
override def inputSchema: StructType
= new StructType()
.add("sales", DoubleType)
.add("saledate", StringType)
// UDAF函数的返回值类型
override def dataType: DataType = DoubleType
// 缓存中间结果
override def bufferSchema: StructType
= new StructType()
.add("year2014", DoubleType)
.add("year2015", DoubleType)
// 布尔值,用以标记针对给定的一组输入,UDAF是否总是生成相同的结果。通常用true
override def deterministic: Boolean = true
// initialize就是对聚合运算中间结果的初始化
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0.0
buffer(1) = 0.0
}
// UDAF的核心计算都发生在update函数中。
// 扫描每行数据,都会调用一次update,输入buffer(缓存中间结果)、input(这一行的输入值)
// update函数的第一个参数为bufferSchema中两个Field的索引,默认以0开始
// update函数的第二个参数input: Row对应的是被inputSchema投影了的行。
// 本例中每一个input就应该只有两个Field的值,input(0)代表销量,input(1)代表销售日期
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val salenumber = input.getAs[Double](0)
input.getString(1).take(4) match {
case "2014" => buffer(0) = buffer.getAs[Double](0) + salenumber
case "2015" => buffer(1) = buffer.getAs[Double](1) + salenumber
case _ => println("ERROR!")
}
}
// 合并两个分区的buffer1、buffer2,将最终结果保存在buffer1中
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0)
buffer1(1) = buffer1.getDouble(1) + buffer2.getDouble(1)
}
// 取出buffer(缓存的值)进行运算,得到最终结果
override def evaluate(buffer: Row): Double = {
println(s"evaluate : ${buffer.getDouble(0)}, ${buffer.getDouble(1)}")
if (buffer.getDouble(0) == 0.0) 0.0
else (buffer.getDouble(1) - buffer.getDouble(0)) / buffer.getDouble(0)
}
}
object UDAFDemo {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
val spark = SparkSession.builder()
.appName(s"${this.getClass.getCanonicalName}")
.master("local[*]")
.getOrCreate()
val sales = Seq(
(1, "Widget Co", 1000.00, 0.00, "AZ", "2014-01-02"),
(2, "Acme Widgets", 2000.00, 500.00, "CA", "2014-02-01"),
(3, "Widgetry", 1000.00, 200.00, "CA", "2015-01-11"),
(4, "Widgets R Us", 2000.00, 0.0, "CA", "2015-02-19"),
(5, "Ye Olde Widgete", 3000.00, 0.0, "MA", "2015-02-28") )
val salesDF = spark.createDataFrame(sales).toDF("id", "name", "sales", "discount", "state", "saleDate")
salesDF.createTempView("sales")
val yearOnYear = new YearOnYearBasis
spark.udf.register("yearOnYear", yearOnYear)
spark.sql("select yearOnYear(sales, saleDate) as yearOnYear from sales").show()
spark.stop()
}
}
八. 从MySQL读取数据
// 读取数据库中的数据
val jdbcDF = spark.read.format("jdbc").
option("url", "jdbc:mysql://localhost:3306/spark").
option("driver","com.mysql.jdbc.Driver").
option("dbtable", "student").
option("user", "hive").
option("password", "hive").load()
jdbcDF.show
jdbcDF.printSchema
备注:
1、将jdbc驱动拷贝到$SPARK_HOME/jars目录下,是最简单的做法;
2、明白每一个参数的意思,一个参数不对整个结果出不来;
3、从数据库从读大量的数据进行分析,不推荐;读取少量的数据是可以接受的,也是常见的做法。
上一篇: hive支持update、delete