Android7.0 MessageQueue详解
android中的消息处理机制大量依赖于handler。每个handler都有对应的looper,用于不断地从对应的messagequeue中取出消息处理。
一直以来,觉得messagequeue应该是java层的抽象,然而事实上messagequeue的主要部分在native层中。
自己对messagequeue在native层的工作不太熟悉,借此机会分析一下。
一、messagequeue的创建
当需要使用looper时,我们会调用looper的prepare函数:
public static void prepare() { prepare(true); } private static void prepare(boolean quitallowed) { if (sthreadlocal.get() != null) { throw new runtimeexception("only one looper may be created per thread"); } //sthreadlocal为线程本地存储区;每个线程仅有一个looper sthreadlocal.set(new looper(quitallowed)); } private looper(boolean quitallowed) { //创建出messagequeue mqueue = new messagequeue(quitallowed); mthread = thread.currentthread(); }
1 nativemessagequeue
我们看看messagequeue的构造函数:
messagequeue(boolean quitallowed) { mquitallowed = quitallowed; //mptr的类型为long? mptr = nativeinit(); }
messagequeue的构造函数中就调用了native函数,我们看看android_os_messagequeue.cpp中的实现:
static jlong android_os_messagequeue_nativeinit(jnienv* env, jclass clazz) { //messagequeue的native层实体 nativemessagequeue* nativemessagequeue = new nativemessagequeue(); ............ //这里应该类似与将指针转化成long类型,放在java层保存;估计java层使用时,会在native层将long变成指针,就可以操作队列了 return reinterpret_cast<jlong>(nativemessagequeue); }
我们跟进nativemessagequeue的构造函数:
nativemessagequeue::nativemessagequeue() : mpollenv(null), mpollobj(null), mexceptionobj(null) { //创建一个native层的looper,也是线程唯一的 mlooper = looper::getforthread(); if (mlooper == null) { mlooper = new looper(false); looper::setforthread(mlooper); } }
从代码来看,native层和java层均有looper对象,应该都是操作messagequeue的。messagequeue在java层和native层有各自的存储结构,分别存储java层和native层的消息。
2 native层的looper
我们看看native层looper的构造函数:
looper::looper(bool allownoncallbacks) : mallownoncallbacks(allownoncallbacks), msendingmessage(false), mpolling(false), mepollfd(-1), mepollrebuildrequired(false), mnextrequestseq(0), mresponseindex(0), mnextmessageuptime(llong_max) { //此处创建了个fd mwakeeventfd = eventfd(0, efd_nonblock | efd_cloexec); ....... rebuildepolllocked(); }
在native层中,messagequeue中的looper初始化时,还调用了rebuildepolllocked函数,我们跟进一下:
void looper::rebuildepolllocked() { // close old epoll instance if we have one. if (mepollfd >= 0) { close(mepollfd); } // allocate the new epoll instance and register the wake pipe. mepollfd = epoll_create(epoll_size_hint); ............ struct epoll_event eventitem; memset(& eventitem, 0, sizeof(epoll_event)); // zero out unused members of data field union eventitem.events = epollin; eventitem.data.fd = mwakeeventfd; //在mepollfd上监听mwakeeventfd上是否有数据到来 int result = epoll_ctl(mepollfd, epoll_ctl_add, mwakeeventfd, & eventitem); ........... for (size_t i = 0; i < mrequests.size(); i++) { const request& request = mrequests.valueat(i); struct epoll_event eventitem; request.initeventitem(&eventitem); //监听request对应fd上数据的到来 int epollresult = epoll_ctl(mepollfd, epoll_ctl_add, request.fd, & eventitem); ............ } }
从native层的looper来看,我们知道native层依赖于epoll来驱动事件处理。此处我们先保留一下大致的映像,后文详细分析。
二、使用messagequeue
1 写入消息
android中既可以在java层向messagequeue写入消息,也可以在native层向messagequeue写入消息。我们分别看一下对应的操作流程。
1.1 java层写入消息
java层向messagequeue写入消息,依赖于enqueuemessage函数:
boolean enqueuemessage(message msg, long when) { if (msg.target == null) { throw new illegalargumentexception("message must have a target."); } if (msg.isinuse()) { throw new illegalstateexception(msg + " this message is already in use."); } synchronized (this) { if (mquitting) { ..... return false; } msg.markinuse(); msg.when = when; message p = mmessages; boolean needwake; if (p == null || when == 0 || when < p.when) { // new head, wake up the event queue if blocked. msg.next = p; mmessages = msg; //在头部插入数据,如果之前messagequeue是阻塞的,那么现在需要唤醒 needwake = mblocked; } else { // inserted within the middle of the queue. usually we don't have to wake // up the event queue unless there is a barrier at the head of the queue // and the message is the earliest asynchronous message in the queue. needwake = mblocked && p.target == null && msg.isasynchronous(); message prev; for (;;) { prev = p; p = p.next; if (p == null || when < p.when) { break; } //不是第一个异步消息时,needwake置为false if (needwake && p.isasynchronous()) { needwake = false; } } msg.next = p; // invariant: p == prev.next prev.next = msg; } // we can assume mptr != 0 because mquitting is false. if (needwake) { nativewake(mptr); } } return true; }
上述代码比较简单,主要就是将新加入的message按执行时间插入到原有的队列中,然后根据情况调用nativeawake函数。
我们跟进一下nativeawake:
void nativemessagequeue::wake() { mlooper->wake(); } void looper::wake() { uint64_t inc = 1; //就是向mwakeeventfd写入数据 ssize_t nwrite = temp_failure_retry(write(mwakeeventfd, &inc, sizeof(uint64_t))); ............. }
在native层的looper初始化时,我们提到过native层的looper将利用epoll来驱动事件,其中构造出的epoll句柄就监听了mwakeeventfd。
实际上从messagequeue中取出数据时,若没有数据到来,就会利用epoll进行等待;因此当java层写入消息时,将会将唤醒处于等待状态的messagequeue。
在后文介绍从messagequeue中提取消息时,将再次分析这个问题。
1.2 native层写入消息
native层写入消息,依赖于native层looper的sendmessage函数:
void looper::sendmessage(const sp<messagehandler>& handler, const message& message) { nsecs_t now = systemtime(system_time_monotonic); sendmessageattime(now, handler, message); } void looper::sendmessageattime(nsecs_t uptime, const sp<messagehandler>& handler, const message& message) { size_t i = 0; { automutex _l(mlock); //同样需要按时间插入 size_t messagecount = mmessageenvelopes.size(); while (i < messagecount && uptime >= mmessageenvelopes.itemat(i).uptime) { i += 1; } //将message包装成一个messageenvelope对象 messageenvelope messageenvelope(uptime, handler, message); mmessageenvelopes.insertat(messageenvelope, i, 1); // optimization: if the looper is currently sending a message, then we can skip // the call to wake() because the next thing the looper will do after processing // messages is to decide when the next wakeup time should be. in fact, it does // not even matter whether this code is running on the looper thread. if (msendingmessage) { return; } } // wake the poll loop only when we enqueue a new message at the head. if (i == 0) { //若插入在队列头部,同样利用wake函数触发epoll唤醒 wake(); } }
以上就是向messagequeue中加入消息的主要流程,接下来我们看看从messagequeue中取出消息的流程。
2、提取消息
当java层的looper对象调用loop函数时,就开始使用messagequeue提取消息了:
public static void loop() { final looper me = mylooper(); ....... for (;;) { message msg = queue.next(); // might block ....... try { //调用message的处理函数进行处理 msg.target.dispatchmessage(msg); }........ } }
此处我们看看messagequeue的next函数:
message next() { //mptr保存了nativemessagequeue的指针 final long ptr = mptr; ....... int pendingidlehandlercount = -1; // -1 only during first iteration int nextpolltimeoutmillis = 0; for (;;) { if (nextpolltimeoutmillis != 0) { //会调用native函数,最终调用ipcthread的talkwithdriver,将数据写入binder驱动或者读取一次数据 //不知道在此处进行这个操作的理由? binder.flushpendingcommands(); } //处理native层的数据,此处会利用epoll进行blocked nativepollonce(ptr, nextpolltimeoutmillis); synchronized (this) { final long now = systemclock.uptimemillis(); message prevmsg = null; message msg = mmessages; //下面其实就是找出下一个异步处理类型的消息;异步处理类型的消息,才含有对应的执行函数 if (msg != null && msg.target == null) { // stalled by a barrier. find the next asynchronous message in the queue. do { prevmsg = msg; msg = msg.next; } while (msg != null && !msg.isasynchronous()); } if (msg != null) { if (now < msg.when) { // next message is not ready. set a timeout to wake up when it is ready. nextpolltimeoutmillis = (int) math.min(msg.when - now, integer.max_value); } else { // got a message. mblocked = false; //完成next记录的存储 if (prevmsg != null) { prevmsg.next = msg.next; } else { mmessages = msg.next; } msg.next = null; if (debug) log.v(tag, "returning message: " + msg); msg.markinuse(); return msg; } } else { // no more messages. nextpolltimeoutmillis = -1; } // process the quit message now that all pending messages have been handled. if (mquitting) { dispose(); return null; } //messagequeue中引入了idlehandler接口,即当messagequeue没有数据处理时,调用idlehandler进行一些工作 //pendingidlehandlercount表示待处理的idlehandler,初始为-1 if (pendingidlehandlercount < 0 && (mmessages == null || now < mmessages.when)) { //midlehandlers的size默认为0,调用接口addidlehandler才能增加 pendingidlehandlercount = midlehandlers.size(); } if (pendingidlehandlercount <= 0) { // no idle handlers to run. loop and wait some more. mblocked = true; continue; } //将待处理的idlehandler加入到pendingidlehandlers中 if (mpendingidlehandlers == null) { mpendingidlehandlers = new idlehandler[math.max(pendingidlehandlercount, 4)]; } //调用arraylist.toarray(t[])节省每次分配的开销;毕竟对于message.next这样调用频率较高的函数,能省一点就是一点 mpendingidlehandlers = midlehandlers.toarray(mpendingidlehandlers); } for (int i = 0; i < pendingidlehandlercount; i++) { final idlehandler idler = mpendingidlehandlers[i]; mpendingidlehandlers[i] = null; // release the reference to the handler boolean keep = false; try { //执行实现类的queueidle函数,返回值决定是否继续保留 keep = idler.queueidle(); } catch (throwable t) { log.wtf(tag, "idlehandler threw exception", t); } if (!keep) { synchronized (this) { midlehandlers.remove(idler); } } } pendingidlehandlercount = 0; nextpolltimeoutmillis = 0; } }
整个提取消息的过程,大致上如上图所示。
可以看到在java层,looper除了要取出messagequeue的消息外,还会在队列空闲期执行idlehandler定义的函数。
2.1 nativepollonce
现在唯一的疑点是nativepollonce是如何处理native层数据的,我们看看对应的native函数:
static void android_os_messagequeue_nativepollonce(jnienv* env, jobject obj, jlong ptr, jint timeoutmillis) { //果然java层调用native层messagequeue时,将long类型的ptr变为指针 nativemessagequeue* nativemessagequeue = reinterpret_cast<nativemessagequeue*>(ptr); nativemessagequeue->pollonce(env, obj, timeoutmillis); } void nativemessagequeue::pollonce(jnienv* env, jobject pollobj, int timeoutmillis) { mpollenv = env; mpollobj = pollobj; //最后还是进入到native层looper的pollonce函数 mlooper->pollonce(timeoutmillis); mpollobj = null; mpollenv = null; if (mexceptionobj) { ......... } }
看看native层looper的pollonce函数:
//timeoutmillis为超时等待时间。值为-1时,表示无限等待直到有事件到来;值为0时,表示无需等待 //outfd此时为null,含义是:存储产生事件的文件句柄 //outevents此时为null,含义是:存储outfd上发生了哪些事件,包括可读、可写、错误和中断 //outdata此时为null,含义是:存储上下文数据,其实调用时传入的参数 int looper::pollonce(int timeoutmillis, int* outfd, int* outevents, void** outdata) { int result = 0; for (;;) { //处理response,目前我们先不关注response的内含 while (mresponseindex < mresponses.size()) { const response& response = mresponses.itemat(mresponseindex++); int ident = response.request.ident; if (ident >= 0) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; if (outfd != null) *outfd = fd; if (outevents != null) *outevents = events; if (outdata != null) *outdata = data; return ident; } } //根据pollinner的结果,进行操作 if (result != 0) { if (outfd != null) *outfd = 0; if (outevents != null) *outevents = 0; if (outdata != null) *outdata = null; return result; } //主力还是靠pollinner result = pollinner(timeoutmillis); } }
跟进一下pollinner函数:
int looper::pollinner(int timeoutmillis) { // adjust the timeout based on when the next message is due. //timeoutmillis是java层事件等待事件 //native层维持了native message的等待时间 //此处其实就是选择最小的等待时间 if (timeoutmillis != 0 && mnextmessageuptime != llong_max) { nsecs_t now = systemtime(system_time_monotonic); int messagetimeoutmillis = tomillisecondtimeoutdelay(now, mnextmessageuptime); if (messagetimeoutmillis >= 0 && (timeoutmillis < 0 || messagetimeoutmillis < timeoutmillis)) { timeoutmillis = messagetimeoutmillis; } } int result = poll_wake; //pollinner初始就清空response mresponses.clear(); mresponseindex = 0; // we are about to idle. mpolling = true; //利用epoll等待mepollfd监控的句柄上事件到达 struct epoll_event eventitems[epoll_max_events]; int eventcount = epoll_wait(mepollfd, eventitems, epoll_max_events, timeoutmillis); // no longer idling. mpolling = false; // acquire lock. mlock.lock(); //重新调用rebuildepolllocked时,将使得epoll句柄能够监听新加入request对应的fd if (mepollrebuildrequired) { mepollrebuildrequired = false; rebuildepolllocked(); goto done; } // check for poll error. if (eventcount < 0) { if (errno == eintr) { goto done; } ...... result = poll_error; goto done; } // check for poll timeout. if (eventcount == 0) { result = poll_timeout; goto done; } for (int i = 0; i < eventcount; i++) { if (fd == mwakeeventfd) { if (epollevents & epollin) { //前面已经分析过,当java层或native层有数据写入队列时,将写mwakeeventfd,以触发epoll唤醒 //awoken将读取并清空mwakeeventfd上的数据 awoken(); } else { ......... } } else { //epoll同样监听的request对应的fd ssize_t requestindex = mrequests.indexofkey(fd); if (requestindex >= 0) { int events = 0; if (epollevents & epollin) events |= event_input; if (epollevents & epollout) events |= event_output; if (epollevents & epollerr) events |= event_error; if (epollevents & epollhup) events |= event_hangup; //存储这个fd对应的response pushresponse(events, mrequests.valueat(requestindex)); } else { .......... } } }
done:
// invoke pending message callbacks. mnextmessageuptime = llong_max; //处理native层的message while (mmessageenvelopes.size() != 0) { nsecs_t now = systemtime(system_time_monotonic); const messageenvelope& messageenvelope = mmessageenvelopes.itemat(0); if (messageenvelope.uptime <= now) { // remove the envelope from the list. // we keep a strong reference to the handler until the call to handlemessage // finishes. then we drop it so that the handler can be deleted *before* // we reacquire our lock. { sp<messagehandler> handler = messageenvelope.handler; message message = messageenvelope.message; mmessageenvelopes.removeat(0); msendingmessage = true; mlock.unlock(); //处理native message handler->handlemessage(message); } mlock.lock(); msendingmessage = false; result = poll_callback; } else { // the last message left at the head of the queue determines the next wakeup time. mnextmessageuptime = messageenvelope.uptime; break; } } // release lock. mlock.unlock(); //处理带回调函数的response for (size_t i = 0; i < mresponses.size(); i++) { response& response = mresponses.edititemat(i); if (response.request.ident == poll_callback) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; //调用response的callback int callbackresult = response.request.callback->handleevent(fd, events, data); if (callbackresult == 0) { removefd(fd, response.request.seq); } response.request.callback.clear(); result = poll_callback; } } return result; }
说实话native层的代码写的很乱,该函数的功能比较多。
如上图所示,在nativepollonce中利用epoll监听是否有数据到来,然后处理native message、native response。
最后,我们看看如何在native层中加入request。
3 添加监控请求
native层增加request依赖于looper的接口addfd:
//fd表示需要监听的句柄 //ident的含义还没有搞明白 //events表示需要监听的事件,例如event_input、event_output、event_error和event_hangup中的一个或多个 //callback为事件发生后的回调函数 //data为回调函数对应的参数 int looper::addfd(int fd, int ident, int events, looper_callbackfunc callback, void* data) { return addfd(fd, ident, events, callback ? new simpleloopercallback(callback) : null, data); }
结合上文native层轮询队列的操作,我们大致可以知道:addfd的目的,就是让native层的looper监控新加入的fd上是否有指定事件发生。
如果发生了指定的事件,就利用回调函数及参数构造对应的response。
native层的looper处理response时,就可以执行对应的回调函数了。
看看实际的代码:
int looper::addfd(int fd, int ident, int events, const sp<loopercallback>& callback, void* data) { ........ { automutex _l(mlock); //利用参数构造一个request request request; request.fd = fd; request.ident = ident; request.events = events; request.seq = mnextrequestseq++; request.callback = callback; request.data = data; if (mnextrequestseq == -1) mnextrequestseq = 0; // reserve sequence number -1 struct epoll_event eventitem; request.initeventitem(&eventitem); //判断之前是否已经利用该fd构造过request ssize_t requestindex = mrequests.indexofkey(fd); if (requestindex < 0) { //mepollfd新增一个需监听fd int epollresult = epoll_ctl(mepollfd, epoll_ctl_add, fd, & eventitem); ....... mrequests.add(fd, request); } else { //mepollfd修改旧的fd对应的监听事件 int epollresult = epoll_ctl(mepollfd, epoll_ctl_mod, fd, & eventitem); if (epollresult < 0) { if (errno == enoent) { // tolerate enoent because it means that an older file descriptor was // closed before its callback was unregistered and meanwhile a new // file descriptor with the same number has been created and is now // being registered for the first time. epollresult = epoll_ctl(mepollfd, epoll_ctl_add, fd, & eventitem); ....... } //发生错误重新加入时,安排epollrebuildlocked,将让epollfd重新添加一次待监听的fd scheduleepollrebuildlocked(); } mrequests.replacevalueat(requestindex, request); } } }
对加入监控请求的处理,在上文介绍pollinner函数时已做分析,此处不再赘述。
三、总结
1、流程总结
messagequeue的整个流程包括了java部分和native部分,从图中可以看出native层的比重还是很大的。我们结合上图回忆一下整个messagequeue对应的处理流程:
1、java层创建looper对象时,将会创建java层的messagequeue;java层的messagequeue初始化时,将利用native函数创建出native层的messagequeue。
2、native层的messagequeue初始化后,将创建对应的native looper对象。native对象初始化时,将创建对应epollfd和wakeeventfd。其中,epollfd将作为epoll的监听句柄,初始时epollfd仅监听wakeeventfd。
3、图中红色线条为looper从messagequeue中取消息时,处理逻辑的流向。
3.1、当java层的looper开始循环时,首先需要通过jni函数调用native looper进行pollonce的操作。
3.2、native looper开始运行后,需要等待epollfd被唤醒。当epollfd等待超时或监听的句柄有事件到来,native looper就可以开始处理事件了。
3.3、在native层,native looper将先处理native messagequeue中的消息,再调用response对应的回调函数。
3.4、本次循环中,native层事件处理完毕后,才开始处理java层中messagequeue的消息。若messagequeue中没有消息需要处理,并且messagequeue中存在idlehandler时,将调用idlehandler定义的处理函数。
图中蓝色部分为对应的函数调用:
在java层:
利用messagequeue的addidlehandler,可以为messagequeue增加idlehandler;
利用messagequeue的enqueuemessage,可以向messagequeue增加消息;必要时将利用native函数向native层的wakeeventfd写入消息,以唤醒epollfd。
在native层:
利用looper:sendmessage,可以为native messagequeue增加消息;同样,要时将向native层的wakeeventfd写入消息,以唤醒epollfd;
利用looper:addfd,可以向native looper注册监听请求,监听请求包含需监听的fd、监听的事件及对应的回调函数等,监听请求对应的fd将被成为epollfd监听的对象。当被监听的fd发生对应的事件后,将会唤醒epollfd,此时将生成对应response加入的response list中,等待处理。一旦response被处理,就会调用对应的回调函数。
2、注意事项
messagequeue在java层和native层有各自的存储结构,可以分别增加消息。从处理逻辑来看,会优先处理native层的message,然后处理native层生成的response,最后才是处理java层的message。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。