SparkStreaming综合整体的练习题![强烈推荐]
程序员文章站
2024-03-08 09:32:22
...
需求
准备数据 :
打开datas.txt数据,将每一行数据中第九列的数据“张三丰”更换成自己的名字,其他人不替换,并保存。[直接使用文替换]
- 在kafak中创建RNGComment主题,设置3个分区2个副本
- 请把datas.txt文件数据写入到kafka中,每秒钟写入一条,数据根据Index进行分区,Index小于等于150920并且为奇数的发送到一个分区中,Index小于等于150920并且为偶数的发送到另一个分区,Index大于150920的放在最后一个分区。
生产到kafka后使用如下命令查看分区中数据量
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list node01:9092,node02:9092,node03:9092 --topic RNGComment
- 使用Spark Streaming对接kafka之后进行计算
在mysql中创建一个数据库rng_comment
在数据库rng_comment创建t_user_counts表, 字段为 id,username,count
在数据库rng_comment创建t_user_counts2表, 字段为 id,username,count - 实时过滤出“自己姓名”的数据添加到t_user_counts表中,每条插入一次。
id字段为数据中的Index,username为数据中的user_name,count为固定值“1”
数据直接插入,不需要做汇总。
提示:最终t_user_counts表内会出现很多相同数据。 - 实时统计“自己姓名”数据的总条数,更新到t_user_counts2(此表只有一条数据)。
- 使用kettle 将t_user_counts表中的数据同步到本地,生成文本文件,并将文本文件作为答案提交
代码
1.bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --replication-factor 2 --partitions 3 --topic RNGComment
2.
package code02;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.*;
import java.util.Properties;
public class Producer {
public static void main(String[] args) throws IOException, InterruptedException {
//1、配置kafka集群
Properties props = new Properties();
//kafka服务器地址
props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
//消息确认机制
props.put("acks", "all");
//重试机制
props.put("retries", 0);
//批量发送的大小
props.put("batch.size", 16384);
//消息延迟
props.put("linger.ms", 1);
//批量的缓冲区大小
props.put("buffer.memory", 33554432);
//kafka数据中key value的序列化
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//props.put("partitioner.class", "ProducerPartition");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
String line=null;
int partition=0;
BufferedReader bufferedReader = new BufferedReader(new FileReader(new File("0424Data/datas.txt")));
while ((line=bufferedReader.readLine())!=null){
int Index = Integer.parseInt(line.split("\t")[0]);
if (Index<150920&&Index%2!=0){
partition=0;
}else if (Index<150920&&Index%2==0){
partition=1;
}else if (Index>150920){
partition=2;
}
ProducerRecord record =new ProducerRecord("RNGComment",partition,partition+"",line);
System.out.println(line);
Thread.sleep(1000);
kafkaProducer.send(record);
}
kafkaProducer.close();
}
}
CREATE TABLE `t_word_counts` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`word` varchar(255) NOT NULL,
`count` int(11) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `word` (`word`)
) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8;
--------------------------------------------------------
CREATE TABLE `t_word_counts2` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`word` varchar(255) NOT NULL,
`count` int(11) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `word` (`word`)
) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8;
package code02
import java.sql.{Connection, DriverManager}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object SparkStreamingAdnKafka {
def main(args: Array[String]): Unit = {
@transient
val conf = new SparkConf().setAppName("Kafka").setMaster("local[*]")
@transient
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint("./wc")
def kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "SparkKafkaDemo",
//earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
//latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
//none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
//这里配置latest自动重置偏移量为最新的偏移量,即如果有偏移量从偏移量位置开始消费,没有偏移量从新来的数据开始消费
"auto.offset.reset" -> "latest",
//false表示关闭自动提交.由spark帮你提交到Checkpoint或程序员手动维护
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val result = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array("RNGComment"), kafkaParams))
var url="jdbc:mysql://node01:3306/rng_comment?characterEncoding=UTF-8"
var user="root"
var password="123456"
val resultRdd = result.filter(x => x.value().contains("自己的姓名")).map(x => x.value().replaceAll("\t\t", "\t").split("\t"))
resultRdd.foreachRDD(rdd=>{
rdd.foreachPartition(iter=>{
val connection = DriverManager.getConnection(url, user, password)
var sql=
"""
|insert into t_word_counts
|values (?,?,?)
|""".stripMargin
iter.foreach(lien=>{
val statement = connection.prepareStatement(sql)
statement.setInt(1,lien(0).toInt)
statement.setString(2,lien(7))
statement.setInt(3,1)
statement.executeUpdate()
statement.close()
})
connection.close()
})
})
ssc.start()
ssc.awaitTermination()
}
}
package code02
import java.sql.{Connection, DriverManager}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object SparkStreamingAdnKafka {
def main(args: Array[String]): Unit = {
@transient
val conf = new SparkConf().setAppName("Kafka").setMaster("local[*]")
@transient
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint("./wc")
def kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "SparkKafkaDemo",
//earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
//latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
//none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
//这里配置latest自动重置偏移量为最新的偏移量,即如果有偏移量从偏移量位置开始消费,没有偏移量从新来的数据开始消费
"auto.offset.reset" -> "latest",
//false表示关闭自动提交.由spark帮你提交到Checkpoint或程序员手动维护
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val result = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array("RNGComment"), kafkaParams))
var url="jdbc:mysql://node01:3306/rng_comment?characterEncoding=UTF-8"
var user="root"
var password="123456"
val resultRdd = result.filter(x => x.value().contains("自己的姓名")).map(x => x.value().replaceAll("\t\t", "\t").split("\t"))
val value: DStream[(String, Int)] = resultRdd.map(x => (x(7), 1)).updateStateByKey(updateFunc)
value.foreachRDD(rdd => {
rdd.foreach(x=>{
val connection = DriverManager.getConnection(url, user, password)
var insertSQL=
"""
|replace into t_word_counts2
|values (1,?,?)
|""".stripMargin
val statement = connection.prepareStatement(insertSQL)
statement.setString(1,x._1)
statement.setInt(2,x._2)
statement.executeUpdate()
statement.close()
})
})
ssc.start()
ssc.awaitTermination()
}
def updateFunc(currentValues: Seq[Int], historyValue: Option[Int]) = {
val count = currentValues.sum + historyValue.getOrElse(0)
Some(count)
}
}
执行就可以了
上一篇: python实现抽样分布的验证
下一篇: Java 时间日期详细介绍及实例