欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

flink -- 分布式缓存

程序员文章站 2022-07-14 13:57:22
...

现在有两种相关的数据,A:机器位置数据,B:机器状态巡检数据


机器位置数据A(:数据量小)

ip 位置
192.168.100.1 机柜1编号1机器
192.168.100.2 机柜1编号2机器
192.168.100.3 机柜1编号3机器
192.168.100.4 机柜1编号4机器
192.168.100.5 机柜1编号5机器
192.168.100.6 机柜1编号6机器
192.168.100.7 机柜1编号7机器
192.168.100.8 机柜1编号8机器
192.168.100.9 机柜1编号9机器
192.168.100.10 机柜1编号10机器

机器状态巡检数据B(:数据量超大)

时间 ip cpu使用率 磁盘使用率 内存使用率 标签
2018-10-01 12:00:00 192.168.100.1 50 50 50 正常
2018-10-01 12:00:00 192.168.100.2 50 50 50 正常
2018-10-01 12:00:00 192.168.100.3 50 50 50 正常
2018-10-01 12:00:00 192.168.100.4 50 50 50 正常
2018-10-01 12:00:00 192.168.100.5 50 50 50 正常
2018-10-01 12:00:00 192.168.100.6 50 50 50 正常

需求一:找到不正常的机器所在的位置

/**
  * step1:注册一个本地/hdfs
  * step2:在open方法中获取到分布式缓存的内容即可
  */
object IPLocation {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val path = "./data/iplocation"
    // step1:注册一个本地/hdfs文件
    env.registerCachedFile(path, "iplocation")
    import org.apache.flink.api.scala._
    val dataSet = env.fromCollection(createData())
    dataSet.map(new RichMapFunction[(String, String, Int, Int, Int, String), (String, String, Int, Int, Int, String)] {
      var iplocations = scala.collection.mutable.Map.empty[String, String]

      // step2:在open方法中获取到分布式缓存的内容即可
      override def open(parameters: Configuration): Unit = {
        val dcFile = getRuntimeContext().getDistributedCache().getFile("iplocation")
        val lines = FileUtils.readLines(dcFile)
        import scala.collection.JavaConverters._
        for (ele <- lines.asScala) {
          val ss = ele.split("\\s")
          iplocations(ss(0)) = ss(1)
        }
      }

      override def map(x: (String, String, Int, Int, Int, String)): (String, String, Int, Int, Int, String) = {
        var address = "--"
        if (iplocations.contains(x._2)) {
          address = iplocations(x._2)
        }
        (x._1, address, x._3, x._4, x._5, x._6)
      }
    }).filter(x => {
      x._6.equals("异常")
    }).map(x => (x._1, x._2)).print()
  }

  def createData(): Seq[(String, String, Int, Int, Int, String)] = {
    val ips = List[String](
      "192.168.100.1",
      "192.168.100.2",
      "192.168.100.3",
      "192.168.100.4",
      "192.168.100.5",
      "192.168.100.6",
      "192.168.100.7",
      "192.168.100.8",
      "192.168.100.9",
      "192.168.100.10"
    )
    val lables = List(
      "正常",
      "异常"
    )
    val r = scala.util.Random
    val list = (1 to 10000).map(x => {
      val index = r.nextInt(10)
      val s = 10 + r.nextInt(50)
      val month = r.nextInt(10)
      val day = 10 + r.nextInt(18)
      val hour = r.nextInt(10)
      val minute = 10 + r.nextInt(50)
      val time = 2019 + "-0" + month + "-" + day + " 0" + hour + ":" + minute + ":" + s
      var ip = ips(index)
      if (index > 7) {
        ip = "xxxx"
      }
      (time, ip, r.nextInt(101), r.nextInt(101), r.nextInt(101), lables(r.nextInt(2)))
    })
    list
  }

}

结果

(2019-06-23 00:18:37,机柜1编号6机器)
(2019-06-14 04:36:16,--)
(2019-00-21 01:36:18,机柜1编号1机器)
(2019-04-24 02:57:30,机柜1编号2机器)
(2019-08-25 04:25:36,机柜1编号7机器)
(2019-08-23 03:49:28,机柜1编号3机器)
(2019-08-23 04:29:41,机柜1编号2机器)
(2019-03-23 00:12:33,机柜1编号3机器)
(2019-03-15 06:14:50,机柜1编号5机器)
(2019-00-17 06:42:55,机柜1编号8机器)
(2019-03-20 08:29:28,机柜1编号8机器)
(2019-00-15 00:26:37,机柜1编号7机器)
(2019-01-10 01:54:34,机柜1编号8机器)
(2019-01-10 02:56:43,机柜1编号5机器)
(2019-08-24 02:30:16,机柜1编号8机器)
(2019-04-11 04:57:17,--)
(2019-00-27 08:50:57,机柜1编号1机器)
(2019-04-14 09:34:15,机柜1编号6机器)
(2019-08-23 08:30:27,机柜1编号3机器)
(2019-01-27 03:50:28,机柜1编号2机器)
(2019-09-24 08:24:44,机柜1编号7机器)
(2019-02-16 07:19:12,机柜1编号6机器)
(2019-08-14 00:48:46,--)
(2019-00-27 08:53:40,机柜1编号7机器)
(2019-03-14 03:44:19,机柜1编号1机器)
(2019-00-15 04:16:18,机柜1编号2机器)
(2019-08-12 02:47:56,机柜1编号4机器)
(2019-07-21 08:16:53,机柜1编号4机器)
(2019-07-20 07:41:14,机柜1编号2机器)
(2019-05-27 08:32:25,机柜1编号2机器)
(2019-05-21 08:56:59,机柜1编号2机器)
....