大数据day04 并发动态大数据基础机制
一、关于socket流阻塞的含义和wait-notify的用法
传统方式下,client和server之间是通过socket连接的,当client连接上server的时候,会创建一个线程,server是不知道client什么时候发消息的,所以一直等待,而且线程一直保持连接,这叫同步阻塞IO,是非常消耗性能的,慢速连接攻击大概是这个意思吧,长期占用着资源,却发送很少消息,这种对资源的不释放,最终结果就是server端不堪重负,最终挂掉。
为了解决上面的问题,就需要异步非阻塞IO,简称NIO。
wait和notify
- wait(),notify(),notifyAll()方法是Object的本地final方法,无法被重写。
- wait()使当前线程阻塞,notify()和notifyAll()使线程唤醒,这三个方法都要写在synchronized代码块里面,因为它们要拿到锁才能执行。
- 当线程执行wait()方法的时候,释放当前锁,让出CPU,进入等待状态。
- 当线程执行notify()方法和notifyAll()方法的时候,会唤醒一个或多个正在等待的线程,然后继续向下执行,直到执行完synchronized代码块或者再次遇到wait()方法,再次释放锁。
- wait()方法需要被try catch包裹,中断也可以使wait()等待的线程唤醒。
- notify和wait的顺序不能错,如果A线程先执行了notify方法,B线程后执行wait方法,B线程是无法被唤醒的。
- notify和notifyAll的区别就是notify只会唤醒一个线程,notifyAll会唤醒所有等待的线程,至于哪个线程第一个处理取决于操作系统。
二、内容大纲介绍
- 掌握多线程
- 掌握并发包下的队列
- 了解JMS
- 掌握JVM技术
- 掌握反射和动态代理
三、线程实现的两种方式
进程:操作系统会为进程在内存中分配一段独立的内存空间,彼此之间不会相互影响,可以负责当前应用程序的运行。当前这个进程负责调度当前程序中的所有运行细节。
线程:程序内部一个相对独立的空间,在进程的内部再次细分独立的空间,一个进程中至少有一个线程。
多线程:就是在一个进程里面同时开启多个线程,让多个线程同时去完成某些任务,目的是提高程序的运行效率。
多线程运行的原理:cpu在线程中做时间片的切换,其实不是同时运行的,只是我们感觉是同时运行的,cpu快速的在这些线程之间做切换,因为cpu的速度是很快的,所以我们感觉不到。
实现线程的两种方式:继承Thread类和实现Runnable接口,本质都是重写run()方法,要调用start()方法,而不是直接调用run()方法。如果调用了run()方法,只是一个普通的方法调用,不会开启新的线程。
public class MyThreadWithExtends extends Thread {
@Override
public void run() {
System.out.println("线程的run方法被调用……");
}
public static void main(String[] args) {
Thread thread = new MyThreadWithExtends();
thread.start();
}
}
public class MyThreadWithImpliment implements Runnable {
@Override
public void run() {
System.out.println("线程的run方法被调用……");
}
public static void main(String[] args) {
Thread thread = new Thread(new MyThreadWithImpliment());
thread.start();
}
}
四、synchronized同步代码块示例
被包裹在synchronized代码块中的代码,同一时间,只能有一个线程执行这段代码,synchronized后面跟的参数代表把谁锁住。下面的例子可以比作上厕所,必须拿到厕所的门才能上厕所,具体上厕所的方式可能不同,因为是同一把锁,所以synchronized中只能有一个线程在执行。
public class MySynchronized {
public static void main(String[] args) {
final MySynchronized mySynchronized = new MySynchronized();
new Thread() {
public void run() {
synchronized (mySynchronized) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread1,start");
}
}
}.start();
new Thread() {
public void run() {
synchronized (mySynchronized) {
System.out.println("thread2,start");
}
}
}.start();
}
}
synchronized还可以修饰在方法上,当两个线程都调用这个方法的时候,同一时间只能有一个线程执行这个方法,另一个线程只能等待。
synchronized的缺陷:当一个线程获取了锁,其他线程只能等待线程释放锁,有两种情况:当前线程执行完成自动释放锁,另外一个是当前线程发生了异常,JVM会让线程释放锁。
五、ReentrantLock的方法示例
lock不是Java语言内置的,synchronized是Java语言的关键字,因此是内置性的,lock是一个类,通过这个类可以实现同步访问。
lock和synchronized有一点非常大的不同,synchronized不需要手动释放锁,当synchronized方法结束或者synchronized代码块结束,会自动释放锁的占用,lock必须手动释放,如果没有手动释放,就会出现死锁的情况。
lock是一个接口,它里面有lock()、tryLock()、tryLock(long time, TimeUnit unit)、lockInterruptibly()、unLock()这5个方法,ReentrantLock是唯一实现了Lock接口的类。ReentrantLock是可重入锁的意思。
lock()和tryLock()的使用方法:
package com.wsy;
import java.util.ArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Test {
private static ArrayList<Integer> arrayList = new ArrayList<Integer>();
static Lock lock = new ReentrantLock(); // 注意这个地方
public static void main(String[] args) {
new Thread() {
public void run() {
Thread thread = Thread.currentThread();
boolean tryLock = lock.tryLock();// 尝试获取锁
System.out.println(thread.getName() + " " + tryLock);
if (tryLock) {
try {
System.out.println(thread.getName() + "得到了锁");
for (int i = 0; i < 5; i++) {
arrayList.add(i);
}
} catch (Exception e) {
} finally {
System.out.println(thread.getName() + "释放了锁");
lock.unlock();
}
}
};
}.start();
new Thread() {
public void run() {
Thread thread = Thread.currentThread();
boolean tryLock = lock.tryLock();
System.out.println(thread.getName() + " " + tryLock);
if (tryLock) {
try {
System.out.println(thread.getName() + "得到了锁");
for (int i = 0; i < 5; i++) {
arrayList.add(i);
}
} catch (Exception e) {
} finally {
System.out.println(thread.getName() + "释放了锁");
lock.unlock();
}
}
};
}.start();
}
}
lockInterruptibly()的使用方法:
package com.wsy;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class MyInterruptibly {
private Lock lock = new ReentrantLock();
public static void main(String[] args) {
MyInterruptibly test = new MyInterruptibly();
MyThread thread0 = new MyThread(test);
MyThread thread1 = new MyThread(test);
thread0.start();
thread1.start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
thread1.interrupt();
System.out.println("=====================");
}
public void insert(Thread thread) throws InterruptedException {
lock.lockInterruptibly(); // 注意,如果需要正确中断等待锁的线程,必须将获取锁放在外面,然后将InterruptedException抛出
try {
System.out.println(thread.getName() + "得到了锁");
long startTime = System.currentTimeMillis();
for (;;) {
if (System.currentTimeMillis() - startTime >= Integer.MAX_VALUE)
break;
// 插入数据
}
} finally {
System.out.println(Thread.currentThread().getName() + "执行finally");
lock.unlock();
System.out.println(thread.getName() + "释放了锁");
}
}
}
class MyThread extends Thread {
private MyInterruptibly test = null;
public MyThread(MyInterruptibly test) {
this.test = test;
}
@Override
public void run() {
try {
test.insert(Thread.currentThread());
} catch (Exception e) {
System.out.println(Thread.currentThread().getName() + "被中断");
}
}
}
六、Lock和synchronized的一些区别和选择考虑的因素
ReadWriteLock是一个接口,用来处理读写中出现的多线程问题,通常,多个线程同时读是没有问题的,多个线程同时写就需要处理,不能让他们同时写。接口中有两个方法,分别是:readLock()和writeLock(),分别返回read锁和write锁。
使用synchronized来实现:读和写都在一个线程中操作,thread1必须在thread0读写完成后才能读写,导致读的时候不能多个线程同时读取,这是不符合逻辑的,我们只需要写的时候锁定。
package com.wsy;
// 一个线程又要读又要写,用synchronize来实现的话,读写操作都只能锁住后一个线程一个线程地进行
public class MySynchronizedReadWrite {
public static void main(String[] args) {
final MySynchronizedReadWrite test = new MySynchronizedReadWrite();
new Thread() {
public void run() {
test.get(Thread.currentThread());
};
}.start();
new Thread() {
public void run() {
test.get(Thread.currentThread());
};
}.start();
}
public synchronized void get(Thread thread) {
long start = System.currentTimeMillis();
int i = 0;
while (System.currentTimeMillis() - start <= 1) {
i++;
if (i % 4 == 0) {
System.out.println(thread.getName() + "正在进行写操作");
} else {
System.out.println(thread.getName() + "正在进行读操作");
}
}
System.out.println(thread.getName() + "读写操作完毕");
}
}
使用ReadWriteLock来实现:多运行几次,可以发现读操作可以交替出现,而写操作一定是单线程进行的。
package com.wsy;
import java.util.concurrent.locks.ReentrantReadWriteLock;
// 使用读写锁,可以实现读写分离锁定,读操作并发进行,写操作锁定单个线程
// 如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。
// 如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。
public class MyReentrantReadWriteLock {
private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
public static void main(String[] args) {
final MyReentrantReadWriteLock test = new MyReentrantReadWriteLock();
new Thread() {
public void run() {
test.get(Thread.currentThread());
test.write(Thread.currentThread());
};
}.start();
new Thread() {
public void run() {
test.get(Thread.currentThread());
test.write(Thread.currentThread());
};
}.start();
}
// 读操作,用读锁来锁定
public void get(Thread thread) {
rwl.readLock().lock();
try {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start <= 1) {
System.out.println(thread.getName() + "正在进行读操作");
}
System.out.println(thread.getName() + "读操作完毕");
} finally {
rwl.readLock().unlock();
}
}
// 写操作,用写锁来锁定
public void write(Thread thread) {
rwl.writeLock().lock();
try {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start <= 1) {
System.out.println(thread.getName() + "正在进行写操作");
}
System.out.println(thread.getName() + "写操作完毕");
} finally {
rwl.writeLock().unlock();
}
}
}
Lock和synchronized的选择:
- Lock是一个接口,synchronized是Java中的关键字。
- synchronized在发生异常的情况下,会自动释放锁,因此一般不会导致死锁的情况(互相死锁除外),而Lock在发生异常的情况下,不会自动调用unlock()方法,所以有可能发生死锁。因此在使用Lock的时候,一定要在finally中释放锁。
- Lock可以让等待锁的线程中断,synchronized不行,使用synchronized时候,等待的线程会一直等待下去,不能够响应中断。
- 通过Lock可以知道有没有成功获取到锁,synchronized做不到。
- Lock可以提高多个线程的读操作效率。
从性能上讲,如果竞争不是很激烈,两者的差别不大,如果竞争非常激烈,Lock的性能要远高于synchronized的,因为Lock可以具体情况具体选择。
七、Java并发包中的线程池种类及其特性介绍
在多线程开发的时候,不会使用new Thread的方式,因为一个系统能承受的线程数量是有限的,无限的new下去是肯定不行的,所以这里我们接触到了线程池的概念。线程池固定线程的数量,对于不同的任务,只需要给它不同的Runnable对象即可,就可以执行任务了。
线程池的5种创建方法:
- SingleThreadExecutor:只有一个线程的线程池,因此所有的任务都是顺序执行的。
- CachedThreadPool:线程池中有很多线程需要同时执行,老的可用线程被新的任务触发重新执行,如果某线程超过60秒没有执行,将被终止并从线程池中删除。
- FixedThreadPool:拥有固定线程数量的线程池,如果没有任务执行,线程会一直等待,通常设置线程的数量=cpu的数量效率会比较高,可以使用代码int cpuNums = Runtime.getRuntime().availableProcessors();拿到cpu的核数量。
- ScheduledThreadPool:用来调度即将执行的任务的线程池。
- SingleThreadScheduledPool:只有一个线程,用来调度任务在指定时间执行。
八、并发包中各种线程池的用法及future获取任务返回结果的机制
线程池之Runnable的使用方法:
package com.wsy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolWithRunable {
// 通过线程池执行线程
public static void main(String[] args) {
// 创建一个线程池
ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 1; i < 5; i++) {
pool.execute(new Runnable() {
@Override
public void run() {
System.out.println("thread name: " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
pool.shutdown();
}
}
线程池之Callable的使用方法:
package com.wsy;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
// callable 跟runnable的区别:
// runnable的run方法不会有任何返回结果,所以主线程无法获得任务线程的返回值
// callable的call方法可以返回结果,但是主线程在获取时是被阻塞,需要等待任务线程返回才能拿到结果
public class ThreadPoolWithcallable {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService pool = Executors.newFixedThreadPool(4);
for (int i = 0; i < 10; i++) {
Future<String> submit = pool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("a");
Thread.sleep(5000);
return "b--" + Thread.currentThread().getName();
}
});
// 从Future中get结果,这个方法是会被阻塞的,一直要等到线程任务返回结果
System.out.println(submit.get());
}
pool.shutdown();
}
}
运行下面的代码之后,可以看到有些线程的运行结果可能并不能立刻返回,但是最终还是能通过句柄拿到返回结果。代码是使用了fixedPool来提交线程,还可以使用schedulerPool来提交线程,将下面相关的注释放开即可。
package com.wsy;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
public class TestPool {
public static void main(String[] args) throws Exception {
Future<?> submit = null;
Random random = new Random();
// 创建固定数量线程池
ExecutorService exec = Executors.newFixedThreadPool(4);
// 创建调度线程池
// ScheduledExecutorService exec = Executors.newScheduledThreadPool(4);
// 用来记录各线程的返回结果
ArrayList<Future<?>> results = new ArrayList<Future<?>>();
for (int i = 0; i < 10; i++) {
// fixedPool提交线程,runnable无返回值,callable有返回值
// submit = exec.submit(new TaskRunnable(i));
// submit = exec.submit(new TaskCallable(i));
// 对于schedulerPool来说,调用submit提交任务时,跟普通pool效果一致
submit = exec.submit(new TaskCallable1(i));
// 对于schedulerPool来说,调用schedule提交任务时,则可按延迟,按间隔时长来调度线程的运行
// submit = exec.schedule(new TaskCallable(i), random.nextInt(10), TimeUnit.SECONDS);
// 存储线程执行结果
// submit其实是一个句柄,放到results里面的不是真正的结果,因为有可能add的时候还没有获取到值,放进去句柄,当后面需要获取的时候,通过句柄拿值
// 如果前面的线程阻塞或者运算量大会延迟一小会儿,那么整体的运行是不会延迟的
results.add(submit);
}
// 打印结果
for (Future f : results) {
boolean done = f.isDone();
System.out.println(done ? "已完成" : "未完成");
// 从结果的打印顺序可以看到,即使未完成,也会阻塞等待
System.out.println("线程返回future结果: " + f.get());
}
exec.shutdown();
}
}
class TaskCallable implements Callable<String> {
private int s;
Random r = new Random();
// Callable可以传参数,可以有返回值
public TaskCallable(int s) {
this.s = s;
}
@Override
public String call() throws Exception {
String name = Thread.currentThread().getName();
long currentTimeMillis = System.currentTimeMillis();
System.out.println(name + " 启动时间:" + currentTimeMillis / 1000);
int rint = r.nextInt(3);
try {
Thread.sleep(rint * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + " is working..." + s);
return s + "";
}
}
九、BlockingQueue的功能和使用示例
BlockingQueue主要是用来控制线程同步的工具。其中put和take是一对阻塞存取,add和poll的一对非阻塞存取。
插入:
add(Object object):把object添加到BlockingQueue中,如果BlockingQueue可以容纳,返回true,否则抛异常。
offer(Object object):如果可能的话,将object添加到BlockingQueue中,可以容纳返回true,不能容纳返回false。
put(Object object):把object放到BlockingQueue中,如果此时空间满了,线程被阻塞,等待空间释放后,再放进去。
读取:
poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,等待time长时间,还是取不到就返回false。
take():取走BlockingQueue里排在首位的对象,如果BlockingQueue中没有对象,那么线程会阻塞,直到BlockingQueue中有对象后为止。
其他:
int remainingCapacity():返回队列剩余的容量,在队列插入和取出的时候,获取数据可能不准确。
boolean remove(Object object):从队列移除元素,如果存在,移除一个或者多个,队列改变了返回true。
boolean contains(Object object):查看队列中是否包含元素object,存在返回true,否则返回false。
int drainTo(Collection<? super E> c):移除队列中所有可用元素,并将它们添加到给定的collection中。
int drainTo(Collection<? super E> c, int maxElements):和上面方法不同的就是指定了移除的数量。
BlockingQueue是一个接口,上面这些方法是在接口中定义的,它有4个实现类,常用的实现类有两个,分别是ArrayBlockingQueue和LinkedBlockingQueue。
ArrayBlockingQueue:一个数组支持的有界阻塞队列,规定大小的BlockingQueue,构造函数中必须有一个int参数。
LinkedBlockingQueue:大小不定的BlockingQueue,如果构造函数时候传了一个int参数,那么这个队列也是有大小的,如果不传,那么大小是由Integer.MAX_VALUE来决定。
生产者:
package com.wsy;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
public class TestBlockingQueueProducer implements Runnable {
BlockingQueue<String> queue;
Random random = new Random();
public TestBlockingQueueProducer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
Thread.sleep(random.nextInt(10));
String task = Thread.currentThread().getName();
System.out.println(task + " put a product");
queue.put(task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消费者
package com.wsy;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
public class TestBlockingQueueConsumer implements Runnable {
BlockingQueue<String> queue;
Random random = new Random();
public TestBlockingQueueConsumer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
Thread.sleep(random.nextInt(10));
System.out.println(Thread.currentThread().getName() + " trying...");
String temp = queue.take();// 如果队列为空,会阻塞当前线程
System.out.println(Thread.currentThread().getName() + " get " + temp);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
测试程序
package com.wsy;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class TestBlockingQueue {
public static void main(String[] args) {
// 不设置的话,LinkedBlockingQueue默认大小为Integer.MAX_VALUE
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);
// BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
TestBlockingQueueConsumer consumer = new TestBlockingQueueConsumer(queue);
TestBlockingQueueProducer producer = new TestBlockingQueueProducer(queue);
for (int i = 0; i < 3; i++) {
new Thread(producer, "Producer" + (i + 1)).start();
}
for (int i = 0; i < 5; i++) {
new Thread(consumer, "Consumer" + (i + 1)).start();
}
new Thread(producer, "Producer" + (4)).start();
}
}
通过上面的程序的运行现象可以看到,queue中有2个位置,当生产者生产的数量大于2的时候,就不能向队列中添加元素了,此时会阻塞,当消费者消费的时候,如果队列中是空的,那么也会阻塞,直到队列中有元素进来,或者程序被中断。
十、volatile的工作机制代码测试
先来看一个程序,猜猜它的运行结果是多少?10000?试试看吧!
package com.wsy;
public class VolatileTest {
public static volatile int num = 0;
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 100; j++) {
num++;
}
}
}).start();
}
try {
// 保证让这上面的线程都运行完成
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(num);
}
}
发现运行结果是小于10000的,多运行几次再试试,还是小于10000的,因此,可以得出结论,volatile不能保证线程安全。问题就出在num++上了,因为num++不是一个原子性操作,num++是要分三步操作的,分别是读、改、写。将主内存中的num读取出来到工作内存,将工作内存中的数据自增1,将自增后的结果写到主内存中去。
一开始的时候,volatile修饰的变量num是主内存中存放着的,这时候,启动了100个线程,这些线程都去获取这个num变量,并把num的值从主内存拷贝到工作内存中,执行num++操作,将修改后的值传递到主内存中,在这3个步骤的间隙,如果正好有线程访问num变量的值,那么这个线程获取到的值就是更改之前的值,这个时候就出问题了,于是就会少一个++操作了。
要想保证线程安全,还是需要锁,因为锁可以保证互斥性和可见性,而volatile只能保证可见性。既然volatile不能保证线程安全,那岂不是很鸡肋?它的应用场景是怎么样的呢?
一个线程写,多个线程读,如果有多个线程写,那么就会像上面那样,出现错误数据,volatile能保证读的时候是最新的值。synchronized和volatile可以实现较低开销的读写锁。
概括来说就是:对变量的写操作不依赖于当前值;该变量没有包含在具有其他变量的不变式中。
十一、关于并发编程的一些总结
- 不使用线程池的缺点
并发线程很高的时候,就会大量的新建线程,开销是非常大的,资源的消耗量也很大,稳定性自然就降下来了。
- 指定执行策略
任务以什么顺序执行;有多少个任务并发执行;要多少个任务进入等待执行队列;系统过载,应该放弃哪些任务,如何通知到应用程序;一个任务的执行前后应该做什么处理。
- 线程池的类型
FixedThreadPool、CachedThreadPool、SingleThreadExecutor、ScheduledThreadPool。
- 线程池的饱和策略
除了CachedThreadPool之外,其他的线程池,当线程池满了之后,可以设置拒绝策略,比如可以设置ThreadPoolExecutor.setRejectExecutionHandler()方法设置一个拒绝任务的策略。
- 线程无依赖性
如果线程与线程之间有依赖性,有可能造成死锁或饥饿;如果调用者监视其他线程的完成情况,会影响并发量。
十二、ActiveMQ
JMS即Java消息服务(Java Message Service)。应用程序接口是一个Java平台中关于面向消息中间件的API,用于两个应用程序之间,或分布系统之间发布消息,进行异步通信。Java消息服务是一个与平台无关的API。
JMS是一种与厂商无关的API,用来访问消息收发系统消息,类似于JDBC可以使用相同的API来访问不同的数据库,JMS提供同样与厂商无关的访问方法,以访问消息收发服务。
JMS的两种模型:
点对点或队列模型:一个生产者向一个特定的队列发布消息,一个消费者从队列中取出消息。生产者不需要在消费者消费该消息期间处于运行状态,消费者也不需要在消息发送时处于运行状态。
发布者/订阅者模型:支持向一个特定的消息主题发布消息。0个或多个订阅者可能对接受来自特定消息主题的消息感兴趣。在这种模式下,发布者和订阅者彼此不知道对方,这种模式好比匿名公告板。发布者和订阅者之间存在时间依赖性,发布者需要建立一个订阅,方便用户能够订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅,在那种情况下,在订阅者未连接的时候发布的消息将在订阅者重新连接时重新发布。
演示ActiveMQ
- 在ActiveMQ官网下载ActiveMQ到本地并解压(这里演示Windows版本的,Linux操作差不多)。
- 修改activemq.xml配置文件,将transportConnectors中的0.0.0.0改为localhost。
- 双击activemq.bat运行ActiveMQ。
- 浏览器登录http://localhost:8161/admin/,用户名和密码都是admin。
package com.wsy;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ProducerTest {
public static void main(String[] args) throws JMSException, Exception {
ProducerTool producer = new ProducerTool();
producer.produceMessage("Hello, world!");
producer.close();
}
}
class ProducerTool {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "myqueue";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageProducer producer = null;
// 初始化
private void initialize() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
// 发送消息
public void produceMessage(String message) throws JMSException, Exception {
initialize();
TextMessage msg = session.createTextMessage(message);
connection.start();
System.out.println("Producer:->Sending message: " + message);
producer.send(msg);
System.out.println("Producer:->Message sent complete!");
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Producer:->Closing connection");
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
运行main方法,控制台输出如下内容,表示producer已经生产了一个message并发送到了队列中,然后关闭连接。
Producer:->Sending message: Hello, world!
Producer:->Message sent complete!
Producer:->Closing connection
我们去浏览器点击queue,可以看到有一个我们刚刚自己定义的queue,命名为myqueue这一行里面对应的数字就是队列中消息数量。再次运行Customer,console中可以收到刚刚producer发出的消息,再去浏览器查看,发现Queue的内容也发生了变动。
package com.wsy;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ConsumerTest implements Runnable {
static Thread t1 = null;
public static void main(String[] args) throws InterruptedException {
t1 = new Thread(new ConsumerTest());
t1.start();
while (true) {
if (!t1.isAlive()) {
t1 = new Thread(new ConsumerTest());
t1.start();
System.out.println("重新启动");
}
Thread.sleep(5000);
}
}
public void run() {
try {
ConsumerTool consumer = new ConsumerTool();
consumer.consumeMessage();
while (ConsumerTool.isconnection) {
;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
class ConsumerTool implements MessageListener, ExceptionListener {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "myqueue";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageConsumer consumer = null;
private ActiveMQConnectionFactory connectionFactory = null;
public static Boolean isconnection = false;
// 初始化
private void initialize() throws JMSException {
connectionFactory = new ActiveMQConnectionFactory(user, password, url);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
consumer = session.createConsumer(destination);
}
// 消费消息
public void consumeMessage() throws JMSException {
initialize();
connection.start();
consumer.setMessageListener(this);
connection.setExceptionListener(this);
System.out.println("Consumer:->Begin listening...");
isconnection = true;
// 开始监听
Message message = consumer.receive();
System.out.println(message.getJMSMessageID());
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Consumer:->Closing connection");
if (consumer != null) {
consumer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
// 消息处理函数
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
System.out.println("Consumer:->Received: " + msg);
} else {
System.out.println("Consumer:->Received: " + message);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
@Override
public void onException(JMSException exception) {
isconnection = false;
}
}
同样再来看看topic,先运行producer,后运行consumer,在浏览器看到的效果类似。
package com.wsy;
import java.util.Random;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ProducerTest {
public static void main(String[] args) throws JMSException, Exception {
ProducerTool producer = new ProducerTool();
Random random = new Random();
for (int i = 0; i < 20; i++) {
Thread.sleep(random.nextInt(10) * 1000);
producer.produceMessage("Hello, world!--" + i);
producer.close();
}
}
}
class ProducerTool {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "mytopic";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageProducer producer = null;
// 初始化
private void initialize() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createTopic(subject);
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
// 发送消息
public void produceMessage(String message) throws JMSException, Exception {
initialize();
TextMessage msg = session.createTextMessage(message);
connection.start();
System.out.println("Producer:->Sending message: " + message);
producer.send(msg);
System.out.println("Producer:->Message sent complete!");
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Producer:->Closing connection");
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
package com.wsy;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ConsumerTest implements Runnable {
static Thread t1 = null;
public static void main(String[] args) throws InterruptedException {
t1 = new Thread(new ConsumerTest());
t1.setDaemon(false);
t1.start();
// 如果发生异常,则重启consumer
while (true) {
if (!t1.isAlive()) {
t1 = new Thread(new ConsumerTest());
t1.start();
System.out.println("重新启动");
}
Thread.sleep(5000);
}
}
public void run() {
try {
ConsumerTool consumer = new ConsumerTool();
consumer.consumeMessage();
while (ConsumerTool.isconnection) {
;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
class ConsumerTool implements MessageListener, ExceptionListener {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "mytopic";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageConsumer consumer = null;
public static Boolean isconnection = false;
// 初始化
private void initialize() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createTopic(subject);
consumer = session.createConsumer(destination);
}
// 消费消息
public void consumeMessage() throws JMSException, Exception {
initialize();
connection.start();
consumer.setMessageListener(this);
connection.setExceptionListener(this);
isconnection = true;
System.out.println("Consumer:->Begin listening...");
// 开始监听
// Message message = consumer.receive();
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Consumer:->Closing connection");
if (consumer != null) {
consumer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
// 消息处理函数
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
System.out.println("Consumer:->Received: " + msg);
} else {
System.out.println("Consumer:->Received: " + message);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
public void onException(JMSException arg0) {
isconnection = false;
}
}
既然说了这么多,topic和queue的区别是什么呢?
Queue
点对点模式,不可用重复消费。当生产者生产了消息之后,消息被存储在queue中,消费者消费的时候,从queue中获取,已经消费的消息,被从queue中剔除,不能再次消费。当没有消费者的时候,queue一直保存消息,直到有消费者来消费。消息不是自动推送给消费者的,而是消费者从队列中获取的。
Topic
发布/订阅模式,可以重复消费。生产者生产了消息,发布到topic中,会有多个消费者(订阅)消费该消息。发布到topic中的消息会被消费者中的所有订阅者消费。这里的消费者分为两种,一种是长期订阅,一种类是普通订阅,一条消息发送到topic后,对于普通订阅的,如果此时处于非活跃状态,那么就错过了消息,对于长期订阅的,即使此时没有处于活跃状态,当它处于活跃状态时,它还会继续受到消息的。
十三、Java的反射实现API
使用反射可以动态加载类,调用私有的方法或者属性,动态的调用方法,可以说,反射将字符串作为输入参数,来获取类对象,或者执行对象的某些方法。
package com.wsy;
import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import org.junit.Before;
import org.junit.Test;
public class MyReflect {
public String className = null;
public Class personClass = null;
// 反射Person类
@Before
public void init() throws Exception {
className = "com.wsy.Person";
personClass = Class.forName(className);
}
// 获取某个class文件对象
@Test
public void getClassName() throws Exception {
System.out.println(personClass);
}
// 获取某个class文件对象的另一种方式
@Test
public void getClassName2() throws Exception {
System.out.println(Person.class);
}
// 创建一个class文件表示的实例对象,底层会调用空参数的构造方法
@Test
public void getNewInstance() throws Exception {
System.out.println(personClass.newInstance());
}
// 获取非私有的构造函数
@Test
public void getPublicConstructor() throws Exception {
Constructor constructor = personClass.getConstructor(Long.class, String.class);
Person person = (Person) constructor.newInstance(100L, "zhangsan");
System.out.println(person.getId());
System.out.println(person.getName());
}
// 获得私有的构造函数
@Test
public void getPrivateConstructor() throws Exception {
Constructor constructor = personClass.getDeclaredConstructor(String.class);
constructor.setAccessible(true);// 强制取消Java的权限检测
Person person2 = (Person) constructor.newInstance("zhangsan");
System.out.println("**" + person2.getName());
}
// 访问非私有的成员变量
@Test
public void getNotPrivateField() throws Exception {
Constructor constructor = personClass.getConstructor(Long.class, String.class);
Object obj = constructor.newInstance(100L, "zhangsan");
Field field = personClass.getField("name");
field.set(obj, "lisi");
System.out.println(field.get(obj));
}
// 访问私有的成员变量
@Test
public void getPrivateField() throws Exception {
Constructor constructor = personClass.getConstructor(Long.class);
Object obj = constructor.newInstance(100L);
Field field2 = personClass.getDeclaredField("id");
field2.setAccessible(true);// 强制取消Java的权限检测
field2.set(obj, 10000L);
System.out.println(field2.get(obj));
}
// 获取非私有的成员函数
@Test
public void getNotPrivateMethod() throws Exception {
System.out.println(personClass.getMethod("toString"));
Object obj = personClass.newInstance();// 获取空参的构造函数
Method toStringMethod = personClass.getMethod("toString");
Object object = toStringMethod.invoke(obj);
System.out.println(object);
}
// 获取私有的成员函数
@Test
public void getPrivateMethod() throws Exception {
Object obj = personClass.newInstance();// 获取空参的构造函数
Method method = personClass.getDeclaredMethod("getSomeThing");
method.setAccessible(true);
Object value = method.invoke(obj);
System.out.println(value);
}
@Test
public void otherMethod() throws Exception {
// 当前加载这个class文件的那个类加载器对象
System.out.println(personClass.getClassLoader());
// 获取某个类实现的所有接口
Class[] interfaces = personClass.getInterfaces();
for (Class class1 : interfaces) {
System.out.println(class1);
}
// 反射当前这个类的直接父类
System.out.println(personClass.getGenericSuperclass());
// getResourceAsStream这个方法可以获取到一个输入流,这个输入流会关联到name所表示的那个文件上。
// 不以‘/’开头时默认是从此类所在的包下取资源,以‘/’开头则是从ClassPath根下获取。其只是通过path构造一个绝对路径,最终还是由ClassLoader获取资源。
System.out.println(personClass.getResourceAsStream("/log4j.properties"));
System.out.println(personClass.getResourceAsStream("log4j.properties"));
// 判断当前的Class对象表示是否是数组
System.out.println(personClass.isArray());
System.out.println(new String[3].getClass().isArray());
// 判断当前的Class对象表示是否是枚举类
System.out.println(personClass.isEnum());
System.out.println(Class.forName("com.wsy.City").isEnum());
// 判断当前的Class对象表示是否是接口
System.out.println(personClass.isInterface());
System.out.println(Class.forName("com.wsy.TestInterface").isInterface());
}
}
class Person implements Serializable, TestInterface {
private Long id;
public String name;
public Person() {
this.id = 100L;
this.name = "afsdfasd";
}
public Person(Long id, String name) {
this.id = id;
this.name = name;
}
public Person(Long id) {
super();
this.id = id;
}
private Person(String name) {
super();
this.name = name + "=======";
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String toString() {
return "Person [id=" + id + ", name=" + name + "]";
}
private String getSomeThing() {
return "sdsadasdsasd";
}
private void testPrivate() {
System.out.println("this is a private method");
}
}
enum City {
}
interface TestInterface {
}
十四、动态代理的工作机制
说到动态代理,先举个例子,原来有一个方法用来返回商品的价格,现在需求变动了,增加了优惠券,所以返回的价格就需要在原来的基础上进行改动,不过这个方法在别的系统里面也用到了,不能直接改这个方法的代码,所以我们可以使用代理的方法解决,在调用这个方法的前面或者后面加上自己的逻辑,就可以实现优惠券的逻辑,而不改动原来的代码,这不就是开闭原则嘛,对扩展开放,对修改关闭,增强原来的方法。
动态代理的流程:
- 书写代理类和代理方法,在代理方法中实现代理Proxy.newProxyInstance。
- 代理中需要的参数分别是:被代理类的类加载器,被代理类的所有实现接口,句柄方法。
- 在句柄方法中有一个invoke()方法需要被复写,invoke()方法有3个参数:被代理的对象,被代理的方法,被代理方法需要的参数,这invoke()方法中,我们就可以对被代理方法进行增强。
- 获取代理类,强转成被代理的接口。
- 最后,可以像没被代理一样,调用接口的任何方法,方法被调用后,方法名和参数被传入代理类的invoke()方法中, 进行新业务的逻辑流程。
十五、动态代理的demo代码
原业务接口
package com.wsy.service;
// 这是一个业务的接口,这个接口中的业务就是返回衣服的价格
public interface IBoss {
int yifu(String size);
}
原业务实现类
package com.wsy.service.impl;
import com.wsy.service.IBoss;
// 实现了卖衣服的接口 自定义了自己的业务,卖裤子
public class Boss implements IBoss {
public int yifu(String size) {
System.err.println("天猫小强旗舰店,老板给客户发快递----衣服型号:" + size);
// 这件衣服的价钱,从数据库读取
return 50;
}
public void kuzi() {
System.err.println("天猫小强旗舰店,老板给客户发快递----裤子");
}
}
原业务调用
package com.wsy.action;
import org.junit.Test;
import com.wsy.service.IBoss;
import com.wsy.service.impl.Boss;
public class SaleAction {
// 不使用代理,直接调用方法 方法中规定什么业务,就只能调用什么业务,规定什么返回值,就只能输出什么返回值
@Test
public void saleByBossSelf() throws Exception {
IBoss boss = new Boss();
System.out.println("老板自营!");
int money = boss.yifu("xxl");// 老板自己卖衣服,不需要客服,结果就是没有聊天记录
System.out.println("衣服成交价:" + money);
}
}
代理类
package com.wsy.proxyclass;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
public class ProxyBoss {
// 对接口方法进行代理
public static <T> T getProxy(final int discountCoupon, final Class<?> interfaceClass, final Class<?> implementsClass) throws Exception {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[] { interfaceClass }, new InvocationHandler() {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Integer returnValue = (Integer) method.invoke(implementsClass.newInstance(), args);// 调用原始对象以后返回的值
return returnValue - discountCoupon;
}
});
}
}
新业务调用
package com.wsy.action;
import org.junit.Test;
import com.wsy.proxyclass.ProxyBoss;
import com.wsy.service.IBoss;
import com.wsy.service.impl.Boss;
// 什么是动态代理? 简单的写一个模板接口,剩下的个性化工作,好给动态代理来完成!
public class ProxySaleAction {
// 使用代理,在这个代理中,只代理了Boss的yifu方法 定制化业务,可以改变原接口的参数、返回值等
@Test
public void saleByProxy() throws Exception {
IBoss boss = ProxyBoss.getProxy(10, IBoss.class, Boss.class);// 将代理的方法实例化成接口
// IBoss boss = new Boss();// 将代理的方法实例化成接口
System.out.println("代理经营!");
int money = boss.yifu("xxl");// 调用接口的方法,实际上调用方式没有变
System.out.println("衣服成交价:" + money);
}
}
其实不要被代理类的写法吓坏,它的写法是比较死的,需要传什么参数就传什么参数,只是表面上看着比较陌生而已。看顺眼了就不觉得困难了。
十六、利用socket来进行远程过程调用
在大数据开发中,通常是部署集群的方式,所以多台机器之间访问的时候,需要使用socket通信,代码在下面,先运行服务端,后运行客户端,可以看到服务端始终在运行,客户端发送socket请求后,立刻接收到了服务端方法调用返回的数据。
业务接口
package com.wsy;
public interface IBusiness {
public int getPrice(String param);
}
业务实现类
package com.wsy;
public class TestBusiness implements IBusiness {
@Override
public int getPrice(String param) {
return param.equals("yifu") ? 10 : 20;
}
}
服务端
package com.wsy;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
public class TestServer {
public static void main(String[] args) throws Exception {
// 创建socket并绑定IP和端口号
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress("localhost", 9898));
// 因为是服务端,所以需要一直在接受请求,这里使用while(true)来处理
while (true) {
// accept()方法是一个阻塞方法,等待请求,如果一直没有请求发过来,就一直阻塞着
Socket socket = serverSocket.accept();
// 因为业务逻辑可能处理的时间比较长,为了不影响后面的请求处理,这里使用线程的方法来处理请求,可以改成线程池,这里简写一下
new Thread(new TestServerTask(socket)).start();
}
}
}
socket处理线程
package com.wsy;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.lang.reflect.Method;
import java.net.Socket;
public class TestServerTask implements Runnable {
private Socket socket;
public TestServerTask(Socket socket) {
this.socket = socket;
}
// 接收客户端发送过来的socket信息,处理并返回
@Override
public void run() {
InputStream inputStream = null;
OutputStream outputStream = null;
BufferedReader bufferedReader = null;
PrintWriter printWriter = null;
try {
inputStream = socket.getInputStream();
outputStream = socket.getOutputStream();
// 读取输入流中的内容,并为下一步的反射做处理
bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
String request = bufferedReader.readLine();
String[] split = request.split(":");
String classname = split[0];
String methodName = split[1];
String methodParam = split[2];
// 通过反射的方式生成类对象,并调用方法
Class<?> className = Class.forName(classname);
System.out.println("calling class: " + className);
Object newInstance = className.newInstance();
Method method = className.getMethod(methodName, String.class);
System.out.println("calling method: " + method);
Object invoke = method.invoke(newInstance, methodParam);
System.out.println("results: " + (int) invoke);
// 将执行结果通过流返回给客户端
printWriter = new PrintWriter(new BufferedOutputStream(outputStream));
printWriter.println((int) invoke);
printWriter.flush();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭资源
try {
bufferedReader.close();
printWriter.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
客户端
package com.wsy;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;
public class TestClient {
public static void main(String[] args) throws Exception {
// 确定socket的IP和端口用来发送请求
Socket socket = new Socket("localhost", 9898);
OutputStream outputStream = socket.getOutputStream();
InputStream inputStream = socket.getInputStream();
// 通过socket发送请求参数到服务端
PrintWriter printWriter = new PrintWriter(new BufferedOutputStream(outputStream));
printWriter.println("com.wsy.TestBusiness:getPrice:yifu");
printWriter.flush();
// 接收服务端处理后的socket结果
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
String readLine = bufferedReader.readLine();
System.out.println("client get result: " + readLine);
socket.close();
}
}