Fabric源码分析之三启动流程代Orderer分析
一、排序节点的启动
本来是想把Peer和Orderer放到一起,结果发现内容太多了,不得不拆开。其实明白了Peer的流程,Orderer也就差不多了,不同的是由于功能的不同,启动的服务肯定有所不同,诸如通信、数据库等。但到了底层,其生成的方式应该类似,下面就看一下相关的排序节点的源码。
二、启动流程
如果觉得Peer的启动入口函数有点简单,那么Orderer更简单:
1、启动
func main() {
server.Main()
}
所以就是想在这儿偷懒也没办法了,否则这分析就没法分析了,只好跳到调用的server.Main里看看
func Main() {
//kingpin是一个命令行的解析工具,正好和Peer中的cobra相辅相成
fullCmd := kingpin.MustParse(app.Parse(os.Args[1:]))
// "version" command
if fullCmd == version.FullCommand() {
fmt.Println(metadata.GetVersionInfo())
return
}
//拿到相关的配置文件数据 .yaml
conf, err := localconfig.Load()
if err != nil {
logger.Error("failed to parse config: ", err)
os.Exit(1)
}
//初始化日志
initializeLogging()
//初始化本地MSP
initializeLocalMsp(conf)
//格式化打印
prettyPrintStruct(conf)
//按配置启动命令
Start(fullCmd, conf)
}
看代码,仍然是挺简单的,所以这时要提高警惕了,当代码一直简单下来,会误导你的大脑分析也简单化。可是,这是不可能的,Order可是整个区块链系统中最重要的一部分——出块可是它来做的。
初始化日志和格式化打印暂时不关心,初始化本地MSP(包括签名证书啥的),其实就是调用mgmt.go中的loadLocaMSP,然后Setup就可以了。这个放到后面MSP相关来分析。
那么,只有最后一个函数Start,是让人觉得有干货的地方。先把代码晾一下:
func Start(cmd string, conf *localconfig.TopLevel) {
//命令和配置文件被传递进来
//引导块(创世块)校验
bootstrapBlock := extractBootstrapBlock(conf)
if err := ValidateBootstrapBlock(bootstrapBlock); err != nil {
logger.Panicf("Failed validating bootstrap block: %v", err)
}
//处理相关的配置信息和监听信息
opsSystem := newOperationsSystem(conf.Operations, conf.Metrics)
err := opsSystem.Start() //启动监听
if err != nil {
logger.Panicf("failed to initialize operations subsystem: %s", err)
}
defer opsSystem.Stop()
metricsProvider := opsSystem.Provider
//创建帐本工厂
lf, _ := createLedgerFactory(conf, metricsProvider)
sysChanLastConfigBlock := extractSysChanLastConfig(lf, bootstrapBlock)
clusterBootBlock := selectClusterBootBlock(bootstrapBlock, sysChanLastConfigBlock)
//集群类型
clusterType := isClusterType(clusterBootBlock)
//创建MSP本地签名实例对象---空对象
signer := localmsp.NewSigner()
//初始化集群客户配置,签名、私钥、证书和TLS等
clusterClientConfig := initializeClusterClientConfig(conf, clusterType, bootstrapBlock)
clusterDialer := &cluster.PredicateDialer{
ClientConfig: clusterClientConfig,
}
//创建复制对象--集群中使用
r := createReplicator(lf, bootstrapBlock, conf, clusterClientConfig.SecOpts, signer)
// Only clusters that are equipped with a recent config block can replicate.
if clusterType && conf.General.GenesisMethod == "file" {
r.replicateIfNeeded(bootstrapBlock)
}
//同前,创建观察者并监控相关信息到日志
logObserver := floggingmetrics.NewObserver(metricsProvider)
flogging.Global.SetObserver(logObserver)
//利用前面生的配置初始GRPC服务---构造CA证书组件对象
serverConfig := initializeServerConfig(conf, metricsProvider)
//这当中会利用SecureOptions、KeepaliveOptions来保存TLS的公私钥,C/S两端的证书以及
//呼应时间、等待时间等。
grpcServer := initializeGrpcServer(conf, serverConfig)
//证书支持
caSupport := &comm.CredentialSupport{
AppRootCAsByChain: make(map[string]comm.CertificateBundle),
OrdererRootCAsByChainAndOrg: make(comm.OrgRootCAs),
ClientRootCAs: serverConfig.SecOpts.ClientRootCAs,
}
clusterServerConfig := serverConfig
clusterGRPCServer := grpcServer
if clusterType {
clusterServerConfig, clusterGRPCServer = configureClusterListener(conf, serverConfig, grpcServer, ioutil.ReadFile)
}
var servers = []*comm.GRPCServer{grpcServer}
// If we have a separate gRPC server for the cluster, we need to update its TLS
// CA certificate pool too.
//处理集群中的单独GRPC服务器
if clusterGRPCServer != grpcServer {
servers = append(servers, clusterGRPCServer)
}
//TLS连接认证的回调函数,更新每个通道的TLS客户端CA证书
tlsCallback := func(bundle *channelconfig.Bundle) {
// only need to do this if mutual TLS is required or if the orderer node is part of a cluster
if grpcServer.MutualTLSRequired() || clusterType {
logger.Debug("Executing callback to update root CAs")
updateTrustedRoots(caSupport, bundle, servers...)
if clusterType {
updateClusterDialer(caSupport, clusterDialer, clusterClientConfig.SecOpts.ServerRootCAs)
}
}
}
//生成新的签名头
sigHdr, err := signer.NewSignatureHeader()
if err != nil {
logger.Panicf("Failed creating a signature header: %v", err)
}
expirationLogger := flogging.MustGetLogger("certmonitor")
crypto.TrackExpiration(
serverConfig.SecOpts.UseTLS,
serverConfig.SecOpts.Certificate,
[][]byte{clusterClientConfig.SecOpts.Certificate},
sigHdr.Creator,
expirationLogger.Warnf, // This can be used to piggyback a metric event in the future
time.Now(),
time.AfterFunc)
//多通道注册初始化--创建数据存储路径,包括索引数据库和通道区块数据
manager := initializeMultichannelRegistrar(clusterBootBlock, r, clusterDialer, clusterServerConfig, clusterGRPCServer, conf, signer, metricsProvider, opsSystem, lf, tlsCallback)
//设置TLS双向认证标志
mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert
expiration := conf.General.Authentication.NoExpirationChecks
//创建服排序Order服务
server := NewServer(manager, metricsProvider, &conf.Debug, conf.General.Authentication.TimeWindow, mutualTLS, expiration)
logger.Infof("Starting %s", metadata.GetVersionInfo())
go handleSignals(addPlatformSignals(map[os.Signal]func(){
syscall.SIGTERM: func() {
grpcServer.Stop()
if clusterGRPCServer != grpcServer {
clusterGRPCServer.Stop()
}
},
}))
if clusterGRPCServer != grpcServer {
logger.Info("Starting cluster listener on", clusterGRPCServer.Address())
//启动集群单独GRPC服务
go clusterGRPCServer.Start()
}
//初始化Profile服务,用来启动监听,在Peer中有类似代码
initializeProfilingService(conf)
//将Order排序服务注册到GRP服务中
ab.RegisterAtomicBroadcastServer(grpcServer.Server(), server)
logger.Info("Beginning to serve requests")
//启动GRPC服务,开始监听Peer请求
grpcServer.Start()
}
已经应该对大段代码免疫了。这一大块代码里面还隐藏着很多的小模块,下面继续分析帐本部分:
//引导块(创世块)
func extractBootstrapBlock(conf *localconfig.TopLevel) *cb.Block {
var bootstrapBlock *cb.Block
// Select the bootstrapping mechanism
switch conf.General.GenesisMethod {
case "provisional":
bootstrapBlock = encoder.New(genesisconfig.Load(conf.General.GenesisProfile)).GenesisBlockForChannel(conf.General.SystemChannel)
case "file":
bootstrapBlock = file.New(conf.General.GenesisFile).GenesisBlock()
default:
logger.Panic("Unknown genesis method:", conf.General.GenesisMethod)
}
return bootstrapBlock
}
func New(fileName string) bootstrap.Helper {
return &fileBootstrapper{
//此处需要解析用工具创建的创世块文件
GenesisBlockFile: fileName,
}
}
type Block struct {
Header *BlockHeader `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"`
Data *BlockData `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
Metadata *BlockMetadata `protobuf:"bytes,3,opt,name=metadata,proto3" json:"metadata,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (b *fileBootstrapper) GenesisBlock() *cb.Block {
bootstrapFile, fileErr := ioutil.ReadFile(b.GenesisBlockFile)
if fileErr != nil {
panic(errors.Errorf("unable to bootstrap orderer. Error reading genesis block file: %v", fileErr))
}
genesisBlock := &cb.Block{}
unmarshallErr := proto.Unmarshal(bootstrapFile, genesisBlock)
if unmarshallErr != nil {
panic(errors.Errorf("unable to bootstrap orderer. Error unmarshalling genesis block: %v", unmarshallErr))
}
return genesisBlock
}
//帐本工厂
func createLedgerFactory(conf *config.TopLevel, metricsProvider metrics.Provider) (blockledger.Factory, string) {
var lf blockledger.Factory
var ld string
switch conf.General.LedgerType {
case "file":
ld = conf.FileLedger.Location
if ld == "" {
ld = createTempDir(conf.FileLedger.Prefix)
}
logger.Debug("Ledger dir:", ld)
//创建一个新的帐本对象-不同的通道保有自己的帐本,保存在不同的子目录下
lf = fileledger.New(ld, metricsProvider)
// The file-based ledger stores the blocks for each channel
// in a fsblkstorage.ChainsDir sub-directory that we have
// to create separately. Otherwise the call to the ledger
// Factory's ChainIDs below will fail (dir won't exist).
createSubDir(ld, fsblkstorage.ChainsDir)
case "json":
ld = conf.FileLedger.Location
if ld == "" {
ld = createTempDir(conf.FileLedger.Prefix)
}
logger.Debug("Ledger dir:", ld)
lf = jsonledger.New(ld)
case "ram":
fallthrough
default:
//内存型帐本
lf = ramledger.New(int(conf.RAMLedger.HistorySize))
}
return lf, ld
}
func NewProvider(conf *Conf, indexConfig *blkstorage.IndexConfig, metricsProvider metrics.Provider) blkstorage.BlockStoreProvider {
p := leveldbhelper.NewProvider(&leveldbhelper.Conf{DBPath: conf.getIndexDir()})
// create stats instance at provider level and pass to newFsBlockStore
stats := newStats(metricsProvider)
return &FsBlockstoreProvider{conf, indexConfig, p, stats}
}
再看一下多通道的注册初始化:
func initializeMultichannelRegistrar(
bootstrapBlock *cb.Block,
ri *replicationInitiator,
clusterDialer *cluster.PredicateDialer,
srvConf comm.ServerConfig,
srv *comm.GRPCServer,
conf *localconfig.TopLevel,
signer crypto.LocalSigner,
metricsProvider metrics.Provider,
healthChecker healthChecker,
lf blockledger.Factory,
callbacks ...channelconfig.BundleActor,
) *multichannel.Registrar {
//通过创世块获得相关ID
genesisBlock := extractBootstrapBlock(conf)
// Are we bootstrapping?
if len(lf.ChainIDs()) == 0 {
initializeBootstrapChannel(genesisBlock, lf)
} else {
logger.Info("Not bootstrapping because of existing channels")
}
consenters := make(map[string]consensus.Consenter)
//传入帐本工厂对象,共识等信息
registrar := multichannel.NewRegistrar(*conf, lf, signer, metricsProvider, callbacks...)
consenters["solo"] = solo.New()
var kafkaMetrics *kafka.Metrics
consenters["kafka"], kafkaMetrics = kafka.New(conf.Kafka, metricsProvider, healthChecker)
// Note, we pass a 'nil' channel here, we could pass a channel that
// closes if we wished to cleanup this routine on exit.
go kafkaMetrics.PollGoMetricsUntilStop(time.Minute, nil)
if isClusterType(bootstrapBlock) {
//RAFT出现了,在这里
initializeEtcdraftConsenter(consenters, conf, lf, clusterDialer, bootstrapBlock, ri, srvConf, srv, registrar, metricsProvider)
}
registrar.Initialize(consenters)
return registrar
}
func initializeBootstrapChannel(genesisBlock *cb.Block, lf blockledger.Factory) {
//具体的取得通道ID在这里
chainID, err := utils.GetChainIDFromBlock(genesisBlock)
if err != nil {
logger.Fatal("Failed to parse channel ID from genesis block:", err)
}
//继续获取帐本对象
gl, err := lf.GetOrCreate(chainID)
if err != nil {
logger.Fatal("Failed to create the system channel:", err)
}
//将通过通道ID获取的通道帐本内加入创世区块
//gl.Append(genesisBlock) => fsBlockStore.AddBlock(block
//*common.Block)=>blockfileMgr.addBlock(block *common.Block)
//首先使用ProtoBuf将创世块文件序列化为字节数组,根据大小判断新区块是否生成(前面提到的64M)
//将区块长度加入区块文件,将上面的序列化数组加入区块文件,
//更新索引数据库,更新检查点和区块链信息
if err := gl.Append(genesisBlock); err != nil {
logger.Fatal("Could not write genesis block to ledger:", err)
}
}
其实可以发现,通过解析创世块,通过其得到通道ID是很多初始化函数都调用的,这个有点意思。GetOrCreate接口有四个继承实现的地方,都在common/ledger/blockledger下面的四个目录内。它调用blkstorageProvider.OpenBlockStore然后再调用common/ledger/blkstroage/fs_blockstroe_provider.go中的OpenBlockStore。即:
func (flf *fileLedgerFactory) GetOrCreate(chainID string) (blockledger.ReadWriter, error) {
flf.mutex.Lock()
defer flf.mutex.Unlock()
key := chainID
// check cache
ledger, ok := flf.ledgers[key]
if ok {
return ledger, nil
}
// open fresh
blockStore, err := flf.blkstorageProvider.OpenBlockStore(key)
if err != nil {
return nil, err
}
ledger = NewFileLedger(blockStore)
flf.ledgers[key] = ledger //存储下面代码生成的fsBlockStore对象实例
return ledger, nil
}
type DBHandle struct {
dbName string
db *DB
}
func (p *FsBlockstoreProvider) OpenBlockStore(ledgerid string) (blkstorage.BlockStore, error) {
indexStoreHandle := p.leveldbProvider.GetDBHandle(ledgerid)
return newFsBlockStore(ledgerid, p.conf, p.indexConfig, indexStoreHandle, p.stats), nil
}
type fsBlockStore struct {
id string
conf *Conf
//前边的事件注册
fileMgr *blockfileMgr
stats *ledgerStats
}
type blockfileMgr struct {
//通道ID目录
rootDir string
//配置文件内容:order_data目录和最大块大小,默认64M
conf *Conf
//通过通道ID得到LEVELDB的句柄,即通道ID和LEVELDB二者绑定
db *leveldbhelper.DBHandle
//区块链索引结构体,实现了Index接口
index index
//区块检查点相关信息
cpInfo *checkpointInfo
//信息访问的互斥锁
cpInfoCond *sync.Cond
//区块的实际路径和句柄
currentFileWriter *blockfileWriter
//区块简要信息,高度,HASH,上一块的HASH
bcInfo atomic.Value
}
func newFsBlockStore(id string, conf *Conf, indexConfig *blkstorage.IndexConfig,
dbHandle *leveldbhelper.DBHandle, stats *stats) *fsBlockStore {
//创建fileMgr
fileMgr := newBlockfileMgr(id, conf, indexConfig, dbHandle)
// create ledgerStats and initialize blockchain_height stat
ledgerStats := stats.ledgerStats(id)
//获取相关对象
info := fileMgr.getBlockchainInfo()
ledgerStats.updateBlockchainHeight(info.Height)
return &fsBlockStore{id, conf, fileMgr, ledgerStats}
}
//newBlockfileMgr函数调用下面这个函数生成索引数据库
func newBlockIndex(indexConfig *blkstorage.IndexConfig, db *leveldbhelper.DBHandle) (*blockIndex, error) {
indexItems := indexConfig.AttrsToIndex
logger.Debugf("newBlockIndex() - indexItems:[%s]", indexItems)
indexItemsMap := make(map[blkstorage.IndexableAttr]bool)
for _, indexItem := range indexItems {
indexItemsMap[indexItem] = true
}
// This dependency is needed because the index 'IndexableAttrTxID' is used for detecting the duplicate txid
// and the results are reused in the other two indexes. Ideally, all three indexes should be merged into one
// for efficiency purpose - [FAB-10587]
if (indexItemsMap[blkstorage.IndexableAttrTxValidationCode] || indexItemsMap[blkstorage.IndexableAttrBlockTxID]) &&
!indexItemsMap[blkstorage.IndexableAttrTxID] {
return nil, errors.Errorf("dependent index [%s] is not enabled for [%s] or [%s]",
blkstorage.IndexableAttrTxID, blkstorage.IndexableAttrTxValidationCode, blkstorage.IndexableAttrBlockTxID)
}
return &blockIndex{indexItemsMap, db}, nil
}
其实从上面的代码中就可以看出拿到通道ID后就可以拿到相应的帐本,就可以构建ChainSupport,而有了它,就可以将相关的对象以KV形式存储起来。下面是它的定义:
type ChainSupport struct {
//账本资源对象含通道配置资源对象(configResources类 型)与区块账本对象(FileLedger类型)
*ledgerResources
//过滤通道消息,默认有四个:
//Empty-RejectRule拒绝空消息过滤器
//expirationRejectRule拒绝过期的签名者身份证书的过滤器
//MaxBytesRule检验消息最大长度(默认 98MB)的过滤器
//sigFilter验证消息签名是否满足ChannelWriters(/Channel/Writers)通道写权限策略要求的过滤器
msgprocessor.Processor
// 构造新区块并提交区块文件,创建新通道和更新通道配置。
//初始化时设置最新的区块号lastBlock、通道配置序号lastConfigSeq、 最新的配置区块
//号lastConfigBlockNum、多通道注册管理器Registrar对象(创建新应用通道)以及关联通道的
//链支持对象(更新通道配置)
*BlockWriter
//共识排序服务对交易排序,然后提交到缓存交易消息列表,
//执行打包出块、通道管理等操作
consensus.Chain
//消息切割组件
//获取指定通道上 的Orderer配置,包含共识组件类型、交易出块周期时间、区块最大字节数、通道限制参///数(如通道数 量)等。
//基于该配置创建消息切割组件(receiver类型),按出块规则切分将本地的缓存交易消息列表
//交由区块账本写组件构造新区块,并提交到 账本区块文件
cutter blockcutter.Receiver
// 本地签名人
crypto.LocalSigner
}
chain := newChainSupport( // 构造应用通道的链支持对象
r,
ledgerResources,
consenters,
signer)
r.chains[chainID] = chain // 将链支持对象注册到多通道管理器
chain.start() // 启动链支持对象
2、广播服务
广播服务分为以下几种:
正常的交易信息,新增通道信息,更新通道信息,看正面的代码:
func NewServer(
r *multichannel.Registrar,
metricsProvider metrics.Provider,
debug *localconfig.Debug,
timeWindow time.Duration,
mutualTLS bool,
expirationCheckDisabled bool,
) ab.AtomicBroadcastServer {
s := &server{
dh: deliver.NewHandler(
deliverSupport{Registrar: r},
timeWindow,
mutualTLS,
deliver.NewMetrics(metricsProvider),
expirationCheckDisabled,
),
bh: &broadcast.Handler{
SupportRegistrar: broadcastSupport{Registrar: r},
Metrics: broadcast.NewMetrics(metricsProvider),
},
debug: debug,
Registrar: r,
}
return s
}
//接收客户端消息
func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {
logger.Debugf("Starting new Broadcast handler")
defer func() {
if r := recover(); r != nil {
logger.Criticalf("Broadcast client triggered panic: %s\n%s", r, debug.Stack())
}
logger.Debugf("Closing Broadcast stream")
}()
return s.bh.Handle(&broadcastMsgTracer{
AtomicBroadcast_BroadcastServer: srv,
msgTracer: msgTracer{
debug: s.debug,
function: "Broadcast",
},
})
}
//处理消息-类似于比特币或EOS的消息处理中心
func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
addr := util.ExtractRemoteAddress(srv.Context())
logger.Debugf("Starting new broadcast loop for %s", addr)
for {
msg, err := srv.Recv()
if err == io.EOF {
logger.Debugf("Received EOF from %s, hangup", addr)
return nil
}
if err != nil {
logger.Warningf("Error reading from %s: %s", addr, err)
return err
}
//验证消息,开始排序
resp := bh.ProcessMessage(msg, addr)
err = srv.Send(resp)
if resp.Status != cb.Status_SUCCESS {
return err
}
if err != nil {
logger.Warningf("Error sending to %s: %s", addr, err)
return err
}
}
}
3、分发服务
分发服务包手以下几个部分:
接收客户端发来的请求,解析客户端消息.看下面的代码:
func (s *server) Deliver(srv ab.AtomicBroadcast_DeliverServer) error {
logger.Debugf("Starting new Deliver handler")
defer func() {
if r := recover(); r != nil {
logger.Criticalf("Deliver client triggered panic: %s\n%s", r, debug.Stack())
}
logger.Debugf("Closing Deliver stream")
}()
policyChecker := func(env *cb.Envelope, channelID string) error {
chain := s.GetChain(channelID)
if chain == nil {
return errors.Errorf("channel %s not found", channelID)
}
// In maintenance mode, we typically require the signature of /Channel/Orderer/Readers.
// This will block Deliver requests from peers (which normally satisfy /Channel/Readers).
sf := msgprocessor.NewSigFilter(policies.ChannelReaders, policies.ChannelOrdererReaders, chain)
return sf.Apply(env)
}
deliverServer := &deliver.Server{
PolicyChecker: deliver.PolicyCheckerFunc(policyChecker),
Receiver: &deliverMsgTracer{
Receiver: srv,
msgTracer: msgTracer{
debug: s.debug,
function: "Deliver",
},
},
ResponseSender: &responseSender{
AtomicBroadcast_DeliverServer: srv,
},
}
return s.dh.Handle(srv.Context(), deliverServer)
}
func (h *Handler) Handle(ctx context.Context, srv *Server) error {
addr := util.ExtractRemoteAddress(ctx)
logger.Debugf("Starting new deliver loop for %s", addr)
h.Metrics.StreamsOpened.Add(1)
defer h.Metrics.StreamsClosed.Add(1)
for {
logger.Debugf("Attempting to read seek info message from %s", addr)
envelope, err := srv.Recv()
if err == io.EOF {
logger.Debugf("Received EOF from %s, hangup", addr)
return nil
}
if err != nil {
logger.Warningf("Error reading from %s: %s", addr, err)
return err
}
//分发区块信息
status, err := h.deliverBlocks(ctx, srv, envelope)
if err != nil {
return err
}
err = srv.SendStatusResponse(status)
if status != cb.Status_SUCCESS {
return err
}
if err != nil {
logger.Warningf("Error sending to %s: %s", addr, err)
return err
}
logger.Debugf("Waiting for new SeekInfo from %s", addr)
}
}
通过分析客户端的消息,获取帐本上的相关区块数据并回复给客户端,如果尚未得到请求的区块,则一直阻塞直到得到相关请求信息。
4、共识相关
这里主要是共识的服务启动和相关的排序切割等,看代码:
//在前面的initializeMultichannelRegistrar调用了registrar.Initialize(consenters)
func (r *Registrar) Initialize(consenters map[string]consensus.Consenter) {
r.consenters = consenters
existingChains := r.ledgerFactory.ChainIDs()
for _, chainID := range existingChains {
rl, err := r.ledgerFactory.GetOrCreate(chainID)
if err != nil {
logger.Panicf("Ledger factory reported chainID %s but could not retrieve it: %s", chainID, err)
}
configTx := configTx(rl)
if configTx == nil {
logger.Panic("Programming error, configTx should never be nil here")
}
ledgerResources := r.newLedgerResources(configTx)
chainID := ledgerResources.ConfigtxValidator().ChainID()
if _, ok := ledgerResources.ConsortiumsConfig(); ok {
if r.systemChannelID != "" {
logger.Panicf("There appear to be two system chains %s and %s", r.systemChannelID, chainID)
}
chain := newChainSupport(
r,
ledgerResources,
r.consenters,
r.signer,
r.blockcutterMetrics,
)
r.templator = msgprocessor.NewDefaultTemplator(chain)
chain.Processor = msgprocessor.NewSystemChannel(chain, r.templator, msgprocessor.CreateSystemChannelFilters(r, chain, r.config))
// Retrieve genesis block to log its hash. See FAB-5450 for the purpose
iter, pos := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{Oldest: &ab.SeekOldest{}}})
defer iter.Close()
if pos != uint64(0) {
logger.Panicf("Error iterating over system channel: '%s', expected position 0, got %d", chainID, pos)
}
genesisBlock, status := iter.Next()
if status != cb.Status_SUCCESS {
logger.Panicf("Error reading genesis block of system channel '%s'", chainID)
}
logger.Infof("Starting system channel '%s' with genesis block hash %x and orderer type %s",
chainID, genesisBlock.Header.Hash(), chain.SharedConfig().ConsensusType())
r.chains[chainID] = chain
r.systemChannelID = chainID
r.systemChannel = chain
// We delay starting this chain, as it might try to copy and replace the chains map via newChain before the map is fully built
defer chain.start()
} else {
logger.Debugf("Starting chain: %s", chainID)
chain := newChainSupport(
r,
ledgerResources,
r.consenters,
r.signer,
r.blockcutterMetrics,
)
r.chains[chainID] = chain
//启动共识-共识有多个版本,都在order/consensus目录下,启动也是相关部分
chain.start()
}
}
......
}
func (cs *ChainSupport) start() {
cs.Chain.Start()
}
func (chain *chainImpl) Start() {
//实际启动相关服务
go startThread(chain)
}
func startThread(chain *chainImpl) {
var err error
// Create topic if it does not exist (requires Kafka v0.10.1.0)
err = setupTopicForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.consenter.topicDetail(), chain.channel)
if err != nil {
// log for now and fallback to auto create topics setting for broker
logger.Infof("[channel: %s]: failed to create Kafka topic = %s", chain.channel.topic(), err)
}
......
chain.processMessagesToBlocks() // Keep up to date with the channel
}
//RAFT
func initializeEtcdraftConsenter(
consenters map[string]consensus.Consenter,
conf *localconfig.TopLevel,
lf blockledger.Factory,
clusterDialer *cluster.PredicateDialer,
bootstrapBlock *cb.Block,
ri *replicationInitiator,
srvConf comm.ServerConfig,
srv *comm.GRPCServer,
registrar *multichannel.Registrar,
metricsProvider metrics.Provider,
) {
replicationRefreshInterval := conf.General.Cluster.ReplicationBackgroundRefreshInterval
if replicationRefreshInterval == 0 {
replicationRefreshInterval = defaultReplicationBackgroundRefreshInterval
}
systemChannelName, err := utils.GetChainIDFromBlock(bootstrapBlock)
if err != nil {
ri.logger.Panicf("Failed extracting system channel name from bootstrap block: %v", err)
}
systemLedger, err := lf.GetOrCreate(systemChannelName)
if err != nil {
ri.logger.Panicf("Failed obtaining system channel (%s) ledger: %v", systemChannelName, err)
}
getConfigBlock := func() *cb.Block {
return multichannel.ConfigBlock(systemLedger)
}
exponentialSleep := exponentialDurationSeries(replicationBackgroundInitialRefreshInterval, replicationRefreshInterval)
ticker := newTicker(exponentialSleep)
icr := &inactiveChainReplicator{
logger: logger,
scheduleChan: ticker.C,
quitChan: make(chan struct{}),
replicator: ri,
chains2CreationCallbacks: make(map[string]chainCreation),
retrieveLastSysChannelConfigBlock: getConfigBlock,
registerChain: ri.registerChain,
}
// Use the inactiveChainReplicator as a channel lister, since it has knowledge
// of all inactive chains.
// This is to prevent us pulling the entire system chain when attempting to enumerate
// the channels in the system.
ri.channelLister = icr
go icr.run()
raftConsenter := etcdraft.New(clusterDialer, conf, srvConf, srv, registrar, icr, metricsProvider)
consenters["etcdraft"] = raftConsenter
}
正面就是排序和切割:
func (r *receiver) Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, pending bool) {
if len(r.pendingBatch) == 0 {
// We are beginning a new batch, mark the time
r.PendingBatchStartTime = time.Now()
}
ordererConfig, ok := r.sharedConfigFetcher.OrdererConfig()
if !ok {
logger.Panicf("Could not retrieve orderer config to query batch parameters, block cutting is not possible")
}
batchSize := ordererConfig.BatchSize()
messageSizeBytes := messageSizeBytes(msg)
if messageSizeBytes > batchSize.PreferredMaxBytes {
logger.Debugf("The current message, with %v bytes, is larger than the preferred batch size of %v bytes and will be isolated.", messageSizeBytes, batchSize.PreferredMaxBytes)
// cut pending batch, if it has any messages
if len(r.pendingBatch) > 0 {
messageBatch := r.Cut()
messageBatches = append(messageBatches, messageBatch)
}
// create new batch with single message
messageBatches = append(messageBatches, []*cb.Envelope{msg})
// Record that this batch took no time to fill
r.Metrics.BlockFillDuration.With("channel", r.ChannelID).Observe(0)
return
}
messageWillOverflowBatchSizeBytes := r.pendingBatchSizeBytes+messageSizeBytes > batchSize.PreferredMaxBytes
if messageWillOverflowBatchSizeBytes {
logger.Debugf("The current message, with %v bytes, will overflow the pending batch of %v bytes.", messageSizeBytes, r.pendingBatchSizeBytes)
logger.Debugf("Pending batch would overflow if current message is added, cutting batch now.")
messageBatch := r.Cut()
r.PendingBatchStartTime = time.Now()
messageBatches = append(messageBatches, messageBatch)
}
logger.Debugf("Enqueuing message into batch")
r.pendingBatch = append(r.pendingBatch, msg)
r.pendingBatchSizeBytes += messageSizeBytes
pending = true
if uint32(len(r.pendingBatch)) >= batchSize.MaxMessageCount {
logger.Debugf("Batch size met, cutting batch")
messageBatch := r.Cut()
messageBatches = append(messageBatches, messageBatch)
pending = false
}
return
}
// Cut returns the current batch and starts a new one
func (r *receiver) Cut() []*cb.Envelope {
r.Metrics.BlockFillDuration.With("channel", r.ChannelID).Observe(time.Since(r.PendingBatchStartTime).Seconds())
r.PendingBatchStartTime = time.Time{}
batch := r.pendingBatch
r.pendingBatch = nil
r.pendingBatchSizeBytes = 0
return batch
}
代码真心的不少,需要认真的分析。
三、总结
Orderer节点要比Peer复杂一些,但基本上也是按套路在走。麻烦在于细节太多,很多细小的部分可能把握不太准确,还需要推敲。