欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

hive on spark

程序员文章站 2022-04-29 08:13:31
...

一. 配置

把hive-site.xml复制到spark/conf下
把MySQL驱动复制到spark/lib
保证每个节点都有hive,spark
Metadata即元数据:元数据包含用Hive创建的database、table等的元信息。元数据存储在关系型数据库中。如Derby、MySQL等。

Metastore的作用是:客户端连接metastore服务,metastore再去连接MySQL数据库来存取元数据。
有了metastore服务,就可以有多个客户端同时连接,而且这些客户端不需要知道MySQL数据库的用户名和密码,只需要连接 metastore 服务即可。
hive中对metastore的配置包含3部分:

metastore database
metastore server
metastore client

二. hive三种模式

hive on spark

1、内嵌Derby方式

这个是Hive默认的启动模式,一般用于单元测试,这种存储方式有一个缺点:在同一时间只能有一个进程连接使用数据库。
当 hive-site.xml没有配置第三方库时自动使用derby库
执行初始化命令:schematool -dbType derby -initSchema
查看初始化后的信息: schematool -dbType derby -info
配置完成后就可在shell中以CLI的方式访问hive 进行操作验证。

2.Local方式

以本地Mysql数据库为例:创建好用户:hive;database:hive。
配置文件 hive-site.xml 中jdbc URL、驱动、用户名、密码等属性值配置如下:

<property>
  <name>javax.jdo.option.ConnectionURL</name>
  <value>jdbc:mysql://localhost/hive?createDatabaseIfNotExist=true</value>
  <description>JDBC connect string for a JDBC metastore</description>
</property>
 
<property>
  <name>javax.jdo.option.ConnectionDriverName</name>
  <value>com.mysql.jdbc.Driver</value>
  <description>Driver class name for a JDBC metastore</description>
</property>
 
<property>
  <name>javax.jdo.option.ConnectionUserName</name>
  <value>hive</value>
  <description>username to use against metastore database</description>
</property>
 
<property>
  <name>javax.jdo.option.ConnectionPassword</name>
  <value>hive</value>
  <description>password to use against metastore database</description>
</property>
 
<property>
  <name>hive.metastore.warehouse.dir</name>
  <!-- base hdfs path -->
  <value>/user/hive/warehouse</value>
  <description>location of default database for the warehouse</description>
</property>

注意: 需要把mysql的驱动包copy到目录 <HIVE_HOME>/lib 中
如果是第一次需要执行初始化命令:
schematool -dbType mysql -initSchema

3.Remote方式 (远程模式)

仅连接远程的mysql并不能称之为“远程模式”,是否远程指的是 metastore 和 hive 服务是否在同一进程内;
以Mysql数据库为例:创建好用户:hive;database:hive_meta。Remote方式需要分别配置服务端和客户端的配置文件:
服务端的 hive-site.xml 中jdbc URL、驱动、用户名、密码等属性值配置和上面相同:

客户端的 hive-site.xml 中jdbc URL、驱动、用户名、密码等属性值配置和上面相同 ,
再加上thrift配置找服务端

<!-- thrift://<host_name>:<port> 默认端口是9083 -->
<property>
  <name>hive.metastore.uris</name>
  <value>thrift://master:9083,thrift://slaver1:9083</value>
  <description>Thrift uri for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>

hive metastore 服务端启动命令:

服务端口可以不写会找配置文件的默认9083
1) hive --service metastore -p <port_num>
2)hive --service metastore &

注意客户端中的端口配置需要和启动监听的端口一致。
客户端启动

输入hive
如果不加端口默认启动:hive --service metastore,则默认监听端口是:9083 ,注意客户端中的端口配置需要和启动监听的端口一致。服务端启动正常后,客户端就可以执行hive操作了。

注意:
客户端中配置hive.metastore.uris,如 thrift://master:9083。如果有多个metastore服务器,将URL之间用逗号分隔(不能有空格)
写多个是为了当前面的宕机了会自动配置后面的uris
优先第一个当第一个没有宕机其他的客户端都连接第一个

确认metastore服务启动:
netstat -an | grep 9083
lsof –i:9083

小结:
hive metastore可以配置多个实例,防止单点故障;(推荐)
配置了metastore,启动hive的时候,本地client端就无需实例化hive的metastore,启动速度会加快;

三. spark sql 远程连接(thriftserver – beeline)

ThriftServer是一个JDBC/ODBC接口,用户可以通过JDBC/ODBC连接ThriftServer来访问SparkSQL的数据。ThriftServer在启动的时候,会启动了一个sparkSQL的应用程序,而通过JDBC/ODBC连接进来的客户端共同分享这个sparkSQL应用程序的资源,也就是说不同的用户之间可以共享数据;ThriftServer启动时还开启一个侦听器,等待JDBC客户端的连接和提交查询。所以,在配置ThriftServer的时候,至少要配置ThriftServer的主机名和端口,如果要使用hive数据的话,还要提供hive metastore的uris。

注意: 集群模式必须保证每个节点都有metastore_db
metastore_db:在哪启动就在那生成 和 sparkwarehouse
/start-thriftserver.sh --master 类型 
         --hiveconf hive.server2.thrift.port=11000//端口号可改
--conf "hive.metastore.warehouse.dir=
hdfs://master:9000/user/hive/warehouse"

hive on spark

启动后启动beeline
bin/beeline   --hiveconf hive.server2.thrift.port=11000 
--conf  hive.metastore.warehouse.dir=
hdfs://master:9000/user/hive/warehouse"

beeline启动后连接thriftserver 注意:别忘把驱动包复制到spark/lib下
!connect jdbc:hive2://localhost:11000	

hive on spark

!quit //退出
!help //获取帮助
thriftserver和普通的spark-shell/spark-sql的区别?
spark-shell,spark-sql都是一个spark application
thriftserver,不管你启动多少个客户端(beeline/code),永远都是一个spark application
解决了一个数据共享的问题,多个客户端可以共享数据;
beeline : 使用它可以实现一个节点多个打开spark-sql

web UI 4040查看job

hive on spark
在spark shell 中操作hdfs 上的数据是很方便的,但是操作也未免过于繁琐,幸好spark 还想用户提供另外两种操作 spark sql 的方式

2 .spark-sql

启动方式比较简单但一个节点只能启动一个

/start-thriftserver.sh --master 类型          
--conf "hive.metastore.warehouse.dir=
hdfs://master:9000/user/hive/warehouse"

hive on spark

3.scala操作spark-SQL

spark.sql(“SQL语句”)

四 .spark on hive(HiveServer 2)

1. HiveServer 2

hive on spark
HiveServer2(HS2)是一个服务端接口,使远程客户端可以执行对Hive的查询并返回结果。目前基于Thrift RPC的实现是 HiveServer 的改进版本,并支持多客户端并发和身份验证;

HiveServer、HiveServer2都是基于Thrift的。由于HiveServer不能处理多于一个客户端的并发请求,因此在Hive-0.11.0版本中重写了HiveServer代码得到了HiveServer2。

HiveServer2支持多客户端的并发和认证,为开放API客户端如JDBC、ODBC提供了更好的支持。

  1. 正常的hive仅允许使用HiveQL执行查询、更新等操作,并且该方式比较笨拙单一。Hive提供了轻客户端的实现,通过HiveServer或者HiveServer2,客户端可以在不启动CLI的情况下对Hive中的数据进行操作,两者都允许远程客户端使用多种编程语言如Java、Python向Hive提交请求,取回结果;
  2. 可以实现远程访问;
  3. 可以通过命令链接多个hive;

生产环境中使用Hive,建议使用HiveServer2来提供服务,好处很多:

  1. 在应用端不用部署Hadoop和Hive客户端;
  2. 相比hive-cli方式,HiveServer2不用直接将HDFS和Metastore暴露给用户;
  3. 有安全认证机制,并且支持自定义权限校验;
  4. 有HA机制,解决应用端的并发和负载均衡问题;
  5. JDBC方式,可以使用任何语言,方便与应用进行数据交互;
  6. 从2.0开始,HiveServer2提供了WEB UI。

2. beline

beeline是从 Hive 0.11版本引入的,是Hive新的命令行客户端工具;Hive客户端工具后续将使用beeline 替代HiveCLI ,并且后续版本将也会废弃HiveCLI 客户端工具;
beeline方式相当于瘦客户端模式,采用JDBC方式借助于Hive Thrift服务访问Hive数据仓库;
从Hive 0.14版本开始,Beeline使用HiveServer2工作时,它会从HiveServer2输出日志信息到STDERR;

Beeline 要与 HiveServer2 配合使用;需要启动 HiverServer2 (在node3);
hive --service hiveserver2 &
Hiveserver2 &

使用Beeline:
启动beeline;
!connect jdbc:hive2://node3:10000
jdbc:hive2://:/
默认用户名(spark)、密码不验证(hive.server2.authentication缺省值为NONE)
执行SQL命令

检查端口:lsof –i:10000
备注:启动 hiveserver2 后立即检查,看不见任何信息;Beeline连接后检查才能看见。

退出beeline命令行则是!quit, 很多命令都是前面需要加一个感叹号, 但对于登录了后的DDL,DML,则直接运行SQL语句即可,语句后带上一个分号,然后回车执行;

Beeline和其他工具有一些不同,执行查询都是正常的SQL输入,但是如果是一些管理的命令,比如进行连接,中断,退出,执行Beeline命令需要带上“!”,不需要终止符。如:
1、!connect url –连接不同的Hive2服务器
2、!exit –退出shell
3、!help –显示全部命令列表

备注:
beeline在我的机器上可能有两个:$ HIVE_HOME/bin、$SPARK_HOME/bin
hive on sparkhive on spark

3. Spark Thrift Server

Thrift JDBC/ODBC Server (简称 Spark Thrift Server 或者 STS)是Spark SQL的Apache Hive HiveServer2的端口,通过这个端口可以允许用户使用JDBC/ODBC端口协议来执行SQL查询;

通过使用STS,用户可以用使用其他的BI工具,比如Tableau来连接Spark进行基于大数据量的报表制作;

Thrift Server在启动的时候,启动了一个SparkSQL的应用程序,而通过JDBC/ODBC连接进来的客户端共同分享SparkSQL应用程序的资源;

Thrift Server启动时还开启一个侦听器,等待JDBC客户端的连接和提交查询;

在配置Thrift Server的时候,通常要配置Thrift Server的主机名和端口,如果要使用hive数据的话,还要提供hive metastore的uris;

五. Spark Thrift Server配置 & 运行

1、$SPARK_HOME/conf/hive-site.xml(与前面的配置相同)(node2)

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://node3:9083,thrift://node1:9083</value>
  </property>
  <property>
    <name>hive.metastore.warehouse.dir</name>
    <value>/user/hive/warehouse</value>
   </property>
</configuration>
备注:仅配置了hive.metastore.uris的信息,其他均采用默认配置

2、启动hive metastore服务(node3 或 node1)

hive --service metastore&

3、启动Spark ThriftServer(node2)/home/xdl/spark-2.3.3-bin-hadoop2.7/conf start-thriftserver.sh

hive on spark

4、检查日志(node2)在node2中查看日志如下所示:

hive on spark

5、检查进程(node2)

hive on spark
9678:启动了SparkSubmit
9741:根据当前配置,在node1、node2、node3上均启动了executor

6、检查端口(缺省端口号是10000,可配置)

hive on spark
hive on spark

beeline连接hiveserver2报错。

错误:Error: Could not open client transport with JDBC Uri: jdbc:hive2://s1:10000/hive: Failed to open new session: java.lang.RuntimeException: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.authorize.AuthorizationException): User: xxx is not allowed to impersonate anonymous (state=08S01,code=0)

解决方案
在hadoop的配置文件core-site.xml增加如下配置,重启hdfs,其中“xxx”是连接beeline的用户,将“xxx”替换成自己的用户名即可

  • 表示可通过超级代理“xxx”操作hadoop的用户、用户组和主机

    hadoop.proxyuser.xxx.hosts
    *
hadoop.proxyuser.xxx.groups * 参考: https://blog.csdn.net/jiangyonggang1/article/details/87261092

Spark Thrift Server配置 & 运行

Beeline执行连接到RDBMS:
!connect jdbc:mysql://master:3306/metastore hive hive
show databases;

SparkSQL通过jdbc访问hive

import java.sql.DriverManager

object SparkSQLThriftServer {
  def main(args: Array[String]): Unit = {
    // 添加驱动
    val driver =  "org.apache.hive.jdbc.HiveDriver"
    Class.forName(driver)

    // 获取connection
    val (url, username, password) = ("jdbc:hive2://master:10000", "lyb", "lyb")
    val connection=  DriverManager.getConnection(url, username, password)

    val sql =  "SELECT count(*) as mycount FROM test1.test"
    // 获取statement
    val statement= connection.prepareStatement(sql)

    // 获取结果
    val res = statement.executeQuery()
    while(res.next()){
      println(s"${res.getString("mycount")}")
    }

    // 关闭资源
    res.close()
    statement.close()
    connection.close()
  }
}

六.UDF

UDF: 自定义函数。函数的输入、输出都是一条数据记录,类似于Spark SQL中普通的数学或字符串函数,从实现上看就是普通的Scala函数;
为了解决一些复杂的计算,并在SQL函数与Scala函数之间左右逢源
UDF的参数视为数据表的某个列;
书写规范:

1.注册版

  1. import spark.implicits._
  2. def funName(参数:类型)={函数体} //自定义函数
  3. spark.udf.register(“fun1”, funName _ )
    fun1 :是sql中要用的函数
    funName _ :自定义的函数名+空格+下划线
    // 注册函数
    4)val x=spark.sql(“select id, fun1(colname) from tbName ”)

2.非注册版

  1. import org.apache.spark.sql.functions._
    import spark.implicits._
  2. val fun2=udf((参数:类型,length:Int)=>参数.length>length)
  3. val getData=DataFrame类型数据.filter(fun2($ ”参数”,lit(10)))
    $ : 可以接收的数据会当成Column对象($符号来包裹一个字符串表示一个Column)
    当不用注册时要有udf包住自定义函数—>udf函数

3. 案例

import org.apache.spark.sql.{Row, SparkSession}

object UDFDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("UDFDemo")
      .master("local[*]")
      .getOrCreate()
    spark.sparkContext.setLogLevel("WARN")

    val data = List(("scala", "author1"), ("spark", "author2"), ("hadoop", "author3"), ("hive", "author4"), ("strom", "author5"), ("kafka", "author6"))
    val df = spark.createDataFrame(data).toDF("title", "author")
    df.createTempView("books")

    // 定义函数并注册
    def len1(bookTitle: String):Int = bookTitle.length
    spark.udf.register("len1", len1 _)
    // UDF可以在select语句、where语句等多处使用
    spark.sql("select title, author, len1(title) from books").show
    spark.sql("select title, author from books where len1(title)>5").show

    // UDF可以在DataFrame、Dataset的API中使用
    import spark.implicits._
    df.filter("len1(title)>5").show
    // 不能通过编译
    //df.filter(len1($"title")>5).show
    // 能通过编译,但不能执行
    //df.select("len1(title)").show
    // 不能通过编译
    //df.select(len1($"title")).show

    // 如果要在DSL语法中使用$符号包裹字符串表示一个Column,需要用udf方法来接收函数。这种函数无需注册
    import org.apache.spark.sql.functions._
    val len2 = udf((bookTitle: String) => bookTitle.length)
    df.filter(len2($"title")>5).show
    df.select(len2($"title")).show

    // 不使用UDF
    df.map{case Row(title: String, author: String) => (title, author, title.length)}.show

    spark.stop()
  }
}

七. UDAF

UDAF :用户自定义聚合函数。函数本身作用于数据集合,能够在聚合操作的基础上进行自定义操作(多条数据输入,一条数据输出);类似于在group by之后使用的sum、avg等函数
hive on spark
hive on spark

abstract class UserDefinedAggregateFunction extends Serializable{
def inputSchema : StructType
//inputSchema用于定义与DataFrame列有关的输入样式

def bufferSchema : StructType
//bufferSchema用于定义存储聚合运算时产生的中间数据结果的Schema;

def dataType : DataFrame
//dataType标明了UDAF函数的返回值类型;

def deterministic : Boolean
//deterministic是一个布尔值,用以标记针对给定的一组输入,UDAF是否总是生成相同的结果;

def initialize ( buffer : MutableAggregationBuffer) : Unit
//initialize对聚合运算中间结果的初始化;

def update ( buffer : MutableAggregationBuffer , input :Row) :Unit
//update函数的第一个参数为bufferSchema中两个Field的索引,默认以0开始;
UDAF的核心计算都发生在update函数中;
update函数的第二个参数input: Row对应的并非DataFrame的行,
而是被inputSchema投影了的行;

def merge (buffer1 : MutableAggregationBuffer , buffer2 : Row):Unit
//merge函数负责合并两个聚合运算的buffer,再将其存储到MutableAggregationBuffer中;

def evluate ( buffer :Row ): Any                       
//evaluate函数完成对聚合Buffer值的运算,得到最终的结果
 }

普通的UDF不支持数据的聚合运算。如当要对销售数据执行年度同比计算,就需要对当年和上一年的销量分别求和,然后再利用同比公式进行计算。
书写UDAF 先继承UserDefinedAggregateFunction接口
在重写他的方法
def update ( buffer : MutableAggregationBuffer , input :Row) :Unit
// UDAF的核心计算都发生在update函数中。
// 扫描每行数据,都会调用一次update,输入buffer(缓存中间结果)、input(这一行的输入值)
// update函数的第一个参数为bufferSchema中两个Field的索引,默认以0开始
// update函数的第二个参数input: Row对应的是被inputSchema投影了的行。
// 本例中每一个input就应该只有两个Field的值,input(0)代表销量,input(1)代表销售日期

案例

class YearOnYearBasis extends UserDefinedAggregateFunction {
  // UDAF与DataFrame列有关的输入样式
  override def inputSchema: StructType 
  				= new StructType()
				  .add("sales", DoubleType)
				  .add("saledate", StringType)

  // UDAF函数的返回值类型
  override def dataType: DataType = DoubleType

  // 缓存中间结果
  override def bufferSchema: StructType 
  					= new StructType()
  					.add("year2014", DoubleType)
  					.add("year2015", DoubleType)

  // 布尔值,用以标记针对给定的一组输入,UDAF是否总是生成相同的结果。通常用true
  override def deterministic: Boolean = true

  // initialize就是对聚合运算中间结果的初始化
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0.0
    buffer(1) = 0.0
  }

  // UDAF的核心计算都发生在update函数中。
  // 扫描每行数据,都会调用一次update,输入buffer(缓存中间结果)、input(这一行的输入值)
  // update函数的第一个参数为bufferSchema中两个Field的索引,默认以0开始
  // update函数的第二个参数input: Row对应的是被inputSchema投影了的行。
  // 本例中每一个input就应该只有两个Field的值,input(0)代表销量,input(1)代表销售日期
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {	  
    val salenumber = input.getAs[Double](0)
    input.getString(1).take(4) match {
      case "2014" => buffer(0) = buffer.getAs[Double](0) + salenumber
      case "2015" => buffer(1) = buffer.getAs[Double](1) + salenumber
      case _ => println("ERROR!")
    }
  }

  // 合并两个分区的buffer1、buffer2,将最终结果保存在buffer1中
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0)
    buffer1(1) = buffer1.getDouble(1) + buffer2.getDouble(1)
  }

  // 取出buffer(缓存的值)进行运算,得到最终结果
  override def evaluate(buffer: Row): Double = {
    println(s"evaluate : ${buffer.getDouble(0)}, ${buffer.getDouble(1)}")
    if (buffer.getDouble(0) == 0.0) 0.0
    else (buffer.getDouble(1) - buffer.getDouble(0)) / buffer.getDouble(0)
  }
}

object UDAFDemo {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)
    val spark = SparkSession.builder()
      .appName(s"${this.getClass.getCanonicalName}")
      .master("local[*]")
      .getOrCreate()

    val sales = Seq(
      (1, "Widget Co",        1000.00, 0.00,    "AZ", "2014-01-02"),
      (2, "Acme Widgets",     2000.00, 500.00,  "CA", "2014-02-01"),
      (3, "Widgetry",         1000.00, 200.00,  "CA", "2015-01-11"),
      (4, "Widgets R Us",     2000.00, 0.0,     "CA", "2015-02-19"),
      (5, "Ye Olde Widgete",  3000.00, 0.0,     "MA", "2015-02-28") )

    val salesDF = spark.createDataFrame(sales).toDF("id", "name", "sales", "discount", "state", "saleDate")
    salesDF.createTempView("sales")

    val yearOnYear = new YearOnYearBasis
    spark.udf.register("yearOnYear", yearOnYear)
    spark.sql("select yearOnYear(sales, saleDate) as yearOnYear from sales").show()

    spark.stop()
  }
}

hive on spark

八. 从MySQL读取数据

// 读取数据库中的数据
val jdbcDF = spark.read.format("jdbc").
				option("url", "jdbc:mysql://localhost:3306/spark").
				option("driver","com.mysql.jdbc.Driver").
				option("dbtable", "student").
				option("user", "hive").
				option("password", "hive").load()
jdbcDF.show
jdbcDF.printSchema

备注:
1、将jdbc驱动拷贝到$SPARK_HOME/jars目录下,是最简单的做法;
2、明白每一个参数的意思,一个参数不对整个结果出不来;
3、从数据库从读大量的数据进行分析,不推荐;读取少量的数据是可以接受的,也是常见的做法。
hive on sparkhive on sparkhive on sparkhive on sparkhive on sparkhive on spark