Android6.0 消息机制原理解析
消息都是存放在一个消息队列中去,而消息循环线程就是围绕这个消息队列进入一个无限循环的,直到线程退出。如果队列中有消息,消息循环线程就会把它取出来,并分发给相应的handler进行处理;如果队列中没有消息,消息循环线程就会进入空闲等待状态,等待下一个消息的到来。在编写android应用程序时,当程序执行的任务比较繁重时,为了不阻塞ui主线程而导致anr的发生,我们通常的做法的创建一个子线程来完成特定的任务。在创建子线程时,有两种选择,一种通过创建thread对象来创建一个无消息循环的子线程;还有一种就是创建一个带有消息循环的子线程,而创建带有消息循环的子线程由于两种实现方法,一种是直接利用android给我们封装好的handlerthread类来直接生成一个带有消息循环的线程对象,另一种方法是在实现线程的run()方法内使用以下方式启动一个消息循环:
一、消息机制使用
通常消息都是有一个消息线程和一个handler组成,下面我们看powermanagerservice中的一个消息handler:
mhandlerthread = new servicethread(tag, process.thread_priority_display, false /*allowio*/); mhandlerthread.start(); mhandler = new powermanagerhandler(mhandlerthread.getlooper());
这里的servicethread就是一个handlerthread,创建handler的时候,必须把handlerthread的looper传进去,否则就是默认当前线程的looper。
而每个handler,大致如下:
private final class powermanagerhandler extends handler { public powermanagerhandler(looper looper) { super(looper, null, true /*async*/); } @override public void handlemessage(message msg) { switch (msg.what) { case msg_user_activity_timeout: handleuseractivitytimeout(); break; case msg_sandman: handlesandman(); break; case msg_screen_brightness_boost_timeout: handlescreenbrightnessboosttimeout(); break; case msg_check_wake_lock_acquire_timeout: checkwakelockaquiretoolong(); message m = mhandler.obtainmessage(msg_check_wake_lock_acquire_timeout); m.setasynchronous(true); mhandler.sendmessagedelayed(m, wake_lock_acquire_too_long_timeout); break; } } }
二、消息机制原理
那我们先来看下handlerthread的主函数run函数:
public void run() { mtid = process.mytid(); looper.prepare(); synchronized (this) { mlooper = looper.mylooper();//赋值后notifyall,主要是getlooper函数返回的是mlooper notifyall(); } process.setthreadpriority(mpriority); onlooperprepared(); looper.loop(); mtid = -1; }
再来看看lopper的prepare函数,最后新建了一个looper对象,并且放在线程的局部变量中。
public static void prepare() { prepare(true); } private static void prepare(boolean quitallowed) { if (sthreadlocal.get() != null) { throw new runtimeexception("only one looper may be created per thread"); } sthreadlocal.set(new looper(quitallowed)); }
looper的构造函数中创建了messagequeue
private looper(boolean quitallowed) { mqueue = new messagequeue(quitallowed); mthread = thread.currentthread(); }
我们再来看下messagequeue的构造函数,其中nativeinit是一个native方法,并且把返回值保存在mptr显然是用long型变量保存的指针
messagequeue(boolean quitallowed) { mquitallowed = quitallowed; mptr = nativeinit(); }
native函数中主要创建了nativemessagequeue对象,并且把指针变量返回了。
static jlong android_os_messagequeue_nativeinit(jnienv* env, jclass clazz) { nativemessagequeue* nativemessagequeue = new nativemessagequeue(); if (!nativemessagequeue) { jnithrowruntimeexception(env, "unable to allocate native queue"); return 0; } nativemessagequeue->incstrong(env); return reinterpret_cast<jlong>(nativemessagequeue); }
nativemessagequeue构造函数就是获取mlooper,如果没有就是新建一个looper
nativemessagequeue::nativemessagequeue() : mpollenv(null), mpollobj(null), mexceptionobj(null) { mlooper = looper::getforthread(); if (mlooper == null) { mlooper = new looper(false); looper::setforthread(mlooper); } }
然后我们再看下looper的构造函数,显示调用了eventfd创建了一个fd,eventfd它的主要是用于进程或者线程间的通信,我们可以看下这篇博客eventfd介绍
looper::looper(bool allownoncallbacks) : mallownoncallbacks(allownoncallbacks), msendingmessage(false), mpolling(false), mepollfd(-1), mepollrebuildrequired(false), mnextrequestseq(0), mresponseindex(0), mnextmessageuptime(llong_max) { mwakeeventfd = eventfd(0, efd_nonblock); log_always_fatal_if(mwakeeventfd < 0, "could not make wake event fd. errno=%d", errno); automutex _l(mlock); rebuildepolllocked(); }
2.1 c层创建epoll
我们再来看下rebuildepolllocked函数,创建了epoll,并且把mwakeeventfd加入epoll,而且把mrequests的fd也加入epoll
void looper::rebuildepolllocked() { // close old epoll instance if we have one. if (mepollfd >= 0) { #if debug_callbacks alogd("%p ~ rebuildepolllocked - rebuilding epoll set", this); #endif close(mepollfd); } // allocate the new epoll instance and register the wake pipe. mepollfd = epoll_create(epoll_size_hint); log_always_fatal_if(mepollfd < 0, "could not create epoll instance. errno=%d", errno); struct epoll_event eventitem; memset(& eventitem, 0, sizeof(epoll_event)); // zero out unused members of data field union eventitem.events = epollin; eventitem.data.fd = mwakeeventfd; int result = epoll_ctl(mepollfd, epoll_ctl_add, mwakeeventfd, & eventitem); log_always_fatal_if(result != 0, "could not add wake event fd to epoll instance. errno=%d", errno); for (size_t i = 0; i < mrequests.size(); i++) { const request& request = mrequests.valueat(i); struct epoll_event eventitem; request.initeventitem(&eventitem); int epollresult = epoll_ctl(mepollfd, epoll_ctl_add, request.fd, & eventitem); if (epollresult < 0) { aloge("error adding epoll events for fd %d while rebuilding epoll set, errno=%d", request.fd, errno); } } }
继续回到handlerthread的run函数,我们继续分析looper的loop函数
public void run() { mtid = process.mytid(); looper.prepare(); synchronized (this) { mlooper = looper.mylooper(); notifyall(); } process.setthreadpriority(mpriority); onlooperprepared(); looper.loop(); mtid = -1; }
我们看看looper的loop函数:
public static void loop() { final looper me = mylooper(); if (me == null) { throw new runtimeexception("no looper; looper.prepare() wasn't called on this thread."); } final messagequeue queue = me.mqueue;//得到looper的mqueue // make sure the identity of this thread is that of the local process, // and keep track of what that identity token actually is. binder.clearcallingidentity(); final long ident = binder.clearcallingidentity(); for (;;) { message msg = queue.next(); // might block这个函数会阻塞,阻塞主要是epoll_wait if (msg == null) { // no message indicates that the message queue is quitting. return; } // this must be in a local variable, in case a ui event sets the logger printer logging = me.mlogging;//自己打的打印 if (logging != null) { logging.println(">>>>> dispatching to " + msg.target + " " + msg.callback + ": " + msg.what); } msg.target.dispatchmessage(msg); if (logging != null) { logging.println("<<<<< finished to " + msg.target + " " + msg.callback); } // make sure that during the course of dispatching the // identity of the thread wasn't corrupted. final long newident = binder.clearcallingidentity(); if (ident != newident) { log.wtf(tag, "thread identity changed from 0x" + long.tohexstring(ident) + " to 0x" + long.tohexstring(newident) + " while dispatching to " + msg.target.getclass().getname() + " " + msg.callback + " what=" + msg.what); } msg.recycleunchecked(); } }
messagequeue类的next函数主要是调用了nativepollonce函数,后面就是从消息队列中取出一个message
message next() { // return here if the message loop has already quit and been disposed. // this can happen if the application tries to restart a looper after quit // which is not supported. final long ptr = mptr;//之前保留的指针 if (ptr == 0) { return null; } int pendingidlehandlercount = -1; // -1 only during first iteration int nextpolltimeoutmillis = 0; for (;;) { if (nextpolltimeoutmillis != 0) { binder.flushpendingcommands(); } nativepollonce(ptr, nextpolltimeoutmillis);
下面我们主要看下nativepollonce这个native函数,把之前的指针强制转换成nativemessagequeue,然后调用其pollonce函数
static void android_os_messagequeue_nativepollonce(jnienv* env, jobject obj, jlong ptr, jint timeoutmillis) { nativemessagequeue* nativemessagequeue = reinterpret_cast<nativemessagequeue*>(ptr); nativemessagequeue->pollonce(env, obj, timeoutmillis); }
2.2 c层epoll_wait阻塞
pollonce函数,这个函数前面的while一般都没有只是处理了indent大于0的情况,这种情况一般没有,所以我们可以直接看pollinner函数
int looper::pollonce(int timeoutmillis, int* outfd, int* outevents, void** outdata) { int result = 0; for (;;) { while (mresponseindex < mresponses.size()) { const response& response = mresponses.itemat(mresponseindex++); int ident = response.request.ident; if (ident >= 0) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if debug_poll_and_wake alogd("%p ~ pollonce - returning signalled identifier %d: " "fd=%d, events=0x%x, data=%p", this, ident, fd, events, data); #endif if (outfd != null) *outfd = fd; if (outevents != null) *outevents = events; if (outdata != null) *outdata = data; return ident; } } if (result != 0) { #if debug_poll_and_wake alogd("%p ~ pollonce - returning result %d", this, result); #endif if (outfd != null) *outfd = 0; if (outevents != null) *outevents = 0; if (outdata != null) *outdata = null; return result; } result = pollinner(timeoutmillis); } }
pollinner函数主要就是调用epoll_wait阻塞,并且java层会计算每次阻塞的时间传到c层,等待有mwakeeventfd或者之前addfd的fd有事件过来,才会epoll_wait返回。
int looper::pollinner(int timeoutmillis) { #if debug_poll_and_wake alogd("%p ~ pollonce - waiting: timeoutmillis=%d", this, timeoutmillis); #endif // adjust the timeout based on when the next message is due. if (timeoutmillis != 0 && mnextmessageuptime != llong_max) { nsecs_t now = systemtime(system_time_monotonic); int messagetimeoutmillis = tomillisecondtimeoutdelay(now, mnextmessageuptime); if (messagetimeoutmillis >= 0 && (timeoutmillis < 0 || messagetimeoutmillis < timeoutmillis)) { timeoutmillis = messagetimeoutmillis; } #if debug_poll_and_wake alogd("%p ~ pollonce - next message in %" prid64 "ns, adjusted timeout: timeoutmillis=%d", this, mnextmessageuptime - now, timeoutmillis); #endif } // poll. int result = poll_wake; mresponses.clear();//清空mresponses mresponseindex = 0; // we are about to idle. mpolling = true; struct epoll_event eventitems[epoll_max_events]; int eventcount = epoll_wait(mepollfd, eventitems, epoll_max_events, timeoutmillis);//epoll_wait主要线程阻塞在这,这个阻塞的时间也是有java层传过来的 // no longer idling. mpolling = false; // acquire lock. mlock.lock(); // rebuild epoll set if needed. if (mepollrebuildrequired) { mepollrebuildrequired = false; rebuildepolllocked(); goto done; } // check for poll error. if (eventcount < 0) { if (errno == eintr) { goto done; } alogw("poll failed with an unexpected error, errno=%d", errno); result = poll_error; goto done; } // check for poll timeout. if (eventcount == 0) { #if debug_poll_and_wake alogd("%p ~ pollonce - timeout", this); #endif result = poll_timeout; goto done; } // handle all events. #if debug_poll_and_wake alogd("%p ~ pollonce - handling events from %d fds", this, eventcount); #endif for (int i = 0; i < eventcount; i++) { int fd = eventitems[i].data.fd; uint32_t epollevents = eventitems[i].events; if (fd == mwakeeventfd) {//通知唤醒线程的事件 if (epollevents & epollin) { awoken(); } else { alogw("ignoring unexpected epoll events 0x%x on wake event fd.", epollevents); } } else { ssize_t requestindex = mrequests.indexofkey(fd);//之前addfd的事件 if (requestindex >= 0) { int events = 0; if (epollevents & epollin) events |= event_input; if (epollevents & epollout) events |= event_output; if (epollevents & epollerr) events |= event_error; if (epollevents & epollhup) events |= event_hangup; pushresponse(events, mrequests.valueat(requestindex));//放在mresponses中 } else { alogw("ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.", epollevents, fd); } } } done: ; // invoke pending message callbacks. mnextmessageuptime = llong_max; while (mmessageenvelopes.size() != 0) {// 这块主要是c层的消息,java层的消息是自己管理的 nsecs_t now = systemtime(system_time_monotonic); const messageenvelope& messageenvelope = mmessageenvelopes.itemat(0); if (messageenvelope.uptime <= now) { // remove the envelope from the list. // we keep a strong reference to the handler until the call to handlemessage // finishes. then we drop it so that the handler can be deleted *before* // we reacquire our lock. { // obtain handler sp<messagehandler> handler = messageenvelope.handler; message message = messageenvelope.message; mmessageenvelopes.removeat(0); msendingmessage = true; mlock.unlock(); #if debug_poll_and_wake || debug_callbacks alogd("%p ~ pollonce - sending message: handler=%p, what=%d", this, handler.get(), message.what); #endif handler->handlemessage(message); } // release handler mlock.lock(); msendingmessage = false; result = poll_callback; } else { // the last message left at the head of the queue determines the next wakeup time. mnextmessageuptime = messageenvelope.uptime; break; } } // release lock. mlock.unlock(); // invoke all response callbacks. for (size_t i = 0; i < mresponses.size(); i++) {//这是之前addfd的事件的处理,主要是遍历mresponses,然后调用其回调 response& response = mresponses.edititemat(i); if (response.request.ident == poll_callback) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if debug_poll_and_wake || debug_callbacks alogd("%p ~ pollonce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p", this, response.request.callback.get(), fd, events, data); #endif // invoke the callback. note that the file descriptor may be closed by // the callback (and potentially even reused) before the function returns so // we need to be a little careful when removing the file descriptor afterwards. int callbackresult = response.request.callback->handleevent(fd, events, data); if (callbackresult == 0) { removefd(fd, response.request.seq); } // clear the callback reference in the response structure promptly because we // will not clear the response vector itself until the next poll. response.request.callback.clear(); result = poll_callback; } } return result; }
继续分析looper的loop函数,可以增加自己的打印来调试代码,之前调用message的target的dispatchmessage来分配消息
for (;;) { message msg = queue.next(); // might block if (msg == null) { // no message indicates that the message queue is quitting. return; } // this must be in a local variable, in case a ui event sets the logger printer logging = me.mlogging;//自己的打印 if (logging != null) { logging.println(">>>>> dispatching to " + msg.target + " " + msg.callback + ": " + msg.what); } msg.target.dispatchmessage(msg); if (logging != null) { logging.println("<<<<< finished to " + msg.target + " " + msg.callback); } // make sure that during the course of dispatching the // identity of the thread wasn't corrupted. final long newident = binder.clearcallingidentity(); if (ident != newident) { log.wtf(tag, "thread identity changed from 0x" + long.tohexstring(ident) + " to 0x" + long.tohexstring(newident) + " while dispatching to " + msg.target.getclass().getname() + " " + msg.callback + " what=" + msg.what); } msg.recycleunchecked(); } }
2.3 增加调试打印
我们先来看自己添加打印,可以通过lopper的setmessagelogging函数来打印
public void setmessagelogging(@nullable printer printer) { mlogging = printer; } printer就是一个interface public interface printer { /** * write a line of text to the output. there is no need to terminate * the given string with a newline. */ void println(string x); }
2.4 java层消息分发处理
再来看消息的分发,先是调用handler的obtainmessage函数
message msg = mhandler.obtainmessage(msg_check_wake_lock_acquire_timeout); msg.setasynchronous(true); mhandler.sendmessagedelayed(msg, wake_lock_acquire_too_long_timeout);
先看obtainmessage调用了message的obtain函数
public final message obtainmessage(int what) { return message.obtain(this, what); }
message的obtain函数就是新建一个message,然后其target就是设置成其handler
public static message obtain(handler h, int what) { message m = obtain();//就是新建一个message m.target = h; m.what = what; return m; }
我们再联系之前分发消息
msg.target.dispatchmessage(msg);最后就是调用handler的dispatchmessage函数,最后在handler中,最后会根据不同的情况对消息进行处理。
public void dispatchmessage(message msg) { if (msg.callback != null) { handlecallback(msg);//这种就是用post形式发送,带runnable的 } else { if (mcallback != null) {//这种是handler传参的时候就是传入了mcallback回调了 if (mcallback.handlemessage(msg)) { return; } } handlemessage(msg);//最后就是在自己实现的handlemessage处理 } }
2.3 java层 消息发送
我们再看下java层的消息发送,主要也是调用handler的sendmessage post之类函数,最终都会调用下面这个函数
public boolean sendmessageattime(message msg, long uptimemillis) { messagequeue queue = mqueue; if (queue == null) { runtimeexception e = new runtimeexception( this + " sendmessageattime() called with no mqueue"); log.w("looper", e.getmessage(), e); return false; } return enqueuemessage(queue, msg, uptimemillis); }
我们再来看java层发送消息最终都会调用enqueuemessage函数
private boolean enqueuemessage(messagequeue queue, message msg, long uptimemillis) { msg.target = this; if (masynchronous) { msg.setasynchronous(true); } return queue.enqueuemessage(msg, uptimemillis); }
最终在enqueuemessage中,把消息加入消息队列,然后需要的话就调用c层的nativewake函数
boolean enqueuemessage(message msg, long when) { if (msg.target == null) { throw new illegalargumentexception("message must have a target."); } if (msg.isinuse()) { throw new illegalstateexception(msg + " this message is already in use."); } synchronized (this) { if (mquitting) { illegalstateexception e = new illegalstateexception( msg.target + " sending message to a handler on a dead thread"); log.w(tag, e.getmessage(), e); msg.recycle(); return false; } msg.markinuse(); msg.when = when; message p = mmessages; boolean needwake; if (p == null || when == 0 || when < p.when) { // new head, wake up the event queue if blocked. msg.next = p; mmessages = msg; needwake = mblocked; } else { // inserted within the middle of the queue. usually we don't have to wake // up the event queue unless there is a barrier at the head of the queue // and the message is the earliest asynchronous message in the queue. needwake = mblocked && p.target == null && msg.isasynchronous(); message prev; for (;;) { prev = p; p = p.next; if (p == null || when < p.when) { break; } if (needwake && p.isasynchronous()) { needwake = false; } } msg.next = p; // invariant: p == prev.next prev.next = msg; } // we can assume mptr != 0 because mquitting is false. if (needwake) { nativewake(mptr); } } return true; }
我们看下这个native方法,最后也是调用了looper的wake函数
static void android_os_messagequeue_nativewake(jnienv* env, jclass clazz, jlong ptr) { nativemessagequeue* nativemessagequeue = reinterpret_cast<nativemessagequeue*>(ptr); nativemessagequeue->wake(); } void nativemessagequeue::wake() { mlooper->wake(); }
looper类的wake,函数只是往mwakeeventfd中写了一些内容,这个fd只是通知而已,类似pipe,最后会把epoll_wait唤醒,线程就不阻塞了继续先发送c层消息,然后处理之前addfd的事件,然后处理java层的消息。
void looper::wake() { #if debug_poll_and_wake alogd("%p ~ wake", this); #endif uint64_t inc = 1; ssize_t nwrite = temp_failure_retry(write(mwakeeventfd, &inc, sizeof(uint64_t))); if (nwrite != sizeof(uint64_t)) { if (errno != eagain) { alogw("could not write wake signal, errno=%d", errno); } } }
2.4 c层发送消息
在c层也是可以发送消息的,主要是调用looper的sendmessageattime函数,参数有有一个handler是一个回调,我们把消息放在mmessageenvelopes中。
void looper::sendmessageattime(nsecs_t uptime, const sp<messagehandler>& handler, const message& message) { #if debug_callbacks alogd("%p ~ sendmessageattime - uptime=%" prid64 ", handler=%p, what=%d", this, uptime, handler.get(), message.what); #endif size_t i = 0; { // acquire lock automutex _l(mlock); size_t messagecount = mmessageenvelopes.size(); while (i < messagecount && uptime >= mmessageenvelopes.itemat(i).uptime) { i += 1; } messageenvelope messageenvelope(uptime, handler, message); mmessageenvelopes.insertat(messageenvelope, i, 1); // optimization: if the looper is currently sending a message, then we can skip // the call to wake() because the next thing the looper will do after processing // messages is to decide when the next wakeup time should be. in fact, it does // not even matter whether this code is running on the looper thread. if (msendingmessage) { return; } } // release lock // wake the poll loop only when we enqueue a new message at the head. if (i == 0) { wake(); } }
当在pollonce中,在epoll_wait之后,会遍历mmessageenvelopes中的消息,然后调用其handler的handlemessage函数
while (mmessageenvelopes.size() != 0) { nsecs_t now = systemtime(system_time_monotonic); const messageenvelope& messageenvelope = mmessageenvelopes.itemat(0); if (messageenvelope.uptime <= now) { // remove the envelope from the list. // we keep a strong reference to the handler until the call to handlemessage // finishes. then we drop it so that the handler can be deleted *before* // we reacquire our lock. { // obtain handler sp<messagehandler> handler = messageenvelope.handler; message message = messageenvelope.message; mmessageenvelopes.removeat(0); msendingmessage = true; mlock.unlock(); #if debug_poll_and_wake || debug_callbacks alogd("%p ~ pollonce - sending message: handler=%p, what=%d", this, handler.get(), message.what); #endif handler->handlemessage(message); } // release handler mlock.lock(); msendingmessage = false; result = poll_callback; } else { // the last message left at the head of the queue determines the next wakeup time. mnextmessageuptime = messageenvelope.uptime; break; } }
有一个looper_test.cpp文件,里面介绍了很多looper的使用方法,我们来看下
sp<stubmessagehandler> handler = new stubmessagehandler(); mlooper->sendmessageattime(now + ms2ns(100), handler, message(msg_test1)); stubmessagehandler继承messagehandler就必须实现handlemessage方法 class stubmessagehandler : public messagehandler { public: vector<message> messages; virtual void handlemessage(const message& message) { messages.push(message); } };
我们再顺便看下message和messagehandler类
struct message { message() : what(0) { } message(int what) : what(what) { } /* the message type. (interpretation is left up to the handler) */ int what; }; /** * interface for a looper message handler. * * the looper holds a strong reference to the message handler whenever it has * a message to deliver to it. make sure to call looper::removemessages * to remove any pending messages destined for the handler so that the handler * can be destroyed. */ class messagehandler : public virtual refbase { protected: virtual ~messagehandler() { } public: /** * handles a message. */ virtual void handlemessage(const message& message) = 0; };
2.5 c层addfd
我们也可以在looper.cpp的addfd中增加fd放入线程epoll中,当fd有数据来我们也可以处理相应的数据,下面我们先来看下addfd函数,我们注意其中有一个callback回调
int looper::addfd(int fd, int ident, int events, looper_callbackfunc callback, void* data) { return addfd(fd, ident, events, callback ? new simpleloopercallback(callback) : null, data); } int looper::addfd(int fd, int ident, int events, const sp<loopercallback>& callback, void* data) { #if debug_callbacks alogd("%p ~ addfd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident, events, callback.get(), data); #endif if (!callback.get()) { if (! mallownoncallbacks) { aloge("invalid attempt to set null callback but not allowed for this looper."); return -1; } if (ident < 0) { aloge("invalid attempt to set null callback with ident < 0."); return -1; } } else { ident = poll_callback; } { // acquire lock automutex _l(mlock); request request; request.fd = fd; request.ident = ident; request.events = events; request.seq = mnextrequestseq++; request.callback = callback; request.data = data; if (mnextrequestseq == -1) mnextrequestseq = 0; // reserve sequence number -1 struct epoll_event eventitem; request.initeventitem(&eventitem); ssize_t requestindex = mrequests.indexofkey(fd); if (requestindex < 0) { int epollresult = epoll_ctl(mepollfd, epoll_ctl_add, fd, & eventitem);//加入epoll if (epollresult < 0) { aloge("error adding epoll events for fd %d, errno=%d", fd, errno); return -1; } mrequests.add(fd, request);//放入mrequests中 } else { int epollresult = epoll_ctl(mepollfd, epoll_ctl_mod, fd, & eventitem);//更新 if (epollresult < 0) { if (errno == enoent) { // tolerate enoent because it means that an older file descriptor was // closed before its callback was unregistered and meanwhile a new // file descriptor with the same number has been created and is now // being registered for the first time. this error may occur naturally // when a callback has the side-effect of closing the file descriptor // before returning and unregistering itself. callback sequence number // checks further ensure that the race is benign. // // unfortunately due to kernel limitations we need to rebuild the epoll // set from scratch because it may contain an old file handle that we are // now unable to remove since its file descriptor is no longer valid. // no such problem would have occurred if we were using the poll system // call instead, but that approach carries others disadvantages. #if debug_callbacks alogd("%p ~ addfd - epoll_ctl_mod failed due to file descriptor " "being recycled, falling back on epoll_ctl_add, errno=%d", this, errno); #endif epollresult = epoll_ctl(mepollfd, epoll_ctl_add, fd, & eventitem); if (epollresult < 0) { aloge("error modifying or adding epoll events for fd %d, errno=%d", fd, errno); return -1; } scheduleepollrebuildlocked(); } else { aloge("error modifying epoll events for fd %d, errno=%d", fd, errno); return -1; } } mrequests.replacevalueat(requestindex, request); } } // release lock return 1; }
在pollonce函数中,我们先寻找mrequests中匹配的fd,然后在pushresponse中新建一个response,然后把response和request匹配起来。
} else { ssize_t requestindex = mrequests.indexofkey(fd); if (requestindex >= 0) { int events = 0; if (epollevents & epollin) events |= event_input; if (epollevents & epollout) events |= event_output; if (epollevents & epollerr) events |= event_error; if (epollevents & epollhup) events |= event_hangup; pushresponse(events, mrequests.valueat(requestindex)); } else { alogw("ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.", epollevents, fd); } }
下面我们就会遍历mresponses中的response,然后调用其request中的回调
for (size_t i = 0; i < mresponses.size(); i++) { response& response = mresponses.edititemat(i); if (response.request.ident == poll_callback) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if debug_poll_and_wake || debug_callbacks alogd("%p ~ pollonce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p", this, response.request.callback.get(), fd, events, data); #endif // invoke the callback. note that the file descriptor may be closed by // the callback (and potentially even reused) before the function returns so // we need to be a little careful when removing the file descriptor afterwards. int callbackresult = response.request.callback->handleevent(fd, events, data); if (callbackresult == 0) { removefd(fd, response.request.seq); } // clear the callback reference in the response structure promptly because we // will not clear the response vector itself until the next poll. response.request.callback.clear(); result = poll_callback; } }
同样我们再来看看looper_test.cpp是如何使用的?
pipe pipe; stubcallbackhandler handler(true); handler.setcallback(mlooper, pipe.receivefd, looper::event_input);
我们看下handler的setcallback函数
class callbackhandler { public: void setcallback(const sp<looper>& looper, int fd, int events) { looper->addfd(fd, 0, events, statichandler, this);//就是调用了looper的addfd函数,并且回调 } protected: virtual ~callbackhandler() { } virtual int handler(int fd, int events) = 0; private: static int statichandler(int fd, int events, void* data) {//这个就是回调函数 return static_cast<callbackhandler*>(data)->handler(fd, events); } }; class stubcallbackhandler : public callbackhandler { public: int nextresult; int callbackcount; int fd; int events; stubcallbackhandler(int nextresult) : nextresult(nextresult), callbackcount(0), fd(-1), events(-1) { } protected: virtual int handler(int fd, int events) {//这个是通过回调函数再调到这里的 callbackcount += 1; this->fd = fd; this->events = events; return nextresult; } };
我们结合looper的addfd一起来看,当callback是有的,我们新建一个simpleloopercallback
int looper::addfd(int fd, int ident, int events, looper_callbackfunc callback, void* data) { return addfd(fd, ident, events, callback ? new simpleloopercallback(callback) : null, data); }
这里的looper_callbackfunc是一个typedef
typedef int (*looper_callbackfunc)(int fd, int events, void* data);
我们再来看simpleloopercallback
class simpleloopercallback : public loopercallback { protected: virtual ~simpleloopercallback(); public: simpleloopercallback(looper_callbackfunc callback); virtual int handleevent(int fd, int events, void* data); private: looper_callbackfunc mcallback; };simpleloopercallback::simpleloopercallback(looper_callbackfunc callback) : mcallback(callback) { } simpleloopercallback::~simpleloopercallback() { } int simpleloopercallback::handleevent(int fd, int events, void* data) { return mcallback(fd, events, data); }
最后我们是调用callback->handleevent(fd, events, data),而callback就是simpleloopercallback,这里的data,之前传进来的就是callbackhandler 的this指针
因此最后就是调用了statichandler,而data->handler,就是this->handler,最后是虚函数就调用到了stubcallbackhandler 的handler函数中了。
当然我们也可以不用这么复杂,直接使用第二个addfd函数,当然callback我们需要自己定义一个类来实现loopercallback类就行了,这样就简单多了。
int addfd(int fd, int ident, int events, const sp<loopercallback>& callback, void* data);
2.6 java层addfd
一直以为只能在c层的looper中才能addfd,原来在java层也通过jni做了这个功能。
我们可以在messagequeue中的addonfiledescriptoreventlistener来实现这个功能
public void addonfiledescriptoreventlistener(@nonnull filedescriptor fd, @onfiledescriptoreventlistener.events int events, @nonnull onfiledescriptoreventlistener listener) { if (fd == null) { throw new illegalargumentexception("fd must not be null"); } if (listener == null) { throw new illegalargumentexception("listener must not be null"); } synchronized (this) { updateonfiledescriptoreventlistenerlocked(fd, events, listener); } }
我们再来看看onfiledescriptoreventlistener 这个回调
public interface onfiledescriptoreventlistener { public static final int event_input = 1 << 0; public static final int event_output = 1 << 1; public static final int event_error = 1 << 2; /** @hide */ @retention(retentionpolicy.source) @intdef(flag=true, value={event_input, event_output, event_error}) public @interface events {} @events int onfiledescriptorevents(@nonnull filedescriptor fd, @events int events); }
接着调用了updateonfiledescriptoreventlistenerlocked函数
private void updateonfiledescriptoreventlistenerlocked(filedescriptor fd, int events, onfiledescriptoreventlistener listener) { final int fdnum = fd.getint$(); int index = -1; filedescriptorrecord record = null; if (mfiledescriptorrecords != null) { index = mfiledescriptorrecords.indexofkey(fdnum); if (index >= 0) { record = mfiledescriptorrecords.valueat(index); if (record != null && record.mevents == events) { return; } } } if (events != 0) { events |= onfiledescriptoreventlistener.event_error; if (record == null) { if (mfiledescriptorrecords == null) { mfiledescriptorrecords = new sparsearray<filedescriptorrecord>(); } record = new filedescriptorrecord(fd, events, listener);//fd保存在filedescriptorrecord对象 mfiledescriptorrecords.put(fdnum, record);//mfiledescriptorrecords然后保存在 } else { record.mlistener = listener; record.mevents = events; record.mseq += 1; } nativesetfiledescriptorevents(mptr, fdnum, events);//调用native函数 } else if (record != null) { record.mevents = 0; mfiledescriptorrecords.removeat(index); } }
native最后调用了nativemessagequeue的setfiledescriptorevents函数
static void android_os_messagequeue_nativesetfiledescriptorevents(jnienv* env, jclass clazz, jlong ptr, jint fd, jint events) { nativemessagequeue* nativemessagequeue = reinterpret_cast<nativemessagequeue*>(ptr); nativemessagequeue->setfiledescriptorevents(fd, events); }
setfiledescriptorevents函数,这个addfd就是调用的第二个addfd,因此我们可以肯定nativemessagequeue继承了loopercallback
void nativemessagequeue::setfiledescriptorevents(int fd, int events) { if (events) { int looperevents = 0; if (events & callback_event_input) { looperevents |= looper::event_input; } if (events & callback_event_output) { looperevents |= looper::event_output; } mlooper->addfd(fd, looper::poll_callback, looperevents, this, reinterpret_cast<void*>(events)); } else { mlooper->removefd(fd); } }
果然是,需要实现handleevent函数
class nativemessagequeue : public messagequeue, public loopercallback { public: nativemessagequeue(); virtual ~nativemessagequeue(); virtual void raiseexception(jnienv* env, const char* msg, jthrowable exceptionobj); void pollonce(jnienv* env, jobject obj, int timeoutmillis); void wake(); void setfiledescriptorevents(int fd, int events); virtual int handleevent(int fd, int events, void* data);
handleevent就是在looper中epoll_wait之后,当我们增加的fd有数据就会调用这个函数
int nativemessagequeue::handleevent(int fd, int looperevents, void* data) { int events = 0; if (looperevents & looper::event_input) { events |= callback_event_input; } if (looperevents & looper::event_output) { events |= callback_event_output; } if (looperevents & (looper::event_error | looper::event_hangup | looper::event_invalid)) { events |= callback_event_error; } int oldwatchedevents = reinterpret_cast<intptr_t>(data); int newwatchedevents = mpollenv->callintmethod(mpollobj, gmessagequeueclassinfo.dispatchevents, fd, events); //调用回调 if (!newwatchedevents) { return 0; // unregister the fd } if (newwatchedevents != oldwatchedevents) { setfiledescriptorevents(fd, newwatchedevents); } return 1; }
最后在java的messagequeue中的dispatchevents就是在jni层反调过来的,然后调用之前注册的回调函数
// called from native code. private int dispatchevents(int fd, int events) { // get the file descriptor record and any state that might change. final filedescriptorrecord record; final int oldwatchedevents; final onfiledescriptoreventlistener listener; final int seq; synchronized (this) { record = mfiledescriptorrecords.get(fd);//通过fd得到filedescriptorrecord if (record == null) { return 0; // spurious, no listener registered } oldwatchedevents = record.mevents; events &= oldwatchedevents; // filter events based on current watched set if (events == 0) { return oldwatchedevents; // spurious, watched events changed } listener = record.mlistener; seq = record.mseq; } // invoke the listener outside of the lock. int newwatchedevents = listener.onfiledescriptorevents(//listener回调 record.mdescriptor, events); if (newwatchedevents != 0) { newwatchedevents |= onfiledescriptoreventlistener.event_error; } // update the file descriptor record if the listener changed the set of // events to watch and the listener itself hasn't been updated since. if (newwatchedevents != oldwatchedevents) { synchronized (this) { int index = mfiledescriptorrecords.indexofkey(fd); if (index >= 0 && mfiledescriptorrecords.valueat(index) == record && record.mseq == seq) { record.mevents = newwatchedevents; if (newwatchedevents == 0) { mfiledescriptorrecords.removeat(index); } } } } // return the new set of events to watch for native code to take care of. return newwatchedevents; }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。