flink -- 分布式缓存
程序员文章站
2022-07-14 13:57:22
...
现在有两种相关的数据,A:机器位置数据,B:机器状态巡检数据
机器位置数据A(:数据量小)
ip | 位置 |
---|---|
192.168.100.1 | 机柜1编号1机器 |
192.168.100.2 | 机柜1编号2机器 |
192.168.100.3 | 机柜1编号3机器 |
192.168.100.4 | 机柜1编号4机器 |
192.168.100.5 | 机柜1编号5机器 |
192.168.100.6 | 机柜1编号6机器 |
192.168.100.7 | 机柜1编号7机器 |
192.168.100.8 | 机柜1编号8机器 |
192.168.100.9 | 机柜1编号9机器 |
192.168.100.10 | 机柜1编号10机器 |
… | … |
机器状态巡检数据B(:数据量超大)
时间 | ip | cpu使用率 | 磁盘使用率 | 内存使用率 | 标签 |
---|---|---|---|---|---|
2018-10-01 12:00:00 | 192.168.100.1 | 50 | 50 | 50 | 正常 |
2018-10-01 12:00:00 | 192.168.100.2 | 50 | 50 | 50 | 正常 |
2018-10-01 12:00:00 | 192.168.100.3 | 50 | 50 | 50 | 正常 |
2018-10-01 12:00:00 | 192.168.100.4 | 50 | 50 | 50 | 正常 |
2018-10-01 12:00:00 | 192.168.100.5 | 50 | 50 | 50 | 正常 |
2018-10-01 12:00:00 | 192.168.100.6 | 50 | 50 | 50 | 正常 |
… | … | … | … | … | … |
需求一:找到不正常的机器所在的位置
/**
* step1:注册一个本地/hdfs
* step2:在open方法中获取到分布式缓存的内容即可
*/
object IPLocation {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val path = "./data/iplocation"
// step1:注册一个本地/hdfs文件
env.registerCachedFile(path, "iplocation")
import org.apache.flink.api.scala._
val dataSet = env.fromCollection(createData())
dataSet.map(new RichMapFunction[(String, String, Int, Int, Int, String), (String, String, Int, Int, Int, String)] {
var iplocations = scala.collection.mutable.Map.empty[String, String]
// step2:在open方法中获取到分布式缓存的内容即可
override def open(parameters: Configuration): Unit = {
val dcFile = getRuntimeContext().getDistributedCache().getFile("iplocation")
val lines = FileUtils.readLines(dcFile)
import scala.collection.JavaConverters._
for (ele <- lines.asScala) {
val ss = ele.split("\\s")
iplocations(ss(0)) = ss(1)
}
}
override def map(x: (String, String, Int, Int, Int, String)): (String, String, Int, Int, Int, String) = {
var address = "--"
if (iplocations.contains(x._2)) {
address = iplocations(x._2)
}
(x._1, address, x._3, x._4, x._5, x._6)
}
}).filter(x => {
x._6.equals("异常")
}).map(x => (x._1, x._2)).print()
}
def createData(): Seq[(String, String, Int, Int, Int, String)] = {
val ips = List[String](
"192.168.100.1",
"192.168.100.2",
"192.168.100.3",
"192.168.100.4",
"192.168.100.5",
"192.168.100.6",
"192.168.100.7",
"192.168.100.8",
"192.168.100.9",
"192.168.100.10"
)
val lables = List(
"正常",
"异常"
)
val r = scala.util.Random
val list = (1 to 10000).map(x => {
val index = r.nextInt(10)
val s = 10 + r.nextInt(50)
val month = r.nextInt(10)
val day = 10 + r.nextInt(18)
val hour = r.nextInt(10)
val minute = 10 + r.nextInt(50)
val time = 2019 + "-0" + month + "-" + day + " 0" + hour + ":" + minute + ":" + s
var ip = ips(index)
if (index > 7) {
ip = "xxxx"
}
(time, ip, r.nextInt(101), r.nextInt(101), r.nextInt(101), lables(r.nextInt(2)))
})
list
}
}
结果
(2019-06-23 00:18:37,机柜1编号6机器)
(2019-06-14 04:36:16,--)
(2019-00-21 01:36:18,机柜1编号1机器)
(2019-04-24 02:57:30,机柜1编号2机器)
(2019-08-25 04:25:36,机柜1编号7机器)
(2019-08-23 03:49:28,机柜1编号3机器)
(2019-08-23 04:29:41,机柜1编号2机器)
(2019-03-23 00:12:33,机柜1编号3机器)
(2019-03-15 06:14:50,机柜1编号5机器)
(2019-00-17 06:42:55,机柜1编号8机器)
(2019-03-20 08:29:28,机柜1编号8机器)
(2019-00-15 00:26:37,机柜1编号7机器)
(2019-01-10 01:54:34,机柜1编号8机器)
(2019-01-10 02:56:43,机柜1编号5机器)
(2019-08-24 02:30:16,机柜1编号8机器)
(2019-04-11 04:57:17,--)
(2019-00-27 08:50:57,机柜1编号1机器)
(2019-04-14 09:34:15,机柜1编号6机器)
(2019-08-23 08:30:27,机柜1编号3机器)
(2019-01-27 03:50:28,机柜1编号2机器)
(2019-09-24 08:24:44,机柜1编号7机器)
(2019-02-16 07:19:12,机柜1编号6机器)
(2019-08-14 00:48:46,--)
(2019-00-27 08:53:40,机柜1编号7机器)
(2019-03-14 03:44:19,机柜1编号1机器)
(2019-00-15 04:16:18,机柜1编号2机器)
(2019-08-12 02:47:56,机柜1编号4机器)
(2019-07-21 08:16:53,机柜1编号4机器)
(2019-07-20 07:41:14,机柜1编号2机器)
(2019-05-27 08:32:25,机柜1编号2机器)
(2019-05-21 08:56:59,机柜1编号2机器)
....