Java Concurrency: Thread&Locks
Synchronized & ReentrantLock
1) Basic Thread Operations
1) void notifyAll()
1> unblocks the threads that called wait on this object.
2> This method can only be called from within a synchronized method or block.
3> This method throws an IllegalMonitorStateException if the current thread is not the owner of the object's lock.
2) void notify()
1> Unblocks one randomly selected thread among the threads that called wait on this object.
2> This method can only be called from within a synchronized method or block.
3> The method throws an IllegalMonitorStateException if the current thread is not the owner of the object's lock.
3) void wait(long millis)
void wait(long millis, int nanos)
1> Cause a thread to wait until it is notified or until the specified amount of time has passed.
2> These methods can only be called from within a synchronized method.
3> They throw an IllegalMonitorStateException if the current thread is not the owner of the object's lock.
Demo for Producer&Consumer using wait()¬ify()
package edu.xmu.thread; import java.util.ArrayList; import java.util.List; public class NotifyTest { public static void main(String[] args) { List<String> data = new ArrayList<String>(); Thread consumer = new Thread(new DataConsumer(data)); Thread producer = new Thread(new DataProducer(data)); Thread producer2 = new Thread(new DataProducer(data)); Thread producer3 = new Thread(new DataProducer(data)); producer.start(); producer2.start(); producer3.start(); consumer.start(); } } class DataConsumer implements Runnable { List<String> data; public DataConsumer(List<String> data) { super(); this.data = data; } @Override public void run() { try { while (true) { Thread.sleep((long) (1000 * Math.random())); synchronized (data) { System.out.println(String.format( "Thread: [%s] attempts to consume data", Thread.currentThread())); while (data.size() == 0) { System.out.println(String.format( "Thread: [%s] is waiting for consuming data", Thread.currentThread())); data.wait(); } String value = data.remove(0); System.out.println(String.format( "Thread: [%s] consumed data: [%s]", Thread.currentThread(), value)); data.notifyAll(); // notifyAll to wake up all waitting threads, especially producer thread, to check while(boolean) and attempt acquire locks for data again. } } } catch (Exception e) { e.printStackTrace(); } } } class DataProducer implements Runnable { List<String> data; public DataProducer(List<String> data) { super(); this.data = data; } @Override public void run() { try { while (true) { Thread.sleep((long) (1000 * Math.random())); synchronized (data) { System.out.println(String.format( "Thread: [%s] attempts to produce data", Thread.currentThread())); while (data.size() == 10) { System.out.println(String.format( "Thread: [%s] is waiting for producing data", Thread.currentThread())); data.wait(); } String value = String.valueOf(100 * Math.random()); data.add(value); System.out.println(String.format( "Thread: [%s] produced data: [%s]", Thread.currentThread(), value)); data.notifyAll(); // notifyAll to wake up all waitting threads, especially consumer threads, to check while(boolean) and attempt to acquire lock for data again. } } } catch (Exception e) { e.printStackTrace(); } } }
2) A quick review of synchronized
Delcaring a block of code to be synchronized has two important consequences
1> Atomicity: Means that only one thread at a time can execute code protected by a given monitor object(lock).
Allowing you to prevent multiple threads from colliding with each other when updating shared state.
2> Visibility: Visibility is more subtle. It deals with the vagaries of memory caching and compiler optimizations.
Ordinarily, threads are free to cache values for variables in such a way that they are not necessarily immediately visible to other threads
(whether it be in registers, in processor-specific caches, or through instruction reordering or other compiler optimizations).
But if the developer has used synchronization, the runtime will ensure that updates to variables made by one thread prior to exiting a synchronized block will become immediately visible to another thread when it enters a aynchronized block protected by that same monitor(lock).
So synchronization takes care of everything needed to reliably update multiple shared variables without race conditions or corrupting data(provideded the synchronization boundaries are in the right place). and ensures that other threads that properly synchronized will see the most update-to-date values of those variables.
"Whenever you will be writing a variable that may next be read by another thread, or reading a variable that may have last been written by another thread, you must synchronize."
3) Limitations of synchronized
Synzhronization is good, but not perfect. It has some functional limitations.
1) It is not possible to interrupt a thread that is waiting to acquire a lock.
2) It is not possible to poll for a lock or attempt to acquire a lock without being willing to wait forever for it.
3) It requires that locks be released in the same stack frame in which they were acquired.<?>
Apart of above limitations, it behaves just like ReentrantLock which is reentrantable.
4) ReentrantLock
1) What is reentrant?
Simply that there is an acquisition count associated with the lock, and if a thread that holds the lock acquires it again, the acquisition count is incremented ant the lock then needs to be released twice to truly release the lock.
Below is an example of UnReentrantLock:
package edu.xmu.thread; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class LockTest { public static void main(String[] args) { ExecutorService pool = Executors.newCachedThreadPool(); Counter counter = new Counter(); List<Runnable> taskList = initTaskList(counter); for (Runnable task : taskList) { pool.submit(task); } pool.shutdown(); } private static List<Runnable> initTaskList(final Counter counter) { List<Runnable> taskList = new ArrayList<Runnable>(); for (int i = 0; i < 5; i++) { taskList.add(new Runnable() { @Override public void run() { try { counter.increase(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } return taskList; } private static class Counter { private UnReentrantLock lock = new UnReentrantLock(); private int count = 0; public void increase() throws InterruptedException { lock.lock(); count++; try { Thread.sleep((long) (Math.random() * 100 + 1)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("CurrentThread: [" + Thread.currentThread() + "]. Count = " + count); increase(); // Here we are trying to reentrant lock.unlock(); } } private static class UnReentrantLock { private boolean isLocked = false; public synchronized void lock() throws InterruptedException { while (isLocked) { System.out.println(Thread.currentThread() + " is waiting"); wait(); } System.out.println(Thread.currentThread() + " got the lock"); isLocked = true; } public synchronized void unlock() { isLocked = false; notify(); } } }
Output:
Thread[pool-1-thread-1,5,main] got the lock Thread[pool-1-thread-2,5,main] is waiting Thread[pool-1-thread-4,5,main] is waiting Thread[pool-1-thread-5,5,main] is waiting Thread[pool-1-thread-3,5,main] is waiting CurrentThread: [Thread[pool-1-thread-1,5,main]]. Count = 1 Thread[pool-1-thread-1,5,main] is waiting // Here we got deadlock. Thread-1 never released the lock and are trying to get the same lock.
Below is an example of ReentrantLock:
package edu.xmu.thread; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class LockTest { public static void main(String[] args) { ExecutorService pool = Executors.newCachedThreadPool(); Counter counter = new Counter(); List<Runnable> taskList = initTaskList(counter); for (Runnable task : taskList) { pool.submit(task); } pool.shutdown(); } private static List<Runnable> initTaskList(final Counter counter) { List<Runnable> taskList = new ArrayList<Runnable>(); for (int i = 0; i < 5; i++) { taskList.add(new Runnable() { @Override public void run() { try { counter.increase(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } return taskList; } private static class Counter { private int count = 0; public void increase() throws InterruptedException { synchronized (this) { count++; try { Thread.sleep((long) (Math.random() * 100 + 1)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("CurrentThread: [" + Thread.currentThread() + "]. Count = " + count); increase(); // Here we are trying to reentrant } } } }
Output:
CurrentThread: [Thread[pool-1-thread-1,5,main]]. Count = 1 CurrentThread: [Thread[pool-1-thread-1,5,main]]. Count = 2 CurrentThread: [Thread[pool-1-thread-1,5,main]]. Count = 3 CurrentThread: [Thread[pool-1-thread-1,5,main]]. Count = 4 CurrentThread: [Thread[pool-1-thread-1,5,main]]. Count = 5 CurrentThread: [Thread[pool-1-thread-1,5,main]]. Count = 6 CurrentThread: [Thread[pool-1-thread-1,5,main]]. Count = 7 CurrentThread: [Thread[pool-1-thread-1,5,main]]. Count = 8 ... // Before thread-1 release the lock, it can still get the same lock. // That is the meaning of re-entrant.
Or we can replace "synchronized{}" with code below:
private static class Counter { private Lock lock = new ReentrantLock(); private int count = 0; public void increase() throws InterruptedException { lock.lock(); count++; try { Thread.sleep((long) (Math.random() * 100 + 1)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("CurrentThread: [" + Thread.currentThread() + "]. Count = " + count); increase(); // Here we are trying to reentrant lock.unlock(); } }
2) The Lock framework in java.util.concurrent.lock is an abstraction for locking, allowing for lock implementations that are implemented as Java classes rather than as a language characteristics.
It makes room for multiple implementations of Lock, which may have different scheduling algorithms, performance characteristics, or locking semantics.
Reference Links:
1) http://www.ibm.com/developerworks/java/library/j-jtp10264/
下一篇: Linux C的一些常用函数