AndroidPN客户端的阻塞读写(2) 博客分类: android androidpn
程序员文章站
2024-02-23 11:51:10
...
AndroidPN客户端分别启动了读和写线程之后,如果有某种写必须等待结果的又如何处理呢?比如客户端向服务端发个消息,要求等待服务端返回后再对本地逻辑做处理。想象一下客户端要求修改密码,PacketWriter把请求write出去后,成功或失败信息肯定是从PacketReader线程中获得,那么如何把二者关联起来呢,看看asmack的做法。
PacketCollector类
public class PacketCollector { private PacketFilter packetFilter; private ArrayBlockingQueue<Packet> resultQueue; private Connection connection; private boolean cancelled = false; /** * Creates a new packet collector. If the packet filter is <tt>null</tt>, then * all packets will match this collector. * * @param conection the connection the collector is tied to. * @param packetFilter determines which packets will be returned by this collector. */ protected PacketCollector(Connection conection, PacketFilter packetFilter) { this(conection, packetFilter, SmackConfiguration.getPacketCollectorSize()); } /** * Creates a new packet collector. If the packet filter is <tt>null</tt>, then * all packets will match this collector. * * @param conection the connection the collector is tied to. * @param packetFilter determines which packets will be returned by this collector. * @param maxSize the maximum number of packets that will be stored in the collector. */ protected PacketCollector(Connection conection, PacketFilter packetFilter, int maxSize) { this.connection = conection; this.packetFilter = packetFilter; this.resultQueue = new ArrayBlockingQueue<Packet>(maxSize); } /** * Explicitly cancels the packet collector so that no more results are * queued up. Once a packet collector has been cancelled, it cannot be * re-enabled. Instead, a new packet collector must be created. */ public void cancel() { // If the packet collector has already been cancelled, do nothing. if (!cancelled) { cancelled = true; connection.removePacketCollector(this); } } /** * Returns the packet filter associated with this packet collector. The packet * filter is used to determine what packets are queued as results. * * @return the packet filter. */ public PacketFilter getPacketFilter() { return packetFilter; } /** * Polls to see if a packet is currently available and returns it, or * immediately returns <tt>null</tt> if no packets are currently in the * result queue. * * @return the next packet result, or <tt>null</tt> if there are no more * results. */ public Packet pollResult() { return resultQueue.poll(); } /** * Returns the next available packet. The method call will block (not return) * until a packet is available. * * @return the next available packet. */ public Packet nextResult() { try { return resultQueue.take(); } catch (InterruptedException e) { throw new RuntimeException(e); } } /** * Returns the next available packet. The method call will block (not return) * until a packet is available or the <tt>timeout</tt> has elapased. If the * timeout elapses without a result, <tt>null</tt> will be returned. * * @param timeout the amount of time to wait for the next packet (in milleseconds). * @return the next available packet. */ public Packet nextResult(long timeout) { try { return resultQueue.poll(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException(e); } } /** * Processes a packet to see if it meets the criteria for this packet collector. * If so, the packet is added to the result queue. * * @param packet the packet to process. */ protected void processPacket(Packet packet) { if (packet == null) { return; } if (packetFilter == null || packetFilter.accept(packet)) { while (!resultQueue.offer(packet)) { // Since we know the queue is full, this poll should never actually block. resultQueue.poll(); } } } }
这个类相当赞,最喜欢这种简单的东西了,我觉得我可以照抄呵呵,基本上不需要看其它代码就能猜asmack是怎么干的了,看看流程
1.首先,如果有需要,比如要修改密码了,就将PacketCollect注册到connection上
/** * Creates a new packet collector for this connection. A packet filter determines * which packets will be accumulated by the collector. A PacketCollector is * more suitable to use than a {@link PacketListener} when you need to wait for * a specific result. * * @param packetFilter the packet filter to use. * @return a new packet collector. */ public PacketCollector createPacketCollector(PacketFilter packetFilter) { PacketCollector collector = new PacketCollector(this, packetFilter); // Add the collector to the list of active collectors. collectors.add(collector); return collector; }
PacketFilter是个过滤器,对于修改密码而言,我只对服务端返回的密码修改结果感兴趣,其它无视,那么它只将相关的接收到的packet放进PacketCollector的ArrayBlockingQueue<Packet>中。注意下processPacket这个方法,队列满了就开始丢弃之前的packet.
2.发起修改密码请求到服务端,其实就是通过PacketWriter线程write一个packet给服务器。
3.坐等服务端返回,可用nextResult()方法,是一直阻塞,还是阻塞一定时间,你自己来定,一般应该是阻塞几秒就差不多了。
4.最后无论有没服务端回应,cancel掉这个PacketCollector,即从connection中将之删除。
来看一段AccounterManager中源码:
public void createAccount(String username, String password, Map<String, String> attributes) throws XMPPException { if (!supportsAccountCreation()) { throw new XMPPException("Server does not support account creation."); } Registration reg = new Registration(); reg.setType(IQ.Type.SET); reg.setTo(connection.getServiceName()); attributes.put("username",username); attributes.put("password",password); reg.setAttributes(attributes); PacketFilter filter = new AndFilter(new PacketIDFilter(reg.getPacketID()), new PacketTypeFilter(IQ.class)); PacketCollector collector = connection.createPacketCollector(filter); connection.sendPacket(reg); IQ result = (IQ)collector.nextResult(SmackConfiguration.getPacketReplyTimeout()); // Stop queuing results collector.cancel(); if (result == null) { throw new XMPPException("No response from server."); } else if (result.getType() == IQ.Type.ERROR) { throw new XMPPException(result.getError()); } }
这个过程还是相当清楚的。
总结下,asmack写线程和读线程之间,通过分发指定消息类型到指定的同步队列,利用同步队列的阻塞,来实现发出消息和收到消息的关联。