欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

基于Java回顾之多线程同步的使用详解

程序员文章站 2024-03-31 10:07:58
首先阐述什么是同步,不同步有什么问题,然后讨论可以采取哪些措施控制同步,接下来我们会仿照回顾网络通信时那样,构建一个服务器端的“线程池”,jdk为我们提供了一个很大的con...

首先阐述什么是同步,不同步有什么问题,然后讨论可以采取哪些措施控制同步,接下来我们会仿照回顾网络通信时那样,构建一个服务器端的“线程池”,jdk为我们提供了一个很大的concurrent工具包,最后我们会对里面的内容进行探索。

为什么要线程同步?

说到线程同步,大部分情况下, 我们是在针对“单对象多线程”的情况进行讨论,一般会将其分成两部分,一部分是关于“共享变量”,一部分关于“执行步骤”。

共享变量

当我们在线程对象(runnable)中定义了全局变量,run方法会修改该变量时,如果有多个线程同时使用该线程对象,那么就会造成全局变量的值被同时修改,造成错误。我们来看下面的代码:

复制代码 代码如下:

共享变量造成同步问题
 class myrunner implements runnable
 {
     public int sum = 0;

     public void run()
     {
         system.out.println(thread.currentthread().getname() + " start.");
         for (int i = 1; i <= 100; i++)
         {
             sum += i;
         }
         try {
             thread.sleep(500);
         } catch (interruptedexception e) {
             e.printstacktrace();
         }
         system.out.println(thread.currentthread().getname() + " --- the value of sum is " + sum);
         system.out.println(thread.currentthread().getname() + " end.");
     }
 }

 
 private static void sharedvaribletest() throws interruptedexception
 {
     myrunner runner = new myrunner();
     thread thread1 = new thread(runner);
     thread thread2 = new thread(runner);
     thread1.setdaemon(true);
     thread2.setdaemon(true);
     thread1.start();
     thread2.start();
     thread1.join();
     thread2.join();
 }

这个示例中,线程用来计算1到100的和是多少,我们知道正确结果是5050(好像是高斯小时候玩过这个?),但是上述程序返回的结果是10100,原因是两个线程同时对sum进行操作。

执行步骤

我们在多个线程运行时,可能需要某些操作合在一起作为“原子操作”,即在这些操作可以看做是“单线程”的,例如我们可能希望输出结果的样子是这样的:

复制代码 代码如下:

线程1:步骤1
 线程1:步骤2
 线程1:步骤3
 线程2:步骤1
 线程2:步骤2
 线程2:步骤3

如果同步控制不好,出来的样子可能是这样的:
复制代码 代码如下:

线程1:步骤1
线程2:步骤1
线程1:步骤2
线程2:步骤2
线程1:步骤3
线程2:步骤3

这里我们也给出一个示例代码:
复制代码 代码如下:

执行步骤混乱带来的同步问题
 class mynonsyncrunner implements runnable
 {
     public void run() {
         system.out.println(thread.currentthread().getname() + " start.");
         for(int i = 1; i <= 5; i++)
         {
             system.out.println(thread.currentthread().getname() + " running step " + i);
             try
             {
                 thread.sleep(50);
             }
             catch(interruptedexception ex)
             {
                 ex.printstacktrace();
             }
         }
         system.out.println(thread.currentthread().getname() + " end.");
     }
 }

 
 private static void synctest() throws interruptedexception
 {
     mynonsyncrunner runner = new mynonsyncrunner();
     thread thread1 = new thread(runner);
     thread thread2 = new thread(runner);
     thread1.setdaemon(true);
     thread2.setdaemon(true);
     thread1.start();
     thread2.start();
     thread1.join();
     thread2.join();
 }

如何控制线程同步

既然线程同步有上述问题,那么我们应该如何去解决呢?针对不同原因造成的同步问题,我们可以采取不同的策略。

控制共享变量

我们可以采取3种方式来控制共享变量。

将“单对象多线程”修改成“多对象多线程”

上文提及,同步问题一般发生在“单对象多线程”的场景中,那么最简单的处理方式就是将运行模型修改成“多对象多线程”的样子,针对上面示例中的同步问题,修改后的代码如下:

复制代码 代码如下:

解决共享变量问题方案一
 private static void sharedvaribletest2() throws interruptedexception
 {
     thread thread1 = new thread(new myrunner());
     thread thread2 = new thread(new myrunner());
     thread1.setdaemon(true);
     thread2.setdaemon(true);
     thread1.start();
     thread2.start();
     thread1.join();
     thread2.join();
 }

我们可以看到,上述代码中两个线程使用了两个不同的runnable实例,它们在运行过程中,就不会去访问同一个全局变量。
将“全局变量”降级为“局部变量”

既然是共享变量造成的问题,那么我们可以将共享变量改为“不共享”,即将其修改为局部变量。这样也可以解决问题,同样针对上面的示例,这种解决方式的代码如下:

复制代码 代码如下:

解决共享变量问题方案二
 class myrunner2 implements runnable
 {
     public void run()
     {
         system.out.println(thread.currentthread().getname() + " start.");
         int sum = 0;
         for (int i = 1; i <= 100; i++)
         {
             sum += i;
         }
         try {
             thread.sleep(500);
         } catch (interruptedexception e) {
             e.printstacktrace();
         }
         system.out.println(thread.currentthread().getname() + " --- the value of sum is " + sum);
         system.out.println(thread.currentthread().getname() + " end.");
     }
 }

 
 private static void sharedvaribletest3() throws interruptedexception
 {
     myrunner2 runner = new myrunner2();
     thread thread1 = new thread(runner);
     thread thread2 = new thread(runner);
     thread1.setdaemon(true);
     thread2.setdaemon(true);
     thread1.start();
     thread2.start();
     thread1.join();
     thread2.join();
 }

我们可以看出,sum变量已经由全局变量变为run方法内部的局部变量了。
使用threadlocal机制

threadlocal是jdk引入的一种机制,它用于解决线程间共享变量,使用threadlocal声明的变量,即使在线程中属于全局变量,针对每个线程来讲,这个变量也是独立的。

我们可以用这种方式来改造上面的代码,如下所示:

复制代码 代码如下:

解决共享变量问题方案三
 class myrunner3 implements runnable
 {
     public threadlocal<integer> tl = new threadlocal<integer>();

     public void run()
     {
         system.out.println(thread.currentthread().getname() + " start.");
         for (int i = 0; i <= 100; i++)
         {
             if (tl.get() == null)
             {
                 tl.set(new integer(0));
             }
             int sum = ((integer)tl.get()).intvalue();
             sum+= i;
             tl.set(new integer(sum));
             try {
                 thread.sleep(10);
             } catch (interruptedexception e) {
                 e.printstacktrace();
             }
         }

         system.out.println(thread.currentthread().getname() + " --- the value of sum is " + ((integer)tl.get()).intvalue());
         system.out.println(thread.currentthread().getname() + " end.");
     }
 }

 
 private static void sharedvaribletest4() throws interruptedexception
 {
     myrunner3 runner = new myrunner3();
     thread thread1 = new thread(runner);
     thread thread2 = new thread(runner);
     thread1.setdaemon(true);
     thread2.setdaemon(true);
     thread1.start();
     thread2.start();
     thread1.join();
     thread2.join();
 }

综上三种方案,第一种方案会降低多线程执行的效率,因此,我们推荐使用第二种或者第三种方案。

控制执行步骤

说到执行步骤,我们可以使用synchronized关键字来解决它。

复制代码 代码如下:

执行步骤问题解决方案
 class mysyncrunner implements runnable
 {
     public void run() {
         synchronized(this)
         {
             system.out.println(thread.currentthread().getname() + " start.");
             for(int i = 1; i <= 5; i++)
             {
                 system.out.println(thread.currentthread().getname() + " running step " + i);
                 try
                 {
                     thread.sleep(50);
                 }
                 catch(interruptedexception ex)
                 {
                     ex.printstacktrace();
                 }
             }
             system.out.println(thread.currentthread().getname() + " end.");
         }
     }
 }

 
 private static void synctest2() throws interruptedexception
 {
     mysyncrunner runner = new mysyncrunner();
     thread thread1 = new thread(runner);
     thread thread2 = new thread(runner);
     thread1.setdaemon(true);
     thread2.setdaemon(true);
     thread1.start();
     thread2.start();
     thread1.join();
     thread2.join();
 }

在线程同步的话题上,synchronized是一个非常重要的关键字。它的原理和数据库中事务锁的原理类似。我们在使用过程中,应该尽量缩减synchronized覆盖的范围,原因有二:1)被它覆盖的范围是串行的,效率低;2)容易产生死锁。我们来看下面的示例:
复制代码 代码如下:

synchronized示例
 private static void synctest3() throws interruptedexception
 {
     final list<integer> list = new arraylist<integer>();

     thread thread1 = new thread()
     {
         public void run()
         {
             system.out.println(thread.currentthread().getname() + " start.");
             random r = new random(100);
             synchronized(list)
             {
                 for (int i = 0; i < 5; i++)
                 {
                     list.add(new integer(r.nextint()));
                 }
                 system.out.println("the size of list is " + list.size());
             }
             try
             {
                 thread.sleep(500);
             }
             catch(interruptedexception ex)
             {
                 ex.printstacktrace();
             }
             system.out.println(thread.currentthread().getname() + " end.");
         }
     };

     thread thread2 = new thread()
     {
         public void run()
         {
             system.out.println(thread.currentthread().getname() + " start.");
             random r = new random(100);
             synchronized(list)
             {
                 for (int i = 0; i < 5; i++)
                 {
                     list.add(new integer(r.nextint()));
                 }
                 system.out.println("the size of list is " + list.size());
             }
             try
             {
                 thread.sleep(500);
             }
             catch(interruptedexception ex)
             {
                 ex.printstacktrace();
             }
             system.out.println(thread.currentthread().getname() + " end.");
         }
     };

     thread1.start();
     thread2.start();
     thread1.join();
     thread2.join();
 }

我们应该把需要同步的内容集中在一起,尽量不包含其他不相关的、消耗大量资源的操作,示例中线程休眠的操作显然不应该包括在里面。

构造线程池

我们在<基于java回顾之网络通信的应用分析>中,已经构建了一个socket连接池,这里我们在此基础上,构建一个线程池,完成基本的启动、休眠、唤醒、停止操作。

基本思路还是以数组的形式保持一系列线程,通过socket通信,客户端向服务器端发送命令,当服务器端接收到命令后,根据收到的命令对线程数组中的线程进行操作。

socket客户端的代码保持不变,依然采用构建socket连接池时的代码,我们主要针对服务器端进行改造。

首先,我们需要定义一个线程对象,它用来执行我们的业务操作,这里简化起见,只让线程进行休眠。

复制代码 代码如下:

定义线程对象
 enum threadstatus
 {
     initial,
     running,
     sleeping,
     stopped
 }

 enum threadtask
 {
     start,
     stop,
     sleep,
     wakeup
 }

 
 class mythread extends thread
 {
     public threadstatus status = threadstatus.initial;
     public threadtask task;
     public void run()
     {
         status = threadstatus.running;
         while(true)
         {
             try {
                 thread.sleep(3000);
                 if (status == threadstatus.sleeping)
                 {
                     system.out.println(thread.currentthread().getname() + " 进入休眠状态。");
                     this.wait();
                 }
             } catch (interruptedexception e) {
                 system.out.println(thread.currentthread().getname() + " 运行过程中出现错误。");
                 status = threadstatus.stopped;
             }
         }
     }
 }

然后,我们需要定义一个线程管理器,它用来对线程池中的线程进行管理,代码如下:
复制代码 代码如下:

定义线程池管理对象
 class mythreadmanager
 {
     public static void managethread(mythread[] threads, threadtask task)
     {
         for (int i = 0; i < threads.length; i++)
         {
             synchronized(threads[i])
             {
                 managethread(threads[i], task);
             }
         }
         system.out.println(getthreadstatus(threads));
     }

     public static void managethread(mythread thread, threadtask task)
     {
         if (task == threadtask.start)
         {
             if (thread.status == threadstatus.running)
             {
                 return;
             }
             if (thread.status == threadstatus.stopped)
             {
                 thread = new mythread();
             }
             thread.status = threadstatus.running;
             thread.start();

         }
         else if (task == threadtask.stop)
         {
             if (thread.status != threadstatus.stopped)
             {
                 thread.interrupt();
                 thread.status = threadstatus.stopped;
             }
         }
         else if (task == threadtask.sleep)
         {
             thread.status = threadstatus.sleeping;
         }
         else if (task == threadtask.wakeup)
         {
             thread.notify();
             thread.status = threadstatus.running;
         }
     }

     public static string getthreadstatus(mythread[] threads)
     {
         stringbuffer sb = new stringbuffer();
         for (int i = 0; i < threads.length; i++)
         {
             sb.append(threads[i].getname() + "的状态:" + threads[i].status).append("\r\n");
         }
         return sb.tostring();
     }
 }

最后,是我们的服务器端,它不断接受客户端的请求,每收到一个连接请求,服务器端会新开一个线程,来处理后续客户端发来的各种操作指令。
复制代码 代码如下:

定义服务器端线程池对象
 public class mythreadpool {

     public static void main(string[] args) throws ioexception
     {
         mythreadpool pool = new mythreadpool(5);
     }

     private int threadcount;
     private mythread[] threads = null;

    
     public mythreadpool(int count) throws ioexception
     {
         this.threadcount = count;
         threads = new mythread[count];
         for (int i = 0; i < threads.length; i++)
         {
             threads[i] = new mythread();
             threads[i].start();
         }
         init();
     }

     private void init() throws ioexception
     {
         serversocket serversocket = new serversocket(5678);
         while(true)
         {
             final socket socket = serversocket.accept();
             thread thread = new thread()
             {
                 public void run()
                 {
                     try
                     {
                         system.out.println("检测到一个新的socket连接。");
                         bufferedreader br = new bufferedreader(new inputstreamreader(socket.getinputstream()));
                         printstream ps = new printstream(socket.getoutputstream());
                         string line = null;
                         while((line = br.readline()) != null)
                         {
                             system.out.println(line);
                             if (line.equals("count"))
                             {
                                 system.out.println("线程池中有5个线程");
                             }
                             else if (line.equals("status"))
                             {
                                 string status = mythreadmanager.getthreadstatus(threads);
                                 system.out.println(status);
                             }
                             else if (line.equals("startall"))
                             {
                                 mythreadmanager.managethread(threads, threadtask.start);
                             }
                             else if (line.equals("stopall"))
                             {
                                 mythreadmanager.managethread(threads, threadtask.stop);
                             }
                             else if (line.equals("sleepall"))
                             {
                                 mythreadmanager.managethread(threads, threadtask.sleep);
                             }
                             else if (line.equals("wakeupall"))
                             {
                                 mythreadmanager.managethread(threads, threadtask.wakeup);
                             }
                             else if (line.equals("end"))
                             {
                                 break;
                             }
                             else
                             {
                                 system.out.println("command:" + line);
                             }
                             ps.println("ok");
                             ps.flush();
                         }
                     }
                     catch(exception ex)
                     {
                         ex.printstacktrace();
                     }
                 }
             };
             thread.start();
         }
     }
 }

探索jdk中的concurrent工具包

为了简化开发人员在进行多线程开发时的工作量,并减少程序中的bug,jdk提供了一套concurrent工具包,我们可以用它来方便的开发多线程程序。
线程池

我们在上面实现了一个非常“简陋”的线程池,concurrent工具包中也提供了线程池,而且使用非常方便。

concurrent工具包中的线程池分为3类:scheduledthreadpool、fixedthreadpool和cachedthreadpool。

首先我们来定义一个runnable的对象

复制代码 代码如下:

定义runnable对象
 class myrunner implements runnable
 {
     public void run() {
         system.out.println(thread.currentthread().getname() + "运行开始");
         for(int i = 0; i < 1; i++)
         {
             try
             {
                 system.out.println(thread.currentthread().getname() + "正在运行");
                 thread.sleep(200);
             }
             catch(exception ex)
             {
                 ex.printstacktrace();
             }
         }
         system.out.println(thread.currentthread().getname() + "运行结束");
     }
 }

可以看出,它的功能非常简单,只是输出了线程的执行过程。

scheduledthreadpool

这和我们平时使用的scheduledtask比较类似,或者说很像timer,它可以使得一个线程在指定的一段时间内开始运行,并且在间隔另外一段时间后再次运行,直到线程池关闭。

示例代码如下:

复制代码 代码如下:

scheduledthreadpool示例
 private static void scheduledthreadpooltest()
 {
     final scheduledexecutorservice scheduler = executors.newscheduledthreadpool(3);

     myrunner runner = new myrunner();

     final scheduledfuture<?> handler1 = scheduler.scheduleatfixedrate(runner, 1, 10, timeunit.seconds);
     final scheduledfuture<?> handler2 = scheduler.schedulewithfixeddelay(runner, 2, 10, timeunit.seconds);

     scheduler.schedule(new runnable()
     {
         public void run()
         {
             handler1.cancel(true);
             handler2.cancel(true);
             scheduler.shutdown();
         }
     }, 30, timeunit.seconds
     );
 }

fixedthreadpool

这是一个指定容量的线程池,即我们可以指定在同一时间,线程池中最多有多个线程在运行,超出的线程,需要等线程池中有空闲线程时,才能有机会运行。

来看下面的代码:

复制代码 代码如下:

fixedthreadpool示例
 private static void fixedthreadpooltest()
 {
     executorservice exec = executors.newfixedthreadpool(3);
     for(int i = 0; i < 5; i++)
     {
         myrunner runner = new myrunner();
         exec.execute(runner);
     }
     exec.shutdown();
 }

注意它的输出结果:
复制代码 代码如下:

pool-1-thread-1运行开始
pool-1-thread-1正在运行
pool-1-thread-2运行开始
pool-1-thread-2正在运行
pool-1-thread-3运行开始
pool-1-thread-3正在运行
pool-1-thread-1运行结束
pool-1-thread-1运行开始
pool-1-thread-1正在运行
pool-1-thread-2运行结束
pool-1-thread-2运行开始
pool-1-thread-2正在运行
pool-1-thread-3运行结束
pool-1-thread-1运行结束
pool-1-thread-2运行结束

可以看到从始至终,最多有3个线程在同时运行。
cachedthreadpool

这是另外一种线程池,它不需要指定容量,只要有需要,它就会创建新的线程。

它的使用方式和fixedthreadpool非常像,来看下面的代码:

复制代码 代码如下:

cachedthreadpool示例
 private static void cachedthreadpooltest()
 {
     executorservice exec = executors.newcachedthreadpool();
     for(int i = 0; i < 5; i++)
     {
         myrunner runner = new myrunner();
         exec.execute(runner);
     }
     exec.shutdown();
 }

它的执行结果如下:
复制代码 代码如下:

pool-1-thread-1运行开始
pool-1-thread-1正在运行
pool-1-thread-2运行开始
pool-1-thread-2正在运行
pool-1-thread-3运行开始
pool-1-thread-3正在运行
pool-1-thread-4运行开始
pool-1-thread-4正在运行
pool-1-thread-5运行开始
pool-1-thread-5正在运行
pool-1-thread-1运行结束
pool-1-thread-2运行结束
pool-1-thread-3运行结束
pool-1-thread-4运行结束
pool-1-thread-5运行结束

可以看到,它创建了5个线程。
处理线程返回值

在有些情况下,我们需要使用线程的返回值,在上述的所有代码中,线程这是执行了某些操作,没有任何返回值。

如何做到这一点呢?我们可以使用jdk中的callable<t>和completionservice<t>,前者返回单个线程的结果,后者返回一组线程的结果。
返回单个线程的结果

还是直接看代码吧:

复制代码 代码如下:

callable示例
 private static void callabletest() throws interruptedexception, executionexception
 {
     executorservice exec = executors.newfixedthreadpool(1);
     callable<string> call = new callable<string>()
     {
         public string call()
         {
             return "hello world.";
         }
     };
     future<string> result = exec.submit(call);
     system.out.println("线程的返回值是" + result.get());
     exec.shutdown();
 }

执行结果如下:
复制代码 代码如下:

线程的返回值是hello world.

返回线程池中每个线程的结果

这里需要使用completionservice<t>,代码如下:

复制代码 代码如下:

completionservice示例
 private static void completionservicetest() throws interruptedexception, executionexception
 {
     executorservice exec = executors.newfixedthreadpool(10);
     completionservice<string> service = new executorcompletionservice<string>(exec);
     for (int i = 0; i < 10; i++)
     {
         callable<string> call = new callable<string>()
         {
             public string call() throws interruptedexception
             {
                 return thread.currentthread().getname();
             }
         };
         service.submit(call);
     }

     thread.sleep(1000);
     for(int i = 0; i < 10; i++)
     {
         future<string> result = service.take();
         system.out.println("线程的返回值是" + result.get());
     }
     exec.shutdown();
 }

执行结果如下:
复制代码 代码如下:

线程的返回值是pool-2-thread-1
线程的返回值是pool-2-thread-2
线程的返回值是pool-2-thread-3
线程的返回值是pool-2-thread-5
线程的返回值是pool-2-thread-4
线程的返回值是pool-2-thread-6
线程的返回值是pool-2-thread-8
线程的返回值是pool-2-thread-7
线程的返回值是pool-2-thread-9
线程的返回值是pool-2-thread-10

实现生产者-消费者模型

对于生产者-消费者模型来说,我们应该都不会陌生,通常我们都会使用某种数据结构来实现它。在concurrent工具包中,我们可以使用blockingqueue来实现生产者-消费者模型,如下:

复制代码 代码如下:

blockingqueue示例
 public class blockingqueuesample {

     public static void main(string[] args)
     {
         blockingqueuetest();
     }

     private static void blockingqueuetest()
     {
         final blockingqueue<integer> queue = new linkedblockingqueue<integer>();
         final int maxsleeptimeforsetter = 10;
         final int maxsleeptimerforgetter = 10;

         runnable setter = new runnable()
         {
             public void run()
             {
                 random r = new random();
                 while(true)
                 {
                     int value = r.nextint(100);
                     try
                     {
                         queue.put(new integer(value));
                         system.out.println(thread.currentthread().getname() + "---向队列中插入值" + value);
                         thread.sleep(r.nextint(maxsleeptimeforsetter) * 1000);
                     }
                     catch(exception ex)
                     {
                         ex.printstacktrace();
                     }
                 }
             }
         };

         runnable getter = new runnable()
         {
             public void run()
             {
                 random r = new random();
                 while(true)
                 {
                     try
                     {
                         if (queue.size() == 0)
                         {
                             system.out.println(thread.currentthread().getname() + "---队列为空");
                         }
                         else
                         {
                             int value = queue.take().intvalue();
                             system.out.println(thread.currentthread().getname() + "---从队列中获取值" + value);
                         }
                         thread.sleep(r.nextint(maxsleeptimerforgetter) * 1000);
                     }
                     catch(exception ex)
                     {
                         ex.printstacktrace();
                     }
                 }
             }
         };

         executorservice exec = executors.newfixedthreadpool(2);
         exec.execute(setter);
         exec.execute(getter);
     }
 }

我们定义了两个线程,一个线程向queue中添加数据,一个线程从queue中取数据。我们可以通过控制maxsleeptimeforsetter和maxsleeptimerforgetter的值,来使得程序得出不同的结果。

可能的执行结果如下:

复制代码 代码如下:

pool-1-thread-1---向队列中插入值88
pool-1-thread-2---从队列中获取值88
pool-1-thread-1---向队列中插入值75
pool-1-thread-2---从队列中获取值75
pool-1-thread-2---队列为空
pool-1-thread-2---队列为空
pool-1-thread-2---队列为空
pool-1-thread-1---向队列中插入值50
pool-1-thread-2---从队列中获取值50
pool-1-thread-2---队列为空
pool-1-thread-2---队列为空
pool-1-thread-2---队列为空
pool-1-thread-2---队列为空
pool-1-thread-2---队列为空
pool-1-thread-1---向队列中插入值51
pool-1-thread-1---向队列中插入值92
pool-1-thread-2---从队列中获取值51
pool-1-thread-2---从队列中获取值92

因为queue中的值和thread的休眠时间都是随机的,所以执行结果也不是固定的。

使用信号量来控制线程

jdk提供了semaphore来实现“信号量”的功能,它提供了两个方法分别用于获取和释放信号量:acquire和release,示例代码如下:

复制代码 代码如下:

semaphore示例
 private static void semaphoretest()
 {
     executorservice exec = executors.newfixedthreadpool(10);
     final semaphore semp = new semaphore(2);

     for (int i = 0; i < 10; i++)
     {
         runnable runner = new runnable()
         {
             public void run()
             {
                 try
                 {
                     semp.acquire();
                     system.out.println(new date() + " " + thread.currentthread().getname() + "正在执行。");
                     thread.sleep(5000);
                     semp.release();
                 }
                 catch(exception ex)
                 {
                     ex.printstacktrace();
                 }
             }
         };
         exec.execute(runner);
     }

     exec.shutdown();
 }

执行结果如下:
复制代码 代码如下:

tue may 07 11:22:11 cst 2013 pool-1-thread-1正在执行。
tue may 07 11:22:11 cst 2013 pool-1-thread-2正在执行。
tue may 07 11:22:17 cst 2013 pool-1-thread-3正在执行。
tue may 07 11:22:17 cst 2013 pool-1-thread-4正在执行。
tue may 07 11:22:22 cst 2013 pool-1-thread-5正在执行。
tue may 07 11:22:22 cst 2013 pool-1-thread-6正在执行。
tue may 07 11:22:27 cst 2013 pool-1-thread-7正在执行。
tue may 07 11:22:27 cst 2013 pool-1-thread-8正在执行。
tue may 07 11:22:32 cst 2013 pool-1-thread-10正在执行。
tue may 07 11:22:32 cst 2013 pool-1-thread-9正在执行。

可以看出,尽管线程池中创建了10个线程,但是同时运行的,只有2个线程。
控制线程池中所有线程的执行步骤

在前面,我们已经提到,可以用synchronized关键字来控制单个线程中的执行步骤,那么如果我们想要对线程池中的所有线程的执行步骤进行控制的话,应该如何实现呢?

我们有两种方式,一种是使用cyclicbarrier,一种是使用countdownlatch。

cyclicbarrier使用了类似于object.wait的机制,它的构造函数中需要接收一个整型数字,用来说明它需要控制的线程数目,当在线程的run方法中调用它的await方法时,它会保证所有的线程都执行到这一步,才会继续执行后面的步骤。

示例代码如下:

复制代码 代码如下:

cyclicbarrier示例
 class myrunner2 implements runnable
 {
     private cyclicbarrier barrier = null;
     public myrunner2(cyclicbarrier barrier)
     {
         this.barrier = barrier;
     }

     public void run() {
         random r = new random();
         try
         {
             for (int i = 0; i < 3; i++)
             {
                 thread.sleep(r.nextint(10) * 1000);
                 system.out.println(new date() + "--" + thread.currentthread().getname() + "--第" + (i + 1) + "次等待。");
                 barrier.await();
             }
         }
         catch(exception ex)
         {
             ex.printstacktrace();
         }
     }

 }

 private static void cyclicbarriertest()
 {
     cyclicbarrier barrier = new cyclicbarrier(3);

     executorservice exec = executors.newfixedthreadpool(3);
     for (int i = 0; i < 3; i++)
     {
         exec.execute(new myrunner2(barrier));
     }
     exec.shutdown();
 }

执行结果如下:
复制代码 代码如下:

tue may 07 11:31:20 cst 2013--pool-1-thread-2--第1次等待。
tue may 07 11:31:21 cst 2013--pool-1-thread-3--第1次等待。
tue may 07 11:31:24 cst 2013--pool-1-thread-1--第1次等待。
tue may 07 11:31:24 cst 2013--pool-1-thread-1--第2次等待。
tue may 07 11:31:26 cst 2013--pool-1-thread-3--第2次等待。
tue may 07 11:31:30 cst 2013--pool-1-thread-2--第2次等待。
tue may 07 11:31:32 cst 2013--pool-1-thread-1--第3次等待。
tue may 07 11:31:33 cst 2013--pool-1-thread-3--第3次等待。
tue may 07 11:31:33 cst 2013--pool-1-thread-2--第3次等待。

可以看出,thread-2到第1次等待点时,一直等到thread-1到达后才继续执行。

countdownlatch则是采取类似”倒计时计数器”的机制来控制线程池中的线程,它有countdown和await两个方法。示例代码如下:

复制代码 代码如下:

countdownlatch示例
 private static void countdownlatchtest() throws interruptedexception
 {
     final countdownlatch begin = new countdownlatch(1);
     final countdownlatch end = new countdownlatch(5);
     executorservice exec = executors.newfixedthreadpool(5);
     for (int i = 0; i < 5; i++)
     {
         runnable runner = new runnable()
         {
             public void run()
             {
                 random r = new random();
                 try
                 {
                     begin.await();
                     system.out.println(thread.currentthread().getname() + "运行开始");
                     thread.sleep(r.nextint(10)*1000);
                     system.out.println(thread.currentthread().getname() + "运行结束");
                 }
                 catch(exception ex)
                 {
                     ex.printstacktrace();
                 }
                 finally
                 {
                     end.countdown();
                 }
             }
         };
         exec.execute(runner);
     }
     begin.countdown();
     end.await();
     system.out.println(thread.currentthread().getname() + "运行结束");
     exec.shutdown();
 }

执行结果如下:
复制代码 代码如下:

pool-1-thread-1运行开始
pool-1-thread-5运行开始
pool-1-thread-2运行开始
pool-1-thread-3运行开始
pool-1-thread-4运行开始
pool-1-thread-2运行结束
pool-1-thread-1运行结束
pool-1-thread-3运行结束
pool-1-thread-5运行结束
pool-1-thread-4运行结束
main运行结束