欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

AndroidPN客户端的阻塞读写(2) 博客分类: android androidpn

程序员文章站 2024-02-23 11:51:10
...

      AndroidPN客户端分别启动了读和写线程之后,如果有某种写必须等待结果的又如何处理呢?比如客户端向服务端发个消息,要求等待服务端返回后再对本地逻辑做处理。想象一下客户端要求修改密码,PacketWriter把请求write出去后,成功或失败信息肯定是从PacketReader线程中获得,那么如何把二者关联起来呢,看看asmack的做法。

     PacketCollector类

public class PacketCollector {

    private PacketFilter packetFilter;
    private ArrayBlockingQueue<Packet> resultQueue;
    private Connection connection;
    private boolean cancelled = false;

    /**
     * Creates a new packet collector. If the packet filter is <tt>null</tt>, then
     * all packets will match this collector.
     *
     * @param conection the connection the collector is tied to.
     * @param packetFilter determines which packets will be returned by this collector.
     */
    protected PacketCollector(Connection conection, PacketFilter packetFilter) {
    	this(conection, packetFilter, SmackConfiguration.getPacketCollectorSize());
    }

    /**
     * Creates a new packet collector. If the packet filter is <tt>null</tt>, then
     * all packets will match this collector.
     *
     * @param conection the connection the collector is tied to.
     * @param packetFilter determines which packets will be returned by this collector.
     * @param maxSize the maximum number of packets that will be stored in the collector.
     */
    protected PacketCollector(Connection conection, PacketFilter packetFilter, int maxSize) {
        this.connection = conection;
        this.packetFilter = packetFilter;
        this.resultQueue = new ArrayBlockingQueue<Packet>(maxSize);
    }

    /**
     * Explicitly cancels the packet collector so that no more results are
     * queued up. Once a packet collector has been cancelled, it cannot be
     * re-enabled. Instead, a new packet collector must be created.
     */
    public void cancel() {
        // If the packet collector has already been cancelled, do nothing.
        if (!cancelled) {
            cancelled = true;
            connection.removePacketCollector(this);
        }
    }

    /**
     * Returns the packet filter associated with this packet collector. The packet
     * filter is used to determine what packets are queued as results.
     *
     * @return the packet filter.
     */
    public PacketFilter getPacketFilter() {
        return packetFilter;
    }

    /**
     * Polls to see if a packet is currently available and returns it, or
     * immediately returns <tt>null</tt> if no packets are currently in the
     * result queue.
     *
     * @return the next packet result, or <tt>null</tt> if there are no more
     *      results.
     */
    public Packet pollResult() {
    	return resultQueue.poll();
    }

    /**
     * Returns the next available packet. The method call will block (not return)
     * until a packet is available.
     *
     * @return the next available packet.
     */
    public Packet nextResult() {
        try {
			return resultQueue.take();
		}
		catch (InterruptedException e) {
			throw new RuntimeException(e);
		}
    }

    /**
     * Returns the next available packet. The method call will block (not return)
     * until a packet is available or the <tt>timeout</tt> has elapased. If the
     * timeout elapses without a result, <tt>null</tt> will be returned.
     *
     * @param timeout the amount of time to wait for the next packet (in milleseconds).
     * @return the next available packet.
     */
    public Packet nextResult(long timeout) {
    	try {
			return resultQueue.poll(timeout, TimeUnit.MILLISECONDS);
		}
		catch (InterruptedException e) {
			throw new RuntimeException(e);
		}
    }

    /**
     * Processes a packet to see if it meets the criteria for this packet collector.
     * If so, the packet is added to the result queue.
     *
     * @param packet the packet to process.
     */
    protected void processPacket(Packet packet) {
        if (packet == null) {
            return;
        }
        
        if (packetFilter == null || packetFilter.accept(packet)) {
        	while (!resultQueue.offer(packet)) {
        		// Since we know the queue is full, this poll should never actually block.
        		resultQueue.poll();
        	}
        }
    }
}

 这个类相当赞,最喜欢这种简单的东西了,我觉得我可以照抄呵呵,基本上不需要看其它代码就能猜asmack是怎么干的了,看看流程

 

 

1.首先,如果有需要,比如要修改密码了,就将PacketCollect注册到connection上

   /**
     * Creates a new packet collector for this connection. A packet filter determines
     * which packets will be accumulated by the collector. A PacketCollector is
     * more suitable to use than a {@link PacketListener} when you need to wait for
     * a specific result.
     * 
     * @param packetFilter the packet filter to use.
     * @return a new packet collector.
     */
    public PacketCollector createPacketCollector(PacketFilter packetFilter) {
        PacketCollector collector = new PacketCollector(this, packetFilter);
        // Add the collector to the list of active collectors.
        collectors.add(collector);
        return collector;
    }

 PacketFilter是个过滤器,对于修改密码而言,我只对服务端返回的密码修改结果感兴趣,其它无视,那么它只将相关的接收到的packet放进PacketCollector的ArrayBlockingQueue<Packet>中。注意下processPacket这个方法,队列满了就开始丢弃之前的packet.

 

 

2.发起修改密码请求到服务端,其实就是通过PacketWriter线程write一个packet给服务器。

 

3.坐等服务端返回,可用nextResult()方法,是一直阻塞,还是阻塞一定时间,你自己来定,一般应该是阻塞几秒就差不多了。

 

4.最后无论有没服务端回应,cancel掉这个PacketCollector,即从connection中将之删除。

 

来看一段AccounterManager中源码:

    public void createAccount(String username, String password, Map<String, String> attributes)
            throws XMPPException
    {
        if (!supportsAccountCreation()) {
            throw new XMPPException("Server does not support account creation.");
        }
        Registration reg = new Registration();
        reg.setType(IQ.Type.SET);
        reg.setTo(connection.getServiceName());
        attributes.put("username",username);
        attributes.put("password",password);
        reg.setAttributes(attributes);
        PacketFilter filter = new AndFilter(new PacketIDFilter(reg.getPacketID()),
                new PacketTypeFilter(IQ.class));
        PacketCollector collector = connection.createPacketCollector(filter);
        connection.sendPacket(reg);
        IQ result = (IQ)collector.nextResult(SmackConfiguration.getPacketReplyTimeout());
        // Stop queuing results
        collector.cancel();
        if (result == null) {
            throw new XMPPException("No response from server.");
        }
        else if (result.getType() == IQ.Type.ERROR) {
            throw new XMPPException(result.getError());
        }
    }

 这个过程还是相当清楚的。

总结下,asmack写线程和读线程之间,通过分发指定消息类型到指定的同步队列,利用同步队列的阻塞,来实现发出消息和收到消息的关联。

 

相关标签: androidpn