欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

PhxPaxos源码分析——Paxos算法实现

程序员文章站 2024-03-19 22:10:40
...

可以进入我的博客查看原文。

这篇主要来分析Paxos算法实现的部分,我想这应该也是读者最感兴趣的。在看这篇文章之前,如果之前对Paxos算法没有了解的童鞋可以看下这篇文章:Paxos算法原理与推导,相信了解Paxos算法后再来通过源码看算法实现应该会很酸爽。

Paxos算法中最重要的两个角色是ProposerAcceptor。当然Leaner也很重要,特别是在PhxPaxos的实现中,Leaner具有重要的功能。但是因为《Paxos Made Simple》论文中主要还是Proposer和Acceptor,因此这篇文章还是以这两个角色为主,通过源码来回顾论文中Paxos算法的过程,同时也看看工程实现和论文的描述有什么区别。

这里先贴出Paxos算法的过程,方便大家对照接下来的工程实现。

  • Prepare阶段:

    (a) Proposer选择一个提案编号N,然后向半数以上的Acceptor发送编号为N的Prepare请求。

    (b) 如果一个Acceptor收到一个编号为N的Prepare请求,且N大于该Acceptor已经响应过的所有Prepare请求的编号,那么它就会将它已经接受过的编号最大的提案(如果有的话)作为响应反馈给Proposer,同时该Acceptor承诺不再接受任何编号小于N的提案。

  • Accept阶段:

    (a) 如果Proposer收到半数以上Acceptor对其发出的编号为N的Prepare请求的响应,那么它就会发送一个针对[N,V]提案的Accept请求给半数以上的Acceptor。注意:V就是收到的响应中编号最大的提案的value,如果响应中不包含任何提案,那么V就由Proposer自己决定。

    (b) 如果Acceptor收到一个针对编号为N的提案的Accept请求,只要该Acceptor没有对编号大于N的Prepare请求做出过响应,它就接受该提案。

Proposer

因为Proposer需要维护或者说记录一些状态信息,包括自己的提案编号ProposalID、提出的Value、其他Proposer提出的最大的提案编号HighestOtherProposalID、Acceptor已经接受过的编号最大的提案的值等,因此这里专门有一个ProposerState类来管理这些信息。同样Acceptor也有一个AcceptorState类来管理Acceptor相关的信息。

先来看下ProposerState的定义:

class ProposerState
{
public:
    ProposerState(const Config * poConfig);
    ~ProposerState();

    void Init();

    void SetStartProposalID(const uint64_t llProposalID);

    void NewPrepare();

    void AddPreAcceptValue(const BallotNumber & oOtherPreAcceptBallot, const std::string & sOtherPreAcceptValue);

    /////////////////////////

    const uint64_t GetProposalID();

    const std::string & GetValue();

    void SetValue(const std::string & sValue);

    void SetOtherProposalID(const uint64_t llOtherProposalID);

    void ResetHighestOtherPreAcceptBallot();

public:
    uint64_t m_llProposalID;
    uint64_t m_llHighestOtherProposalID;
    std::string m_sValue;

    BallotNumber m_oHighestOtherPreAcceptBallot;

    Config * m_poConfig;
};复制代码

基本都是对这些信息的set跟get,很容易理解。直接来看Proposer类的定义:

class Proposer : public Base
{
public:
    Proposer(
            const Config * poConfig, 
            const MsgTransport * poMsgTransport,
            const Instance * poInstance,
            const Learner * poLearner,
            const IOLoop * poIOLoop);
    ~Proposer();

    //设置起始的ProposalID
    void SetStartProposalID(const uint64_t llProposalID);

    //初始化新的一轮Paxos过程,每一轮叫做一个Paxos Instance,每一轮确定一个值
    virtual void InitForNewPaxosInstance();

    //Proposer发起提案的入口函数。参数sValue即Proposer自己想提出的value,当然最终提出的value不一定是这个,需要根据Acceptor再Prepare阶段的回复来确定
    int NewValue(const std::string & sValue);

    //判断Proposer是否处于Prepare阶段或Accept阶段
    bool IsWorking();

    /////////////////////////////

    //对应Paxos算法中的Prepare阶段
    void Prepare(const bool bNeedNewBallot = true);

    //Prepare阶段等待Acceptor的回复,统计投票并确定是否进入Accept阶段
    void OnPrepareReply(const PaxosMsg & oPaxosMsg);

    //Prepare阶段被拒绝
    void OnExpiredPrepareReply(const PaxosMsg & oPaxosMsg);

    //对应Paxos算法中的Accept阶段
    void Accept();

    //Accept阶段等待Acceptor的回复,统计投票并确定值(Value)是否被选定(Chosen)
    void OnAcceptReply(const PaxosMsg & oPaxosMsg);

    //Accept阶段被拒绝
    void OnExpiredAcceptReply(const PaxosMsg & oPaxosMsg);

    //Prepare阶段超时
    void OnPrepareTimeout();

    //Accept阶段超时
    void OnAcceptTimeout();

    //退出Prepare阶段
    void ExitPrepare();

    //退出Accept阶段
    void ExitAccept();

    //取消跳过Prepare阶段,也就是必须先Prepare阶段再Accept阶段
    void CancelSkipPrepare();

    /////////////////////////////

    void AddPrepareTimer(const int iTimeoutMs = 0);

    void AddAcceptTimer(const int iTimeoutMs = 0);

public:
    ProposerState m_oProposerState;
    MsgCounter m_oMsgCounter;
    Learner * m_poLearner;

    bool m_bIsPreparing;
    bool m_bIsAccepting;

    IOLoop * m_poIOLoop;

    uint32_t m_iPrepareTimerID;
    int m_iLastPrepareTimeoutMs;
    uint32_t m_iAcceptTimerID;
    int m_iLastAcceptTimeoutMs;
    uint64_t m_llTimeoutInstanceID;

    bool m_bCanSkipPrepare;

    bool m_bWasRejectBySomeone;

    TimeStat m_oTimeStat;
};复制代码

NewValue

下面就从NewValue方法入手:

int Proposer :: NewValue(const std::string & sValue)
{
    BP->GetProposerBP()->NewProposal(sValue);

    if (m_oProposerState.GetValue().size() == 0)
    {
        m_oProposerState.SetValue(sValue);
    }

    m_iLastPrepareTimeoutMs = START_PREPARE_TIMEOUTMS;
    m_iLastAcceptTimeoutMs = START_ACCEPT_TIMEOUTMS;

    //如果可以跳过Prepare阶段并且没有被Acceptor拒绝过,则直接进入Accept阶段
    if (m_bCanSkipPrepare && !m_bWasRejectBySomeone)
    {
        BP->GetProposerBP()->NewProposalSkipPrepare();

        PLGHead("skip prepare, directly start accept");
        Accept();
    }

    //否则先进入Prepare阶段
    else
    {
        //if not reject by someone, no need to increase ballot
        Prepare(m_bWasRejectBySomeone);
    }

    return 0;
}复制代码

这里可以直接进入Accept阶段的前提是该Proposer已经发起过Prepare请求且得到半数以上的同意(即通过了Prepare阶段),并且没有被任何Acceptor拒绝(说明没有Acceptor响应过比该Proposer的提案编号更高的提案)。那么,什么情况下可以跳过Prepare请求呢,这里应该对应的是选出一个master的情况?相当于raft里的leader?

Prepare

接下来直接看Prepare阶段:

void Proposer :: Prepare(const bool bNeedNewBallot)
{
    PLGHead("START Now.InstanceID %lu MyNodeID %lu State.ProposalID %lu State.ValueLen %zu",
            GetInstanceID(), m_poConfig->GetMyNodeID(), m_oProposerState.GetProposalID(),
            m_oProposerState.GetValue().size());

    BP->GetProposerBP()->Prepare();
    m_oTimeStat.Point();

    ExitAccept();

    //表明Proposer正处于Prepare阶段
    m_bIsPreparing = true;

    //不能跳过Prepare阶段
    m_bCanSkipPrepare = false;

    //目前还未被任意一个Acceptor拒绝
    m_bWasRejectBySomeone = false;

    m_oProposerState.ResetHighestOtherPreAcceptBallot();

    //如果需要产生新的投票,就调用NewPrepare产生新的ProposalID,新的ProposalID为当前已知的最大ProposalID+1
    if (bNeedNewBallot)
    {
        m_oProposerState.NewPrepare();
    }

    PaxosMsg oPaxosMsg;

    //设置Prepare消息的各个字段
    oPaxosMsg.set_msgtype(MsgType_PaxosPrepare);
    oPaxosMsg.set_instanceid(GetInstanceID());
    oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
    oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());

    //MsgCount是专门用来统计票数的,根据计算的结果确定是否通过Prepare阶段或者Accept阶段
    m_oMsgCounter.StartNewRound();

    //Prepare超时定时器
    AddPrepareTimer();

    PLGHead("END OK");

    //将Prepare消息发送到各个节点
    BroadcastMessage(oPaxosMsg);
}复制代码

Proposer在Prepare阶段主要做了这么几件事:

  1. 重置各个状态位,表明当前正处于Prepare阶段。
  2. 获取提案编号ProposalID。当bNeedNewBallot为true时需要将ProposalID+1。否则沿用之前的ProposalID。bNeedNewBallot是在NewValue中调用Prepare方法时传入的m_bWasRejectBySomeone参数。也就是如果之前没有被任何Acceptor拒绝(说明还没有明确出现更大的ProposalID),则不需要获取新的ProposalID。对应的场景是Prepare阶段超时了,在超时时间内没有收到过半Acceptor同意的消息,因此需要重新执行Prepare阶段,此时只需要沿用原来的ProposalID即可。
  3. 发送Prepare请求。该请求PaxosMsg是Protocol Buffer定义的一个message,包含MsgType、InstanceID、NodeID、ProposalID等字段。在BroadcastMessage(oPaxosMsg)中还会将oPaxosMsg序列化后才发送出去。

PaxosMsg的定义如下,Prepare和Accept阶段Proposer和Acceptor的所有消息都用PaxosMsg来表示:

message PaxosMsg
{
    required int32 MsgType = 1;
    optional uint64 InstanceID = 2;
    optional uint64 NodeID = 3;
    optional uint64 ProposalID = 4;
    optional uint64 ProposalNodeID = 5;
    optional bytes Value = 6;
    optional uint64 PreAcceptID = 7;
    optional uint64 PreAcceptNodeID = 8;
    optional uint64 RejectByPromiseID = 9;
    optional uint64 NowInstanceID = 10;
    optional uint64 MinChosenInstanceID = 11;
    optional uint32 LastChecksum = 12;
    optional uint32 Flag = 13;
    optional bytes SystemVariables = 14;
    optional bytes MasterVariables = 15;
};复制代码

OnPrepareReply

Proposer发出Prepare请求后就开始等待Acceptor的回复。当Proposer所在节点收到PaxosPrepareReply消息后,就会调用Proposer的OnPrepareReply(oPaxosMsg),其中oPaxosMsg是Acceptor回复的消息。

void Proposer :: OnPrepareReply(const PaxosMsg & oPaxosMsg)
{
    PLGHead("START Msg.ProposalID %lu State.ProposalID %lu Msg.from_nodeid %lu RejectByPromiseID %lu",
            oPaxosMsg.proposalid(), m_oProposerState.GetProposalID(), 
            oPaxosMsg.nodeid(), oPaxosMsg.rejectbypromiseid());

    BP->GetProposerBP()->OnPrepareReply();

    //如果Proposer不是在Prepare阶段,则忽略该消息
    if (!m_bIsPreparing)
    {
        BP->GetProposerBP()->OnPrepareReplyButNotPreparing();
        //PLGErr("Not preparing, skip this msg");
        return;
    }

    //如果ProposalID不同,也忽略
    if (oPaxosMsg.proposalid() != m_oProposerState.GetProposalID())
    {
        BP->GetProposerBP()->OnPrepareReplyNotSameProposalIDMsg();
        //PLGErr("ProposalID not same, skip this msg");
        return;
    }

    //加入一个收到的消息,用于MsgCounter统计
    m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());

    //如果该消息不是拒绝,即Acceptor同意本次Prepare请求
    if (oPaxosMsg.rejectbypromiseid() == 0)
    {
        BallotNumber oBallot(oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid());
        PLGDebug("[Promise] PreAcceptedID %lu PreAcceptedNodeID %lu ValueSize %zu", 
                oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid(), oPaxosMsg.value().size());
        //加入MsgCounter用于统计投票
        m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());
        //将Acceptor返回的它接受过的编号最大的提案记录下来(如果有的话),用于确定Accept阶段的Value
        m_oProposerState.AddPreAcceptValue(oBallot, oPaxosMsg.value());
    }

    //Acceptor拒绝了Prepare请求
    else
    {
        PLGDebug("[Reject] RejectByPromiseID %lu", oPaxosMsg.rejectbypromiseid());

        //同样也要记录到MsgCounter用于统计投票
        m_oMsgCounter.AddReject(oPaxosMsg.nodeid());

        //记录被Acceptor拒绝过,待会儿如果重新进入Prepare阶段的话就需要获取更大的ProposalID
        m_bWasRejectBySomeone = true;

        //记录下别的Proposer提出的更大的ProposalID。这样重新发起Prepare请求时才知道需要用多大的ProposalID
        m_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());
    }


    //本次Prepare请求通过了。也就是得到了半数以上Acceptor的同意
    if (m_oMsgCounter.IsPassedOnThisRound())
    {
        int iUseTimeMs = m_oTimeStat.Point();
        BP->GetProposerBP()->PreparePass(iUseTimeMs);
        PLGImp("[Pass] start accept, usetime %dms", iUseTimeMs);
        m_bCanSkipPrepare = true;

        //进入Accept阶段
        Accept();
    }

    //本次Prepare请求没有通过
    else if (m_oMsgCounter.IsRejectedOnThisRound()
            || m_oMsgCounter.IsAllReceiveOnThisRound())
    {
        BP->GetProposerBP()->PrepareNotPass();
        PLGImp("[Not Pass] wait 30ms and restart prepare");

         //随机等待一段时间后重新发起Prepare请求
        AddPrepareTimer(OtherUtils::FastRand() % 30 + 10);
    }

    PLGHead("END");
}复制代码

该阶段Proposer主要做了以下事情:

  1. 判断消息是否有效。包括ProposalID是否相同,自身是否处于Prepare阶段等。因为网络是不可靠的,有些消息可能延迟很久,等收到的时候已经不需要了,所以需要做这些判断。

  2. 将收到的消息加入MsgCounter用于统计。

  3. 根据收到的消息更新自身状态。包括Acceptor承诺过的ProposalID,以及Acceptor接受过的编号最大的提案等。

  4. 根据MsgCounter统计的Acceptor投票结果决定是进入Acceptor阶段还是重新发起Prepare请求。这里如果判断需要重新发起Prepare请求的话,也不是立即进行,而是等待一段随机的时间,这样做的好处是减少不同Proposer之间的冲突,采取的策略跟raft中leader选举冲突时在一段随机的选举超时时间后重新发起选举的做法类似。

注:这里跟Paxos算法中提案编号对应的并不是ProposalID,而是BallotNumber。BallotNumber由ProposalID和NodeID组成。还实现了运算符重载。如果ProposalID大,则BallotNumber(即提案编号)大。在ProposalID相同的情况下,NodeID大的BallotNumber大。

Accept

接下来Proposer就进入Accept阶段:

void Proposer :: Accept()
{
    PLGHead("START ProposalID %lu ValueSize %zu ValueLen %zu", 
            m_oProposerState.GetProposalID(), m_oProposerState.GetValue().size(), m_oProposerState.GetValue().size());

    BP->GetProposerBP()->Accept();
    m_oTimeStat.Point();

    ExitPrepare();
    m_bIsAccepting = true;

    //设置Accept请求的消息内容
    PaxosMsg oPaxosMsg;
    oPaxosMsg.set_msgtype(MsgType_PaxosAccept);
    oPaxosMsg.set_instanceid(GetInstanceID());
    oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
    oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());
    oPaxosMsg.set_value(m_oProposerState.GetValue());
    oPaxosMsg.set_lastchecksum(GetLastChecksum());

    m_oMsgCounter.StartNewRound();

    AddAcceptTimer();

    PLGHead("END");

    //发给各个节点
    BroadcastMessage(oPaxosMsg, BroadcastMessage_Type_RunSelf_Final);
}复制代码

Accept请求中PaxosMsg里的Value是这样确定的:如果Prepare阶段有Acceptor的回复中带有提案值,则该Value为所有的Acceptor的回复中,编号最大的提案的值。否则就是Proposer在最初调用NewValue时传入的值。

OnAcceptReply

void Proposer :: OnAcceptReply(const PaxosMsg & oPaxosMsg)
{
    PLGHead("START Msg.ProposalID %lu State.ProposalID %lu Msg.from_nodeid %lu RejectByPromiseID %lu",
            oPaxosMsg.proposalid(), m_oProposerState.GetProposalID(), 
            oPaxosMsg.nodeid(), oPaxosMsg.rejectbypromiseid());

    BP->GetProposerBP()->OnAcceptReply();

    if (!m_bIsAccepting)
    {
        //PLGErr("Not proposing, skip this msg");
        BP->GetProposerBP()->OnAcceptReplyButNotAccepting();
        return;
    }

    if (oPaxosMsg.proposalid() != m_oProposerState.GetProposalID())
    {
        //PLGErr("ProposalID not same, skip this msg");
        BP->GetProposerBP()->OnAcceptReplyNotSameProposalIDMsg();
        return;
    }

    m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());

    if (oPaxosMsg.rejectbypromiseid() == 0)
    {
        PLGDebug("[Accept]");
        m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());
    }
    else
    {
        PLGDebug("[Reject]");
        m_oMsgCounter.AddReject(oPaxosMsg.nodeid());

        m_bWasRejectBySomeone = true;

        m_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());
    }

    if (m_oMsgCounter.IsPassedOnThisRound())
    {
        int iUseTimeMs = m_oTimeStat.Point();
        BP->GetProposerBP()->AcceptPass(iUseTimeMs);
        PLGImp("[Pass] Start send learn, usetime %dms", iUseTimeMs);
        ExitAccept();

        //让Leaner学习被选定(Chosen)的值
        m_poLearner->ProposerSendSuccess(GetInstanceID(), m_oProposerState.GetProposalID());
    }
    else if (m_oMsgCounter.IsRejectedOnThisRound()
            || m_oMsgCounter.IsAllReceiveOnThisRound())
    {
        BP->GetProposerBP()->AcceptNotPass();
        PLGImp("[Not pass] wait 30ms and Restart prepare");
        AddAcceptTimer(OtherUtils::FastRand() % 30 + 10);
    }

    PLGHead("END");
}复制代码

这里跟OnPrepareReply的过程基本一致,因此就不加太多注释了。比较大的区别在于最后如果过半的Acceptor接受了该Accept请求,则说明该Value被选定(Chosen)了,就发送消息,让每个节点上的Learner学习该Value。因为Leaner不是本文的重点,这里就不详细介绍了。

Acceptor

Acceptor的逻辑比Proposer更简单。同样先看它的定义:

class Acceptor : public Base
{
public:
    Acceptor(
            const Config * poConfig, 
            const MsgTransport * poMsgTransport, 
            const Instance * poInstance,
            const LogStorage * poLogStorage);
    ~Acceptor();

    virtual void InitForNewPaxosInstance();

    int Init();

    AcceptorState * GetAcceptorState();

    //Prepare阶段回复Prepare请求
    int OnPrepare(const PaxosMsg & oPaxosMsg);

    //Accept阶段回复Accept请求
    void OnAccept(const PaxosMsg & oPaxosMsg);

//private:
    AcceptorState m_oAcceptorState;
};复制代码

OnPrepare

OnPrepare用于处理收到的Prepare请求,逻辑如下:

int Acceptor :: OnPrepare(const PaxosMsg & oPaxosMsg)
{
    PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu",
            oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid());

    BP->GetAcceptorBP()->OnPrepare();

    PaxosMsg oReplyPaxosMsg;
    oReplyPaxosMsg.set_instanceid(GetInstanceID());
    oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
    oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());
    oReplyPaxosMsg.set_msgtype(MsgType_PaxosPrepareReply);

    //构造接收到的Prepare请求里的提案编号
    BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());

    //提案编号大于承诺过的提案编号
    if (oBallot >= m_oAcceptorState.GetPromiseBallot())
    {
        PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu "
                "State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",
                m_oAcceptorState.GetPromiseBallot().m_llProposalID, 
                m_oAcceptorState.GetPromiseBallot().m_llNodeID,
                m_oAcceptorState.GetAcceptedBallot().m_llProposalID,
                m_oAcceptorState.GetAcceptedBallot().m_llNodeID);

        //返回之前接受过的提案的编号
        oReplyPaxosMsg.set_preacceptid(m_oAcceptorState.GetAcceptedBallot().m_llProposalID);
        oReplyPaxosMsg.set_preacceptnodeid(m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
        //如果接受过的提案编号大于0(<=0说明没有接受过提案),则设置接受过的提案的Value
        if (m_oAcceptorState.GetAcceptedBallot().m_llProposalID > 0)
        {
            oReplyPaxosMsg.set_value(m_oAcceptorState.GetAcceptedValue());
        }

        //更新承诺的提案编号为新的提案编号(因为新的提案编号更大)
        m_oAcceptorState.SetPromiseBallot(oBallot);

        //信息持久化
        int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());
        if (ret != 0)
        {
            BP->GetAcceptorBP()->OnPreparePersistFail();
            PLGErr("Persist fail, Now.InstanceID %lu ret %d",
                    GetInstanceID(), ret);

            return -1;
        }

        BP->GetAcceptorBP()->OnPreparePass();
    }

    //提案编号小于承诺过的提案编号,需要拒绝
    else
    {
        BP->GetAcceptorBP()->OnPrepareReject();

        PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu", 
                m_oAcceptorState.GetPromiseBallot().m_llProposalID, 
                m_oAcceptorState.GetPromiseBallot().m_llNodeID);

        //拒绝该Prepare请求,并返回承诺过的ProposalID      
        oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);
    }

    nodeid_t iReplyNodeID = oPaxosMsg.nodeid();

    PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",
            GetInstanceID(), oPaxosMsg.nodeid());;

    //向发出Prepare请求的Proposer回复消息
    SendMessage(iReplyNodeID, oReplyPaxosMsg);

    return 0;
}复制代码

OnAccept

再来看看OnAccept:

void Acceptor :: OnAccept(const PaxosMsg & oPaxosMsg)
{
    PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu Msg.ValueLen %zu",
            oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid(), oPaxosMsg.value().size());

    BP->GetAcceptorBP()->OnAccept();

    PaxosMsg oReplyPaxosMsg;
    oReplyPaxosMsg.set_instanceid(GetInstanceID());
    oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
    oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());
    oReplyPaxosMsg.set_msgtype(MsgType_PaxosAcceptReply);

    BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());

    //提案编号不小于承诺过的提案编号(注意:这里是“>=”,而再OnPrepare中是“>”,可以先思考下为什么),需要接受该提案
    if (oBallot >= m_oAcceptorState.GetPromiseBallot())
    {
        PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu "
                "State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",
                m_oAcceptorState.GetPromiseBallot().m_llProposalID, 
                m_oAcceptorState.GetPromiseBallot().m_llNodeID,
                m_oAcceptorState.GetAcceptedBallot().m_llProposalID,
                m_oAcceptorState.GetAcceptedBallot().m_llNodeID);

        //更新承诺的提案编号;接受的提案编号、提案值
        m_oAcceptorState.SetPromiseBallot(oBallot);
        m_oAcceptorState.SetAcceptedBallot(oBallot);
        m_oAcceptorState.SetAcceptedValue(oPaxosMsg.value());

        //信息持久化
        int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());
        if (ret != 0)
        {
            BP->GetAcceptorBP()->OnAcceptPersistFail();

            PLGErr("Persist fail, Now.InstanceID %lu ret %d",
                    GetInstanceID(), ret);

            return;
        }

        BP->GetAcceptorBP()->OnAcceptPass();
    }

    //需要拒绝该提案
    else
    {
        BP->GetAcceptorBP()->OnAcceptReject();

        PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu", 
                m_oAcceptorState.GetPromiseBallot().m_llProposalID, 
                m_oAcceptorState.GetPromiseBallot().m_llNodeID);

        //拒绝的消息中附上承诺过的ProposalID
        oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);
    }

    nodeid_t iReplyNodeID = oPaxosMsg.nodeid();

    PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",
            GetInstanceID(), oPaxosMsg.nodeid());

    //将响应发送给Proposer
    SendMessage(iReplyNodeID, oReplyPaxosMsg);
}复制代码

结语

通过阅读源码可以发现,整个PhxPaxos完全基于Lamport的《Paxos Made Simple》进行工程化,没有进行任何算法变种。这对于学习Paxos算法的人来说真的是一笔宝贵的财富,所以如果对Paxos算法感兴趣,应该深入地去阅读PhxPaxos的源码,相信看完后大家对Paxos会有更深的理解。同时我们也发现,在工程实现上还是有很多细节需要注意,这比单纯理解算法要难得多。

可以进入我的博客查看原文。

欢迎关注公众号: FullStackPlan 获取更多干货