muduo库总结(一)源码粗读
前言
我相信用linux进行后端开发的C++程序员应该对muduo库都比较熟悉,muduo库利用了epoll+线程池
对网络IO模块进行了封装,实现了高并发、高性能的网络库,通过简单的几个类能快速的建立起网络模块,非常的方便。今天对muduo库进行一个简单的源码分析。
简介
muduo 是一个基于 Reactor 模式的现代 C++ 网络库,它采用非阻塞 IO 模型,基于事件驱动和回调,原生支持多核多线程,适合编写 Linux 服务端多线程网络应用程序。
源码
我们从muduo库的一个使用案例中出发,一步一步来读取源码:案例:muduo_test.cpp
#include <muduo/base/Logging.h>
#include <muduo/base/Timestamp.h>
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpConnection.h>
using namespace muduo;
using namespace muduo::net;
void onConnection(const TcpConnectionPtr &conn)
{
LOG_INFO << "EchoServer - " << conn->peerAddress().toIpPort() << " -> "
<< conn->localAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
}
void onMessage(const TcpConnectionPtr &conn,
Buffer *buf,
Timestamp time)
{
muduo::string msg(buf->retrieveAllAsString());
LOG_INFO << conn->name() << " echo " << msg.size() << " bytes, "
<< "data received at " << time.toString();
conn->send(msg);
}
int main(int argc, const char *argv[])
{
EventLoop loop;
//地址初始化
InetAddress addr("127.0.0.1", 8988);
TcpServer server(&loop, addr, "echo");
server.setConnectionCallback(&onConnection);
server.setMessageCallback(&onMessage);
server.start();
loop.loop();
return 0;
}
根据这个,我们看一下步骤:
实例化 EventLoop
EventLoop::EventLoop()
: looping_(false),
quit_(false),
eventHandling_(false),
callingPendingFunctors_(false),
iteration_(0),
threadId_(CurrentThread::tid()),
//确定使用那种IO线程
poller_(Poller::newDefaultPoller(this)),
timerQueue_(new TimerQueue(this)),
wakeupFd_(createEventfd()),
//通道初始化,//事件分发器
wakeupChannel_(new Channel(this, wakeupFd_)),
currentActiveChannel_(NULL)
{
LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;
if (t_loopInThisThread)
{
LOG_FATAL << "Another EventLoop " << t_loopInThisThread
<< " exists in this thread " << threadId_;
}
else
{
t_loopInThisThread = this;
}
//设置readCallback_回调(read)
wakeupChannel_->setReadCallback(
std::bind(&EventLoop::handleRead, this));
// we are always reading the wakeupfd
wakeupChannel_->enableReading();
}
这里面很重要的是poller_(Poller::newDefaultPoller(this))
这句,这一句就决定了了,我们后面使用哪种IO,比如:在linux平台下,我们一般使用的是EPOLL;看一下源码:
static Poller* newDefaultPoller(EventLoop* loop);
会发现这个是一个静态函数,顺着找到这个静态函数的实现:
Poller* Poller::newDefaultPoller(EventLoop* loop)
{
if (::getenv("MUDUO_USE_POLL"))
{
return new PollPoller(loop);
}
else
{
return new EPollPoller(loop);
}
}
我们选择EPollPoller
来看:
EPollPoller::EPollPoller(EventLoop* loop)
: Poller(loop),
epollfd_(::epoll_create1(EPOLL_CLOEXEC)),
events_(kInitEventListSize)
{
if (epollfd_ < 0)
{
LOG_SYSFATAL << "EPollPoller::EPollPoller";
}
}
其实很惊喜的会发现了epoll_create1
函数,是不是有种似曾相识的感觉;没错,这个就是我们最常见的EPOLL的实现,epoll_create
-> epoll_ctl
-> epoll_wait
;一会发现,mudo也是这么做的;
注册----监控-----回调
在EventLoop实例化中是由一个比较重要的是回调函数的设置,没注意的话,后面可能找不到这个函数:
wakeupChannel_->setReadCallback(
std::bind(&EventLoop::handleRead, this));
对Channel
对象中readCallback_
进行赋值,至于这个的作用,后续代码中说明。
到这,基本流程的第一步就已经完成了,我们大概 总结一下EventLoop都做了些什么?
- 确定使用的IO是哪种,Epoll还是poll
- 创建事件描述符
- 事件分发器channel
- 设置事件分发器的read回调
InetAddress 地址初始化
这个就不多说了,主要就是一些地址、ip的操作,之前编写基础通信的时候也是经历过的。
TcpServer 网络库
TcpServer::TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg,
Option option)
: loop_(CHECK_NOTNULL(loop)),
ipPort_(listenAddr.toIpPort()),
name_(nameArg),
acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),
threadPool_(new EventLoopThreadPool(loop, name_)),
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
nextConnId_(1)
{
//设置connectionCallback_
acceptor_->setNewConnectionCallback(
std::bind(&TcpServer::newConnection, this, _1, _2));
}
在TcpServer
构造函数中,主要注意这些函数
acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),
看一下构造函数:
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport)
: loop_(loop),
acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),
acceptChannel_(loop, acceptSocket_.fd()),
listenning_(false),
idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))
{
assert(idleFd_ >= 0);
acceptSocket_.setReuseAddr(true);
acceptSocket_.setReusePort(reuseport);
acceptSocket_.bindAddress(listenAddr);
//Channel设置回调,当sockfd可读时掉用设置的回调
acceptChannel_.setReadCallback(
std::bind(&Acceptor::handleRead, this));
}
这一段代码就是在网络编程中的设置地址、端口这些,主要看一下handleRead
函数:
void Acceptor::handleRead()
{
//判断是否在IO线程
loop_->assertInLoopThread();
//客户的地址
InetAddress peerAddr;
//FIXME loop until no more
//获得连接的描述符
int connfd = acceptSocket_.accept(&peerAddr);
if (connfd >= 0)
{
// string hostport = peerAddr.toIpPort();
// LOG_TRACE << "Accepts of " << hostport;
if (newConnectionCallback_)
{
//TcpServer注册的,创建新的con,并且加入TcpServer的ConnectionMap中。
newConnectionCallback_(connfd, peerAddr);
}
else
{
sockets::close(connfd);
}
}
else
{
LOG_SYSERR << "in Acceptor::handleRead";
// Read the section named "The special problem of
// accept()ing when you can't" in libev's doc.
// By Marc Lehmann, author of libev.
if (errno == EMFILE)
{
::close(idleFd_);
idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
::close(idleFd_);
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
}
}
}
这里就是创建一个新的连接做的一系列操作,其中主要是这个:
newConnectionCallback_(connfd, peerAddr);
这个函数是在TcpServer
中进行回调操作的,看一下代码:
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
EventLoop* ioLoop = threadPool_->getNextLoop();
char buf[64];
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf;
LOG_INFO << "TcpServer::newConnection [" << name_
<< "] - new connection [" << connName
<< "] from " << peerAddr.toIpPort();
InetAddress localAddr(sockets::getLocalAddr(sockfd));
// FIXME poll with zero timeout to double confirm the new connection
// FIXME use make_shared if necessary
//后面循环回调函数handleXXXXX
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));
connections_[connName] = conn;////将新构建的con加入server的map中
conn->setConnectionCallback(connectionCallback_);//muduo默认的
conn->setMessageCallback(messageCallback_);//muduo默认的
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(
std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));//在某个线程池的loop中加入这个con
}
看一下主要函数:
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));
这里面的设置函数会在loop中使用
TcpConnection::TcpConnection(EventLoop* loop,
const string& nameArg,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr)
: loop_(CHECK_NOTNULL(loop)),
name_(nameArg),
state_(kConnecting),
reading_(true),
socket_(new Socket(sockfd)),
channel_(new Channel(loop, sockfd)),
localAddr_(localAddr),
peerAddr_(peerAddr),
highWaterMark_(64*1024*1024)
{
channel_->setReadCallback(
std::bind(&TcpConnection::handleRead, this, _1));
channel_->setWriteCallback(
std::bind(&TcpConnection::handleWrite, this));
channel_->setCloseCallback(
std::bind(&TcpConnection::handleClose, this));
channel_->setErrorCallback(
std::bind(&TcpConnection::handleError, this));
LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this
<< " fd=" << sockfd;
socket_->setKeepAlive(true);
}
这里面设置了大量回调函数,handleRead、handleWrite、handleClose、handleError,后面用到可以再这里面找到;
回到TcpServer
中,看一下刚才需要的connectionCallback_
就在这里:
acceptor_->setNewConnectionCallback(
std::bind(&TcpServer::newConnection, this, _1, _2));
loop 函数
这个函数就是整套代码的核心所在,其实里面最核心的就是epoll_wait
函数,话不多说,看代码
void EventLoop::loop()
{
assert(!looping_);
assertInLoopThread();
looping_ = true;
quit_ = false; // FIXME: what if someone calls quit() before loop() ?
LOG_TRACE << "EventLoop " << this << " start looping";
while (!quit_)
{
//将活动线程队列置空
activeChannels_.clear();
//获得活动文件描述符的数量,并且获得活动的channel队列
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
//增加Poll次数
++iteration_;
if (Logger::logLevel() <= Logger::TRACE)
{
printActiveChannels();
}
// TODO sort channel by priority
//事件处理状态
eventHandling_ = true;
for (Channel* channel : activeChannels_)
{
//获得当前活动的事件
currentActiveChannel_ = channel;
//处理事件,传递一个poll的阻塞时间
currentActiveChannel_->handleEvent(pollReturnTime_);
}
//将当前活动事件置为空
currentActiveChannel_ = NULL;
//退出事件处理状态
eventHandling_ = false;
//处理用户在其他线程注册给IO线程的事件
doPendingFunctors();
}
LOG_TRACE << "EventLoop " << this << " stop looping";
//退出LOOPING状态
looping_ = false;
}
调用的这个函数
Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
LOG_TRACE << "fd total count " << channels_.size();
int numEvents = ::epoll_wait(epollfd_,
&*events_.begin(),
static_cast<int>(events_.size()),
timeoutMs);
int savedErrno = errno;
Timestamp now(Timestamp::now());
if (numEvents > 0)
{
LOG_TRACE << numEvents << " events happened";
fillActiveChannels(numEvents, activeChannels);
if (implicit_cast<size_t>(numEvents) == events_.size())
{
events_.resize(events_.size()*2);
}
}
else if (numEvents == 0)
{
LOG_TRACE << "nothing happened";
}
else
{
// error happens, log uncommon ones
if (savedErrno != EINTR)
{
errno = savedErrno;
LOG_SYSERR << "EPollPoller::poll()";
}
}
return now;
}
总结
到此为止,我只是大概的粗读了一下muduo库的源码,还有很多模块的设计以及处理方式值得进一步研究,后面附一张网上的流程图: