欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

大数据day04 并发动态大数据基础机制

程序员文章站 2024-01-15 19:42:58
...

一、关于socket流阻塞的含义和wait-notify的用法

传统方式下,client和server之间是通过socket连接的,当client连接上server的时候,会创建一个线程,server是不知道client什么时候发消息的,所以一直等待,而且线程一直保持连接,这叫同步阻塞IO,是非常消耗性能的,慢速连接攻击大概是这个意思吧,长期占用着资源,却发送很少消息,这种对资源的不释放,最终结果就是server端不堪重负,最终挂掉。

为了解决上面的问题,就需要异步非阻塞IO,简称NIO。

wait和notify

  1. wait(),notify(),notifyAll()方法是Object的本地final方法,无法被重写。
  2. wait()使当前线程阻塞,notify()和notifyAll()使线程唤醒,这三个方法都要写在synchronized代码块里面,因为它们要拿到锁才能执行。
  3. 当线程执行wait()方法的时候,释放当前锁,让出CPU,进入等待状态。
  4. 当线程执行notify()方法和notifyAll()方法的时候,会唤醒一个或多个正在等待的线程,然后继续向下执行,直到执行完synchronized代码块或者再次遇到wait()方法,再次释放锁。
  5. wait()方法需要被try catch包裹,中断也可以使wait()等待的线程唤醒。
  6. notify和wait的顺序不能错,如果A线程先执行了notify方法,B线程后执行wait方法,B线程是无法被唤醒的。
  7. notify和notifyAll的区别就是notify只会唤醒一个线程,notifyAll会唤醒所有等待的线程,至于哪个线程第一个处理取决于操作系统。

二、内容大纲介绍

  1. 掌握多线程
  2. 掌握并发包下的队列
  3. 了解JMS
  4. 掌握JVM技术
  5. 掌握反射和动态代理

三、线程实现的两种方式

进程:操作系统会为进程在内存中分配一段独立的内存空间,彼此之间不会相互影响,可以负责当前应用程序的运行。当前这个进程负责调度当前程序中的所有运行细节。

线程:程序内部一个相对独立的空间,在进程的内部再次细分独立的空间,一个进程中至少有一个线程。

多线程:就是在一个进程里面同时开启多个线程,让多个线程同时去完成某些任务,目的是提高程序的运行效率。

多线程运行的原理:cpu在线程中做时间片的切换,其实不是同时运行的,只是我们感觉是同时运行的,cpu快速的在这些线程之间做切换,因为cpu的速度是很快的,所以我们感觉不到。

实现线程的两种方式:继承Thread类和实现Runnable接口,本质都是重写run()方法,要调用start()方法,而不是直接调用run()方法。如果调用了run()方法,只是一个普通的方法调用,不会开启新的线程。

public class MyThreadWithExtends extends Thread {
    @Override
    public void run() {
        System.out.println("线程的run方法被调用……");
    }

    public static void main(String[] args) {
        Thread thread = new MyThreadWithExtends();
        thread.start();
    }
}
public class MyThreadWithImpliment implements Runnable {
    @Override
    public void run() {
        System.out.println("线程的run方法被调用……");
    }

    public static void main(String[] args) {
        Thread thread = new Thread(new MyThreadWithImpliment());
        thread.start();
    }
}

四、synchronized同步代码块示例

被包裹在synchronized代码块中的代码,同一时间,只能有一个线程执行这段代码,synchronized后面跟的参数代表把谁锁住。下面的例子可以比作上厕所,必须拿到厕所的门才能上厕所,具体上厕所的方式可能不同,因为是同一把锁,所以synchronized中只能有一个线程在执行。

public class MySynchronized {
    public static void main(String[] args) {
        final MySynchronized mySynchronized = new MySynchronized();
        new Thread() {
            public void run() {
                synchronized (mySynchronized) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("thread1,start");
                }
            }
        }.start();
        new Thread() {
            public void run() {
                synchronized (mySynchronized) {
                    System.out.println("thread2,start");
                }
            }
        }.start();
    }
}

synchronized还可以修饰在方法上,当两个线程都调用这个方法的时候,同一时间只能有一个线程执行这个方法,另一个线程只能等待。

synchronized的缺陷:当一个线程获取了锁,其他线程只能等待线程释放锁,有两种情况:当前线程执行完成自动释放锁,另外一个是当前线程发生了异常,JVM会让线程释放锁。

五、ReentrantLock的方法示例

lock不是Java语言内置的,synchronized是Java语言的关键字,因此是内置性的,lock是一个类,通过这个类可以实现同步访问。

lock和synchronized有一点非常大的不同,synchronized不需要手动释放锁,当synchronized方法结束或者synchronized代码块结束,会自动释放锁的占用,lock必须手动释放,如果没有手动释放,就会出现死锁的情况。

lock是一个接口,它里面有lock()、tryLock()、tryLock(long time, TimeUnit unit)、lockInterruptibly()、unLock()这5个方法,ReentrantLock是唯一实现了Lock接口的类。ReentrantLock是可重入锁的意思。

lock()和tryLock()的使用方法:

package com.wsy;

import java.util.ArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Test {
    private static ArrayList<Integer> arrayList = new ArrayList<Integer>();
    static Lock lock = new ReentrantLock(); // 注意这个地方

    public static void main(String[] args) {
        new Thread() {
            public void run() {
                Thread thread = Thread.currentThread();
                boolean tryLock = lock.tryLock();// 尝试获取锁
                System.out.println(thread.getName() + " " + tryLock);
                if (tryLock) {
                    try {
                        System.out.println(thread.getName() + "得到了锁");
                        for (int i = 0; i < 5; i++) {
                            arrayList.add(i);
                        }
                    } catch (Exception e) {
                    } finally {
                        System.out.println(thread.getName() + "释放了锁");
                        lock.unlock();
                    }
                }
            };
        }.start();
        new Thread() {
            public void run() {
                Thread thread = Thread.currentThread();
                boolean tryLock = lock.tryLock();
                System.out.println(thread.getName() + " " + tryLock);
                if (tryLock) {
                    try {
                        System.out.println(thread.getName() + "得到了锁");
                        for (int i = 0; i < 5; i++) {
                            arrayList.add(i);
                        }
                    } catch (Exception e) {
                    } finally {
                        System.out.println(thread.getName() + "释放了锁");
                        lock.unlock();
                    }
                }

            };
        }.start();
    }
}

lockInterruptibly()的使用方法:

package com.wsy;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MyInterruptibly {
    private Lock lock = new ReentrantLock();

    public static void main(String[] args) {
        MyInterruptibly test = new MyInterruptibly();
        MyThread thread0 = new MyThread(test);
        MyThread thread1 = new MyThread(test);
        thread0.start();
        thread1.start();
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        thread1.interrupt();
        System.out.println("=====================");
    }

    public void insert(Thread thread) throws InterruptedException {
        lock.lockInterruptibly(); // 注意,如果需要正确中断等待锁的线程,必须将获取锁放在外面,然后将InterruptedException抛出
        try {
            System.out.println(thread.getName() + "得到了锁");
            long startTime = System.currentTimeMillis();
            for (;;) {
                if (System.currentTimeMillis() - startTime >= Integer.MAX_VALUE)
                    break;
                // 插入数据
            }
        } finally {
            System.out.println(Thread.currentThread().getName() + "执行finally");
            lock.unlock();
            System.out.println(thread.getName() + "释放了锁");
        }
    }
}

class MyThread extends Thread {
    private MyInterruptibly test = null;

    public MyThread(MyInterruptibly test) {
        this.test = test;
    }

    @Override
    public void run() {
        try {
            test.insert(Thread.currentThread());
        } catch (Exception e) {
            System.out.println(Thread.currentThread().getName() + "被中断");
        }
    }
}

六、Lock和synchronized的一些区别和选择考虑的因素

ReadWriteLock是一个接口,用来处理读写中出现的多线程问题,通常,多个线程同时读是没有问题的,多个线程同时写就需要处理,不能让他们同时写。接口中有两个方法,分别是:readLock()和writeLock(),分别返回read锁和write锁。

使用synchronized来实现:读和写都在一个线程中操作,thread1必须在thread0读写完成后才能读写,导致读的时候不能多个线程同时读取,这是不符合逻辑的,我们只需要写的时候锁定。

package com.wsy;

// 一个线程又要读又要写,用synchronize来实现的话,读写操作都只能锁住后一个线程一个线程地进行
public class MySynchronizedReadWrite {
    public static void main(String[] args) {
        final MySynchronizedReadWrite test = new MySynchronizedReadWrite();
        new Thread() {
            public void run() {
                test.get(Thread.currentThread());
            };
        }.start();
        new Thread() {
            public void run() {
                test.get(Thread.currentThread());
            };
        }.start();
    }

    public synchronized void get(Thread thread) {
        long start = System.currentTimeMillis();
        int i = 0;
        while (System.currentTimeMillis() - start <= 1) {
            i++;
            if (i % 4 == 0) {
                System.out.println(thread.getName() + "正在进行写操作");
            } else {
                System.out.println(thread.getName() + "正在进行读操作");
            }
        }
        System.out.println(thread.getName() + "读写操作完毕");
    }
}

使用ReadWriteLock来实现:多运行几次,可以发现读操作可以交替出现,而写操作一定是单线程进行的。

package com.wsy;

import java.util.concurrent.locks.ReentrantReadWriteLock;
// 使用读写锁,可以实现读写分离锁定,读操作并发进行,写操作锁定单个线程
// 如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。
// 如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。

public class MyReentrantReadWriteLock {
    private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

    public static void main(String[] args) {
        final MyReentrantReadWriteLock test = new MyReentrantReadWriteLock();
        new Thread() {
            public void run() {
                test.get(Thread.currentThread());
                test.write(Thread.currentThread());
            };
        }.start();
        new Thread() {
            public void run() {
                test.get(Thread.currentThread());
                test.write(Thread.currentThread());
            };
        }.start();
    }

    // 读操作,用读锁来锁定
    public void get(Thread thread) {
        rwl.readLock().lock();
        try {
            long start = System.currentTimeMillis();

            while (System.currentTimeMillis() - start <= 1) {
                System.out.println(thread.getName() + "正在进行读操作");
            }
            System.out.println(thread.getName() + "读操作完毕");
        } finally {
            rwl.readLock().unlock();
        }
    }

    // 写操作,用写锁来锁定
    public void write(Thread thread) {
        rwl.writeLock().lock();
        try {
            long start = System.currentTimeMillis();

            while (System.currentTimeMillis() - start <= 1) {
                System.out.println(thread.getName() + "正在进行写操作");
            }
            System.out.println(thread.getName() + "写操作完毕");
        } finally {
            rwl.writeLock().unlock();
        }
    }
}

Lock和synchronized的选择:

  1. Lock是一个接口,synchronized是Java中的关键字。
  2. synchronized在发生异常的情况下,会自动释放锁,因此一般不会导致死锁的情况(互相死锁除外),而Lock在发生异常的情况下,不会自动调用unlock()方法,所以有可能发生死锁。因此在使用Lock的时候,一定要在finally中释放锁。
  3. Lock可以让等待锁的线程中断,synchronized不行,使用synchronized时候,等待的线程会一直等待下去,不能够响应中断。
  4. 通过Lock可以知道有没有成功获取到锁,synchronized做不到。
  5. Lock可以提高多个线程的读操作效率。

从性能上讲,如果竞争不是很激烈,两者的差别不大,如果竞争非常激烈,Lock的性能要远高于synchronized的,因为Lock可以具体情况具体选择。

七、Java并发包中的线程池种类及其特性介绍

在多线程开发的时候,不会使用new Thread的方式,因为一个系统能承受的线程数量是有限的,无限的new下去是肯定不行的,所以这里我们接触到了线程池的概念。线程池固定线程的数量,对于不同的任务,只需要给它不同的Runnable对象即可,就可以执行任务了。

线程池的5种创建方法:

  1. SingleThreadExecutor:只有一个线程的线程池,因此所有的任务都是顺序执行的。
  2. CachedThreadPool:线程池中有很多线程需要同时执行,老的可用线程被新的任务触发重新执行,如果某线程超过60秒没有执行,将被终止并从线程池中删除。
  3. FixedThreadPool:拥有固定线程数量的线程池,如果没有任务执行,线程会一直等待,通常设置线程的数量=cpu的数量效率会比较高,可以使用代码int cpuNums = Runtime.getRuntime().availableProcessors();拿到cpu的核数量。
  4. ScheduledThreadPool:用来调度即将执行的任务的线程池。
  5. SingleThreadScheduledPool:只有一个线程,用来调度任务在指定时间执行。

八、并发包中各种线程池的用法及future获取任务返回结果的机制

线程池之Runnable的使用方法:

package com.wsy;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolWithRunable {
    // 通过线程池执行线程
    public static void main(String[] args) {
        // 创建一个线程池
        ExecutorService pool = Executors.newCachedThreadPool();
        for (int i = 1; i < 5; i++) {
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("thread name: " + Thread.currentThread().getName());
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        pool.shutdown();
    }
}

线程池之Callable的使用方法:

package com.wsy;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

// callable 跟runnable的区别:
// runnable的run方法不会有任何返回结果,所以主线程无法获得任务线程的返回值
// callable的call方法可以返回结果,但是主线程在获取时是被阻塞,需要等待任务线程返回才能拿到结果

public class ThreadPoolWithcallable {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService pool = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 10; i++) {
            Future<String> submit = pool.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    System.out.println("a");
                    Thread.sleep(5000);
                    return "b--" + Thread.currentThread().getName();
                }
            });
            // 从Future中get结果,这个方法是会被阻塞的,一直要等到线程任务返回结果
            System.out.println(submit.get());
        }
        pool.shutdown();
    }
}

运行下面的代码之后,可以看到有些线程的运行结果可能并不能立刻返回,但是最终还是能通过句柄拿到返回结果。代码是使用了fixedPool来提交线程,还可以使用schedulerPool来提交线程,将下面相关的注释放开即可。

package com.wsy;

import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

public class TestPool {
    public static void main(String[] args) throws Exception {
        Future<?> submit = null;
        Random random = new Random();
        // 创建固定数量线程池
        ExecutorService exec = Executors.newFixedThreadPool(4);
        // 创建调度线程池
        // ScheduledExecutorService exec = Executors.newScheduledThreadPool(4);
        // 用来记录各线程的返回结果
        ArrayList<Future<?>> results = new ArrayList<Future<?>>();
        for (int i = 0; i < 10; i++) {
            // fixedPool提交线程,runnable无返回值,callable有返回值
            // submit = exec.submit(new TaskRunnable(i));
            // submit = exec.submit(new TaskCallable(i));
            // 对于schedulerPool来说,调用submit提交任务时,跟普通pool效果一致
            submit = exec.submit(new TaskCallable1(i));
            // 对于schedulerPool来说,调用schedule提交任务时,则可按延迟,按间隔时长来调度线程的运行
            // submit = exec.schedule(new TaskCallable(i), random.nextInt(10), TimeUnit.SECONDS);
            // 存储线程执行结果
            // submit其实是一个句柄,放到results里面的不是真正的结果,因为有可能add的时候还没有获取到值,放进去句柄,当后面需要获取的时候,通过句柄拿值
            // 如果前面的线程阻塞或者运算量大会延迟一小会儿,那么整体的运行是不会延迟的
            results.add(submit);
        }
        // 打印结果
        for (Future f : results) {
            boolean done = f.isDone();
            System.out.println(done ? "已完成" : "未完成");
            // 从结果的打印顺序可以看到,即使未完成,也会阻塞等待
            System.out.println("线程返回future结果: " + f.get());
        }
        exec.shutdown();
    }
}

class TaskCallable implements Callable<String> {
    private int s;
    Random r = new Random();

    // Callable可以传参数,可以有返回值
    public TaskCallable(int s) {
        this.s = s;
    }

    @Override
    public String call() throws Exception {
        String name = Thread.currentThread().getName();
        long currentTimeMillis = System.currentTimeMillis();
        System.out.println(name + " 启动时间:" + currentTimeMillis / 1000);
        int rint = r.nextInt(3);
        try {
            Thread.sleep(rint * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(name + " is working..." + s);
        return s + "";
    }
}

九、BlockingQueue的功能和使用示例

BlockingQueue主要是用来控制线程同步的工具。其中put和take是一对阻塞存取,add和poll的一对非阻塞存取。

插入:

add(Object object):把object添加到BlockingQueue中,如果BlockingQueue可以容纳,返回true,否则抛异常。

offer(Object object):如果可能的话,将object添加到BlockingQueue中,可以容纳返回true,不能容纳返回false。

put(Object object):把object放到BlockingQueue中,如果此时空间满了,线程被阻塞,等待空间释放后,再放进去。

读取:

poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,等待time长时间,还是取不到就返回false。

take():取走BlockingQueue里排在首位的对象,如果BlockingQueue中没有对象,那么线程会阻塞,直到BlockingQueue中有对象后为止。

其他:

int remainingCapacity():返回队列剩余的容量,在队列插入和取出的时候,获取数据可能不准确。

boolean remove(Object object):从队列移除元素,如果存在,移除一个或者多个,队列改变了返回true。

boolean contains(Object object):查看队列中是否包含元素object,存在返回true,否则返回false。

int drainTo(Collection<? super E> c):移除队列中所有可用元素,并将它们添加到给定的collection中。

int drainTo(Collection<? super E> c, int maxElements):和上面方法不同的就是指定了移除的数量。

BlockingQueue是一个接口,上面这些方法是在接口中定义的,它有4个实现类,常用的实现类有两个,分别是ArrayBlockingQueue和LinkedBlockingQueue。

ArrayBlockingQueue:一个数组支持的有界阻塞队列,规定大小的BlockingQueue,构造函数中必须有一个int参数。

LinkedBlockingQueue:大小不定的BlockingQueue,如果构造函数时候传了一个int参数,那么这个队列也是有大小的,如果不传,那么大小是由Integer.MAX_VALUE来决定。

生产者: 

package com.wsy;

import java.util.Random;
import java.util.concurrent.BlockingQueue;

public class TestBlockingQueueProducer implements Runnable {
    BlockingQueue<String> queue;
    Random random = new Random();

    public TestBlockingQueueProducer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(random.nextInt(10));
            String task = Thread.currentThread().getName();
            System.out.println(task + " put a product");
            queue.put(task);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

消费者

package com.wsy;

import java.util.Random;
import java.util.concurrent.BlockingQueue;

public class TestBlockingQueueConsumer implements Runnable {
    BlockingQueue<String> queue;
    Random random = new Random();

    public TestBlockingQueueConsumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(random.nextInt(10));
            System.out.println(Thread.currentThread().getName() + " trying...");
            String temp = queue.take();// 如果队列为空,会阻塞当前线程
            System.out.println(Thread.currentThread().getName() + " get " + temp);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

测试程序

package com.wsy;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class TestBlockingQueue {
    public static void main(String[] args) {
        // 不设置的话,LinkedBlockingQueue默认大小为Integer.MAX_VALUE
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);
        // BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
        TestBlockingQueueConsumer consumer = new TestBlockingQueueConsumer(queue);
        TestBlockingQueueProducer producer = new TestBlockingQueueProducer(queue);
        for (int i = 0; i < 3; i++) {
            new Thread(producer, "Producer" + (i + 1)).start();
        }
        for (int i = 0; i < 5; i++) {
            new Thread(consumer, "Consumer" + (i + 1)).start();
        }
        new Thread(producer, "Producer" + (4)).start();
    }
}

通过上面的程序的运行现象可以看到,queue中有2个位置,当生产者生产的数量大于2的时候,就不能向队列中添加元素了,此时会阻塞,当消费者消费的时候,如果队列中是空的,那么也会阻塞,直到队列中有元素进来,或者程序被中断。

十、volatile的工作机制代码测试

先来看一个程序,猜猜它的运行结果是多少?10000?试试看吧!

package com.wsy;

public class VolatileTest {
    public static volatile int num = 0;

    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 100; j++) {
                        num++;
                    }
                }
            }).start();
        }
        try {
            // 保证让这上面的线程都运行完成
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(num);
    }
}

发现运行结果是小于10000的,多运行几次再试试,还是小于10000的,因此,可以得出结论,volatile不能保证线程安全。问题就出在num++上了,因为num++不是一个原子性操作,num++是要分三步操作的,分别是读、改、写。将主内存中的num读取出来到工作内存,将工作内存中的数据自增1,将自增后的结果写到主内存中去。

一开始的时候,volatile修饰的变量num是主内存中存放着的,这时候,启动了100个线程,这些线程都去获取这个num变量,并把num的值从主内存拷贝到工作内存中,执行num++操作,将修改后的值传递到主内存中,在这3个步骤的间隙,如果正好有线程访问num变量的值,那么这个线程获取到的值就是更改之前的值,这个时候就出问题了,于是就会少一个++操作了。

要想保证线程安全,还是需要锁,因为锁可以保证互斥性和可见性,而volatile只能保证可见性。既然volatile不能保证线程安全,那岂不是很鸡肋?它的应用场景是怎么样的呢?

一个线程写,多个线程读,如果有多个线程写,那么就会像上面那样,出现错误数据,volatile能保证读的时候是最新的值。synchronized和volatile可以实现较低开销的读写锁。

概括来说就是:对变量的写操作不依赖于当前值;该变量没有包含在具有其他变量的不变式中。

十一、关于并发编程的一些总结

  • 不使用线程池的缺点

并发线程很高的时候,就会大量的新建线程,开销是非常大的,资源的消耗量也很大,稳定性自然就降下来了。

  • 指定执行策略

任务以什么顺序执行;有多少个任务并发执行;要多少个任务进入等待执行队列;系统过载,应该放弃哪些任务,如何通知到应用程序;一个任务的执行前后应该做什么处理。

  • 线程池的类型

FixedThreadPool、CachedThreadPool、SingleThreadExecutor、ScheduledThreadPool。

  • 线程池的饱和策略

除了CachedThreadPool之外,其他的线程池,当线程池满了之后,可以设置拒绝策略,比如可以设置ThreadPoolExecutor.setRejectExecutionHandler()方法设置一个拒绝任务的策略。

  • 线程无依赖性

如果线程与线程之间有依赖性,有可能造成死锁或饥饿;如果调用者监视其他线程的完成情况,会影响并发量。

十二、ActiveMQ

JMS即Java消息服务(Java Message Service)。应用程序接口是一个Java平台中关于面向消息中间件的API,用于两个应用程序之间,或分布系统之间发布消息,进行异步通信。Java消息服务是一个与平台无关的API。

JMS是一种与厂商无关的API,用来访问消息收发系统消息,类似于JDBC可以使用相同的API来访问不同的数据库,JMS提供同样与厂商无关的访问方法,以访问消息收发服务。

JMS的两种模型:

点对点或队列模型:一个生产者向一个特定的队列发布消息,一个消费者从队列中取出消息。生产者不需要在消费者消费该消息期间处于运行状态,消费者也不需要在消息发送时处于运行状态。

发布者/订阅者模型:支持向一个特定的消息主题发布消息。0个或多个订阅者可能对接受来自特定消息主题的消息感兴趣。在这种模式下,发布者和订阅者彼此不知道对方,这种模式好比匿名公告板。发布者和订阅者之间存在时间依赖性,发布者需要建立一个订阅,方便用户能够订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅,在那种情况下,在订阅者未连接的时候发布的消息将在订阅者重新连接时重新发布。

演示ActiveMQ

  1. ActiveMQ官网下载ActiveMQ到本地并解压(这里演示Windows版本的,Linux操作差不多)。
  2. 修改activemq.xml配置文件,将transportConnectors中的0.0.0.0改为localhost。
  3. 双击activemq.bat运行ActiveMQ。
  4. 浏览器登录http://localhost:8161/admin/,用户名和密码都是admin。
package com.wsy;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ProducerTest {
    public static void main(String[] args) throws JMSException, Exception {
        ProducerTool producer = new ProducerTool();
        producer.produceMessage("Hello, world!");
        producer.close();
    }
}

class ProducerTool {
    private String user = ActiveMQConnection.DEFAULT_USER;
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;
    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    private String subject = "myqueue";
    private Destination destination = null;
    private Connection connection = null;
    private Session session = null;
    private MessageProducer producer = null;

    // 初始化
    private void initialize() throws JMSException, Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
        connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = session.createQueue(subject);
        producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    }

    // 发送消息
    public void produceMessage(String message) throws JMSException, Exception {
        initialize();
        TextMessage msg = session.createTextMessage(message);
        connection.start();
        System.out.println("Producer:->Sending message: " + message);
        producer.send(msg);
        System.out.println("Producer:->Message sent complete!");
    }

    // 关闭连接
    public void close() throws JMSException {
        System.out.println("Producer:->Closing connection");
        if (producer != null) {
            producer.close();
        }
        if (session != null) {
            session.close();
        }
        if (connection != null) {
            connection.close();
        }
    }
}

运行main方法,控制台输出如下内容,表示producer已经生产了一个message并发送到了队列中,然后关闭连接。

Producer:->Sending message: Hello, world!
Producer:->Message sent complete!
Producer:->Closing connection

我们去浏览器点击queue,可以看到有一个我们刚刚自己定义的queue,命名为myqueue这一行里面对应的数字就是队列中消息数量。再次运行Customer,console中可以收到刚刚producer发出的消息,再去浏览器查看,发现Queue的内容也发生了变动。

package com.wsy;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ConsumerTest implements Runnable {
    static Thread t1 = null;

    public static void main(String[] args) throws InterruptedException {
        t1 = new Thread(new ConsumerTest());
        t1.start();
        while (true) {
            if (!t1.isAlive()) {
                t1 = new Thread(new ConsumerTest());
                t1.start();
                System.out.println("重新启动");
            }
            Thread.sleep(5000);
        }
    }

    public void run() {
        try {
            ConsumerTool consumer = new ConsumerTool();
            consumer.consumeMessage();
            while (ConsumerTool.isconnection) {
                ;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

class ConsumerTool implements MessageListener, ExceptionListener {
    private String user = ActiveMQConnection.DEFAULT_USER;
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;
    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    private String subject = "myqueue";
    private Destination destination = null;
    private Connection connection = null;
    private Session session = null;
    private MessageConsumer consumer = null;
    private ActiveMQConnectionFactory connectionFactory = null;
    public static Boolean isconnection = false;

    // 初始化
    private void initialize() throws JMSException {
        connectionFactory = new ActiveMQConnectionFactory(user, password, url);
        connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = session.createQueue(subject);
        consumer = session.createConsumer(destination);
    }

    // 消费消息
    public void consumeMessage() throws JMSException {
        initialize();
        connection.start();
        consumer.setMessageListener(this);
        connection.setExceptionListener(this);
        System.out.println("Consumer:->Begin listening...");
        isconnection = true;
        // 开始监听
        Message message = consumer.receive();
        System.out.println(message.getJMSMessageID());
    }

    // 关闭连接
    public void close() throws JMSException {
        System.out.println("Consumer:->Closing connection");
        if (consumer != null) {
            consumer.close();
        }
        if (session != null) {
            session.close();
        }
        if (connection != null) {
            connection.close();
        }
    }

    // 消息处理函数
    public void onMessage(Message message) {
        try {
            if (message instanceof TextMessage) {
                TextMessage txtMsg = (TextMessage) message;
                String msg = txtMsg.getText();
                System.out.println("Consumer:->Received: " + msg);
            } else {
                System.out.println("Consumer:->Received: " + message);
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void onException(JMSException exception) {
        isconnection = false;
    }
}

同样再来看看topic,先运行producer,后运行consumer,在浏览器看到的效果类似。

package com.wsy;

import java.util.Random;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ProducerTest {
    public static void main(String[] args) throws JMSException, Exception {
        ProducerTool producer = new ProducerTool();
        Random random = new Random();
        for (int i = 0; i < 20; i++) {
            Thread.sleep(random.nextInt(10) * 1000);
            producer.produceMessage("Hello, world!--" + i);
            producer.close();
        }
    }
}

class ProducerTool {
    private String user = ActiveMQConnection.DEFAULT_USER;
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;
    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    private String subject = "mytopic";
    private Destination destination = null;
    private Connection connection = null;
    private Session session = null;
    private MessageProducer producer = null;

    // 初始化
    private void initialize() throws JMSException, Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
        connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = session.createTopic(subject);
        producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    }

    // 发送消息
    public void produceMessage(String message) throws JMSException, Exception {
        initialize();
        TextMessage msg = session.createTextMessage(message);
        connection.start();
        System.out.println("Producer:->Sending message: " + message);
        producer.send(msg);
        System.out.println("Producer:->Message sent complete!");
    }

    // 关闭连接
    public void close() throws JMSException {
        System.out.println("Producer:->Closing connection");
        if (producer != null) {
            producer.close();
        }
        if (session != null) {
            session.close();
        }
        if (connection != null) {
            connection.close();
        }
    }
}
package com.wsy;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ConsumerTest implements Runnable {
    static Thread t1 = null;

    public static void main(String[] args) throws InterruptedException {
        t1 = new Thread(new ConsumerTest());
        t1.setDaemon(false);
        t1.start();
        // 如果发生异常,则重启consumer
        while (true) {
            if (!t1.isAlive()) {
                t1 = new Thread(new ConsumerTest());
                t1.start();
                System.out.println("重新启动");
            }
            Thread.sleep(5000);
        }
    }

    public void run() {
        try {
            ConsumerTool consumer = new ConsumerTool();
            consumer.consumeMessage();
            while (ConsumerTool.isconnection) {
                ;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

class ConsumerTool implements MessageListener, ExceptionListener {
    private String user = ActiveMQConnection.DEFAULT_USER;
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;
    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    private String subject = "mytopic";
    private Destination destination = null;
    private Connection connection = null;
    private Session session = null;
    private MessageConsumer consumer = null;
    public static Boolean isconnection = false;

    // 初始化
    private void initialize() throws JMSException, Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
        connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = session.createTopic(subject);
        consumer = session.createConsumer(destination);
    }

    // 消费消息
    public void consumeMessage() throws JMSException, Exception {
        initialize();
        connection.start();
        consumer.setMessageListener(this);
        connection.setExceptionListener(this);
        isconnection = true;
        System.out.println("Consumer:->Begin listening...");
        // 开始监听
        // Message message = consumer.receive();
    }

    // 关闭连接
    public void close() throws JMSException {
        System.out.println("Consumer:->Closing connection");
        if (consumer != null) {
            consumer.close();
        }
        if (session != null) {
            session.close();
        }
        if (connection != null) {
            connection.close();
        }
    }

    // 消息处理函数
    public void onMessage(Message message) {
        try {
            if (message instanceof TextMessage) {
                TextMessage txtMsg = (TextMessage) message;
                String msg = txtMsg.getText();
                System.out.println("Consumer:->Received: " + msg);
            } else {
                System.out.println("Consumer:->Received: " + message);
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void onException(JMSException arg0) {
        isconnection = false;
    }
}

既然说了这么多,topic和queue的区别是什么呢?

Queue

点对点模式,不可用重复消费。当生产者生产了消息之后,消息被存储在queue中,消费者消费的时候,从queue中获取,已经消费的消息,被从queue中剔除,不能再次消费。当没有消费者的时候,queue一直保存消息,直到有消费者来消费。消息不是自动推送给消费者的,而是消费者从队列中获取的。

Topic

发布/订阅模式,可以重复消费。生产者生产了消息,发布到topic中,会有多个消费者(订阅)消费该消息。发布到topic中的消息会被消费者中的所有订阅者消费。这里的消费者分为两种,一种是长期订阅,一种类是普通订阅,一条消息发送到topic后,对于普通订阅的,如果此时处于非活跃状态,那么就错过了消息,对于长期订阅的,即使此时没有处于活跃状态,当它处于活跃状态时,它还会继续受到消息的。

十三、Java的反射实现API

使用反射可以动态加载类,调用私有的方法或者属性,动态的调用方法,可以说,反射将字符串作为输入参数,来获取类对象,或者执行对象的某些方法。

package com.wsy;

import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;

import org.junit.Before;
import org.junit.Test;

public class MyReflect {
    public String className = null;
    public Class personClass = null;

    // 反射Person类
    @Before
    public void init() throws Exception {
        className = "com.wsy.Person";
        personClass = Class.forName(className);
    }

    // 获取某个class文件对象
    @Test
    public void getClassName() throws Exception {
        System.out.println(personClass);
    }

    // 获取某个class文件对象的另一种方式
    @Test
    public void getClassName2() throws Exception {
        System.out.println(Person.class);
    }

    // 创建一个class文件表示的实例对象,底层会调用空参数的构造方法
    @Test
    public void getNewInstance() throws Exception {
        System.out.println(personClass.newInstance());
    }

    // 获取非私有的构造函数
    @Test
    public void getPublicConstructor() throws Exception {
        Constructor constructor = personClass.getConstructor(Long.class, String.class);
        Person person = (Person) constructor.newInstance(100L, "zhangsan");
        System.out.println(person.getId());
        System.out.println(person.getName());
    }

    // 获得私有的构造函数
    @Test
    public void getPrivateConstructor() throws Exception {
        Constructor constructor = personClass.getDeclaredConstructor(String.class);
        constructor.setAccessible(true);// 强制取消Java的权限检测
        Person person2 = (Person) constructor.newInstance("zhangsan");
        System.out.println("**" + person2.getName());
    }

    // 访问非私有的成员变量
    @Test
    public void getNotPrivateField() throws Exception {
        Constructor constructor = personClass.getConstructor(Long.class, String.class);
        Object obj = constructor.newInstance(100L, "zhangsan");
        Field field = personClass.getField("name");
        field.set(obj, "lisi");
        System.out.println(field.get(obj));
    }

    // 访问私有的成员变量
    @Test
    public void getPrivateField() throws Exception {
        Constructor constructor = personClass.getConstructor(Long.class);
        Object obj = constructor.newInstance(100L);
        Field field2 = personClass.getDeclaredField("id");
        field2.setAccessible(true);// 强制取消Java的权限检测
        field2.set(obj, 10000L);
        System.out.println(field2.get(obj));
    }

    // 获取非私有的成员函数
    @Test
    public void getNotPrivateMethod() throws Exception {
        System.out.println(personClass.getMethod("toString"));
        Object obj = personClass.newInstance();// 获取空参的构造函数
        Method toStringMethod = personClass.getMethod("toString");
        Object object = toStringMethod.invoke(obj);
        System.out.println(object);
    }

    // 获取私有的成员函数
    @Test
    public void getPrivateMethod() throws Exception {
        Object obj = personClass.newInstance();// 获取空参的构造函数
        Method method = personClass.getDeclaredMethod("getSomeThing");
        method.setAccessible(true);
        Object value = method.invoke(obj);
        System.out.println(value);
    }

    @Test
    public void otherMethod() throws Exception {
        // 当前加载这个class文件的那个类加载器对象
        System.out.println(personClass.getClassLoader());
        // 获取某个类实现的所有接口
        Class[] interfaces = personClass.getInterfaces();
        for (Class class1 : interfaces) {
            System.out.println(class1);
        }
        // 反射当前这个类的直接父类
        System.out.println(personClass.getGenericSuperclass());
        // getResourceAsStream这个方法可以获取到一个输入流,这个输入流会关联到name所表示的那个文件上。
        // 不以‘/’开头时默认是从此类所在的包下取资源,以‘/’开头则是从ClassPath根下获取。其只是通过path构造一个绝对路径,最终还是由ClassLoader获取资源。
        System.out.println(personClass.getResourceAsStream("/log4j.properties"));
        System.out.println(personClass.getResourceAsStream("log4j.properties"));
        // 判断当前的Class对象表示是否是数组
        System.out.println(personClass.isArray());
        System.out.println(new String[3].getClass().isArray());
        // 判断当前的Class对象表示是否是枚举类
        System.out.println(personClass.isEnum());
        System.out.println(Class.forName("com.wsy.City").isEnum());
        // 判断当前的Class对象表示是否是接口
        System.out.println(personClass.isInterface());
        System.out.println(Class.forName("com.wsy.TestInterface").isInterface());
    }
}

class Person implements Serializable, TestInterface {
    private Long id;
    public String name;

    public Person() {
        this.id = 100L;
        this.name = "afsdfasd";
    }

    public Person(Long id, String name) {
        this.id = id;
        this.name = name;
    }

    public Person(Long id) {
        super();
        this.id = id;
    }

    private Person(String name) {
        super();
        this.name = name + "=======";
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String toString() {
        return "Person [id=" + id + ", name=" + name + "]";
    }

    private String getSomeThing() {
        return "sdsadasdsasd";
    }

    private void testPrivate() {
        System.out.println("this is a private method");
    }
}

enum City {
}

interface TestInterface {
}

十四、动态代理的工作机制

说到动态代理,先举个例子,原来有一个方法用来返回商品的价格,现在需求变动了,增加了优惠券,所以返回的价格就需要在原来的基础上进行改动,不过这个方法在别的系统里面也用到了,不能直接改这个方法的代码,所以我们可以使用代理的方法解决,在调用这个方法的前面或者后面加上自己的逻辑,就可以实现优惠券的逻辑,而不改动原来的代码,这不就是开闭原则嘛,对扩展开放,对修改关闭,增强原来的方法。

动态代理的流程:

  1. 书写代理类和代理方法,在代理方法中实现代理Proxy.newProxyInstance。
  2. 代理中需要的参数分别是:被代理类的类加载器,被代理类的所有实现接口,句柄方法。
  3. 在句柄方法中有一个invoke()方法需要被复写,invoke()方法有3个参数:被代理的对象,被代理的方法,被代理方法需要的参数,这invoke()方法中,我们就可以对被代理方法进行增强。
  4. 获取代理类,强转成被代理的接口。
  5. 最后,可以像没被代理一样,调用接口的任何方法,方法被调用后,方法名和参数被传入代理类的invoke()方法中, 进行新业务的逻辑流程。

十五、动态代理的demo代码

原业务接口

package com.wsy.service;

// 这是一个业务的接口,这个接口中的业务就是返回衣服的价格
public interface IBoss {
    int yifu(String size);
}

原业务实现类

package com.wsy.service.impl;

import com.wsy.service.IBoss;

// 实现了卖衣服的接口 自定义了自己的业务,卖裤子
public class Boss implements IBoss {
    public int yifu(String size) {
        System.err.println("天猫小强旗舰店,老板给客户发快递----衣服型号:" + size);
        // 这件衣服的价钱,从数据库读取
        return 50;
    }

    public void kuzi() {
        System.err.println("天猫小强旗舰店,老板给客户发快递----裤子");
    }
}

原业务调用

package com.wsy.action;

import org.junit.Test;

import com.wsy.service.IBoss;
import com.wsy.service.impl.Boss;

public class SaleAction {
    // 不使用代理,直接调用方法 方法中规定什么业务,就只能调用什么业务,规定什么返回值,就只能输出什么返回值
    @Test
    public void saleByBossSelf() throws Exception {
        IBoss boss = new Boss();
        System.out.println("老板自营!");
        int money = boss.yifu("xxl");// 老板自己卖衣服,不需要客服,结果就是没有聊天记录
        System.out.println("衣服成交价:" + money);
    }
}

代理类

package com.wsy.proxyclass;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

public class ProxyBoss {
    // 对接口方法进行代理
    public static <T> T getProxy(final int discountCoupon, final Class<?> interfaceClass, final Class<?> implementsClass) throws Exception {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[] { interfaceClass }, new InvocationHandler() {
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Integer returnValue = (Integer) method.invoke(implementsClass.newInstance(), args);// 调用原始对象以后返回的值
                return returnValue - discountCoupon;
            }
        });
    }
}

新业务调用

package com.wsy.action;

import org.junit.Test;

import com.wsy.proxyclass.ProxyBoss;
import com.wsy.service.IBoss;
import com.wsy.service.impl.Boss;

// 什么是动态代理? 简单的写一个模板接口,剩下的个性化工作,好给动态代理来完成!
public class ProxySaleAction {
    // 使用代理,在这个代理中,只代理了Boss的yifu方法 定制化业务,可以改变原接口的参数、返回值等
    @Test
    public void saleByProxy() throws Exception {
        IBoss boss = ProxyBoss.getProxy(10, IBoss.class, Boss.class);// 将代理的方法实例化成接口
        // IBoss boss = new Boss();// 将代理的方法实例化成接口
        System.out.println("代理经营!");
        int money = boss.yifu("xxl");// 调用接口的方法,实际上调用方式没有变
        System.out.println("衣服成交价:" + money);
    }
}

其实不要被代理类的写法吓坏,它的写法是比较死的,需要传什么参数就传什么参数,只是表面上看着比较陌生而已。看顺眼了就不觉得困难了。

十六、利用socket来进行远程过程调用

在大数据开发中,通常是部署集群的方式,所以多台机器之间访问的时候,需要使用socket通信,代码在下面,先运行服务端,后运行客户端,可以看到服务端始终在运行,客户端发送socket请求后,立刻接收到了服务端方法调用返回的数据。

业务接口

package com.wsy;

public interface IBusiness {
    public int getPrice(String param);
}

业务实现类

package com.wsy;

public class TestBusiness implements IBusiness {
    @Override
    public int getPrice(String param) {
        return param.equals("yifu") ? 10 : 20;
    }
}

服务端

package com.wsy;

import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;

public class TestServer {
    public static void main(String[] args) throws Exception {
        // 创建socket并绑定IP和端口号
        ServerSocket serverSocket = new ServerSocket();
        serverSocket.bind(new InetSocketAddress("localhost", 9898));
        // 因为是服务端,所以需要一直在接受请求,这里使用while(true)来处理
        while (true) {
            // accept()方法是一个阻塞方法,等待请求,如果一直没有请求发过来,就一直阻塞着
            Socket socket = serverSocket.accept();
            // 因为业务逻辑可能处理的时间比较长,为了不影响后面的请求处理,这里使用线程的方法来处理请求,可以改成线程池,这里简写一下
            new Thread(new TestServerTask(socket)).start();
        }
    }
}

socket处理线程

package com.wsy;

import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.lang.reflect.Method;
import java.net.Socket;

public class TestServerTask implements Runnable {
    private Socket socket;

    public TestServerTask(Socket socket) {
        this.socket = socket;
    }

    // 接收客户端发送过来的socket信息,处理并返回
    @Override
    public void run() {
        InputStream inputStream = null;
        OutputStream outputStream = null;
        BufferedReader bufferedReader = null;
        PrintWriter printWriter = null;
        try {
            inputStream = socket.getInputStream();
            outputStream = socket.getOutputStream();
            // 读取输入流中的内容,并为下一步的反射做处理
            bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
            String request = bufferedReader.readLine();
            String[] split = request.split(":");
            String classname = split[0];
            String methodName = split[1];
            String methodParam = split[2];
            // 通过反射的方式生成类对象,并调用方法
            Class<?> className = Class.forName(classname);
            System.out.println("calling class: " + className);
            Object newInstance = className.newInstance();
            Method method = className.getMethod(methodName, String.class);
            System.out.println("calling method: " + method);
            Object invoke = method.invoke(newInstance, methodParam);
            System.out.println("results: " + (int) invoke);
            // 将执行结果通过流返回给客户端
            printWriter = new PrintWriter(new BufferedOutputStream(outputStream));
            printWriter.println((int) invoke);
            printWriter.flush();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭资源
            try {
                bufferedReader.close();
                printWriter.close();
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

客户端

package com.wsy;

import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;

public class TestClient {
    public static void main(String[] args) throws Exception {
        // 确定socket的IP和端口用来发送请求
        Socket socket = new Socket("localhost", 9898);
        OutputStream outputStream = socket.getOutputStream();
        InputStream inputStream = socket.getInputStream();
        // 通过socket发送请求参数到服务端
        PrintWriter printWriter = new PrintWriter(new BufferedOutputStream(outputStream));
        printWriter.println("com.wsy.TestBusiness:getPrice:yifu");
        printWriter.flush();
        // 接收服务端处理后的socket结果
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
        String readLine = bufferedReader.readLine();
        System.out.println("client get result: " + readLine);
        socket.close();
    }
}