Spark Streaming算子开发实例
spark streaming算子开发实例
transform算子开发
transform操作应用在dstream上时,可以用于执行任意的rdd到rdd的转换操作,还可以用于实现dstream api中所没有提供的操作,比如说,dstreamapi中并没有提供将一个dstream中的每个batch,与一个特定的rdd进行join的操作,dstream中的join算子只能join其他dstream,但是我们自己就可以使用transform操作来实现该功能。
实例:黑名单用户实时过滤
package streamingdemo import org.apache.log4j.{level, logger} import org.apache.spark.sparkconf import org.apache.spark.streaming.{seconds, streamingcontext} /** * 实时黑名单过滤 */ object transformdemo { def main(args: array[string]): unit = { //设置日志级别 logger.getlogger("org").setlevel(level.warn) val conf = new sparkconf() .setappname(this.getclass.getsimplename) .setmaster("local[2]") val ssc = new streamingcontext(conf, seconds(2)) //创建一个黑名单的rdd val blackrdd = ssc.sparkcontext.parallelize(array(("zs", true), ("lisi", true))) //通过socket从nc中获取数据 val linesdstream = ssc.sockettextstream("hadoop01", 6666) /** * 过滤黑名单用户发言 * zs sb sb sb sb * lisi fuck fuck fuck * jack hello */ linesdstream .map(x => { val info = x.split(" ") (info(0), info.tolist.tail.mkstring(" ")) }) .transform(rdd => { //transform是一个rdd->rdd的操作,所以返回值必须是rdd /** * 经过leftouterjoin操作之后,产生的结果如下: * (zs,(sb sb sb sb),some(true))) * (lisi,(fuck fuck fuck),some(true))) * (jack,(hello,none)) */ val joinrdd = rdd.leftouterjoin(blackrdd) //如果是some(true)的,说明就是黑名单用户,如果是none的,说明不在黑名单内,把非黑名单的用户保留下来 val filterrdd = joinrdd.filter(x => x._2._2.isempty) filterrdd }) .map(x=>(x._1,x._2._1)).print() ssc.start() ssc.awaittermination() } }
测试
启动nc,传入用户及其发言信息
可以看到程序实时的过滤掉了在黑名单里的用户发言
updatestatebykey算子开发
updatestatebykey算子可以保持任意状态,同时不断有新的信息进行更新,这个算子可以为每个key维护一份state,并持续不断的更新state。对于每个batch来说,spark都会为每个之前已经存在的key去应用一次state更新函数,无论这个key在batch中是否有新的值,如果state更新函数返回的值是none,那么这个key对应的state就会被删除;对于新出现的key也会执行state更新函数。
要使用该算子,必须进行两个步骤
- 定义state——state可以是任意的数据类型
- 定义state更新函数——用一个函数指定如何使用之前的状态,以及从输入流中获取新值更新状态
注意:updatestatebykey操作,要求必须开启checkpoint机制
实例:基于缓存的实时wordcount
package streamingdemo import org.apache.log4j.{level, logger} import org.apache.spark.sparkconf import org.apache.spark.streaming.{seconds, streamingcontext} /** * 基于缓存的实时wordcount,在全局范围内统计单词出现次数 */ object updatestatebykeydemo { def main(args: array[string]): unit = { //设置日志级别 logger.getlogger("org").setlevel(level.warn) /** * 如果没有启用安全认证或者从kerberos获取的用户为null,那么获取hadoop_user_name环境变量, * 并将它的值作为hadoop执行用户设置hadoop username * 这里实验了一下在没有启用安全认证的情况下,就算不显式添加,也会自动获取我的用户名 */ //system.setproperty("hadoop_user_name","setsuna") val conf = new sparkconf() .setappname(this.getclass.getsimplename) .setmaster("local[2]") val ssc = new streamingcontext(conf, seconds(2)) //设置checkpoint存放的路径 ssc.checkpoint("hdfs://hadoop01:9000/checkpoint") //创建输入dstream val linedstream = ssc.sockettextstream("hadoop01", 6666) val worddstream = linedstream.flatmap(_.split(" ")) val pairsdstream = worddstream.map((_, 1)) /** * state:代表之前的状态值 * values:代表当前batch中key对应的values值 */ val resultdstream = pairsdstream.updatestatebykey((values: seq[int], state: option[int]) => { //当state为none,表示没有对这个单词做统计,则返回0值给计数器count var count = state.getorelse(0) //遍历values,累加新出现的单词的value值 for (value <- values) { count += value } //返回key对应的新state,即单词的出现次数 option(count) }) //在控制台输出 resultdstream.print() ssc.start() ssc.awaittermination() } }
测试
开启nc,输入单词
控制台实时输出的结果
window滑动窗口算子开发
spark streaming提供了滑动窗口操作的支持,可以对一个滑动窗口内的数据执行计算操作
在滑动窗口中,包含批处理间隔、窗口间隔、滑动间隔
- 对于窗口操作而言,在其窗口内部会有n个批处理数据
- 批处理数据的大小由窗口间隔决定,而窗口间隔指的就是窗口的持续时间,也就是窗口的长度
- 滑动时间间隔指的是经过多长时间窗口滑动一次,形成新的窗口,滑动间隔默认情况下和批处理时间间隔的相同
注意:滑动时间间隔和窗口时间间隔的大小一定得设置为批处理间隔的整数倍
用一个官方的图来作为说明
批处理间隔是1个时间单位,窗口间隔是3个时间单位,滑动间隔是2个时间单位。对于初始的窗口time1-time3,只有窗口间隔满足了才触发数据的处理。所以滑动窗口操作都必须指定两个参数,窗口长度和滑动时间间隔。在spark streaming中对滑动窗口的支持是比storm更加完善的。
window滑动算子操作
算子 | 描述 |
window() | 对每个滑动窗口的数据执行自定义的计算 |
countbywindow() | 对每个滑动窗口的数据执行count操作 |
reducebywindow() | 对每个滑动窗口的数据执行reduce操作 |
reducebykeyandwindow() | 对每个滑动窗口的数据执行reducebykey操作 |
countbyvalueandwindow() | 对每个滑动窗口的数据执行countbyvalue操作 |
reducebykeyandwindow算子开发
实例:在线热点搜索词实时滑动统计
每隔2秒钟,统计最近5秒钟的搜索词中排名最靠前的3个搜索词以及出现次数
package streamingdemo import org.apache.log4j.{level, logger} import org.apache.spark.sparkconf import org.apache.spark.streaming.{seconds, streamingcontext} /** * 需求:每隔2秒钟,统计最近5秒钟的搜索词中排名最靠前的3个搜索词以及出现次数 */ object reducebykeyandwindowdemo { def main(args: array[string]): unit = { //设置日志级别 logger.getlogger("org").setlevel(level.warn) //基础配置 val conf = new sparkconf() .setappname(this.getclass.getsimplename) .setmaster("local[2]") //批处理间隔设置为1s val ssc = new streamingcontext(conf, seconds(1)) val linesdstream = ssc.sockettextstream("hadoop01", 6666) linesdstream .flatmap(_.split(" ")) //根据空格来做分词 .map((_, 1)) //返回(word,1) .reducebykeyandwindow( //定义窗口如何计算的函数 //x代表的是聚合后的结果,y代表的是这个key对应的下一个需要聚合的值 (x: int, y: int) => x + y, //窗口长度为5秒 seconds(5), //窗口时间间隔为2秒 seconds(2) ) .transform(rdd => { //transform算子对rdd做处理,转换为另一个rdd //根据key的出现次数来进行排序,然后降序排列,获取最靠前的3个搜索词 val info: array[(string, int)] = rdd.sortby(_._2, false).take(3) //将array转换为resultrdd val resultrdd = ssc.sparkcontext.parallelize(info) resultrdd }) .map(x => s"${x._1}出现的次数是:${x._2}") .print() ssc.start() ssc.awaittermination() } }
测试结果
dstream output操作概览
spark streaming允许dstream的数据输出到外部系统,dsteram中的所有计算,都是由output操作触发的,foreachrdd输出操作,也必须在里面对rdd执行action操作,才能触发对每一个batch的计算逻辑。
转换 | 描述 |
print() | 在driver中打印出dstream中数据的前10个元素。主要用于测试,或者是不需要执行什么output操作时,用于简单触发一下job。 |
saveastextfiles(prefix, [suffix]) |
将dstream中的内容以文本的形式保存为文本文件,其中每次批处理间隔内产生的文件以prefix-time_in_ms[.suffix]的方式命名。 |
saveasobjectfiles(prefix , [suffix]) |
将dstream中的内容按对象序列化并且以sequencefile的格式保存。其中每次批处理间隔内产生的文件以prefix-time_in_ms[.suffix]的方式命名。 |
saveashadoopfiles(pref ix, [suffix]) |
将dstream中的内容以文本的形式保存为hadoop文件,其中每次批处理间隔内产生的文件 以prefix-time_in_ms[.suffix]的方式命名。 |
foreachrdd(func) | 最基本的输出操作,将func函数应用于dstream中的rdd上,这个操作会输出数据到外部系 统,比如保存rdd到文件或者网络数据库等。需要注意的是func函数是在运行该streaming 应用的driver进程里执行的。 |
foreachrdd算子开发
foreachrdd是最常用的output操作,可以遍历dstream中的每个产生的rdd并进行处理,然后将每个rdd中的数据写入外部存储,如文件、数据库、缓存等,通常在其中针对rdd执行action操作,比如foreach
使用foreachrdd操作数据库
通常在foreachrdd中都会创建一个connection,比如jdbc connection,然后通过connection将数据写入外部存储
误区一:在rdd的foreach操作外部创建connection
dstream.foreachrdd { rdd => val connection=createnewconnection() rdd.foreach { record => connection.send(record) } }
这种方式是错误的,这样的方式会导致connection对象被序列化后被传输到每一个task上,但是connection对象是不支持序列化的,所以也就无法被传输
误区二:在rdd的foreach操作内部创建connection
dstream.foreachrdd { rdd => rdd.foreach { record => val connection = createnewconnection() connection.send(record) connection.close() } }
这种方式虽然是可以的,但是执行效率会很低,因为它会导致对rdd中的每一条数据都创建一个connection对象,通常connection对象的创建都是很消耗性能的
合理的方式
- 第一种:使用rdd的foreachpartition操作,并且在该操作内部创建connection对象,这样就相当于为rdd的每个partition创建一个connection对象,节省了很多资源
- 第二种:自己手动封装一个静态连接池,使用rdd的foreachpartition操作,并且在该操作内部从静态连接池中,通过静态方法获取到一个连接,连接使用完之后再放回连接池中。这样的话,可以在多个rdd的partition之间复用连接了
实例:实时全局统计wordcount,并将结果保存到mysql数据库中
mysql数据库建表语句如下
create table wordcount ( word varchar(100) character set utf8 not null, count int(10) not null, primary key (word) ) engine=innodb default charset=latin1;
在idea中添加mysql-connector-java-5.1.40-bin.jar
代码如下
连接池的代码,其实一开始有想过用静态块来写个池子直接获取,但是如果考虑到池子宽度不够用的问题,这样的方式其实更好,一开始,实例化一个连接池出来,被调用获取连接,当连接全部都被获取了的时候,池子空了,就再实例化一个池子出来
package streamingdemo import java.sql.{connection, drivermanager, sqlexception} import java.util object jdbcmanager { var connectionque: java.util.linkedlist[connection] = null /** * 从数据库连接池中获取连接对象 * @return */ def getconnection(): connection = { synchronized({ try { //如果连接池是空的,那么就实例化一个connection类型的链表 if (connectionque == null) { connectionque = new util.linkedlist[connection]() for (i <- 0 until (10)) { //生成10个连接,并配置相关信息 val connection = drivermanager.getconnection( "jdbc:mysql://hadoop01:3306/test?characterencoding=utf-8", "root", "root") //将连接push进连接池 connectionque.push(connection) } } } catch { //捕获异常并输出 case e: sqlexception => e.printstacktrace() } //如果连接池不为空,则返回表头元素,并将它在链表里删除 return connectionque.poll() }) } /** * 当连接对象用完后,需要调用这个方法归还连接 * @param connection */ def returnconnection(connection: connection) = { //插入元素 connectionque.push(connection) } def main(args: array[string]): unit = { //main方法测试 getconnection() println(connectionque.size()) } }
wordcount代码
package streamingdemo import org.apache.log4j.{level, logger} import org.apache.spark.{sparkconf, streaming} import org.apache.spark.streaming.{seconds, streamingcontext} object foreachrdddemo { def main(args: array[string]): unit = { //设置日志级别,避免info信息过多 logger.getlogger("org").setlevel(level.warn) //设置hadoop的用户,不加也可以 system.setproperty("hadoop_user_name", "setsuna") //spark基本配置 val conf = new sparkconf() .setappname(this.getclass.getsimplename) .setmaster("local[2]") val ssc = new streamingcontext(conf, streaming.seconds(2)) //因为要使用updatestatebykey,所以需要使用checkpoint ssc.checkpoint("hdfs://hadoop01:9000/checkpoint") //设置socket,跟nc配置的一样 val linesdstream = ssc.sockettextstream("hadoop01", 6666) val wordcountdstream = linesdstream .flatmap(_.split(" ")) //根据空格做分词 .map((_, 1)) //生成(word,1) .updatestatebykey((values: seq[int], state: option[int]) => { //实时更新状态信息 var count = state.getorelse(0) for (value <- values) { count += value } option(count) }) wordcountdstream.foreachrdd(rdd => { if (!rdd.isempty()) { rdd.foreachpartition(part => { //从连接池中获取连接 val connection = jdbcmanager.getconnection() part.foreach(data => { val sql = //往wordcount表中插入wordcount信息,on duplicate key update子句是有则更新无则插入 s"insert into wordcount (word,count) " + s"values ('${data._1}',${data._2}) on duplicate key update count=${data._2}" //使用preparestatement来使用sql语句 val pstmt = connection.preparestatement(sql) pstmt.executeupdate() }) //在连接处提交完数据后,归还连接到连接池 jdbcmanager.returnconnection(connection) }) } }) ssc.start() ssc.awaittermination() } }
打开nc,输入数据
在另一个终端对wordcount的结果进行查询,可以发现是实时发生变化的
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。