欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RocketMQ写入数据报错RemotingTooMuchRequestException: sendDefaultImpl call timeout

程序员文章站 2022-07-14 23:04:51
...

1.视界

RocketMQ写入数据报错RemotingTooMuchRequestException: sendDefaultImpl call timeout

2.背景

想做一个spark structured streaming 写入到RocketMQ的测试

/**
    * 背景:要写一个Spark Structured Streaming 写入到 RocketMQ的实现
    *      该代码是从 https://github.com/lccbiluox2/rocketmq-externals.git 下载的代码
    * 测试点:测试Spark 写入到rocketMQ
    * 1,1,1
    */
  @Test
  def telnetToRocketMqTest220(): Unit ={

    val host = "localhost"
    val port = "9997"

    logInfo(String.format("监听主机:%s,端口:%s", host, port))

    val spark = SparkSession
      .builder
      .appName("testRocketMQ")
      .master("local[4]")
      .getOrCreate()

    val map = spark.conf.getAll.mkString("\n")
    logInfo(String.format("系统参数如下(jar生效测试,名称没有):\n %s ", map))


    // Create DataFrame representing the stream of input lines from connection to localhost:9999
    val lines = spark.readStream
      .format("socket")
      .option("host", host)
      .option("port", port)
      .load()

    lines.printSchema()

    val lineData =  lines.selectExpr("CAST(value AS STRING)").as(Encoders.STRING)
    import spark.implicits._

    val df = lineData.map(x => {
      val values = x.split(",")
      val id = values(0)
      val name = values(1)
      val age = values(2)
      new Person(id, name, age)
    })

    df.printSchema()
    df.createOrReplaceTempView("person")

    val lastData = spark.sql("select * from person");
    var valueDF = lastData.selectExpr("to_json(struct(*)) AS body")

    var streamReader = valueDF.writeStream
      .format("org.apache.spark.sql.rocketmq.RocketMQSourceProvider")
      .outputMode(OutputMode.Append())
      .option("checkpointLocation","/data/spark/checkpoint")
      .option("nameServer", "localhost:9876")
      .option("topic", "TopicTest")
      .trigger(Trigger.ProcessingTime(10000L, TimeUnit.MILLISECONDS))
      .queryName("sinkTest")

    val query = streamReader.start()
    query.awaitTermination()

  }

然后执行的时候不报错

19/10/09 15:53:57.174 NettyClientSelector_1 INFO RocketmqRemoting: closeChannel: close the connection to remote address[] result: true

但是日志中有这样的一句。
然后我打开控制台消费TopicTest中的数据,结果发现一直消费不到。
怀疑是没写入数据,于是我使用默认的程序写入

aaa@qq.com rocketmq-all-4.5.0-bin-release$ export NAMESRV_ADDR=localhost:9876
aaa@qq.com rocketmq-all-4.5.0-bin-release$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

结果报错了,写入不进去

6:20:20.955 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[] result: true
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:635)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1280)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1226)
	at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:283)
	at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:67)
16:20:24.963 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[] result: true
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:635)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1280)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1226)
	at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:283)
	at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:67)
16:20:28.968 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[] result: true
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:635)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1280)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1226)16:20:32.979 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[] result: true

3.环境

此时环境为
配置文件

# 绑定的主机
namesrvAddr = localhost:9876
brokerIP1 = localhost

mqnamesrv启动中

aaa@qq.com rocketmq-all-4.5.0-bin-release$ sh bin/mqnamesrv
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
Java HotSpot(TM) 64-Bit Server VM warning: Cannot open file /dev/shm/rmq_srv_gc.log due to No such file or directory

The Name Server boot success. serializeType=JSON

broker启动中

aaa@qq.com rocketmq-all-4.5.0-bin-release$ sh bin/mqbroker -n localhost:9876
Java HotSpot(TM) 64-Bit Server VM warning: Cannot open file /dev/shm/mq_gc_pid41069.log due to No such file or directory

The broker[lcc, 172.16.2.2:10911] boot success. serializeType=JSON and name server is localhost:9876

4. 修改IP

参考 : https://blog.csdn.net/lw5885799/article/details/88646051
配置文件

# 绑定的主机
namesrvAddr = 192.168.43.73:9876
brokerIP1 = 192.168.43.73

启动

aaa@qq.com rocketmq-all-4.5.0-bin-release$ sh bin/mqnamesrv  -n 192.168.43.73:9876  -c conf/broker.conf autoCreateTopicEnable=true
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
Java HotSpot(TM) 64-Bit Server VM warning: Cannot open file /dev/shm/rmq_srv_gc.log due to No such file or directory

load config properties file OK, conf/broker.conf
The Name Server boot success. serializeType=JSON




aaa@qq.com rocketmq-all-4.5.0-bin-release$ sh bin/mqbroker -n 192.168.43.73:9876
Java HotSpot(TM) 64-Bit Server VM warning: Cannot open file /dev/shm/mq_gc_pid55588.log due to No such file or directory

The broker[lcc, 172.16.2.2:10911] boot success. serializeType=JSON and name server is 192.168.43.73:9876

即使是指定了IP,也是不可以的。

5.关闭VIP

broker中加入如下配置

com.rocketmq.sendMessageWithVIPChannel = false

报错:https://blog.csdn.net/qq_21383435/article/details/102468720