欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

5个并发处理技巧代码示例

程序员文章站 2024-04-02 10:26:28
【译者注】在本文中,作者总结出了5个关于处理并发性程序的技巧,并给出代码示例,让读者更好地理解和使用这5种方法。 以下为译文: 1.捕获interruptedexce...

【译者注】在本文中,作者总结出了5个关于处理并发性程序的技巧,并给出代码示例,让读者更好地理解和使用这5种方法。 以下为译文:

1.捕获interruptedexception错误

请检查下面的代码片段:

public class task implements runnable {
	private final blockingqueue queue = ...;
	@override
	 public void run() {
		while (!thread.currentthread().isinterrupted()) {
			string result = getordefault(() -> queue.poll(1l, timeunit.minutes), "default");
			//do smth with the result
		}
	}
	t getordefault(callable supplier, t defaultvalue) {
		try {
			return supplier.call();
		}
		catch (exception e) {
			logger.error("got exception while retrieving value.", e);
			return defaultvalue;
		}
	}
}

代码的问题是,在等待队列中的新元素时,是不可能终止线程的,因为中断的标志永远不会被恢复:

1.运行代码的线程被中断。
2.blockingqueue # poll()方法抛出interruptedexception异常,并清除了中断的标志。
3.while中的循环条件 (!thread.currentthread().isinterrupted())的判断是true,因为标记已被清除。

为了防止这种行为,当一个方法被显式抛出(通过声明抛出interruptedexception)或隐式抛出(通过声明/抛出一个原始异常)时,总是捕获interruptedexception异常,并恢复中断的标志。

t getordefault(callable supplier, t defaultvalue) {
	try {
		return supplier.call();
	}
	catch (interruptedexception e) {
		logger.error("got interrupted while retrieving value.", e);
		thread.currentthread().interrupt();
		return defaultvalue;
	}
	catch (exception e) {
		logger.error("got exception while retrieving value.", e);
		return defaultvalue;
	}
}

2.使用特定的执行程序来阻止操作

因为一个缓慢的操作而使整个服务器变得无响应,这通常不是开发人员想要的。不幸的是,对于rpc,响应时间通常是不可预测的。

假设服务器有100个工作线程,有一个端点,称为100 rps。在内部,它发出一个rpc调用,通常需要10毫秒。在某个时间点,此rpc的响应时间变为2秒,在峰值期间服务器能够做的惟一的一件事就是等待这些调用,而其他端点则无法访问。

@get
@path("/genre/{name}")
@produces(mediatype.application_json)
public response getgenre(@pathparam("name") string genrename) {
	genre genre = potentiallyveryslowsynchronouscall(genrename);
	return response.ok(genre).build();
}

解决这个问题最简单的方法是提交代码,它将阻塞调用变成一个线程池:

@get
@path("/genre/{name}")
@produces(mediatype.application_json)
public void getgenre(@pathparam("name") string genrename, @suspended asyncresponse response) {
	response.settimeout(1l, timeunit.seconds);
	executorservice.submit(() -> {
		genre genre = potentiallyveryslowsynchronouscall(genrename);
		return response.resume(response.ok(genre).build());
	}
	);
}

3.传mdc的值

mdc(mapped diagnostic context)通常用于存储单个任务的特定值。例如,在web应用程序中,它可能为每个请求存储一个请求id和一个用户id,因此mdc查找与单个请求或整个用户活动相关的日志记录变得更加容易。

2017-08-27 14:38:30,893 info [server-thread-0] [requestid=060d8c7f, userid=2928ea66] c.g.s.web.controller - message.

可是如果代码的某些部分是在专用线程池中执行的,则线程(提交任务的线程)中mdc就不会被继续传值。在下面的示例中,第7行的日志中包含“requestid”,而第9行的日志则没有:

@get
@path("/genre/{name}")
@produces(mediatype.application_json)
public void getgenre(@pathparam("name") string genrename, @suspended asyncresponse response) {
	try (mdc.mdccloseable ignored = mdc.putcloseable("requestid", uuid.randomuuid().tostring())) {
		string genreid = getgenreidbyname(genrename);
		//sync call
		logger.trace("submitting task to find genre with id '{}'.", genreid);
		//'requestid' is logged
		executorservice.submit(() -> {
			logger.trace("starting task to find genre with id '{}'.", genreid);
			//'requestid' is not logged
			response result = getgenre(genreid) //async call
			.map(artist -> response.ok(artist).build())
			   .orelseget(() -> response.status(response.status.not_found).build());
			response.resume(result);
		}
		);
	}
}

这可以通过mdc#getcopyofcontextmap()方法来解决:

...
public void getgenre(@pathparam("name") string genrename, @suspended asyncresponse response) {
 try (mdc.mdccloseable ignored = mdc.putcloseable("requestid", uuid.randomuuid().tostring())) {
 ...
 logger.trace("submitting task to find genre with id '{}'.", genreid); //'requestid' is logged
 withcopyingmdc(executorservice, () -> {
  logger.trace("starting task to find genre with id '{}'.", genreid); //'requestid' is logged
  ...
 });
 }
}
private void withcopyingmdc(executorservice executorservice, runnable function) {
 map

4.更改线程名称

为了简化日志读取和线程转储,可以自定义线程的名称。这可以通过创建executorservice时用一个threadfactory来完成。在流行的实用程序库中有许多threadfactory接口的实现:

com.google.common.util.concurrent.threadfactorybuilde+r in guava. 
org.springframework.scheduling.concurrent.customizablethreadfactory in spring. 
org.apache.commons.lang3.concurrent.basicthreadfactory in apache commons lang 3.
threadfactory threadfactory = new basicthreadfactory.builder()
 .namingpattern("computation-thread-%d")
 .build();
executorservice executorservice = executors.newfixedthreadpool(numberofthreads, threadfactory);

尽管forkjoinpool不使用threadfactory接口,但也支持对线程的重命名:

forkjoinpool.forkjoinworkerthreadfactory forkjointhreadfactory = pool -> { 
 forkjoinworkerthread thread = forkjoinpool.defaultforkjoinworkerthreadfactory.newthread(pool); 
 thread.setname("computation-thread-" + thread.getpoolindex()); 
 return thread;
};
forkjoinpool forkjoinpool = new forkjoinpool(numberofthreads, forkjointhreadfactory, null, false);

将线程转储与默认命名进行比较:

"pool-1-thread-3" #14 prio=5 os_prio=31 tid=0x00007fc06b19f000 nid=0x5703 runnable [0x0000700001ff9000]
 java.lang.thread.state: runnable
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.taskhandler.compute(taskhandler.java:16)
...
"pool-2-thread-3" #15 prio=5 os_prio=31 tid=0x00007fc06aa10800 nid=0x5903 runnable [0x00007000020fc000]
 java.lang.thread.state: runnable
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.healthcheckcallback.recordfailure(healthchecker.java:21)
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.healthchecker.check(healthchecker.java:9)
...
"pool-1-thread-2" #12 prio=5 os_prio=31 tid=0x00007fc06aa10000 nid=0x5303 runnable [0x0000700001df3000]
 java.lang.thread.state: runnable
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.taskhandler.compute(taskhandler.java:16)
 ...

与自定义命名进行比较:

"task-handler-thread-1" #14 prio=5 os_prio=31 tid=0x00007fb49c9df000 nid=0x5703 runnable [0x000070000334a000]
 java.lang.thread.state: runnable
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.taskhandler.compute(taskhandler.java:16)
...
"authentication-service-ping-thread-0" #15 prio=5 os_prio=31 tid=0x00007fb49c9de000 nid=0x5903 runnable [0x0000700003247000]
 java.lang.thread.state: runnable
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.healthcheckcallback.recordfailure(healthchecker.java:21)
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.healthchecker.check(healthchecker.java:9)
...
"task-handler-thread-0" #12 prio=5 os_prio=31 tid=0x00007fb49b9b5000 nid=0x5303 runnable [0x0000700003144000]
 java.lang.thread.state: runnable
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.taskhandler.compute(taskhandler.java:16)
 ...

想象一下,可能会不止3个线程。

5.使用longadder计数器

在高竞争的情况下,会采用java.util.concurrent.atomic.longadder进行计数,而不会采用atomiclong/atomicinteger。

longadder可以跨越多个单元间仍保持值不变,但是如果需要的话,也可以增加它们的值,但与父类atomicxx比较,这会导致更高的吞吐量,也会增加内存消耗。

longadder counter = new longadder();
counter.increment();
...
long currentvalue = counter.sum();

总结

以上就是本文关于5个并发处理技巧代码示例的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:java并发编程semaphore计数信号量详解优化tomcat配置(内存、并发、缓存等方面)方法详解等,有什么问题可以随时留言,小编会及时回复大家的。感谢朋友们对本站的支持!