欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SparkStreaming综合整体的练习题![强烈推荐]

程序员文章站 2024-03-08 09:32:22
...

需求

SparkStreaming综合整体的练习题![强烈推荐]
准备数据 :
打开datas.txt数据,将每一行数据中第九列的数据“张三丰”更换成自己的名字,其他人不替换,并保存。[直接使用文替换]

  1. 在kafak中创建RNGComment主题,设置3个分区2个副本
  2. 请把datas.txt文件数据写入到kafka中,每秒钟写入一条,数据根据Index进行分区,Index小于等于150920并且为奇数的发送到一个分区中,Index小于等于150920并且为偶数的发送到另一个分区,Index大于150920的放在最后一个分区。
    生产到kafka后使用如下命令查看分区中数据量
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list node01:9092,node02:9092,node03:9092 --topic RNGComment
  1. 使用Spark Streaming对接kafka之后进行计算
    在mysql中创建一个数据库rng_comment
    在数据库rng_comment创建t_user_counts表, 字段为 id,username,count
    在数据库rng_comment创建t_user_counts2表, 字段为 id,username,count
  2. 实时过滤出“自己姓名”的数据添加到t_user_counts表中,每条插入一次。
    id字段为数据中的Index,username为数据中的user_name,count为固定值“1”
    数据直接插入,不需要做汇总。
    提示:最终t_user_counts表内会出现很多相同数据。
  3. 实时统计“自己姓名”数据的总条数,更新到t_user_counts2(此表只有一条数据)。
  4. 使用kettle 将t_user_counts表中的数据同步到本地,生成文本文件,并将文本文件作为答案提交

代码

1.bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --replication-factor 2 --partitions 3 --topic RNGComment
2.

package code02;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.*;
import java.util.Properties;

public class Producer {

    public static void main(String[] args) throws IOException, InterruptedException {

        //1、配置kafka集群
        Properties props = new Properties();
        //kafka服务器地址
        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
        //消息确认机制
        props.put("acks", "all");
        //重试机制
        props.put("retries", 0);
        //批量发送的大小
        props.put("batch.size", 16384);
        //消息延迟
        props.put("linger.ms", 1);
        //批量的缓冲区大小
        props.put("buffer.memory", 33554432);
        //kafka数据中key  value的序列化
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        //props.put("partitioner.class", "ProducerPartition");


        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        String line=null;
        int partition=0;
        BufferedReader bufferedReader = new BufferedReader(new FileReader(new File("0424Data/datas.txt")));
        while ((line=bufferedReader.readLine())!=null){
            int Index = Integer.parseInt(line.split("\t")[0]);
            if (Index<150920&&Index%2!=0){
                partition=0;
            }else if (Index<150920&&Index%2==0){
                partition=1;
            }else if (Index>150920){
                partition=2;
            }
            ProducerRecord record =new ProducerRecord("RNGComment",partition,partition+"",line);
            System.out.println(line);
            Thread.sleep(1000);
            kafkaProducer.send(record);
        }
            kafkaProducer.close();
    }
}

    CREATE TABLE `t_word_counts` (
        `id` int(11) NOT NULL AUTO_INCREMENT,
        `word` varchar(255) NOT NULL,
        `count` int(11) DEFAULT NULL,
        PRIMARY KEY (`id`),
        UNIQUE KEY `word` (`word`)
      ) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8;
      --------------------------------------------------------
          CREATE TABLE `t_word_counts2` (
        `id` int(11) NOT NULL AUTO_INCREMENT,
        `word` varchar(255) NOT NULL,
        `count` int(11) DEFAULT NULL,
        PRIMARY KEY (`id`),
        UNIQUE KEY `word` (`word`)
      ) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8;
package code02

import java.sql.{Connection, DriverManager}

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}


object SparkStreamingAdnKafka {
  def main(args: Array[String]): Unit = {
    @transient
    val conf = new SparkConf().setAppName("Kafka").setMaster("local[*]")
    @transient
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc, Seconds(5))
    ssc.checkpoint("./wc")
    def kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "SparkKafkaDemo",
      //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      //none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      //这里配置latest自动重置偏移量为最新的偏移量,即如果有偏移量从偏移量位置开始消费,没有偏移量从新来的数据开始消费
      "auto.offset.reset" -> "latest",
      //false表示关闭自动提交.由spark帮你提交到Checkpoint或程序员手动维护
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val result = KafkaUtils.createDirectStream[String, String](ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Array("RNGComment"), kafkaParams))

    var url="jdbc:mysql://node01:3306/rng_comment?characterEncoding=UTF-8"
    var user="root"
    var password="123456"

    val resultRdd = result.filter(x => x.value().contains("自己的姓名")).map(x => x.value().replaceAll("\t\t", "\t").split("\t"))
  resultRdd.foreachRDD(rdd=>{
      rdd.foreachPartition(iter=>{
        val connection = DriverManager.getConnection(url, user, password)
        var sql=
          """
            |insert into  t_word_counts
            |values (?,?,?)
            |""".stripMargin
        iter.foreach(lien=>{
          val statement = connection.prepareStatement(sql)
          statement.setInt(1,lien(0).toInt)
          statement.setString(2,lien(7))
          statement.setInt(3,1)
          statement.executeUpdate()
          statement.close()
        })
        connection.close()
      })
    })
    ssc.start()
    ssc.awaitTermination()
  }

}

package code02

import java.sql.{Connection, DriverManager}

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}


object SparkStreamingAdnKafka {
  def main(args: Array[String]): Unit = {
    @transient
    val conf = new SparkConf().setAppName("Kafka").setMaster("local[*]")
    @transient
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc, Seconds(5))
    ssc.checkpoint("./wc")
    def kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "SparkKafkaDemo",
      //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      //none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      //这里配置latest自动重置偏移量为最新的偏移量,即如果有偏移量从偏移量位置开始消费,没有偏移量从新来的数据开始消费
      "auto.offset.reset" -> "latest",
      //false表示关闭自动提交.由spark帮你提交到Checkpoint或程序员手动维护
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val result = KafkaUtils.createDirectStream[String, String](ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Array("RNGComment"), kafkaParams))

    var url="jdbc:mysql://node01:3306/rng_comment?characterEncoding=UTF-8"
    var user="root"
    var password="123456"

    val resultRdd = result.filter(x => x.value().contains("自己的姓名")).map(x => x.value().replaceAll("\t\t", "\t").split("\t"))

  val value: DStream[(String, Int)] = resultRdd.map(x => (x(7), 1)).updateStateByKey(updateFunc)
    value.foreachRDD(rdd => {
      rdd.foreach(x=>{
        val connection = DriverManager.getConnection(url, user, password)
        var insertSQL=
          """
            |replace into  t_word_counts2
            |values (1,?,?)
            |""".stripMargin
      val statement = connection.prepareStatement(insertSQL)
          statement.setString(1,x._1)
          statement.setInt(2,x._2)
          statement.executeUpdate()
          statement.close()


      })
    })
    ssc.start()
    ssc.awaitTermination()
  }
  def updateFunc(currentValues: Seq[Int], historyValue: Option[Int]) = {
    val count = currentValues.sum + historyValue.getOrElse(0)
    Some(count)
  }
}

SparkStreaming综合整体的练习题![强烈推荐]
SparkStreaming综合整体的练习题![强烈推荐]
SparkStreaming综合整体的练习题![强烈推荐]
SparkStreaming综合整体的练习题![强烈推荐]
SparkStreaming综合整体的练习题![强烈推荐]
SparkStreaming综合整体的练习题![强烈推荐]
SparkStreaming综合整体的练习题![强烈推荐]

SparkStreaming综合整体的练习题![强烈推荐]
SparkStreaming综合整体的练习题![强烈推荐]
SparkStreaming综合整体的练习题![强烈推荐]
SparkStreaming综合整体的练习题![强烈推荐]
SparkStreaming综合整体的练习题![强烈推荐]
SparkStreaming综合整体的练习题![强烈推荐]
执行就可以了

相关标签: Spark