深入解析Java的线程同步以及线程间通信
java线程同步
当两个或两个以上的线程需要共享资源,它们需要某种方法来确定资源在某一刻仅被一个线程占用。达到此目的的过程叫做同步(synchronization)。像你所看到的,java为此提供了独特的,语言水平上的支持。
同步的关键是管程(也叫信号量semaphore)的概念。管程是一个互斥独占锁定的对象,或称互斥体(mutex)。在给定的时间,仅有一个线程可以获得管程。当一个线程需要锁定,它必须进入管程。所有其他的试图进入已经锁定的管程的线程必须挂起直到第一个线程退出管程。这些其他的线程被称为等待管程。一个拥有管程的线程如果愿意的话可以再次进入相同的管程。
如果你用其他语言例如c或c++时用到过同步,你会知道它用起来有一点诡异。这是因为很多语言它们自己不支持同步。相反,对同步线程,程序必须利用操作系统源语。幸运的是java通过语言元素实现同步,大多数的与同步相关的复杂性都被消除。
你可以用两种方法同步化代码。两者都包括synchronized关键字的运用,下面分别说明这两种方法。
使用同步方法
java中同步是简单的,因为所有对象都有它们与之对应的隐式管程。进入某一对象的管程,就是调用被synchronized关键字修饰的方法。当一个线程在一个同步方法内部,所有试图调用该方法(或其他同步方法)的同实例的其他线程必须等待。为了退出管程,并放弃对对象的控制权给其他等待的线程,拥有管程的线程仅需从同步方法中返回。
为理解同步的必要性,让我们从一个应该使用同步却没有用的简单例子开始。下面的程序有三个简单类。首先是callme,它有一个简单的方法call( )。call( )方法有一个名为msg的string参数。该方法试图在方括号内打印msg 字符串。有趣的事是在调用call( ) 打印左括号和msg字符串后,调用thread.sleep(1000),该方法使当前线程暂停1秒。
下一个类的构造函数caller,引用了callme的一个实例以及一个string,它们被分别存在target 和 msg 中。构造函数也创建了一个调用该对象的run( )方法的新线程。该线程立即启动。caller类的run( )方法通过参数msg字符串调用callme实例target的call( ) 方法。最后,synch类由创建callme的一个简单实例和caller的三个具有不同消息字符串的实例开始。
callme的同一实例传给每个caller实例。
// this program is not synchronized. class callme { void call(string msg) { system.out.print("[" + msg); try { thread.sleep(1000); } catch(interruptedexception e) { system.out.println("interrupted"); } system.out.println("]"); } } class caller implements runnable { string msg; callme target; thread t; public caller(callme targ, string s) { target = targ; msg = s; t = new thread(this); t.start(); } public void run() { target.call(msg); } } class synch { public static void main(string args[]) { callme target = new callme(); caller ob1 = new caller(target, "hello"); caller ob2 = new caller(target, "synchronized"); caller ob3 = new caller(target, "world"); // wait for threads to end try { ob1.t.join(); ob2.t.join(); ob3.t.join(); } catch(interruptedexception e) { system.out.println("interrupted"); } } }
该程序的输出如下:
hello[synchronized[world] ] ]
在本例中,通过调用sleep( ),call( )方法允许执行转换到另一个线程。该结果是三个消息字符串的混合输出。该程序中,没有阻止三个线程同时调用同一对象的同一方法的方法存在。这是一种竞争,因为三个线程争着完成方法。例题用sleep( )使该影响重复和明显。在大多数情况,竞争是更为复杂和不可预知的,因为你不能确定何时上下文转换会发生。这使程序时而运行正常时而出错。
为达到上例所想达到的目的,必须有权连续的使用call( )。也就是说,在某一时刻,必须限制只有一个线程可以支配它。为此,你只需在call( ) 定义前加上关键字synchronized,如下:
class callme { synchronized void call(string msg) { ...
这防止了在一个线程使用call( )时其他线程进入call( )。在synchronized加到call( )前面以后,程序输出如下:
[hello] [synchronized] [world]
任何时候在多线程情况下,你有一个方法或多个方法操纵对象的内部状态,都必须用synchronized 关键字来防止状态出现竞争。记住,一旦线程进入实例的同步方法,没有其他线程可以进入相同实例的同步方法。然而,该实例的其他不同步方法却仍然可以被调用。
同步语句
尽管在创建的类的内部创建同步方法是获得同步的简单和有效的方法,但它并非在任何时候都有效。这其中的原因,请跟着思考。假设你想获得不为多线程访问设计的类对象的同步访问,也就是,该类没有用到synchronized方法。而且,该类不是你自己,而是第三方创建的,你不能获得它的源代码。这样,你不能在相关方法前加synchronized修饰符。怎样才能使该类的一个对象同步化呢?很幸运,解决方法很简单:你只需将对这个类定义的方法的调用放入一个synchronized块内就可以了。
下面是synchronized语句的普通形式:
synchronized(object) { // statements to be synchronized }
其中,object是被同步对象的引用。如果你想要同步的只是一个语句,那么不需要花括号。一个同步块确保对object成员方法的调用仅在当前线程成功进入object管程后发生。
下面是前面程序的修改版本,在run( )方法内用了同步块:
// this program uses a synchronized block. class callme { void call(string msg) { system.out.print("[" + msg); try { thread.sleep(1000); } catch (interruptedexception e) { system.out.println("interrupted"); } system.out.println("]"); } } class caller implements runnable { string msg; callme target; thread t; public caller(callme targ, string s) { target = targ; msg = s; t = new thread(this); t.start(); } // synchronize calls to call() public void run() { synchronized(target) { // synchronized block target.call(msg); } } } class synch1 { public static void main(string args[]) { callme target = new callme(); caller ob1 = new caller(target, "hello"); caller ob2 = new caller(target, "synchronized"); caller ob3 = new caller(target, "world"); // wait for threads to end try { ob1.t.join(); ob2.t.join(); ob3.t.join(); } catch(interruptedexception e) { system.out.println("interrupted"); } } }
这里,call( )方法没有被synchronized修饰。而synchronized是在caller类的run( )方法中声明的。这可以得到上例中同样正确的结果,因为每个线程运行前都等待先前的一个线程结束。
java线程间通信
多线程通过把任务分成离散的和合乎逻辑的单元代替了事件循环程序。线程还有第二优点:它远离了轮询。轮询通常由重复监测条件的循环实现。一旦条件成立,就要采取适当的行动。这浪费了cpu时间。举例来说,考虑经典的序列问题,当一个线程正在产生数据而另一个程序正在消费它。为使问题变得更有趣,假设数据产生器必须等待消费者完成工作才能产生新的数据。在轮询系统,消费者在等待生产者产生数据时会浪费很多cpu周期。一旦生产者完成工作,它将启动轮询,浪费更多的cpu时间等待消费者的工作结束,如此下去。很明显,这种情形不受欢迎。
为避免轮询,java包含了通过wait( ),notify( )和notifyall( )方法实现的一个进程间通信机制。这些方法在对象中是用final方法实现的,所以所有的类都含有它们。这三个方法仅在synchronized方法中才能被调用。尽管这些方法从计算机科学远景方向上来说具有概念的高度先进性,实际中用起来是很简单的:
wait( ) 告知被调用的线程放弃管程进入睡眠直到其他线程进入相同管程并且调用notify( )。
notify( ) 恢复相同对象中第一个调用 wait( ) 的线程。
notifyall( ) 恢复相同对象中所有调用 wait( ) 的线程。具有最高优先级的线程最先运行。
这些方法在object中被声明,如下所示:
final void wait( ) throws interruptedexception final void notify( ) final void notifyall( )
wait( )存在的另外的形式允许你定义等待时间。
下面的例子程序错误的实行了一个简单生产者/消费者的问题。它由四个类组成:q,设法获得同步的序列;producer,产生排队的线程对象;consumer,消费序列的线程对象;以及pc,创建单个q,producer,和consumer的小类。
// an incorrect implementation of a producer and consumer. class q { int n; synchronized int get() { system.out.println("got: " + n); return n; } synchronized void put(int n) { this.n = n; system.out.println("put: " + n); } } class producer implements runnable { q q; producer(q q) { this.q = q; new thread(this, "producer").start(); } public void run() { int i = 0; while(true) { q.put(i++); } } } class consumer implements runnable { q q; consumer(q q) { this.q = q; new thread(this, "consumer").start(); } public void run() { while(true) { q.get(); } } } class pc { public static void main(string args[]) { q q = new q(); new producer(q); new consumer(q); system.out.println("press control-c to stop."); } }
尽管q类中的put( )和get( )方法是同步的,没有东西阻止生产者超越消费者,也没有东西阻止消费者消费同样的序列两次。这样,你就得到下面的错误输出(输出将随处理器速度和装载的任务而改变):
put: 1 got: 1 got: 1 got: 1 got: 1 got: 1 put: 2 put: 3 put: 4 put: 5 put: 6 put: 7 got: 7
生产者生成1后,消费者依次获得同样的1五次。生产者在继续生成2到7,消费者没有机会获得它们。
用java正确的编写该程序是用wait( )和notify( )来对两个方向进行标志,如下所示:
// a correct implementation of a producer and consumer. class q { int n; boolean valueset = false; synchronized int get() { if(!valueset) try { wait(); } catch(interruptedexception e) { system.out.println("interruptedexception caught"); } system.out.println("got: " + n); valueset = false; notify(); return n; } synchronized void put(int n) { if(valueset) try { wait(); } catch(interruptedexception e) { system.out.println("interruptedexception caught"); } this.n = n; valueset = true; system.out.println("put: " + n); notify(); } } class producer implements runnable { q q; producer(q q) { this.q = q; new thread(this, "producer").start(); } public void run() { int i = 0; while(true) { q.put(i++); } } } class consumer implements runnable { q q; consumer(q q) { this.q = q; new thread(this, "consumer").start(); } public void run() { while(true) { q.get(); } } } class pcfixed { public static void main(string args[]) { q q = new q(); new producer(q); new consumer(q); system.out.println("press control-c to stop."); } }
内部get( ), wait( )被调用。这使执行挂起直到producer 告知数据已经预备好。这时,内部get( ) 被恢复执行。获取数据后,get( )调用notify( )。这告诉producer可以向序列中输入更多数据。在put( )内,wait( )挂起执行直到consumer取走了序列中的项目。当执行再继续,下一个数据项目被放入序列,notify( )被调用,这通知consumer它应该移走该数据。
下面是该程序的输出,它清楚的显示了同步行为:
put: 1 got: 1 put: 2 got: 2 put: 3 got: 3 put: 4 got: 4 put: 5 got: 5