RocketMQ写入数据报错RemotingTooMuchRequestException: sendDefaultImpl call timeout
程序员文章站
2022-07-14 23:04:51
...
1.视界
2.背景
想做一个spark structured streaming 写入到RocketMQ的测试
/**
* 背景:要写一个Spark Structured Streaming 写入到 RocketMQ的实现
* 该代码是从 https://github.com/lccbiluox2/rocketmq-externals.git 下载的代码
* 测试点:测试Spark 写入到rocketMQ
* 1,1,1
*/
@Test
def telnetToRocketMqTest220(): Unit ={
val host = "localhost"
val port = "9997"
logInfo(String.format("监听主机:%s,端口:%s", host, port))
val spark = SparkSession
.builder
.appName("testRocketMQ")
.master("local[4]")
.getOrCreate()
val map = spark.conf.getAll.mkString("\n")
logInfo(String.format("系统参数如下(jar生效测试,名称没有):\n %s ", map))
// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.load()
lines.printSchema()
val lineData = lines.selectExpr("CAST(value AS STRING)").as(Encoders.STRING)
import spark.implicits._
val df = lineData.map(x => {
val values = x.split(",")
val id = values(0)
val name = values(1)
val age = values(2)
new Person(id, name, age)
})
df.printSchema()
df.createOrReplaceTempView("person")
val lastData = spark.sql("select * from person");
var valueDF = lastData.selectExpr("to_json(struct(*)) AS body")
var streamReader = valueDF.writeStream
.format("org.apache.spark.sql.rocketmq.RocketMQSourceProvider")
.outputMode(OutputMode.Append())
.option("checkpointLocation","/data/spark/checkpoint")
.option("nameServer", "localhost:9876")
.option("topic", "TopicTest")
.trigger(Trigger.ProcessingTime(10000L, TimeUnit.MILLISECONDS))
.queryName("sinkTest")
val query = streamReader.start()
query.awaitTermination()
}
然后执行的时候不报错
19/10/09 15:53:57.174 NettyClientSelector_1 INFO RocketmqRemoting: closeChannel: close the connection to remote address[] result: true
但是日志中有这样的一句。
然后我打开控制台消费TopicTest中的数据,结果发现一直消费不到。
怀疑是没写入数据,于是我使用默认的程序写入
aaa@qq.com rocketmq-all-4.5.0-bin-release$ export NAMESRV_ADDR=localhost:9876
aaa@qq.com rocketmq-all-4.5.0-bin-release$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
结果报错了,写入不进去
6:20:20.955 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[] result: true
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:635)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1280)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1226)
at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:283)
at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:67)
16:20:24.963 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[] result: true
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:635)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1280)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1226)
at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:283)
at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:67)
16:20:28.968 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[] result: true
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:635)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1280)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1226)16:20:32.979 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[] result: true
3.环境
此时环境为
配置文件
# 绑定的主机
namesrvAddr = localhost:9876
brokerIP1 = localhost
mqnamesrv启动中
aaa@qq.com rocketmq-all-4.5.0-bin-release$ sh bin/mqnamesrv
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
Java HotSpot(TM) 64-Bit Server VM warning: Cannot open file /dev/shm/rmq_srv_gc.log due to No such file or directory
The Name Server boot success. serializeType=JSON
broker启动中
aaa@qq.com rocketmq-all-4.5.0-bin-release$ sh bin/mqbroker -n localhost:9876
Java HotSpot(TM) 64-Bit Server VM warning: Cannot open file /dev/shm/mq_gc_pid41069.log due to No such file or directory
The broker[lcc, 172.16.2.2:10911] boot success. serializeType=JSON and name server is localhost:9876
4. 修改IP
参考 : https://blog.csdn.net/lw5885799/article/details/88646051
配置文件
# 绑定的主机
namesrvAddr = 192.168.43.73:9876
brokerIP1 = 192.168.43.73
启动
aaa@qq.com rocketmq-all-4.5.0-bin-release$ sh bin/mqnamesrv -n 192.168.43.73:9876 -c conf/broker.conf autoCreateTopicEnable=true
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
Java HotSpot(TM) 64-Bit Server VM warning: Cannot open file /dev/shm/rmq_srv_gc.log due to No such file or directory
load config properties file OK, conf/broker.conf
The Name Server boot success. serializeType=JSON
aaa@qq.com rocketmq-all-4.5.0-bin-release$ sh bin/mqbroker -n 192.168.43.73:9876
Java HotSpot(TM) 64-Bit Server VM warning: Cannot open file /dev/shm/mq_gc_pid55588.log due to No such file or directory
The broker[lcc, 172.16.2.2:10911] boot success. serializeType=JSON and name server is 192.168.43.73:9876
即使是指定了IP,也是不可以的。
5.关闭VIP
broker中加入如下配置
com.rocketmq.sendMessageWithVIPChannel = false
报错:https://blog.csdn.net/qq_21383435/article/details/102468720