基于java中BlockingQueue的使用介绍
程序员文章站
2023-12-03 09:11:04
最近在维护一个java工程,在群里面也就聊起来java的优劣!无奈一些java的终极粉丝,总是号称性能已经不必...
最近在维护一个java工程,在群里面也就聊起来java的优劣!无奈一些java的终极粉丝,总是号称性能已经不必c++差,并且很多标准类库都是大师级的人写的,如何如何稳定等等。索性就认真研究一番,他们给我的一项说明就是,在线程之间投递消息,用java已经封装好的blockingqueue,就足够用了。
既然足够用那就写代码测试喽,简简单单写一个小程序做了一番测试:
//默认包
import java.util.concurrent.*;
import base.myrunnable;
public class test
{
public static void main(string[] args)
{
blockingqueue<integer> queue = new linkedblockingqueue<integer>();
java.lang.runnable r = new myrunnable(queue);
thread t = new thread(r);
t.start();
while(true)
{
try
{
while(true)
{
for(int i =0;i < 10000;i++)
{
queue.offer(i);
}
}
}
catch ( exception e)
{
e.printstacktrace();
}
}
}
}
//需要添加的包
package base;
import java.lang.runnable;
import java.util.concurrent.*;
import java.util.*;
public class myrunnable implements runnable
{
public myrunnable(blockingqueue<integer> queue)
{
this.queue = queue;
}
public void run()
{
date d = new date();
long starttime = d.gettime();
system.err.println(starttime);
int count = 0;
while(true)
{
try
{
integer i = this.queue.poll();
if(i != null)
{
count ++;
}
if(count == 100000)
{
date e = new date();
long endtime = e.gettime();
system.err.println(count);
system.err.println(endtime);
system.err.print(endtime - starttime);
break;
}
}
catch (exception e)
{
}
}
}
private blockingqueue<integer> queue;
}
传递十万条数据,在我的测试机上面,大概需要50ms左右,倒是还可以!索性就看了一下blockingqueue的底层实现
我在上面的测试代码中使用的offer 和 poll,就看看这两个实现函数吧,首先是offer
public e poll() {
final atomicinteger count = this.count;
if (count.get() == 0)
return null;
e x = null;
int c = -1;
final reentrantlock takelock = this.takelock;
takelock.lock();
try {
if (count.get() > 0) {
x = extract();
c = count.getanddecrement();
if (c > 1)
notempty.signal();
}
} finally {
takelock.unlock();
}
if (c == capacity)
signalnotfull();
return x;
}
和一般的同步线程类似,只是多加了一个signal,在学习unix环境高级编程时候,看到条件变量用于线程之间的同步,可以实现线程以竞争的方式实现同步!
poll函数的实现也是类似!
public boolean offer(e e) {
if (e == null) throw new nullpointerexception();
final atomicinteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
final reentrantlock putlock = this.putlock;
putlock.lock();
try {
if (count.get() < capacity) {
insert(e);
c = count.getandincrement();
if (c + 1 < capacity)
notfull.signal();
}
} finally {
putlock.unlock();
}
if (c == 0)
signalnotempty();
return c >= 0;
}
既然足够用那就写代码测试喽,简简单单写一个小程序做了一番测试:
复制代码 代码如下:
//默认包
import java.util.concurrent.*;
import base.myrunnable;
public class test
{
public static void main(string[] args)
{
blockingqueue<integer> queue = new linkedblockingqueue<integer>();
java.lang.runnable r = new myrunnable(queue);
thread t = new thread(r);
t.start();
while(true)
{
try
{
while(true)
{
for(int i =0;i < 10000;i++)
{
queue.offer(i);
}
}
}
catch ( exception e)
{
e.printstacktrace();
}
}
}
}
//需要添加的包
package base;
import java.lang.runnable;
import java.util.concurrent.*;
import java.util.*;
public class myrunnable implements runnable
{
public myrunnable(blockingqueue<integer> queue)
{
this.queue = queue;
}
public void run()
{
date d = new date();
long starttime = d.gettime();
system.err.println(starttime);
int count = 0;
while(true)
{
try
{
integer i = this.queue.poll();
if(i != null)
{
count ++;
}
if(count == 100000)
{
date e = new date();
long endtime = e.gettime();
system.err.println(count);
system.err.println(endtime);
system.err.print(endtime - starttime);
break;
}
}
catch (exception e)
{
}
}
}
private blockingqueue<integer> queue;
}
传递十万条数据,在我的测试机上面,大概需要50ms左右,倒是还可以!索性就看了一下blockingqueue的底层实现
我在上面的测试代码中使用的offer 和 poll,就看看这两个实现函数吧,首先是offer
复制代码 代码如下:
public e poll() {
final atomicinteger count = this.count;
if (count.get() == 0)
return null;
e x = null;
int c = -1;
final reentrantlock takelock = this.takelock;
takelock.lock();
try {
if (count.get() > 0) {
x = extract();
c = count.getanddecrement();
if (c > 1)
notempty.signal();
}
} finally {
takelock.unlock();
}
if (c == capacity)
signalnotfull();
return x;
}
和一般的同步线程类似,只是多加了一个signal,在学习unix环境高级编程时候,看到条件变量用于线程之间的同步,可以实现线程以竞争的方式实现同步!
poll函数的实现也是类似!
复制代码 代码如下:
public boolean offer(e e) {
if (e == null) throw new nullpointerexception();
final atomicinteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
final reentrantlock putlock = this.putlock;
putlock.lock();
try {
if (count.get() < capacity) {
insert(e);
c = count.getandincrement();
if (c + 1 < capacity)
notfull.signal();
}
} finally {
putlock.unlock();
}
if (c == 0)
signalnotempty();
return c >= 0;
}