欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

最简Android推送客户端模型 博客分类: android android推送

程序员文章站 2024-02-23 11:55:04
...

     最简单的android推送client模型,特点:读写使用同一线程,逻辑非常简单;断线重连;心跳;发消息等。要求不高的基本可以凑合用了呵呵。

 

package com.example.nbctts;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Iterator;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import android.app.Activity;
import android.os.Bundle;
import android.os.Handler;
import android.os.Message;
import android.view.View;
import android.view.View.OnClickListener;
import android.widget.Button;
import android.widget.EditText;
import android.widget.TextView;

import com.example.nbctts.model.ITask;
import com.example.nbctts.model.RcvMsg;
import com.example.nbctts.model.SendMsg;
import com.example.nbctts.model.Status;
import com.google.gson.Gson;

/*
 * 1.Timer控制任务投放节奏
 * 2.用单线程池控制任务线程顺序执行
 * 3.用socket.setSoSocketTime(timeout)及SocketTimeoutException控制阻塞读时间
 */
public class MainActivity extends Activity {

	EditText msgEdt;
	Button sendBtn;
	TextView serverTxt;

	protected final ExecutorService singleThreadExecutor = Executors
			.newSingleThreadExecutor();
	protected final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();// 任务队列
	protected Socket socket = null;
	protected Socket rcvSocket = null;
	protected BufferedReader mReader;// BufferedWriter 用于推送消息
	protected BufferedWriter mWriter;// BufferedReader 用于接收消息
	protected Gson gson = new Gson();
	protected Timer timer = new Timer();

	/*
	 * 队列中是否已有该类型的任务
	 */
	private boolean isTaskExist(int msgType) {
		Iterator<Runnable> it = queue.iterator();
		while (it.hasNext()) {
			ITask task = (ITask) it.next();
			if (task.getID() == msgType) {
				return true;
			}
		}
		return false;
	}

	/*
	 * 添加到队列,除了发送消息任务以外,其它不重复添加到队列
	 */
	private void addQueue(Runnable task) {
		int id = ((ITask) task).getID();
		try {
			if (id == Status.MSG_H) {
				queue.put(task);
			} else {
				if (!isTaskExist(id)) {
					queue.put(task);
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/*
	 * 线程消息处理,提交任务到待处理队列
	 */
	public Handler myHandler = new Handler() {
		@Override
		public void handleMessage(Message msg) {
			HandlerMsg m = (HandlerMsg) msg.obj;
			switch (msg.what) {
			case Status.CONN_H:
				serverTxt.append(m.content + "\n");
				// 连接不成功,自动重连
				if (!m.successFlag) {
					timer.schedule(new TimerTask() {
						@Override
						public void run() {
							addQueue(new connTask());
						}
					}, Status.getConnIntervel());
				}
				break;
			case Status.LOGIN_H:
				serverTxt.append(m.content + "\n");
				break;
			case Status.SVRMSG_H:
				serverTxt.append(m.content + "\n");
				break;
			case Status.HEARTBEAT_H:
				// 暂不处理
				break;
			case Status.MSG_H:
				// 暂不处理
			case Status.LOGOUT_H:
				// 暂不处理
				break;
			default:
				break;
			}
		}
	};

	@Override
	protected void onCreate(Bundle savedInstanceState) {
		super.onCreate(savedInstanceState);
		setContentView(R.layout.activity_main);
		msgEdt = (EditText) findViewById(R.id.msgEdt);
		sendBtn = (Button) findViewById(R.id.sendBtn);
		serverTxt = (TextView) findViewById(R.id.serverTxt);

		// 启动任务处理线程
		new TaskThread().start();
		addQueue(new connTask());
		// 定时读推送消息/心跳,这里可以分开:)
		timer.schedule(new TimerTask() {
			@Override
			public void run() {
				addQueue(new RcvTask());
				addQueue(new HeartBeatTask());
			}
		}, 1000, Status.heartBeat);

		sendBtn.setOnClickListener(new OnClickListener() {
			@Override
			public void onClick(View v) {
				String str = msgEdt.getText().toString();
				serverTxt.append("client:" + str + "\n");
				addQueue(new sendTask(str));
			}
		});
	}

	/*
	 * 处理的消息
	 */
	class HandlerMsg {
		public HandlerMsg(boolean successFlag, String content) {
			this.successFlag = successFlag;
			this.content = content;
		}

		private boolean successFlag;
		private String content;
	}

	/*
	 * 任务处理线程
	 */
	class TaskThread extends Thread {
		@Override
		public void run() {
			while (true) {
				if (!queue.isEmpty()) {
					try {
						singleThreadExecutor.execute(queue.take());
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		}
	}

	/*
	 * 1.连接服务器任务
	 */
	class connTask implements Runnable, ITask {
		@Override
		public void run() {
			if (!Status.isConnected) {
				socket = new Socket();
				try {
					socket.setKeepAlive(true);
					socket.setSoTimeout(Status.readBlock);
					socket.connect(
							new InetSocketAddress("169.169.43.56", 9999),
							Status.connTimeout);
					mReader = new BufferedReader(new InputStreamReader(
							socket.getInputStream(), "utf-8"));
					mWriter = new BufferedWriter(new OutputStreamWriter(
							socket.getOutputStream(), "utf-8"));
					Status.isConnected = true;
					Status.reConNum = 0;
					Message msg = new Message();
					msg.what = Status.CONN_H;
					msg.obj = new HandlerMsg(true, "connTask:已连接到服务器");
					myHandler.sendMessage(msg);
					// 连接成功,则尝试登录
					queue.add(new loginTask());
				} catch (IOException e) {
					Status.isConnected = false;
					Status.isLogin = false;
					Status.reConNum++;
					Message msg = new Message();
					msg.what = Status.CONN_H;
					msg.obj = new HandlerMsg(false, "connTask:无法连接到服务器");
					myHandler.sendMessage(msg);
				}
			}
		}

		@Override
		public int getID() {
			return Status.CONN_H;
		}
	}

	/*
	 * 2.登录到服务器任务
	 */
	class loginTask implements Runnable, ITask {
		@Override
		public void run() {
			try {
				if (Status.isConnected) {
					SendMsg sm = new SendMsg();
					sm.setOpt("LOGIN");
					sm.setUser(Status.username);
					sm.setPassword(Status.password);
					String s = gson.toJson(sm, SendMsg.class);
					mWriter.write(s + "\n");
					mWriter.flush();
					// 服务器回复
					String rs = mReader.readLine().trim();
					RcvMsg rm = gson.fromJson(rs, RcvMsg.class);
					if (rm.getContent().equals("ok")) {
						Status.isLogin = true;
						Message msg = new Message();
						msg.what = Status.LOGIN_H;
						msg.obj = new HandlerMsg(true, "loginTask:已登录到服务器");
						myHandler.sendMessage(msg);
					} else {
						Status.isLogin = false;
						Message msg = new Message();
						msg.what = Status.LOGIN_H;
						msg.obj = new HandlerMsg(false, "loginTask:登录到服务器失败:"
								+ rm.getContent());
						myHandler.sendMessage(msg);
					}
				}
			} catch (SocketTimeoutException e) {
				e.printStackTrace();
			} catch (IOException e) {
				Status.isConnected = false;
				Status.isLogin = false;
				Message msg = new Message();
				msg.what = Status.CONN_H;
				msg.obj = new HandlerMsg(false, "loginTask:无法连接到服务器");
				myHandler.sendMessage(msg);
			}
		}

		@Override
		public int getID() {
			return Status.LOGIN_H;
		}
	}

	/*
	 * 3.接收服务器消息任务
	 */
	class RcvTask implements Runnable, ITask {
		@Override
		public void run() {
			if (Status.isConnected && Status.isLogin) {
				try {
					String line = "";
					while ((line = mReader.readLine()) != null) {					
						RcvMsg rm = gson.fromJson(line, RcvMsg.class);
						// 目前只接收推来的消息
						if (rm != null && rm.getOpt().equals("SVRMSG")) {
							Message msg = new Message();
							msg.what = Status.SVRMSG_H;
							msg.obj = new HandlerMsg(true, rm.getContent());
							myHandler.sendMessage(msg);
						}
					}
				} catch (SocketTimeoutException e) {
					e.printStackTrace();
				} catch (IOException e) {
					Status.isConnected = false;
					Message msg = new Message();
					msg.what = Status.CONN_H;
					msg.obj = new HandlerMsg(false, "RcvTask:无法连接到服务器");
					myHandler.sendMessage(msg);
				}
			}
		}

		@Override
		public int getID() {
			return Status.SVRMSG_H;
		}
	}

	/*
	 * 4.心跳
	 */
	class HeartBeatTask implements Runnable, ITask {
		@Override
		public void run() {
			if (Status.isConnected && Status.isLogin) {
				try {
					SendMsg sm = new SendMsg();
					sm.setOpt("HEARTBEAT");
					sm.setUser(Status.username);
					sm.setPassword(Status.password);
					String s = gson.toJson(sm, SendMsg.class);
					mWriter.write(s + "\n");
					mWriter.flush();
				} catch (IOException e) {
					Status.isConnected = false;
					Message msg = new Message();
					msg.what = Status.CONN_H;
					msg.obj = new HandlerMsg(false, "HeartBeatTask:无法连接到服务器");
					myHandler.sendMessage(msg);
				}
			}
		}

		@Override
		public int getID() {
			return Status.HEARTBEAT_H;
		}
	}

	/*
	 * 5.发送消息
	 */
	class sendTask implements Runnable, ITask {
		final String str;

		public sendTask(String str) {
			this.str = str;
		}

		@Override
		public void run() {
			if (Status.isConnected && Status.isLogin) {
				try {
					SendMsg sm = new SendMsg();
					sm.setOpt("MSG");
					sm.setUser(Status.username);
					sm.setPassword(Status.password);
					sm.setTargetUser("");// 暂时这样
					sm.setContent(str);
					String s = gson.toJson(sm, SendMsg.class);
					mWriter.write(s + "\n");
					mWriter.flush();
				} catch (IOException e) {
					Status.isConnected = false;
					Message msg = new Message();
					msg.what = Status.CONN_H;
					msg.obj = new HandlerMsg(false, "sendTask:无法连接到服务器");
					myHandler.sendMessage(msg);
				}
			} else {
				// 发送消息时如果未连接,马上重连
				Status.reConNum = 0;
				addQueue(new connTask());
			}
		}

		@Override
		public int getID() {
			return Status.MSG_H;
		}
	}

}

 

 

界面不贴,很简单一个输入框输消息内容,一个按钮发消息,一个文本框记录log及服务端推过来的消息,只是一个测试demo。

 

经测试:断线重连,连接后自动登录,心跳,发消息都没问题,就是接收服务端推送时有延时,这个没办法,毕竟是间隔读,其实也可以把间隔搞小一点。

一般推送对实时性要求不是很高吧,所以基本上可以用。

其实用BlockingQueue,搞成和asmack那样也可以,我这个也算种思路么,哈哈,记录之。

 

相关标签: android 推送