欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  移动技术

Android6.0 消息机制原理解析

程序员文章站 2024-03-06 13:50:56
消息都是存放在一个消息队列中去,而消息循环线程就是围绕这个消息队列进入一个无限循环的,直到线程退出。如果队列中有消息,消息循环线程就会把它取出来,并分发给相应的handle...

消息都是存放在一个消息队列中去,而消息循环线程就是围绕这个消息队列进入一个无限循环的,直到线程退出。如果队列中有消息,消息循环线程就会把它取出来,并分发给相应的handler进行处理;如果队列中没有消息,消息循环线程就会进入空闲等待状态,等待下一个消息的到来。在编写android应用程序时,当程序执行的任务比较繁重时,为了不阻塞ui主线程而导致anr的发生,我们通常的做法的创建一个子线程来完成特定的任务。在创建子线程时,有两种选择,一种通过创建thread对象来创建一个无消息循环的子线程;还有一种就是创建一个带有消息循环的子线程,而创建带有消息循环的子线程由于两种实现方法,一种是直接利用android给我们封装好的handlerthread类来直接生成一个带有消息循环的线程对象,另一种方法是在实现线程的run()方法内使用以下方式启动一个消息循环: 

一、消息机制使用 

通常消息都是有一个消息线程和一个handler组成,下面我们看powermanagerservice中的一个消息handler:        

 mhandlerthread = new servicethread(tag,
        process.thread_priority_display, false /*allowio*/);
    mhandlerthread.start();
    mhandler = new powermanagerhandler(mhandlerthread.getlooper()); 

这里的servicethread就是一个handlerthread,创建handler的时候,必须把handlerthread的looper传进去,否则就是默认当前线程的looper。 

而每个handler,大致如下:

   private final class powermanagerhandler extends handler {
    public powermanagerhandler(looper looper) {
      super(looper, null, true /*async*/);
    }

    @override
    public void handlemessage(message msg) {
      switch (msg.what) {
        case msg_user_activity_timeout:
          handleuseractivitytimeout();
          break;
        case msg_sandman:
          handlesandman();
          break;
        case msg_screen_brightness_boost_timeout:
          handlescreenbrightnessboosttimeout();
          break;
        case msg_check_wake_lock_acquire_timeout:
          checkwakelockaquiretoolong();
          message m = mhandler.obtainmessage(msg_check_wake_lock_acquire_timeout);
          m.setasynchronous(true);
          mhandler.sendmessagedelayed(m, wake_lock_acquire_too_long_timeout);
          break;
      }
    }
  }

二、消息机制原理
那我们先来看下handlerthread的主函数run函数: 

 public void run() {
    mtid = process.mytid();
    looper.prepare();
    synchronized (this) {
      mlooper = looper.mylooper();//赋值后notifyall,主要是getlooper函数返回的是mlooper
      notifyall();
    }
    process.setthreadpriority(mpriority);
    onlooperprepared();
    looper.loop();
    mtid = -1;
  }

再来看看lopper的prepare函数,最后新建了一个looper对象,并且放在线程的局部变量中。

public static void prepare() {
    prepare(true);
  }

  private static void prepare(boolean quitallowed) {
    if (sthreadlocal.get() != null) {
      throw new runtimeexception("only one looper may be created per thread");
    }
    sthreadlocal.set(new looper(quitallowed));
  } 

looper的构造函数中创建了messagequeue

   private looper(boolean quitallowed) {
    mqueue = new messagequeue(quitallowed);
    mthread = thread.currentthread();
  } 

我们再来看下messagequeue的构造函数,其中nativeinit是一个native方法,并且把返回值保存在mptr显然是用long型变量保存的指针

messagequeue(boolean quitallowed) {
    mquitallowed = quitallowed;
    mptr = nativeinit();
  } 

native函数中主要创建了nativemessagequeue对象,并且把指针变量返回了。

 static jlong android_os_messagequeue_nativeinit(jnienv* env, jclass clazz) {
  nativemessagequeue* nativemessagequeue = new nativemessagequeue();
  if (!nativemessagequeue) {
    jnithrowruntimeexception(env, "unable to allocate native queue");
    return 0;
  }

  nativemessagequeue->incstrong(env);
  return reinterpret_cast<jlong>(nativemessagequeue);
} 

nativemessagequeue构造函数就是获取mlooper,如果没有就是新建一个looper 

nativemessagequeue::nativemessagequeue() :
    mpollenv(null), mpollobj(null), mexceptionobj(null) {
  mlooper = looper::getforthread();
  if (mlooper == null) {
    mlooper = new looper(false);
    looper::setforthread(mlooper);
  }
}

然后我们再看下looper的构造函数,显示调用了eventfd创建了一个fd,eventfd它的主要是用于进程或者线程间的通信,我们可以看下这篇博客eventfd介绍

 looper::looper(bool allownoncallbacks) :
    mallownoncallbacks(allownoncallbacks), msendingmessage(false),
    mpolling(false), mepollfd(-1), mepollrebuildrequired(false),
    mnextrequestseq(0), mresponseindex(0), mnextmessageuptime(llong_max) {
  mwakeeventfd = eventfd(0, efd_nonblock);
  log_always_fatal_if(mwakeeventfd < 0, "could not make wake event fd. errno=%d", errno);

  automutex _l(mlock);
  rebuildepolllocked();
}

2.1 c层创建epoll 

我们再来看下rebuildepolllocked函数,创建了epoll,并且把mwakeeventfd加入epoll,而且把mrequests的fd也加入epoll

 void looper::rebuildepolllocked() {
  // close old epoll instance if we have one.
  if (mepollfd >= 0) {
#if debug_callbacks
    alogd("%p ~ rebuildepolllocked - rebuilding epoll set", this);
#endif
    close(mepollfd);
  }

  // allocate the new epoll instance and register the wake pipe.
  mepollfd = epoll_create(epoll_size_hint);
  log_always_fatal_if(mepollfd < 0, "could not create epoll instance. errno=%d", errno);

  struct epoll_event eventitem;
  memset(& eventitem, 0, sizeof(epoll_event)); // zero out unused members of data field union
  eventitem.events = epollin;
  eventitem.data.fd = mwakeeventfd;
  int result = epoll_ctl(mepollfd, epoll_ctl_add, mwakeeventfd, & eventitem);
  log_always_fatal_if(result != 0, "could not add wake event fd to epoll instance. errno=%d",
      errno);

  for (size_t i = 0; i < mrequests.size(); i++) {
    const request& request = mrequests.valueat(i);
    struct epoll_event eventitem;
    request.initeventitem(&eventitem);

    int epollresult = epoll_ctl(mepollfd, epoll_ctl_add, request.fd, & eventitem);
    if (epollresult < 0) {
      aloge("error adding epoll events for fd %d while rebuilding epoll set, errno=%d",
          request.fd, errno);
    }
  }
} 

继续回到handlerthread的run函数,我们继续分析looper的loop函数

public void run() {
    mtid = process.mytid();
    looper.prepare();
    synchronized (this) {
      mlooper = looper.mylooper();
      notifyall();
    }
    process.setthreadpriority(mpriority);
    onlooperprepared();
    looper.loop();
    mtid = -1;
  } 

我们看看looper的loop函数:

public static void loop() {
    final looper me = mylooper();
    if (me == null) {
      throw new runtimeexception("no looper; looper.prepare() wasn't called on this thread.");
    }
    final messagequeue queue = me.mqueue;//得到looper的mqueue

    // make sure the identity of this thread is that of the local process,
    // and keep track of what that identity token actually is.
    binder.clearcallingidentity();
    final long ident = binder.clearcallingidentity();

    for (;;) {
      message msg = queue.next(); // might block这个函数会阻塞,阻塞主要是epoll_wait
      if (msg == null) {
        // no message indicates that the message queue is quitting.
        return;
      }

      // this must be in a local variable, in case a ui event sets the logger
      printer logging = me.mlogging;//自己打的打印
      if (logging != null) {
        logging.println(">>>>> dispatching to " + msg.target + " " +
            msg.callback + ": " + msg.what);
      }

      msg.target.dispatchmessage(msg);

      if (logging != null) {
        logging.println("<<<<< finished to " + msg.target + " " + msg.callback);
      }

      // make sure that during the course of dispatching the
      // identity of the thread wasn't corrupted.
      final long newident = binder.clearcallingidentity();
      if (ident != newident) {
        log.wtf(tag, "thread identity changed from 0x"
            + long.tohexstring(ident) + " to 0x"
            + long.tohexstring(newident) + " while dispatching to "
            + msg.target.getclass().getname() + " "
            + msg.callback + " what=" + msg.what);
      }

      msg.recycleunchecked();
    }
  }

messagequeue类的next函数主要是调用了nativepollonce函数,后面就是从消息队列中取出一个message

message next() {
    // return here if the message loop has already quit and been disposed.
    // this can happen if the application tries to restart a looper after quit
    // which is not supported.
    final long ptr = mptr;//之前保留的指针
    if (ptr == 0) {
      return null;
    }

    int pendingidlehandlercount = -1; // -1 only during first iteration
    int nextpolltimeoutmillis = 0;
    for (;;) {
      if (nextpolltimeoutmillis != 0) {
        binder.flushpendingcommands();
      }

      nativepollonce(ptr, nextpolltimeoutmillis); 

下面我们主要看下nativepollonce这个native函数,把之前的指针强制转换成nativemessagequeue,然后调用其pollonce函数

static void android_os_messagequeue_nativepollonce(jnienv* env, jobject obj,
    jlong ptr, jint timeoutmillis) {
  nativemessagequeue* nativemessagequeue = reinterpret_cast<nativemessagequeue*>(ptr);
  nativemessagequeue->pollonce(env, obj, timeoutmillis);
}

2.2 c层epoll_wait阻塞 

pollonce函数,这个函数前面的while一般都没有只是处理了indent大于0的情况,这种情况一般没有,所以我们可以直接看pollinner函数

 int looper::pollonce(int timeoutmillis, int* outfd, int* outevents, void** outdata) {
  int result = 0;
  for (;;) {
    while (mresponseindex < mresponses.size()) {
      const response& response = mresponses.itemat(mresponseindex++);
      int ident = response.request.ident;
      if (ident >= 0) {
        int fd = response.request.fd;
        int events = response.events;
        void* data = response.request.data;
#if debug_poll_and_wake
        alogd("%p ~ pollonce - returning signalled identifier %d: "
            "fd=%d, events=0x%x, data=%p",
            this, ident, fd, events, data);
#endif
        if (outfd != null) *outfd = fd;
        if (outevents != null) *outevents = events;
        if (outdata != null) *outdata = data;
        return ident;
      }
    }

    if (result != 0) {
#if debug_poll_and_wake
      alogd("%p ~ pollonce - returning result %d", this, result);
#endif
      if (outfd != null) *outfd = 0;
      if (outevents != null) *outevents = 0;
      if (outdata != null) *outdata = null;
      return result;
    }

    result = pollinner(timeoutmillis);
  }
} 

pollinner函数主要就是调用epoll_wait阻塞,并且java层会计算每次阻塞的时间传到c层,等待有mwakeeventfd或者之前addfd的fd有事件过来,才会epoll_wait返回。 

int looper::pollinner(int timeoutmillis) {
#if debug_poll_and_wake
  alogd("%p ~ pollonce - waiting: timeoutmillis=%d", this, timeoutmillis);
#endif

  // adjust the timeout based on when the next message is due.
  if (timeoutmillis != 0 && mnextmessageuptime != llong_max) {
    nsecs_t now = systemtime(system_time_monotonic);
    int messagetimeoutmillis = tomillisecondtimeoutdelay(now, mnextmessageuptime);
    if (messagetimeoutmillis >= 0
        && (timeoutmillis < 0 || messagetimeoutmillis < timeoutmillis)) {
      timeoutmillis = messagetimeoutmillis;
    }
#if debug_poll_and_wake
    alogd("%p ~ pollonce - next message in %" prid64 "ns, adjusted timeout: timeoutmillis=%d",
        this, mnextmessageuptime - now, timeoutmillis);
#endif
  }

  // poll.
  int result = poll_wake;
  mresponses.clear();//清空mresponses
  mresponseindex = 0;

  // we are about to idle.
  mpolling = true;

  struct epoll_event eventitems[epoll_max_events];
  int eventcount = epoll_wait(mepollfd, eventitems, epoll_max_events, timeoutmillis);//epoll_wait主要线程阻塞在这,这个阻塞的时间也是有java层传过来的

  // no longer idling.
  mpolling = false;

  // acquire lock.
  mlock.lock();

  // rebuild epoll set if needed.
  if (mepollrebuildrequired) {
    mepollrebuildrequired = false;
    rebuildepolllocked();
    goto done;
  }

  // check for poll error.
  if (eventcount < 0) {
    if (errno == eintr) {
      goto done;
    }
    alogw("poll failed with an unexpected error, errno=%d", errno);
    result = poll_error;
    goto done;
  }

  // check for poll timeout.
  if (eventcount == 0) {
#if debug_poll_and_wake
    alogd("%p ~ pollonce - timeout", this);
#endif
    result = poll_timeout;
    goto done;
  }

  // handle all events.
#if debug_poll_and_wake
  alogd("%p ~ pollonce - handling events from %d fds", this, eventcount);
#endif

  for (int i = 0; i < eventcount; i++) {
    int fd = eventitems[i].data.fd;
    uint32_t epollevents = eventitems[i].events;
    if (fd == mwakeeventfd) {//通知唤醒线程的事件
      if (epollevents & epollin) {
        awoken();
      } else {
        alogw("ignoring unexpected epoll events 0x%x on wake event fd.", epollevents);
      }
    } else {
      ssize_t requestindex = mrequests.indexofkey(fd);//之前addfd的事件
      if (requestindex >= 0) {
        int events = 0;
        if (epollevents & epollin) events |= event_input;
        if (epollevents & epollout) events |= event_output;
        if (epollevents & epollerr) events |= event_error;
        if (epollevents & epollhup) events |= event_hangup;
        pushresponse(events, mrequests.valueat(requestindex));//放在mresponses中
      } else {
        alogw("ignoring unexpected epoll events 0x%x on fd %d that is "
            "no longer registered.", epollevents, fd);
      }
    }
  }
done: ;

  // invoke pending message callbacks.
  mnextmessageuptime = llong_max;
  while (mmessageenvelopes.size() != 0) {// 这块主要是c层的消息,java层的消息是自己管理的
    nsecs_t now = systemtime(system_time_monotonic);
    const messageenvelope& messageenvelope = mmessageenvelopes.itemat(0);
    if (messageenvelope.uptime <= now) {
      // remove the envelope from the list.
      // we keep a strong reference to the handler until the call to handlemessage
      // finishes. then we drop it so that the handler can be deleted *before*
      // we reacquire our lock.
      { // obtain handler
        sp<messagehandler> handler = messageenvelope.handler;
        message message = messageenvelope.message;
        mmessageenvelopes.removeat(0);
        msendingmessage = true;
        mlock.unlock();

#if debug_poll_and_wake || debug_callbacks
        alogd("%p ~ pollonce - sending message: handler=%p, what=%d",
            this, handler.get(), message.what);
#endif
        handler->handlemessage(message);
      } // release handler

      mlock.lock();
      msendingmessage = false;
      result = poll_callback;
    } else {
      // the last message left at the head of the queue determines the next wakeup time.
      mnextmessageuptime = messageenvelope.uptime;
      break;
    }
  }

  // release lock.
  mlock.unlock();

  // invoke all response callbacks.
  for (size_t i = 0; i < mresponses.size(); i++) {//这是之前addfd的事件的处理,主要是遍历mresponses,然后调用其回调
    response& response = mresponses.edititemat(i);
    if (response.request.ident == poll_callback) {
      int fd = response.request.fd;
      int events = response.events;
      void* data = response.request.data;
#if debug_poll_and_wake || debug_callbacks
      alogd("%p ~ pollonce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
          this, response.request.callback.get(), fd, events, data);
#endif
      // invoke the callback. note that the file descriptor may be closed by
      // the callback (and potentially even reused) before the function returns so
      // we need to be a little careful when removing the file descriptor afterwards.
      int callbackresult = response.request.callback->handleevent(fd, events, data);
      if (callbackresult == 0) {
        removefd(fd, response.request.seq);
      }

      // clear the callback reference in the response structure promptly because we
      // will not clear the response vector itself until the next poll.
      response.request.callback.clear();
      result = poll_callback;
    }
  }
  return result;
} 

继续分析looper的loop函数,可以增加自己的打印来调试代码,之前调用message的target的dispatchmessage来分配消息

     for (;;) {
      message msg = queue.next(); // might block
      if (msg == null) {
        // no message indicates that the message queue is quitting.
        return;
      }

      // this must be in a local variable, in case a ui event sets the logger
      printer logging = me.mlogging;//自己的打印
      if (logging != null) {
        logging.println(">>>>> dispatching to " + msg.target + " " +
            msg.callback + ": " + msg.what);
      }

      msg.target.dispatchmessage(msg);

      if (logging != null) {
        logging.println("<<<<< finished to " + msg.target + " " + msg.callback);
      }

      // make sure that during the course of dispatching the
      // identity of the thread wasn't corrupted.
      final long newident = binder.clearcallingidentity();
      if (ident != newident) {
        log.wtf(tag, "thread identity changed from 0x"
            + long.tohexstring(ident) + " to 0x"
            + long.tohexstring(newident) + " while dispatching to "
            + msg.target.getclass().getname() + " "
            + msg.callback + " what=" + msg.what);
      }

      msg.recycleunchecked();
    }
  }

2.3 增加调试打印 

我们先来看自己添加打印,可以通过lopper的setmessagelogging函数来打印

public void setmessagelogging(@nullable printer printer) {
    mlogging = printer;
  } 
printer就是一个interface
 
public interface printer {
  /**
   * write a line of text to the output. there is no need to terminate
   * the given string with a newline.
   */
  void println(string x);
}

2.4 java层消息分发处理 

再来看消息的分发,先是调用handler的obtainmessage函数               

 message msg = mhandler.obtainmessage(msg_check_wake_lock_acquire_timeout);
 msg.setasynchronous(true);
 mhandler.sendmessagedelayed(msg, wake_lock_acquire_too_long_timeout); 

先看obtainmessage调用了message的obtain函数

public final message obtainmessage(int what)
  {
    return message.obtain(this, what);
  } 

message的obtain函数就是新建一个message,然后其target就是设置成其handler

public static message obtain(handler h, int what) {
    message m = obtain();//就是新建一个message
    m.target = h;
    m.what = what;

    return m;
  }
 

我们再联系之前分发消息 

msg.target.dispatchmessage(msg);最后就是调用handler的dispatchmessage函数,最后在handler中,最后会根据不同的情况对消息进行处理。

   public void dispatchmessage(message msg) {
    if (msg.callback != null) {
      handlecallback(msg);//这种就是用post形式发送,带runnable的
    } else {
      if (mcallback != null) {//这种是handler传参的时候就是传入了mcallback回调了
        if (mcallback.handlemessage(msg)) {
          return;
        }
      }
      handlemessage(msg);//最后就是在自己实现的handlemessage处理
    }
  }

2.3 java层 消息发送 

我们再看下java层的消息发送,主要也是调用handler的sendmessage post之类函数,最终都会调用下面这个函数

   public boolean sendmessageattime(message msg, long uptimemillis) {
    messagequeue queue = mqueue;
    if (queue == null) {
      runtimeexception e = new runtimeexception(
          this + " sendmessageattime() called with no mqueue");
      log.w("looper", e.getmessage(), e);
      return false;
    }
    return enqueuemessage(queue, msg, uptimemillis);
  } 

我们再来看java层发送消息最终都会调用enqueuemessage函数

private boolean enqueuemessage(messagequeue queue, message msg, long uptimemillis) {
    msg.target = this;
    if (masynchronous) {
      msg.setasynchronous(true);
    }
    return queue.enqueuemessage(msg, uptimemillis);
  } 

最终在enqueuemessage中,把消息加入消息队列,然后需要的话就调用c层的nativewake函数

boolean enqueuemessage(message msg, long when) {
    if (msg.target == null) {
      throw new illegalargumentexception("message must have a target.");
    }
    if (msg.isinuse()) {
      throw new illegalstateexception(msg + " this message is already in use.");
    }

    synchronized (this) {
      if (mquitting) {
        illegalstateexception e = new illegalstateexception(
            msg.target + " sending message to a handler on a dead thread");
        log.w(tag, e.getmessage(), e);
        msg.recycle();
        return false;
      }

      msg.markinuse();
      msg.when = when;
      message p = mmessages;
      boolean needwake;
      if (p == null || when == 0 || when < p.when) {
        // new head, wake up the event queue if blocked.
        msg.next = p;
        mmessages = msg;
        needwake = mblocked;
      } else {
        // inserted within the middle of the queue. usually we don't have to wake
        // up the event queue unless there is a barrier at the head of the queue
        // and the message is the earliest asynchronous message in the queue.
        needwake = mblocked && p.target == null && msg.isasynchronous();
        message prev;
        for (;;) {
          prev = p;
          p = p.next;
          if (p == null || when < p.when) {
            break;
          }
          if (needwake && p.isasynchronous()) {
            needwake = false;
          }
        }
        msg.next = p; // invariant: p == prev.next
        prev.next = msg;
      }

      // we can assume mptr != 0 because mquitting is false.
      if (needwake) {
        nativewake(mptr);
      }
    }
    return true;
  } 

我们看下这个native方法,最后也是调用了looper的wake函数

 static void android_os_messagequeue_nativewake(jnienv* env, jclass clazz, jlong ptr) {
  nativemessagequeue* nativemessagequeue = reinterpret_cast<nativemessagequeue*>(ptr);
  nativemessagequeue->wake();
}
void nativemessagequeue::wake() {
  mlooper->wake();
} 

looper类的wake,函数只是往mwakeeventfd中写了一些内容,这个fd只是通知而已,类似pipe,最后会把epoll_wait唤醒,线程就不阻塞了继续先发送c层消息,然后处理之前addfd的事件,然后处理java层的消息。 

void looper::wake() {
#if debug_poll_and_wake
  alogd("%p ~ wake", this);
#endif

  uint64_t inc = 1;
  ssize_t nwrite = temp_failure_retry(write(mwakeeventfd, &inc, sizeof(uint64_t)));
  if (nwrite != sizeof(uint64_t)) {
    if (errno != eagain) {
      alogw("could not write wake signal, errno=%d", errno);
    }
  }
}

2.4 c层发送消息 

在c层也是可以发送消息的,主要是调用looper的sendmessageattime函数,参数有有一个handler是一个回调,我们把消息放在mmessageenvelopes中。

 void looper::sendmessageattime(nsecs_t uptime, const sp<messagehandler>& handler,
    const message& message) {
#if debug_callbacks
  alogd("%p ~ sendmessageattime - uptime=%" prid64 ", handler=%p, what=%d",
      this, uptime, handler.get(), message.what);
#endif

  size_t i = 0;
  { // acquire lock
    automutex _l(mlock);

    size_t messagecount = mmessageenvelopes.size();
    while (i < messagecount && uptime >= mmessageenvelopes.itemat(i).uptime) {
      i += 1;
    }

    messageenvelope messageenvelope(uptime, handler, message);
    mmessageenvelopes.insertat(messageenvelope, i, 1);

    // optimization: if the looper is currently sending a message, then we can skip
    // the call to wake() because the next thing the looper will do after processing
    // messages is to decide when the next wakeup time should be. in fact, it does
    // not even matter whether this code is running on the looper thread.
    if (msendingmessage) {
      return;
    }
  } // release lock

  // wake the poll loop only when we enqueue a new message at the head.
  if (i == 0) {
    wake();
  }
} 

当在pollonce中,在epoll_wait之后,会遍历mmessageenvelopes中的消息,然后调用其handler的handlemessage函数

   while (mmessageenvelopes.size() != 0) {
    nsecs_t now = systemtime(system_time_monotonic);
    const messageenvelope& messageenvelope = mmessageenvelopes.itemat(0);
    if (messageenvelope.uptime <= now) {
      // remove the envelope from the list.
      // we keep a strong reference to the handler until the call to handlemessage
      // finishes. then we drop it so that the handler can be deleted *before*
      // we reacquire our lock.
      { // obtain handler
        sp<messagehandler> handler = messageenvelope.handler;
        message message = messageenvelope.message;
        mmessageenvelopes.removeat(0);
        msendingmessage = true;
        mlock.unlock();

#if debug_poll_and_wake || debug_callbacks
        alogd("%p ~ pollonce - sending message: handler=%p, what=%d",
            this, handler.get(), message.what);
#endif
        handler->handlemessage(message);
      } // release handler

      mlock.lock();
      msendingmessage = false;
      result = poll_callback;
    } else {
      // the last message left at the head of the queue determines the next wakeup time.
      mnextmessageuptime = messageenvelope.uptime;
      break;
    }
  } 

有一个looper_test.cpp文件,里面介绍了很多looper的使用方法,我们来看下

   sp<stubmessagehandler> handler = new stubmessagehandler();
  mlooper->sendmessageattime(now + ms2ns(100), handler, message(msg_test1)); 
stubmessagehandler继承messagehandler就必须实现handlemessage方法
 
class stubmessagehandler : public messagehandler {
public:
  vector<message> messages;

  virtual void handlemessage(const message& message) {
    messages.push(message);
  }
}; 

我们再顺便看下message和messagehandler类

 struct message {
  message() : what(0) { }
  message(int what) : what(what) { }

  /* the message type. (interpretation is left up to the handler) */
  int what;
};


/**
 * interface for a looper message handler.
 *
 * the looper holds a strong reference to the message handler whenever it has
 * a message to deliver to it. make sure to call looper::removemessages
 * to remove any pending messages destined for the handler so that the handler
 * can be destroyed.
 */
class messagehandler : public virtual refbase {
protected:
  virtual ~messagehandler() { }

public:
  /**
   * handles a message.
   */
  virtual void handlemessage(const message& message) = 0;
};

2.5 c层addfd 

我们也可以在looper.cpp的addfd中增加fd放入线程epoll中,当fd有数据来我们也可以处理相应的数据,下面我们先来看下addfd函数,我们注意其中有一个callback回调

 int looper::addfd(int fd, int ident, int events, looper_callbackfunc callback, void* data) {
  return addfd(fd, ident, events, callback ? new simpleloopercallback(callback) : null, data);
}

int looper::addfd(int fd, int ident, int events, const sp<loopercallback>& callback, void* data) {
#if debug_callbacks
  alogd("%p ~ addfd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
      events, callback.get(), data);
#endif

  if (!callback.get()) {
    if (! mallownoncallbacks) {
      aloge("invalid attempt to set null callback but not allowed for this looper.");
      return -1;
    }

    if (ident < 0) {
      aloge("invalid attempt to set null callback with ident < 0.");
      return -1;
    }
  } else {
    ident = poll_callback;
  }

  { // acquire lock
    automutex _l(mlock);

    request request;
    request.fd = fd;
    request.ident = ident;
    request.events = events;
    request.seq = mnextrequestseq++;
    request.callback = callback;
    request.data = data;
    if (mnextrequestseq == -1) mnextrequestseq = 0; // reserve sequence number -1

    struct epoll_event eventitem;
    request.initeventitem(&eventitem);

    ssize_t requestindex = mrequests.indexofkey(fd);
    if (requestindex < 0) {
      int epollresult = epoll_ctl(mepollfd, epoll_ctl_add, fd, & eventitem);//加入epoll
      if (epollresult < 0) {
        aloge("error adding epoll events for fd %d, errno=%d", fd, errno);
        return -1;
      }
      mrequests.add(fd, request);//放入mrequests中
    } else {
      int epollresult = epoll_ctl(mepollfd, epoll_ctl_mod, fd, & eventitem);//更新
      if (epollresult < 0) {
        if (errno == enoent) {
          // tolerate enoent because it means that an older file descriptor was
          // closed before its callback was unregistered and meanwhile a new
          // file descriptor with the same number has been created and is now
          // being registered for the first time. this error may occur naturally
          // when a callback has the side-effect of closing the file descriptor
          // before returning and unregistering itself. callback sequence number
          // checks further ensure that the race is benign.
          //
          // unfortunately due to kernel limitations we need to rebuild the epoll
          // set from scratch because it may contain an old file handle that we are
          // now unable to remove since its file descriptor is no longer valid.
          // no such problem would have occurred if we were using the poll system
          // call instead, but that approach carries others disadvantages.
#if debug_callbacks
          alogd("%p ~ addfd - epoll_ctl_mod failed due to file descriptor "
              "being recycled, falling back on epoll_ctl_add, errno=%d",
              this, errno);
#endif
          epollresult = epoll_ctl(mepollfd, epoll_ctl_add, fd, & eventitem);
          if (epollresult < 0) {
            aloge("error modifying or adding epoll events for fd %d, errno=%d",
                fd, errno);
            return -1;
          }
          scheduleepollrebuildlocked();
        } else {
          aloge("error modifying epoll events for fd %d, errno=%d", fd, errno);
          return -1;
        }
      }
      mrequests.replacevalueat(requestindex, request);
    }
  } // release lock
  return 1;
} 

在pollonce函数中,我们先寻找mrequests中匹配的fd,然后在pushresponse中新建一个response,然后把response和request匹配起来。

     } else {
      ssize_t requestindex = mrequests.indexofkey(fd);
      if (requestindex >= 0) {
        int events = 0;
        if (epollevents & epollin) events |= event_input;
        if (epollevents & epollout) events |= event_output;
        if (epollevents & epollerr) events |= event_error;
        if (epollevents & epollhup) events |= event_hangup;
        pushresponse(events, mrequests.valueat(requestindex));
      } else {
        alogw("ignoring unexpected epoll events 0x%x on fd %d that is "
            "no longer registered.", epollevents, fd);
      }
    } 

下面我们就会遍历mresponses中的response,然后调用其request中的回调

   for (size_t i = 0; i < mresponses.size(); i++) {
    response& response = mresponses.edititemat(i);
    if (response.request.ident == poll_callback) {
      int fd = response.request.fd;
      int events = response.events;
      void* data = response.request.data;
#if debug_poll_and_wake || debug_callbacks
      alogd("%p ~ pollonce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
          this, response.request.callback.get(), fd, events, data);
#endif
      // invoke the callback. note that the file descriptor may be closed by
      // the callback (and potentially even reused) before the function returns so
      // we need to be a little careful when removing the file descriptor afterwards.
      int callbackresult = response.request.callback->handleevent(fd, events, data);
      if (callbackresult == 0) {
        removefd(fd, response.request.seq);
      }

      // clear the callback reference in the response structure promptly because we
      // will not clear the response vector itself until the next poll.
      response.request.callback.clear();
      result = poll_callback;
    }
  } 

同样我们再来看看looper_test.cpp是如何使用的?

   pipe pipe;
  stubcallbackhandler handler(true);

  handler.setcallback(mlooper, pipe.receivefd, looper::event_input); 

我们看下handler的setcallback函数

class callbackhandler {
public:
  void setcallback(const sp<looper>& looper, int fd, int events) {
    looper->addfd(fd, 0, events, statichandler, this);//就是调用了looper的addfd函数,并且回调
  }

protected:
  virtual ~callbackhandler() { }

  virtual int handler(int fd, int events) = 0;

private:
  static int statichandler(int fd, int events, void* data) {//这个就是回调函数
    return static_cast<callbackhandler*>(data)->handler(fd, events);
  }
};

class stubcallbackhandler : public callbackhandler {
public:
  int nextresult;
  int callbackcount;

  int fd;
  int events;

  stubcallbackhandler(int nextresult) : nextresult(nextresult),
      callbackcount(0), fd(-1), events(-1) {
  }

protected:
  virtual int handler(int fd, int events) {//这个是通过回调函数再调到这里的
    callbackcount += 1;
    this->fd = fd;
    this->events = events;
    return nextresult;
  }
}; 

我们结合looper的addfd一起来看,当callback是有的,我们新建一个simpleloopercallback

 int looper::addfd(int fd, int ident, int events, looper_callbackfunc callback, void* data) {
  return addfd(fd, ident, events, callback ? new simpleloopercallback(callback) : null, data);
} 

这里的looper_callbackfunc是一个typedef
typedef int (*looper_callbackfunc)(int fd, int events, void* data);

我们再来看simpleloopercallback

 class simpleloopercallback : public loopercallback {
protected:
  virtual ~simpleloopercallback();

public:
  simpleloopercallback(looper_callbackfunc callback);
  virtual int handleevent(int fd, int events, void* data);

private:
  looper_callbackfunc mcallback;
};simpleloopercallback::simpleloopercallback(looper_callbackfunc callback) :
    mcallback(callback) {
}

simpleloopercallback::~simpleloopercallback() {
}

int simpleloopercallback::handleevent(int fd, int events, void* data) {
  return mcallback(fd, events, data);
} 

最后我们是调用callback->handleevent(fd, events, data),而callback就是simpleloopercallback,这里的data,之前传进来的就是callbackhandler 的this指针
 因此最后就是调用了statichandler,而data->handler,就是this->handler,最后是虚函数就调用到了stubcallbackhandler 的handler函数中了。 

当然我们也可以不用这么复杂,直接使用第二个addfd函数,当然callback我们需要自己定义一个类来实现loopercallback类就行了,这样就简单多了。
 int addfd(int fd, int ident, int events, const sp<loopercallback>& callback, void* data);

2.6 java层addfd 

一直以为只能在c层的looper中才能addfd,原来在java层也通过jni做了这个功能。 

我们可以在messagequeue中的addonfiledescriptoreventlistener来实现这个功能

   public void addonfiledescriptoreventlistener(@nonnull filedescriptor fd,
      @onfiledescriptoreventlistener.events int events,
      @nonnull onfiledescriptoreventlistener listener) {
    if (fd == null) {
      throw new illegalargumentexception("fd must not be null");
    }
    if (listener == null) {
      throw new illegalargumentexception("listener must not be null");
    }

    synchronized (this) {
      updateonfiledescriptoreventlistenerlocked(fd, events, listener);
    }
  }

我们再来看看onfiledescriptoreventlistener 这个回调

   public interface onfiledescriptoreventlistener {
    public static final int event_input = 1 << 0;
    public static final int event_output = 1 << 1;
    public static final int event_error = 1 << 2;

    /** @hide */
    @retention(retentionpolicy.source)
    @intdef(flag=true, value={event_input, event_output, event_error})
    public @interface events {}


    @events int onfiledescriptorevents(@nonnull filedescriptor fd, @events int events);
  }

接着调用了updateonfiledescriptoreventlistenerlocked函数

 private void updateonfiledescriptoreventlistenerlocked(filedescriptor fd, int events,
      onfiledescriptoreventlistener listener) {
    final int fdnum = fd.getint$();

    int index = -1;
    filedescriptorrecord record = null;
    if (mfiledescriptorrecords != null) {
      index = mfiledescriptorrecords.indexofkey(fdnum);
      if (index >= 0) {
        record = mfiledescriptorrecords.valueat(index);
        if (record != null && record.mevents == events) {
          return;
        }
      }
    }

    if (events != 0) {
      events |= onfiledescriptoreventlistener.event_error;
      if (record == null) {
        if (mfiledescriptorrecords == null) {
          mfiledescriptorrecords = new sparsearray<filedescriptorrecord>();
        }
        record = new filedescriptorrecord(fd, events, listener);//fd保存在filedescriptorrecord对象
        mfiledescriptorrecords.put(fdnum, record);//mfiledescriptorrecords然后保存在
      } else {
        record.mlistener = listener;
        record.mevents = events;
        record.mseq += 1;
      }
      nativesetfiledescriptorevents(mptr, fdnum, events);//调用native函数
    } else if (record != null) {
      record.mevents = 0;
      mfiledescriptorrecords.removeat(index);
    }
  } 

native最后调用了nativemessagequeue的setfiledescriptorevents函数 

static void android_os_messagequeue_nativesetfiledescriptorevents(jnienv* env, jclass clazz,
    jlong ptr, jint fd, jint events) {
  nativemessagequeue* nativemessagequeue = reinterpret_cast<nativemessagequeue*>(ptr);
  nativemessagequeue->setfiledescriptorevents(fd, events);
}

setfiledescriptorevents函数,这个addfd就是调用的第二个addfd,因此我们可以肯定nativemessagequeue继承了loopercallback

 void nativemessagequeue::setfiledescriptorevents(int fd, int events) {
  if (events) {
    int looperevents = 0;
    if (events & callback_event_input) {
      looperevents |= looper::event_input;
    }
    if (events & callback_event_output) {
      looperevents |= looper::event_output;
    }
    mlooper->addfd(fd, looper::poll_callback, looperevents, this,
        reinterpret_cast<void*>(events));
  } else {
    mlooper->removefd(fd);
  }
}

果然是,需要实现handleevent函数

 class nativemessagequeue : public messagequeue, public loopercallback {
public:
  nativemessagequeue();
  virtual ~nativemessagequeue();

  virtual void raiseexception(jnienv* env, const char* msg, jthrowable exceptionobj);

  void pollonce(jnienv* env, jobject obj, int timeoutmillis);
  void wake();
  void setfiledescriptorevents(int fd, int events);

  virtual int handleevent(int fd, int events, void* data);

handleevent就是在looper中epoll_wait之后,当我们增加的fd有数据就会调用这个函数

 int nativemessagequeue::handleevent(int fd, int looperevents, void* data) {
  int events = 0;
  if (looperevents & looper::event_input) {
    events |= callback_event_input;
  }
  if (looperevents & looper::event_output) {
    events |= callback_event_output;
  }
  if (looperevents & (looper::event_error | looper::event_hangup | looper::event_invalid)) {
    events |= callback_event_error;
  }
  int oldwatchedevents = reinterpret_cast<intptr_t>(data);
  int newwatchedevents = mpollenv->callintmethod(mpollobj,
      gmessagequeueclassinfo.dispatchevents, fd, events); //调用回调
  if (!newwatchedevents) {
    return 0; // unregister the fd
  }
  if (newwatchedevents != oldwatchedevents) {
    setfiledescriptorevents(fd, newwatchedevents);
  }
  return 1;
}

最后在java的messagequeue中的dispatchevents就是在jni层反调过来的,然后调用之前注册的回调函数

// called from native code.
  private int dispatchevents(int fd, int events) {
    // get the file descriptor record and any state that might change.
    final filedescriptorrecord record;
    final int oldwatchedevents;
    final onfiledescriptoreventlistener listener;
    final int seq;
    synchronized (this) {
      record = mfiledescriptorrecords.get(fd);//通过fd得到filedescriptorrecord 
      if (record == null) {
        return 0; // spurious, no listener registered
      }

      oldwatchedevents = record.mevents;
      events &= oldwatchedevents; // filter events based on current watched set
      if (events == 0) {
        return oldwatchedevents; // spurious, watched events changed
      }

      listener = record.mlistener;
      seq = record.mseq;
    }

    // invoke the listener outside of the lock.
    int newwatchedevents = listener.onfiledescriptorevents(//listener回调
        record.mdescriptor, events);
    if (newwatchedevents != 0) {
      newwatchedevents |= onfiledescriptoreventlistener.event_error;
    }

    // update the file descriptor record if the listener changed the set of
    // events to watch and the listener itself hasn't been updated since.
    if (newwatchedevents != oldwatchedevents) {
      synchronized (this) {
        int index = mfiledescriptorrecords.indexofkey(fd);
        if (index >= 0 && mfiledescriptorrecords.valueat(index) == record
            && record.mseq == seq) {
          record.mevents = newwatchedevents;
          if (newwatchedevents == 0) {
            mfiledescriptorrecords.removeat(index);
          }
        }
      }
    }

    // return the new set of events to watch for native code to take care of.
    return newwatchedevents;
  }

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。