JAVA流控及超流控后的延迟处理实例
程序员文章站
2024-03-02 16:31:10
本文实例讲述了java流控及超流控后的延迟处理方法。分享给大家供大家参考。具体实现方法如下:
流控检查(每半秒累计,因此最小留空阀值只能做到每秒2条):
复制代码 代码...
本文实例讲述了java流控及超流控后的延迟处理方法。分享给大家供大家参考。具体实现方法如下:
流控检查(每半秒累计,因此最小留空阀值只能做到每秒2条):
复制代码 代码如下:
import java.text.simpledateformat;
import java.util.date;
import java.lang.thread;
/**
* 流量控制
*
* @author chenx
*/
public class overflowcontroller {
private int maxsendcountpersecend; // 该条链路上流控阀值
private date sendtime = new date();
private int sendcount = 0; // 该条链路上发送的数量
public overflowcontroller(int maxsendcountpersecend) {
if (maxsendcountpersecend < 2) {
maxsendcountpersecend = 2;
}
this.maxsendcountpersecend = maxsendcountpersecend;
}
public int getmaxsendcountpersecend() {
if (getmilliseconds(new date()) >= 500) {
return maxsendcountpersecend / 2;
}
return maxsendcountpersecend - (maxsendcountpersecend / 2);
}
/**
* 是否超流控
*/
public boolean isoverflow(int sendnum) {
synchronized (this) {
date now = new date();
if (now.gettime() - sendtime.gettime() >= 500) {
sendtime = now;
sendcount = sendnum;
} else {
if (sendcount + sendnum > getmaxsendcountpersecend()) {
return true;
} else {
sendcount += sendnum;
}
}
return false;
}
}
/**
* 获取指定时间的毫秒数
*/
private int getmilliseconds(date date) {
simpledateformat df = new simpledateformat("sss");
return integer.valueof(df.format(date));
}
public static void main(string[] args) throws interruptedexception {
overflowcontroller oc = new overflowcontroller(50);
simpledateformat df = new simpledateformat("yyyy-mm-dd hh:mm:ss:sss");
for (int i = 0; i <= 100; i++) {
if (oc.isoverflow(1)) {
system.out.println(i + "-isoverflow-" + df.format(new date()));
} else {
system.out.println(i + "-sendok-" + df.format(new date()));
}
thread.sleep(10);
}
}
}
import java.util.date;
import java.lang.thread;
/**
* 流量控制
*
* @author chenx
*/
public class overflowcontroller {
private int maxsendcountpersecend; // 该条链路上流控阀值
private date sendtime = new date();
private int sendcount = 0; // 该条链路上发送的数量
public overflowcontroller(int maxsendcountpersecend) {
if (maxsendcountpersecend < 2) {
maxsendcountpersecend = 2;
}
this.maxsendcountpersecend = maxsendcountpersecend;
}
public int getmaxsendcountpersecend() {
if (getmilliseconds(new date()) >= 500) {
return maxsendcountpersecend / 2;
}
return maxsendcountpersecend - (maxsendcountpersecend / 2);
}
/**
* 是否超流控
*/
public boolean isoverflow(int sendnum) {
synchronized (this) {
date now = new date();
if (now.gettime() - sendtime.gettime() >= 500) {
sendtime = now;
sendcount = sendnum;
} else {
if (sendcount + sendnum > getmaxsendcountpersecend()) {
return true;
} else {
sendcount += sendnum;
}
}
return false;
}
}
/**
* 获取指定时间的毫秒数
*/
private int getmilliseconds(date date) {
simpledateformat df = new simpledateformat("sss");
return integer.valueof(df.format(date));
}
public static void main(string[] args) throws interruptedexception {
overflowcontroller oc = new overflowcontroller(50);
simpledateformat df = new simpledateformat("yyyy-mm-dd hh:mm:ss:sss");
for (int i = 0; i <= 100; i++) {
if (oc.isoverflow(1)) {
system.out.println(i + "-isoverflow-" + df.format(new date()));
} else {
system.out.println(i + "-sendok-" + df.format(new date()));
}
thread.sleep(10);
}
}
}
超流控后的延迟处理,由于java中没有.net的“延迟委托”一说:
复制代码 代码如下:
threadpool.registerwaitforsingleobject(
waithandle waitobject,
waitortimercallback callback,
object state,
int millisecondstimeoutinterval,
bool executeonlyonce
)
waithandle waitobject,
waitortimercallback callback,
object state,
int millisecondstimeoutinterval,
bool executeonlyonce
)
java下需实现一个简单的延迟队列:
复制代码 代码如下:
import java.util.concurrent.delayed;
import java.util.concurrent.timeunit;
public class delayentry implements delayed {
private int count;
private long dequeuedtimemillis; // 出队列时间
public int getcount() {
return count;
}
public void setcount(int count) {
this.count = count;
}
public long getdequeuedtimemillis() {
return dequeuedtimemillis;
}
public delayentry(long delaymillis) {
dequeuedtimemillis = system.currenttimemillis() + delaymillis;
}
@override
public int compareto(delayed o) {
delayentry de = (delayentry) o;
long timeout = dequeuedtimemillis - de.dequeuedtimemillis;
return timeout > 0 ? 1 : timeout < 0 ? -1 : 0;
}
@override
public long getdelay(timeunit unit) {
return dequeuedtimemillis - system.currenttimemillis();
}
}
import java.util.concurrent.timeunit;
public class delayentry implements delayed {
private int count;
private long dequeuedtimemillis; // 出队列时间
public int getcount() {
return count;
}
public void setcount(int count) {
this.count = count;
}
public long getdequeuedtimemillis() {
return dequeuedtimemillis;
}
public delayentry(long delaymillis) {
dequeuedtimemillis = system.currenttimemillis() + delaymillis;
}
@override
public int compareto(delayed o) {
delayentry de = (delayentry) o;
long timeout = dequeuedtimemillis - de.dequeuedtimemillis;
return timeout > 0 ? 1 : timeout < 0 ? -1 : 0;
}
@override
public long getdelay(timeunit unit) {
return dequeuedtimemillis - system.currenttimemillis();
}
}
复制代码 代码如下:
import java.util.concurrent.delayqueue;
public class delayservice {
public void run() {
delayqueue<delayentry> queue = new delayqueue<delayentry>();
delayconsumer delayconsumer = new delayconsumer(queue);
delayconsumer.start();
for (int i = 0; i < 100; i++) {
delayentry de = new delayentry(5000);
de.setcount(i);
system.out.println(system.currenttimemillis() + "--------" + de.getcount());
queue.add(de);
}
}
class delayconsumer extends thread {
delayqueue<delayentry> queue;
public delayconsumer(delayqueue<delayentry> queue) {
this.queue = queue;
}
public void run() {
while (true) {
try {
delayentry de = queue.take();
system.out.println("queue size=" + queue.size());
system.out.println(de.getcount());
system.out.println(system.currenttimemillis());
} catch (interruptedexception e) {
e.printstacktrace();
}
}
}
}
public static void main(string[] args) {
delayservice ds = new delayservice();
ds.run();
}
}
public class delayservice {
public void run() {
delayqueue<delayentry> queue = new delayqueue<delayentry>();
delayconsumer delayconsumer = new delayconsumer(queue);
delayconsumer.start();
for (int i = 0; i < 100; i++) {
delayentry de = new delayentry(5000);
de.setcount(i);
system.out.println(system.currenttimemillis() + "--------" + de.getcount());
queue.add(de);
}
}
class delayconsumer extends thread {
delayqueue<delayentry> queue;
public delayconsumer(delayqueue<delayentry> queue) {
this.queue = queue;
}
public void run() {
while (true) {
try {
delayentry de = queue.take();
system.out.println("queue size=" + queue.size());
system.out.println(de.getcount());
system.out.println(system.currenttimemillis());
} catch (interruptedexception e) {
e.printstacktrace();
}
}
}
}
public static void main(string[] args) {
delayservice ds = new delayservice();
ds.run();
}
}
希望本文所述对大家的java程序设计有所帮助。