flink添加黑名单需求时报错
程序员文章站
2022-05-09 15:10:09
...
报错:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:678)
at com.atguigu.marketanalysis.AdStatisticsByGeo$.main(AdStatisticsByGeo.scala:60)
at com.atguigu.marketanalysis.AdStatisticsByGeo.main(AdStatisticsByGeo.scala)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:175)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.lang.IndexOutOfBoundsException: 3
at com.atguigu.marketanalysis.BlackListWarning.productElement(AdStatisticsByGeo.scala:25)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:96)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736)
at org.apache.flink.streaming.api.operators.KeyedProcessOperator$ContextImpl.output(KeyedProcessOperator.java:132)
at com.atguigu.marketanalysis.FilterBlackListUser.processElement(AdStatisticsByGeo.scala:83)
at com.atguigu.marketanalysis.FilterBlackListUser.processElement(AdStatisticsByGeo.scala:64)
at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)
Process finished with exit code 1
代码:
package com.atguigu.marketanalysis
import java.sql.Timestamp
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
/**
* Project: com.atguigu.marketanalysis
* Created by Bright on 2020-09-08 18:58
* Description: 在省份分组统计count的基础上加上黑名单机制
*
*/
//todo 样例类
case class AdClickLog(userId: String,
adId: String,
province: String,
city: String,
timestamp: Long)
case class AdViewCountByProvince(windowEnd: String,
province: String,
count: Long)
case class BlackListWarning(userId: String, adId: String, msg: String)
object AdStatisticsByGeo {
def main(args: Array[String]): Unit = {
//todo 环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//todo 读取数据并转换成样例类
val dataStream: DataStream[AdClickLog] = env
.readTextFile("D:\\code\\UserBehaviorAnalysis\\MarketAnalusis\\src\\main\\resources\\AdClickLog.csv")
.map(line => {
val arr: Array[String] = line.split(",")
AdClickLog(arr(0), arr(1), arr(2), arr(3), arr(4).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
val filteredStream: DataStream[AdClickLog] = dataStream
.keyBy(data => (data.userId, data.adId))
.process(new FilterBlackListUser(100))
//todo 根据省份分组,开窗聚合
val adCountStream: DataStream[AdViewCountByProvince] = filteredStream
.keyBy(_.province)
.timeWindow(Time.minutes(60), Time.seconds(5))
.aggregate(new AdCountAgg(), new AdCountResult())
adCountStream.print("result")
filteredStream.getSideOutput(new OutputTag[AdClickLog]("black-list")).print("black list")
env.execute("ad click count job")
}
}
class AdCountAgg() extends AggregateFunction[AdClickLog, Long, Long] {
override def createAccumulator(): Long = 0L
override def add(value: AdClickLog, accumulator: Long): Long = accumulator + 1
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b
}
class AdCountResult() extends WindowFunction[Long, AdViewCountByProvince, String, TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[AdViewCountByProvince]): Unit = {
val end = new Timestamp(window.getEnd).toString
out.collect(AdViewCountByProvince(end, key, input.head))
}
}
class FilterBlackListUser(maxCount: Int) extends KeyedProcessFunction[(String, String), AdClickLog, AdClickLog] {
//定义需要的状态
lazy val countState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("count", classOf[Long]))
lazy val resetTimerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("reset-time-ts", classOf[Long]))
lazy val isInBlackListState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-in-blacklist", classOf[Boolean]))
override def processElement(value: AdClickLog, ctx: KeyedProcessFunction[(String, String), AdClickLog, AdClickLog]#Context, out: Collector[AdClickLog]): Unit = {
val curCount: Long = countState.value()
if (curCount == 0) {
val ts = (ctx.timerService().currentProcessingTime() / (1000 * 60 * 60 * 24) + 1) / (24 * 3600 * 1000) - 8 * 3600 * 1000
ctx.timerService().registerProcessingTimeTimer(ts)
resetTimerTsState.update(ts)
}
if (curCount >= maxCount) {
if (!isInBlackListState.value()) {
ctx.output(new OutputTag[BlackListWarning]("black-list"),
BlackListWarning(value.userId, value.adId, s"click over $maxCount times today"))
isInBlackListState.update(true)
}
return
}
out.collect(value)
countState.update(curCount + 1)
}
}
还是写错 没有逻辑上太大的问题
发生错误的时候一定先看提醒 点击可以点击的提醒行 就点击看看给的提示
真不行就一行行地看吧 不过 一般情况是不需要如此的