欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  移动技术

Android7.0 MessageQueue详解

程序员文章站 2024-03-05 10:40:12
android中的消息处理机制大量依赖于handler。每个handler都有对应的looper,用于不断地从对应的messagequeue中取出消息处理。 一直以...

android中的消息处理机制大量依赖于handler。每个handler都有对应的looper,用于不断地从对应的messagequeue中取出消息处理。

一直以来,觉得messagequeue应该是java层的抽象,然而事实上messagequeue的主要部分在native层中。
自己对messagequeue在native层的工作不太熟悉,借此机会分析一下。

一、messagequeue的创建

当需要使用looper时,我们会调用looper的prepare函数:

public static void prepare() {
 prepare(true);
}

private static void prepare(boolean quitallowed) {
 if (sthreadlocal.get() != null) {
 throw new runtimeexception("only one looper may be created per thread");
 }
 //sthreadlocal为线程本地存储区;每个线程仅有一个looper
 sthreadlocal.set(new looper(quitallowed));
}

private looper(boolean quitallowed) {
 //创建出messagequeue
 mqueue = new messagequeue(quitallowed);
 mthread = thread.currentthread();
}

1 nativemessagequeue

我们看看messagequeue的构造函数:

messagequeue(boolean quitallowed) {
 mquitallowed = quitallowed;
 //mptr的类型为long?
 mptr = nativeinit();
}

messagequeue的构造函数中就调用了native函数,我们看看android_os_messagequeue.cpp中的实现:

static jlong android_os_messagequeue_nativeinit(jnienv* env, jclass clazz) {
 //messagequeue的native层实体
 nativemessagequeue* nativemessagequeue = new nativemessagequeue();
 ............
 //这里应该类似与将指针转化成long类型,放在java层保存;估计java层使用时,会在native层将long变成指针,就可以操作队列了
 return reinterpret_cast<jlong>(nativemessagequeue);
}

我们跟进nativemessagequeue的构造函数:

nativemessagequeue::nativemessagequeue() :
 mpollenv(null), mpollobj(null), mexceptionobj(null) {
 //创建一个native层的looper,也是线程唯一的
 mlooper = looper::getforthread();
 if (mlooper == null) {
 mlooper = new looper(false);
 looper::setforthread(mlooper);
 }
}

从代码来看,native层和java层均有looper对象,应该都是操作messagequeue的。messagequeue在java层和native层有各自的存储结构,分别存储java层和native层的消息。

2 native层的looper

我们看看native层looper的构造函数:

looper::looper(bool allownoncallbacks) :
 mallownoncallbacks(allownoncallbacks), msendingmessage(false),
 mpolling(false), mepollfd(-1), mepollrebuildrequired(false),
 mnextrequestseq(0), mresponseindex(0), mnextmessageuptime(llong_max) {
 //此处创建了个fd
 mwakeeventfd = eventfd(0, efd_nonblock | efd_cloexec);
 .......
 rebuildepolllocked();
}

在native层中,messagequeue中的looper初始化时,还调用了rebuildepolllocked函数,我们跟进一下:

void looper::rebuildepolllocked() {
 // close old epoll instance if we have one.
 if (mepollfd >= 0) {
 close(mepollfd);
 }

 // allocate the new epoll instance and register the wake pipe.
 mepollfd = epoll_create(epoll_size_hint);
 ............
 struct epoll_event eventitem;
 memset(& eventitem, 0, sizeof(epoll_event)); // zero out unused members of data field union
 eventitem.events = epollin;
 eventitem.data.fd = mwakeeventfd;
 //在mepollfd上监听mwakeeventfd上是否有数据到来
 int result = epoll_ctl(mepollfd, epoll_ctl_add, mwakeeventfd, & eventitem);
 ...........
 for (size_t i = 0; i < mrequests.size(); i++) {
 const request& request = mrequests.valueat(i);
 struct epoll_event eventitem;
 request.initeventitem(&eventitem);
 //监听request对应fd上数据的到来
 int epollresult = epoll_ctl(mepollfd, epoll_ctl_add, request.fd, & eventitem);
 ............
 }
}

从native层的looper来看,我们知道native层依赖于epoll来驱动事件处理。此处我们先保留一下大致的映像,后文详细分析。

二、使用messagequeue

1 写入消息
 android中既可以在java层向messagequeue写入消息,也可以在native层向messagequeue写入消息。我们分别看一下对应的操作流程。

1.1 java层写入消息
java层向messagequeue写入消息,依赖于enqueuemessage函数:

boolean enqueuemessage(message msg, long when) {
 if (msg.target == null) {
 throw new illegalargumentexception("message must have a target.");
 }
 if (msg.isinuse()) {
 throw new illegalstateexception(msg + " this message is already in use.");
 }

 synchronized (this) {
 if (mquitting) {
  .....
  return false;
 }

 msg.markinuse();
 msg.when = when;
 message p = mmessages;
 boolean needwake;
 if (p == null || when == 0 || when < p.when) {
  // new head, wake up the event queue if blocked.
  msg.next = p;
  mmessages = msg;
  //在头部插入数据,如果之前messagequeue是阻塞的,那么现在需要唤醒
  needwake = mblocked;
 } else {
  // inserted within the middle of the queue. usually we don't have to wake
  // up the event queue unless there is a barrier at the head of the queue
  // and the message is the earliest asynchronous message in the queue.
  needwake = mblocked && p.target == null && msg.isasynchronous();
  message prev;
  for (;;) {
  prev = p;
  p = p.next;
  if (p == null || when < p.when) {
   break;
  }
  //不是第一个异步消息时,needwake置为false
  if (needwake && p.isasynchronous()) {
   needwake = false;
  }
  }
  msg.next = p; // invariant: p == prev.next
  prev.next = msg;
 }
 // we can assume mptr != 0 because mquitting is false.
 if (needwake) {
  nativewake(mptr);
 }
 }
 return true;
}

上述代码比较简单,主要就是将新加入的message按执行时间插入到原有的队列中,然后根据情况调用nativeawake函数。

我们跟进一下nativeawake:

void nativemessagequeue::wake() {
 mlooper->wake();
}

void looper::wake() {
 uint64_t inc = 1;
 //就是向mwakeeventfd写入数据
 ssize_t nwrite = temp_failure_retry(write(mwakeeventfd, &inc, sizeof(uint64_t)));
 .............
}

在native层的looper初始化时,我们提到过native层的looper将利用epoll来驱动事件,其中构造出的epoll句柄就监听了mwakeeventfd。
实际上从messagequeue中取出数据时,若没有数据到来,就会利用epoll进行等待;因此当java层写入消息时,将会将唤醒处于等待状态的messagequeue。
在后文介绍从messagequeue中提取消息时,将再次分析这个问题。

1.2 native层写入消息
native层写入消息,依赖于native层looper的sendmessage函数:

void looper::sendmessage(const sp<messagehandler>& handler, const message& message) {
 nsecs_t now = systemtime(system_time_monotonic);
 sendmessageattime(now, handler, message);
}

void looper::sendmessageattime(nsecs_t uptime, const sp<messagehandler>& handler,
 const message& message) {
 size_t i = 0;
 {
 automutex _l(mlock);

 //同样需要按时间插入
 size_t messagecount = mmessageenvelopes.size();
 while (i < messagecount && uptime >= mmessageenvelopes.itemat(i).uptime) {
  i += 1;
 }

 //将message包装成一个messageenvelope对象
 messageenvelope messageenvelope(uptime, handler, message);
 mmessageenvelopes.insertat(messageenvelope, i, 1);

 // optimization: if the looper is currently sending a message, then we can skip
 // the call to wake() because the next thing the looper will do after processing
 // messages is to decide when the next wakeup time should be. in fact, it does
 // not even matter whether this code is running on the looper thread.
 if (msendingmessage) {
  return;
 }
 }
 // wake the poll loop only when we enqueue a new message at the head.
 if (i == 0) {
 //若插入在队列头部,同样利用wake函数触发epoll唤醒
 wake();
 }
}

以上就是向messagequeue中加入消息的主要流程,接下来我们看看从messagequeue中取出消息的流程。

2、提取消息
当java层的looper对象调用loop函数时,就开始使用messagequeue提取消息了:

public static void loop() {
 final looper me = mylooper();
 .......
 for (;;) {
 message msg = queue.next(); // might block
 .......
 try {
  //调用message的处理函数进行处理
  msg.target.dispatchmessage(msg);
 }........
 }
}

此处我们看看messagequeue的next函数:

message next() {
 //mptr保存了nativemessagequeue的指针
 final long ptr = mptr;
 .......
 int pendingidlehandlercount = -1; // -1 only during first iteration
 int nextpolltimeoutmillis = 0;

 for (;;) {
 if (nextpolltimeoutmillis != 0) {
  //会调用native函数,最终调用ipcthread的talkwithdriver,将数据写入binder驱动或者读取一次数据
  //不知道在此处进行这个操作的理由?
  binder.flushpendingcommands();
 }

 //处理native层的数据,此处会利用epoll进行blocked
 nativepollonce(ptr, nextpolltimeoutmillis);

 synchronized (this) {
  final long now = systemclock.uptimemillis();
  message prevmsg = null;
  message msg = mmessages;
  //下面其实就是找出下一个异步处理类型的消息;异步处理类型的消息,才含有对应的执行函数
  if (msg != null && msg.target == null) {
  // stalled by a barrier. find the next asynchronous message in the queue.
  do {
   prevmsg = msg;
   msg = msg.next;
  } while (msg != null && !msg.isasynchronous());
  }

  if (msg != null) {
  if (now < msg.when) {
   // next message is not ready. set a timeout to wake up when it is ready.
   nextpolltimeoutmillis = (int) math.min(msg.when - now, integer.max_value);
  } else {
   // got a message.
   mblocked = false;
   //完成next记录的存储
   if (prevmsg != null) {
   prevmsg.next = msg.next;
   } else {
   mmessages = msg.next;
   }
   msg.next = null;
   if (debug) log.v(tag, "returning message: " + msg);
   msg.markinuse();
   return msg;
  }
  } else {
  // no more messages.
  nextpolltimeoutmillis = -1;
  }

  // process the quit message now that all pending messages have been handled.
  if (mquitting) {
  dispose();
  return null;
  }

  //messagequeue中引入了idlehandler接口,即当messagequeue没有数据处理时,调用idlehandler进行一些工作

  //pendingidlehandlercount表示待处理的idlehandler,初始为-1
  if (pendingidlehandlercount < 0
   && (mmessages == null || now < mmessages.when)) {
  //midlehandlers的size默认为0,调用接口addidlehandler才能增加
  pendingidlehandlercount = midlehandlers.size();
  }

  if (pendingidlehandlercount <= 0) {
  // no idle handlers to run. loop and wait some more.
  mblocked = true;
  continue;
  }

  //将待处理的idlehandler加入到pendingidlehandlers中
  if (mpendingidlehandlers == null) {
  mpendingidlehandlers = new idlehandler[math.max(pendingidlehandlercount, 4)];
  }
  //调用arraylist.toarray(t[])节省每次分配的开销;毕竟对于message.next这样调用频率较高的函数,能省一点就是一点
  mpendingidlehandlers = midlehandlers.toarray(mpendingidlehandlers);
 }

 for (int i = 0; i < pendingidlehandlercount; i++) {
  final idlehandler idler = mpendingidlehandlers[i];
  mpendingidlehandlers[i] = null; // release the reference to the handler

  boolean keep = false;
  try {
  //执行实现类的queueidle函数,返回值决定是否继续保留
  keep = idler.queueidle();
  } catch (throwable t) {
  log.wtf(tag, "idlehandler threw exception", t);
  }

  if (!keep) {
  synchronized (this) {
   midlehandlers.remove(idler);
  }
  }
 }
 pendingidlehandlercount = 0;
 nextpolltimeoutmillis = 0;
 }
}

Android7.0 MessageQueue详解

整个提取消息的过程,大致上如上图所示。
可以看到在java层,looper除了要取出messagequeue的消息外,还会在队列空闲期执行idlehandler定义的函数。

2.1 nativepollonce
现在唯一的疑点是nativepollonce是如何处理native层数据的,我们看看对应的native函数:

static void android_os_messagequeue_nativepollonce(jnienv* env, jobject obj,
 jlong ptr, jint timeoutmillis) {
 //果然java层调用native层messagequeue时,将long类型的ptr变为指针
 nativemessagequeue* nativemessagequeue = reinterpret_cast<nativemessagequeue*>(ptr);
 nativemessagequeue->pollonce(env, obj, timeoutmillis);
}

void nativemessagequeue::pollonce(jnienv* env, jobject pollobj, int timeoutmillis) {
 mpollenv = env;
 mpollobj = pollobj;
 //最后还是进入到native层looper的pollonce函数
 mlooper->pollonce(timeoutmillis);
 mpollobj = null;
 mpollenv = null;

 if (mexceptionobj) {
 .........
 }
}

看看native层looper的pollonce函数:

//timeoutmillis为超时等待时间。值为-1时,表示无限等待直到有事件到来;值为0时,表示无需等待
//outfd此时为null,含义是:存储产生事件的文件句柄
//outevents此时为null,含义是:存储outfd上发生了哪些事件,包括可读、可写、错误和中断
//outdata此时为null,含义是:存储上下文数据,其实调用时传入的参数
int looper::pollonce(int timeoutmillis, int* outfd, int* outevents, void** outdata) {
 int result = 0;
 for (;;) {
 //处理response,目前我们先不关注response的内含
 while (mresponseindex < mresponses.size()) {
  const response& response = mresponses.itemat(mresponseindex++);
  int ident = response.request.ident;
  if (ident >= 0) {
  int fd = response.request.fd;
  int events = response.events;
  void* data = response.request.data;

  if (outfd != null) *outfd = fd;
  if (outevents != null) *outevents = events;
  if (outdata != null) *outdata = data;
  return ident;
  }
 }

 //根据pollinner的结果,进行操作
 if (result != 0) {
  if (outfd != null) *outfd = 0;
  if (outevents != null) *outevents = 0;
  if (outdata != null) *outdata = null;
  return result;
 }

 //主力还是靠pollinner
 result = pollinner(timeoutmillis);
 }
}

跟进一下pollinner函数:

int looper::pollinner(int timeoutmillis) {
 // adjust the timeout based on when the next message is due.
 //timeoutmillis是java层事件等待事件
 //native层维持了native message的等待时间
 //此处其实就是选择最小的等待时间
 if (timeoutmillis != 0 && mnextmessageuptime != llong_max) {
  nsecs_t now = systemtime(system_time_monotonic);
  int messagetimeoutmillis = tomillisecondtimeoutdelay(now, mnextmessageuptime);
  if (messagetimeoutmillis >= 0
  && (timeoutmillis < 0 || messagetimeoutmillis < timeoutmillis)) {
  timeoutmillis = messagetimeoutmillis;
 }
 }

 int result = poll_wake;
 //pollinner初始就清空response
 mresponses.clear();
 mresponseindex = 0;

 // we are about to idle.
 mpolling = true;

 //利用epoll等待mepollfd监控的句柄上事件到达
 struct epoll_event eventitems[epoll_max_events];
 int eventcount = epoll_wait(mepollfd, eventitems, epoll_max_events, timeoutmillis);

 // no longer idling.
 mpolling = false;

 // acquire lock.
 mlock.lock();

 //重新调用rebuildepolllocked时,将使得epoll句柄能够监听新加入request对应的fd
 if (mepollrebuildrequired) {
 mepollrebuildrequired = false;
 rebuildepolllocked();
 goto done;
 }

 // check for poll error.
 if (eventcount < 0) {
 if (errno == eintr) {
  goto done;
 }
 ......
 result = poll_error;
 goto done;
 }

 // check for poll timeout.
 if (eventcount == 0) {
 result = poll_timeout;
 goto done;
 }

 for (int i = 0; i < eventcount; i++) {
 if (fd == mwakeeventfd) {
  if (epollevents & epollin) {
  //前面已经分析过,当java层或native层有数据写入队列时,将写mwakeeventfd,以触发epoll唤醒
  //awoken将读取并清空mwakeeventfd上的数据
  awoken();
  } else {
  .........
  }
 } else {
  //epoll同样监听的request对应的fd
  ssize_t requestindex = mrequests.indexofkey(fd);
  if (requestindex >= 0) {
  int events = 0;
  if (epollevents & epollin) events |= event_input;
  if (epollevents & epollout) events |= event_output;
  if (epollevents & epollerr) events |= event_error;
  if (epollevents & epollhup) events |= event_hangup;
  //存储这个fd对应的response
  pushresponse(events, mrequests.valueat(requestindex));
  } else {
  ..........
  }
 }
 }

done:

 // invoke pending message callbacks.
 mnextmessageuptime = llong_max;
 //处理native层的message
 while (mmessageenvelopes.size() != 0) {
 nsecs_t now = systemtime(system_time_monotonic);
 const messageenvelope& messageenvelope = mmessageenvelopes.itemat(0);
 if (messageenvelope.uptime <= now) {
  // remove the envelope from the list.
  // we keep a strong reference to the handler until the call to handlemessage
  // finishes. then we drop it so that the handler can be deleted *before*
  // we reacquire our lock.
  {
  sp<messagehandler> handler = messageenvelope.handler;
  message message = messageenvelope.message;
  mmessageenvelopes.removeat(0);
  msendingmessage = true;
  mlock.unlock();

  //处理native message
  handler->handlemessage(message);
  }
  mlock.lock();
  msendingmessage = false;
  result = poll_callback;
 } else {
  // the last message left at the head of the queue determines the next wakeup time.
  mnextmessageuptime = messageenvelope.uptime;
  break;
 }
 }

 // release lock.
 mlock.unlock();

 //处理带回调函数的response
 for (size_t i = 0; i < mresponses.size(); i++) {
 response& response = mresponses.edititemat(i);
 if (response.request.ident == poll_callback) {
  int fd = response.request.fd;
  int events = response.events;
  void* data = response.request.data;

  //调用response的callback
  int callbackresult = response.request.callback->handleevent(fd, events, data);
  if (callbackresult == 0) {
  removefd(fd, response.request.seq);
  }

  response.request.callback.clear();
  result = poll_callback;
 }
 }
 return result;
}

Android7.0 MessageQueue详解

说实话native层的代码写的很乱,该函数的功能比较多。
如上图所示,在nativepollonce中利用epoll监听是否有数据到来,然后处理native message、native response。

最后,我们看看如何在native层中加入request。

3 添加监控请求
native层增加request依赖于looper的接口addfd:

//fd表示需要监听的句柄
//ident的含义还没有搞明白
//events表示需要监听的事件,例如event_input、event_output、event_error和event_hangup中的一个或多个
//callback为事件发生后的回调函数
//data为回调函数对应的参数
int looper::addfd(int fd, int ident, int events, looper_callbackfunc callback, void* data) {
 return addfd(fd, ident, events, callback ? new simpleloopercallback(callback) : null, data);
}

结合上文native层轮询队列的操作,我们大致可以知道:addfd的目的,就是让native层的looper监控新加入的fd上是否有指定事件发生。
如果发生了指定的事件,就利用回调函数及参数构造对应的response。
native层的looper处理response时,就可以执行对应的回调函数了。

看看实际的代码:

int looper::addfd(int fd, int ident, int events, const sp<loopercallback>& callback, void* data) {
 ........
 {
 automutex _l(mlock);

 //利用参数构造一个request
 request request;
 request.fd = fd;
 request.ident = ident;
 request.events = events;
 request.seq = mnextrequestseq++;
 request.callback = callback;
 request.data = data;
 if (mnextrequestseq == -1) mnextrequestseq = 0; // reserve sequence number -1

 struct epoll_event eventitem;
 request.initeventitem(&eventitem);

 //判断之前是否已经利用该fd构造过request
 ssize_t requestindex = mrequests.indexofkey(fd);
 if (requestindex < 0) {
  //mepollfd新增一个需监听fd
  int epollresult = epoll_ctl(mepollfd, epoll_ctl_add, fd, & eventitem);
  .......
  mrequests.add(fd, request);
 } else {
  //mepollfd修改旧的fd对应的监听事件
  int epollresult = epoll_ctl(mepollfd, epoll_ctl_mod, fd, & eventitem);
  if (epollresult < 0) {
  if (errno == enoent) {
   // tolerate enoent because it means that an older file descriptor was
   // closed before its callback was unregistered and meanwhile a new
   // file descriptor with the same number has been created and is now
   // being registered for the first time. 
   epollresult = epoll_ctl(mepollfd, epoll_ctl_add, fd, & eventitem);
   .......
  }
  //发生错误重新加入时,安排epollrebuildlocked,将让epollfd重新添加一次待监听的fd
  scheduleepollrebuildlocked();
  }
  mrequests.replacevalueat(requestindex, request);
 }
 }
}

对加入监控请求的处理,在上文介绍pollinner函数时已做分析,此处不再赘述。

三、总结

1、流程总结

Android7.0 MessageQueue详解

messagequeue的整个流程包括了java部分和native部分,从图中可以看出native层的比重还是很大的。我们结合上图回忆一下整个messagequeue对应的处理流程:
1、java层创建looper对象时,将会创建java层的messagequeue;java层的messagequeue初始化时,将利用native函数创建出native层的messagequeue。

2、native层的messagequeue初始化后,将创建对应的native looper对象。native对象初始化时,将创建对应epollfd和wakeeventfd。其中,epollfd将作为epoll的监听句柄,初始时epollfd仅监听wakeeventfd。

3、图中红色线条为looper从messagequeue中取消息时,处理逻辑的流向。
3.1、当java层的looper开始循环时,首先需要通过jni函数调用native looper进行pollonce的操作。

3.2、native looper开始运行后,需要等待epollfd被唤醒。当epollfd等待超时或监听的句柄有事件到来,native looper就可以开始处理事件了。

3.3、在native层,native looper将先处理native messagequeue中的消息,再调用response对应的回调函数。

3.4、本次循环中,native层事件处理完毕后,才开始处理java层中messagequeue的消息。若messagequeue中没有消息需要处理,并且messagequeue中存在idlehandler时,将调用idlehandler定义的处理函数。

图中蓝色部分为对应的函数调用:
在java层:
利用messagequeue的addidlehandler,可以为messagequeue增加idlehandler;
利用messagequeue的enqueuemessage,可以向messagequeue增加消息;必要时将利用native函数向native层的wakeeventfd写入消息,以唤醒epollfd。

在native层:
利用looper:sendmessage,可以为native messagequeue增加消息;同样,要时将向native层的wakeeventfd写入消息,以唤醒epollfd;
利用looper:addfd,可以向native looper注册监听请求,监听请求包含需监听的fd、监听的事件及对应的回调函数等,监听请求对应的fd将被成为epollfd监听的对象。当被监听的fd发生对应的事件后,将会唤醒epollfd,此时将生成对应response加入的response list中,等待处理。一旦response被处理,就会调用对应的回调函数。

2、注意事项
messagequeue在java层和native层有各自的存储结构,可以分别增加消息。从处理逻辑来看,会优先处理native层的message,然后处理native层生成的response,最后才是处理java层的message。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。