欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

fabricv1.0.3的kafka共识下交易”丢失“的问题

程序员文章站 2022-04-03 22:26:16
...

fabricv1.0.3的kafka共识下交易”丢失“的问题

此问题fabric在1.3release中已经解决,之前的版本还未修复。

背景

在实施项目时,发现在负载不大的时候,sdk的发送交易在特定情况会超时。

项目中sdk发交易的特点:

1、sdk是invoke为同步交易。

2、数据源为消息中间件且需要保证消息的顺序性,msg需要串行处理

起初以为是peer收到区块时,eventhub传输block event的时候出了问题,所以sdk会任务交易超时。分别查看peer和orderer日志后发现,peer收到包含该txid的区块收到已经延迟了30s左右。

问题分析

定位

首先,peer和orderer负载并没有很大,而且30s在fabric里面是一个很特殊的时间,是chaincode执行交易超时的时间。

仔细查看peer获取包含该txid的区块的时间,比sdk发出交易晚了刚刚好30s多一点,再查看orderer生成该block的时间与peer commit该区块的时间相差无几,由此可以确认问题的根源在于orderer切块”慢了“。

逻辑梳理

仔细查看kafka共识算法,其将满足条件的交易打包成一个区块以提高效率,这个条件或是超过一定条数,或是超过一定大小,以此满足在大量连续交易时,orderer可以合理的出块。

BatchSize:
        # Max Message Count: The maximum number of messages to permit in a batch
        MaxMessageCount: 10
        # Preferred Max Bytes: The preferred maximum number of bytes allowed for
        # the serialized messages in a batch. A message larger than the preferred
        # max bytes will result in a batch larger than preferred max bytes.
        PreferredMaxBytes: 512 KB

当交易数量不足MaxMessageCount,或者pending的消息不足PreferredMaxBytes时,设计者选择让定时切块的机制来兜底,默认2s会切出一个区块,当交易量很小且交易本身size也小的时候,常常就是2s生成一个区块。

如果2s必须生成一个区块,这样可能会导致orderer生成了一堆空的区块,那就得不偿失了。所以设计者决定只有在一种特定情况下,才会去触发定时器,向kafka发送定时切块消息。这种情况就是,共识验证了一笔交易,但是这个交易并没有导致切块,也就是说有交易pending。

func processRegular(regularMessage *ab.KafkaMessageRegular, support multichain.ConsenterSupport, timer *<-chan time.Time, receivedOffset int64, lastCutBlockNumber *uint64) error {
	...
	batches, committers, ok, pending := support.BlockCutter().Ordered(env)
	logger.Debugf("[channel: %s] Ordering results: items in batch = %d, ok = %v, pending = %v", support.ChainID(), len(batches), ok, pending)
	if ok && len(batches) == 0 && *timer == nil {
		*timer = time.After(support.SharedConfig().BatchTimeout())
		logger.Debugf("[channel: %s] Just began %s batch timer", support.ChainID(), support.SharedConfig().BatchTimeout().String())
		return nil
	}
	...
	if len(batches) > 0 {
    *timer = nil
	}
	return nil
}

如果新增了一笔交易,且没有直接出块(即len(batches) == 0),这笔交易确实需要依靠定时器才能保证交易及时出块(假如后续无任何的其他交易)。

但是,这并不能覆盖所有的情况。当pendingBatchSizeBytes+messageSizeBytes > PreferredMaxBytes时,Orderered返回的batches长度为1,却有pending消息还没出来。然而,因为batches长度不为0,无法触发timer,所以2秒之后pending的消息仍然会滞留,从sdk的视角去看,这笔交易是”丢失“了。当然,如果后续还有其他交易的话,可以将这种滞留消息顶出来,但是在单线程且同步invoke的时候,这问题会相当致命。

func (r *receiver) Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, committerBatches [][]filter.Committer, validTx bool, pending bool) {
  ...
	messageWillOverflowBatchSizeBytes := r.pendingBatchSizeBytes+messageSizeBytes > r.sharedConfigManager.BatchSize().PreferredMaxBytes

	if messageWillOverflowBatchSizeBytes {
		logger.Debugf("The current message, with %v bytes, will overflow the pending batch of %v bytes.", messageSizeBytes, r.pendingBatchSizeBytes)
		logger.Debugf("Pending batch would overflow if current message is added, cutting batch now.")
		messageBatch, committerBatch := r.Cut()
		messageBatches = append(messageBatches, messageBatch)
		committerBatches = append(committerBatches, committerBatch)
	}
  ...
}

bug复现

为了复现pendingBatchSizeBytes+messageSizeBytes > PreferredMaxBytes的情况,保证每条交易的大小都超过PreferredMaxBytes的一半。

复现区块”丢失“的问题:

以官方的examples/e2e部署为例,做以下调整:

chaincode使用examples/chaincode/go/marbles02

configtx.yaml修改PreferredMaxBytes为4kb(因为marbels02的一条invoke大小刚好超过2k,这样连续两条invoke就可以复现len(batchs) == 1且有数据pending的情况 )
fabricv1.0.3的kafka共识下交易”丢失“的问题

步骤

#进入cli容器,连续发送两条invoke(必须在2s以内)
peer chaincode invoke -o orderer.example.com:7050 --tls --cafile $ORDERER_CA -C mychannel -n marbles -c '{"Args":["initMarble","marble1","blue","35","tom"]}'
peer chaincode invoke -o orderer.example.com:7050 --tls --cafile $ORDERER_CA -C mychannel -n marbles -c '{"Args":["initMarble","marble2","red","50","tom"]}'
#在两秒后执行如下query,发现只能查询到marble1
peer chaincode query -C mychannel -n marbles -c '{"Args":["readMarble","marble1"]}'
peer chaincode query -C mychannel -n marbles -c '{"Args":["readMarble","marble2"]}'
#再发送消息,再次查询,发现最后有一个交易无法查询到
peer chaincode invoke -o orderer.example.com:7050 --tls --cafile $ORDERER_CA -C mychannel -n marbles -c '{"Args":["initMarble","marble3","blue","70","tom"]}'
peer chaincode query -C mychannel -n marbles -c '{"Args":["readMarble","marble3"]}'

修复

func processRegular(regularMessage *ab.KafkaMessageRegular, support multichain.ConsenterSupport, timer *<-chan time.Time, receivedOffset int64, lastCutBlockNumber *uint64) error {
	...
	if len(batches) > 0 {
		if pending {
			*timer = time.After(support.SharedConfig().BatchTimeout())
		} else {
			*timer = nil
		}
	}
	return nil
}

就算本次交易已经切块,只要还有pending的消息,就重置定时器时间,因为pending交易可能会需要定时切块来出块。

问题的表现

这个bug导致的问题表现可能是:

1、chaincode event丢失

2、交易超时(即block event丢失)