欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java 并发编程 - Programming Concurrency on the JVM

程序员文章站 2022-07-13 09:17:30
...

这几个月一直在做性能调优的工作,以前总是进行功能的开发,从来不考虑性能的问题,经过这几个月的工作,发现从性能和扩展性的角度去看软件开发,还真是大不一样。在和朋友聊天的时候,提及Java程序是否能充分利用多核cpu的问题的时候,朋友给我推荐了这本书《Programming Concurrency on the JVM》。几天看下来,还真觉得很应景,建议做Java开发的朋友试着阅读一下。我简单记录下我的读后感。

从多线程角度重新检查你的程序

一直习惯于在JavaEE的开源框架下做开发,认为多线程是容器(server)和框架的事情。其实不是的。我们定义的每一个类,如果是在多线程环境下被使用,你就得考虑线程安全和高并发性。

 

利用多核CPU

 

如果你准备用多线程来提高你的性能,创建的线程池的大小至少要等于你的CPU内核数,因为多核CPU是可以同时运行多个thread的。当然如果,你的操作依赖于IO或者网络,你还可以增加你的thread pool的大小,这样在等待IO的时候让CPU也别闲着,可以做点计算什么的。

可以通过Runtime.getRuntime().availableProcessors();来得到机器的CPU处理器的个数。


线程安全(thread-safe)VS高并发

所谓线程安全,就是指当同一个对象的状态(属性)被多个线程同时读写的时候,会不会产生冲突和不一致的问题。传统的JDK给我们提供了同步 (synchronize)来避免这个问题,但这往往会造成性能的瓶颈。从JDK5开始,JDK提供了concurrent包,可以帮助我们在很多情况下 避免使用同步来解决线程安全的问题。


设计不变类(immutable), 分离可变类(mutable)

其实,优良的设计是可以避免很多的线程安全问题,并提供高并发和高可用性的支持。最重要的一个设计方法就是设计不变类(immutable)。如果你的类的实例在创建之后就不再能被改变,那么你就不用担心读写冲突,也就是线程安全了,这样你就可以*的cache和共享这个类的实例了。
Hibernate 的SessionFactory就是这样设计的,所以SessionFactory是线程安全的。但是Session就不是线程安全的。所以不要让session在多线程下共享。session是hibernate帮我们管理持久对象状态变迁的核心,它和transaction紧密相关,只有transaction开始了,一个 session才能被使用,transaction提交或者回滚(commit或rollback)后这个session就会被clean up,清空。即使在同一个线程里,我们也不能每次需要的时候就创建一个新的session,尽管创建session开销不大。毕竟一个线程里面不一定就只执行一个transaction。通常我们通过SessionFactory的getCurrentSession()方法获取session。这背后是通过SessionContext里面的ThreadLocal变量context把session和线程绑定在一起的。这就保证了线程之间不会共享session,同时又可以重用同一个线程已有的session,只要session被clean up了并还没有关闭。这样这个session就能被同一个线程的一个接一个的transaction再利用。既然可以重用,为什么有多个session,而不是只有一个呢?这是为了应对一个线程中牵涉到不只是一个session factory的时候。比如,你的一个线程里需要访问多个不同的数据库,这样就有可能有多个session factory。没有数据库,你需要创建一个session与之关联。可以看出content变量是Map,其实是Map<SessionFactory, Session>,不知道为什么没有写成:ThreadLocal<Map<SessionFactory, Session>>。

 

SessionFactoryImpl.java
public Session getCurrentSession() throws HibernateException {
		if ( currentSessionContext == null ) {
			throw new HibernateException( "No CurrentSessionContext configured!" );
		}
		return currentSessionContext.currentSession();
	}

ThreadLocalSessionContext.java
/**
	 * A ThreadLocal maintaining current sessions for the given execution thread.
	 * The actual ThreadLocal variable is a java.util.Map to account for
	 * the possibility for multiple SessionFactorys being used during execution
	 * of the given thread.
	 */
	private static final ThreadLocal<Map> context = new ThreadLocal<Map>();
 


Spring 的Bean工厂生产的bean默认情况下是单例的,也就是可以在这一个IoC容器里面只会有一个这个Bean的实例。这样就要求这个类是无状态的,既然无状态也就不存在被改变(write)的可能,所以可以保证线程安全。如果你想让spring创建有状态的类的实例,你就得在Bean的定义上明确scope=prototype。如果singleton的Bean引用了prototype的类的实例,就会存在线程安全的问题。因为singleton的bean只会在第一次引用的时候创建一个prototype的实例然后注入,之后多线程就会引用同一个singleton的bean,那么这个singleton的bean也就不安全了。换句话说,如果singleton的bean引用了prototype的实例,singleton的bean就会被污染成有状态的了。如果你能确保这个prototype的实例在创建之后就不可改变,也就是immutable的,那就不会有问题,否则,你就得注意线程安全的问题。
Spring的BeanFactory里面用了java.util.concurrent.ConcurrentHashMap来缓存解析好的bean的定义,保证了高并发性的要求。

        /**
	 * Map of bean definition objects, keyed by bean name
	 * @uml.property  name="beanDefinitionMap"
	 * @uml.associationEnd  qualifier="beanName:java.lang.String 
         * org.springframework.beans.factory.config.BeanDefinition"
	 */
	private final Map<String, BeanDefinition> beanDefinitionMap = 
        new ConcurrentHashMap<String, BeanDefinition>();
 

另外JDK的String还有Integer等wrapper类也都是不变类。


当然我们不能避免使用会发生状态变化的类,只是我们要尽量把可变的类和不可变的类分离出来(这其实也是OOD的一个原则)。


使用concurrent包

对于可变的类,也尽量不要使用synchronize。可以使用concurrent提供的lock,这个包提供了读写lock,比synchronize力度更细。其实有点类似于数据库的锁的设计了。

 

Tomcat 在Endpoint (处理连接请求的类) 的实现中就利用java.util.concurrent.ThreadPoolExecutor来管理多线程:

 

AbstractEndpoint.java

 

 public void createExecutor() {
        internalExecutor = true;
        TaskQueue taskqueue = new TaskQueue();
        TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
        taskqueue.setParent( (ThreadPoolExecutor) executor);
    }

 

这个线程池就是为就是用来处理request而创建的。当tomcat server通过endpoint收到socket请求之后,对socket按照tomcat容器的层次(Server->Service->Connector->Engine->Host->Context->Wrapper, 并不全部都是必须的)最终交到了servlet的wrapper手中加以处理。也就是调用我们自定义的servlet来处理。这样看来servlet是在一个线程中被调用和创建的。那么servlet是不是线程安全的呢?

老版本的tomcat实现中有SingleThreadMode,想法是为每一个请求创建一个Servlet实例,从而并避免线程安全的问题。其实是避免不了的,设想如果servlet里面定义了静态变量,是不是就会有线程安全的问题。所以在新版本中这种方法被废弃了。默认情况Servlet是单例的。如同spring里面的bean一样。所以servlet不是线程安全的。尽量让servlet无状态,或者immutable。不然你就得处理线程同步的问题。有人用ThreadLocal变量来存放状态,其实这个和并发没有关系,看你的需要了。


使用Akka - software transactional memery(STM)


Akka是一个实现了(STM)的框架,对Java和Scala都有很好的支持。什么是STM呢?它的核心思想是将对象的状态和引用分开来看。保证对象的状态不改变,但引用变量的值可以变化。怎么说呢。就是说当需要改变对象的状态的时候,不是在原来的对象上修改,而是重新创建一个新的对象,并且让对象引用指向新创建的对象。这在多线程下可以保证高并发读,避免在读的时候数据被修改了。它能保证在状态改变之前的读不会受到后面修改的影响,同时在修改以后的读都能读到修改以后的值。这是合理的。这里有个关键就是对象是immutable的。

 

其实是借助事务设计的理念来处理并发问题。其实这个也是借鉴了DB的事务设计理念。试想一下,数据库作为一个共享并且可变的资源,能在多线程下工作的那么好,无非借助于优良的锁和事务的机制。STM借鉴了乐观锁的事务机制。STM假设你的共享数据被频繁的读和写,但是同时写的可能性比较小。试想一下,很多时候我们的共享数据都是用户相关的,也就是不同的用户有着不同的可变状态类,只要你保住同一个用户在操作的时候调用的服务不要有并发的问题,也就不会冲突,这大概也是为什么我们一直不怎么注意线程安全同时又没有遇到什么问题的原因。对于一个系统管理的信息,比如几个管理员有可能同时改变某个系统设置,有可能产生并发访问。这种情况很少,一方面因为管理员用户本来就很少,他们同时操作同一个数据的可能性就很少了。而且现在的权限设计很细致,以至于不同的管理员也有不同的管理数据域。但是,我们也不能完全避免并发性,虽然概率比较小。这个时候STM就发挥作用了,它确保在写的时候,如果没有被别的线程捷足先登,它就写进去,万一有别的线程在它读之后,写之前修改了这个数据,它就回滚整个事务,并且retry。这就是乐观锁的思想。

从上面的分析可以看出,在频繁读写同时写冲突很少发生的情况下,可以使用STM取得较好的高并发性。

其实STM,也给我们另一个启事,那就是我们在设计代码的时候,是不是可以加入事务的考虑?那样我们的程序更为安全和合理,Akka提供了这方面的API,很有意思。


使用Akka - Actors

Actor本质上是基于时间驱动的异步处理机制。就比如很多人同时给你发短信,你的手机负责把短信依次排列好等你阅读。而给你发短信的人可以并发的进行,不象给你打电话的,必须依次进行。Actor的机制很类似这个。框架负责保证他的方法可以在一个线程进行,不会发生冲突。这个特别适合可以一步处理的任务,框架负责多线程的调度和管理,代码会比较简洁。

 

思考1. 在tomcat web app环境下如何利用executor service (Executor.newFixedThreadPool(int poolSize))和确定thread pool的大小?

看了上面的tomcat中利用executor service来构建request 的处理线程可以知道,如果tomcat的connector设置了最大连接数为1000,那就是说tomcat最多可以创建1000个线程来接受客户端的请求。如果我们在servlet里面又创建了连接池,比如在有4核的cpu服务器上,我们在servlet里面利用大小为4的线程池来处理请求,会有什么问题呢?

 

1. 如果在某刻,只有一个请求,那么servlet便可以充分利用4核的处理能力,响应速度自然很快。如果在某刻有1000个请求,那就有可能产生4000个线程。我们的cpu会不会有问题?毕竟不是线程越多越好,应该以发挥cpu最大能力为目标,又不能受累于线程间的无味切换。这个还真得靠测试和监控来决定了。

2. 还有,是创建一个全局的线程池,还是为每个请求创建一个线程池好呢?我们知道servlet是单例的,但不是线程安全的。如果用一个servlet field来持有这个线程池好不好?还是在servlet调用的方法里面创建线程池?创建线程池的开销大不大?这些都需要评估。

3.我们需要在提交完task之后,调用shutdown方法以使得任务被完成之后,终止线程,从而释放资源。shutdown方法只是阻止接受新的任务,还是会允许之前提交的任务继续做完,都做完之后才会终止线程。

public static ExecutorService
 newFixedThreadPool
(int nThreads)

Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available. If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks. The threads in the pool will exist until it is explicitly shutdown .