使用CAS变量和CountDownLatch保证线程间的同步-匀速的发送短信 多线程
程序员文章站
2022-07-12 18:20:18
...
使用CAS变量和CountDownLatch保证线程间的同步
//使用CAS变量和CountDownLatch保证线程间的同步-匀速的发送短信
public class ThreadSync {
private AtomicLong total = new AtomicLong(0);
private volatile boolean isstop = false;
private volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
private CountDownLatch2 waitPoint = new CountDownLatch2(1);
public static void main(String[] args) {
final ThreadSync o = new ThreadSync();
ThreadA t1 = o.new ThreadA("t1");
t1.start();
//每隔2秒唤醒t1线程
new Thread(){
public void run(){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(!o.getHasNotified().get()){
o.wakeup();
}
}
}.start();
}
class ThreadA extends Thread{
public ThreadA(String name){
super(name);
}
public void run() {
while(!isstop){
waitForRunning(500);
System.out.println(this.getName()+" " + total.getAndIncrement() + " "+timeMillisToHumanString(new Date().getTime()));
}
}
}
public void wakeup() {
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}
protected void waitForRunning(long interval) {
if (hasNotified.compareAndSet(true, false)) {
System.out.println("***");
return;
}
waitPoint.reset();
try {
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
hasNotified.set(false);
}
}
public AtomicBoolean getHasNotified() {
return hasNotified;
}
public static String timeMillisToHumanString(final long t) {
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(t);
return String.format("%04d%02d%02d%02d%02d%02d%03d", cal.get(Calendar.YEAR), cal.get(Calendar.MONTH) + 1,
cal.get(Calendar.DAY_OF_MONTH), cal.get(Calendar.HOUR_OF_DAY), cal.get(Calendar.MINUTE), cal.get(Calendar.SECOND),
cal.get(Calendar.MILLISECOND));
}
}
//增加了reset方法
public class CountDownLatch2 {
private final Sync sync;
public CountDownLatch2(int count) {
if (count < 0)
throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void countDown() {
sync.releaseShared(1);
}
public long getCount() {
return sync.getCount();
}
public void reset() {
sync.reset();
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
private final int startCount;
Sync(int count) {
this.startCount = count;
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
for (; ; ) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
protected void reset() {
setState(startCount);
}
}
}
//使用CAS变量和CountDownLatch保证线程间的同步-匀速的发送短信
public class ThreadSync {
private AtomicLong total = new AtomicLong(0);
private volatile boolean isstop = false;
private volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
private CountDownLatch2 waitPoint = new CountDownLatch2(1);
public static void main(String[] args) {
final ThreadSync o = new ThreadSync();
ThreadA t1 = o.new ThreadA("t1");
t1.start();
//每隔2秒唤醒t1线程
new Thread(){
public void run(){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(!o.getHasNotified().get()){
o.wakeup();
}
}
}.start();
}
class ThreadA extends Thread{
public ThreadA(String name){
super(name);
}
public void run() {
while(!isstop){
waitForRunning(500);
System.out.println(this.getName()+" " + total.getAndIncrement() + " "+timeMillisToHumanString(new Date().getTime()));
}
}
}
public void wakeup() {
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}
protected void waitForRunning(long interval) {
if (hasNotified.compareAndSet(true, false)) {
System.out.println("***");
return;
}
waitPoint.reset();
try {
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
hasNotified.set(false);
}
}
public AtomicBoolean getHasNotified() {
return hasNotified;
}
public static String timeMillisToHumanString(final long t) {
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(t);
return String.format("%04d%02d%02d%02d%02d%02d%03d", cal.get(Calendar.YEAR), cal.get(Calendar.MONTH) + 1,
cal.get(Calendar.DAY_OF_MONTH), cal.get(Calendar.HOUR_OF_DAY), cal.get(Calendar.MINUTE), cal.get(Calendar.SECOND),
cal.get(Calendar.MILLISECOND));
}
}
//增加了reset方法
public class CountDownLatch2 {
private final Sync sync;
public CountDownLatch2(int count) {
if (count < 0)
throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void countDown() {
sync.releaseShared(1);
}
public long getCount() {
return sync.getCount();
}
public void reset() {
sync.reset();
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
private final int startCount;
Sync(int count) {
this.startCount = count;
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
for (; ; ) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
protected void reset() {
setState(startCount);
}
}
}