关于Openfire集群源码的分析
本文介绍了openfire的相关内容,这个东西现在用的人好像不多了。算了,我们看看具体内容。
openfire是什么?
openfire 采用java开发,开源的实时协作(rtc)服务器基于xmpp(jabber)协议。openfire安装和使用都非常简单,并利用web进行管理。单台服务器可支持上万并发用户。由于是采用开放的xmpp协议,您可以使用各种支持xmpp协议的im客户端软件登陆服务。如果你想轻易地构建高效率的即时通信服务器,那就选择它吧!
openfire能做什么?
我们要了解openfire,首先要了解xmpp协议,因为openfire是用java语言编写的,基于xmpp协议、开源的实时协作的服务器。openfire具有跨平台的能力,openfire与客户端采用的是c/s架构,一个服务器要负责为连接在其上的客户端提供服务。openfire客户端有spark,pidgin, miranda im,ichat等,用户如果自己开发客户端,可以采用遵循gpl的开源client端api--smack。openfire服务器端支持插件开发,如果开发者需要添加新的服务,可以开发出自己的插件后,安装至服务器,就可以提供服务,如查找联系人服务就是以插件的形式提供的。
openfire如果用户量增加后为了解决吞吐量问题,需要引入集群,在openfire中提供了集群的支持,另外也实现了两个集群插件:hazelcast和clustering。为了了解情况集群的工作原理,我就沿着openfire的源代码进行了分析,也是一次学习的过程。
首先理解集群的一些简单概念
集群的目的是让多个实例像一个实例一样运行,这样就可以通过增长实例来增长计算能力。也就是所谓的分布式计算问题,这其中最为关注的一个特性就是——cap理论,也就是所谓的一致性、可用性、分区容错性。集群中最核心解决的问题就是cap。
cap综合理解就是我上面写的,多个实例像一个实例一样运行。
所以所谓集群就是把一些数据共享或者同步到不同的实例上,这样系统使用同样的算法,取的结果当然应该是相同啦。所以一些数据库的主从复制,缓存数据集群都是类似这种解决方法。只是代码实现质量和处理规模的问题。
有了这个基础我们再来看看openfire是怎么解决这个问题的。
openfire的集群设计
1、哪些需要进行集群间的同步
对于openfire而言,有这几方面的数据需要进行保证集群间的同步:数据库存的数据、缓存数据、session。貌似就这些吧?
数据库
因为对于openfire来说基本上是透明的,所以这块就交给数据库本身来实现。
缓存数据
缓存是存在内存里的,所以这部分是要同步的
session
session在openfire并不需要所有实例同步,但是需要做用户路由缓存,否则发消息时找不到对应的会话。由此用户路由还是要同步的。
2、缓存的设计
缓存接口
openfire里对缓存的数据容器提供了一个包装接口,这个接口提供了缓存数据的基本方法,用于统一数据操作。
publicinterface cache<k,v> extends java.util.map<k,v>
如果不开启集群时缓存的默认缓存容器类是:public class defaultcache<k, v> ,实际上defaultcache就是用一个hashmap来存数据的。
缓存工厂类
为了保证缓存是可以扩展的,提供了一个工厂类:
publicclass cachefactory
cachefactory类中会管理所有的缓存容器,如下代码:
/** * returns the named cache, creating it as necessary. * * @param name the name of the cache to create. * @return the named cache, creating it as necessary. */ @suppresswarnings("unchecked") publicstaticsynchronized <t extends cache> t createcache(string name) { t cache = (t) caches.get(name); if (cache != null) { return cache; } cache = (t) cachefactorystrategy.createcache(name); log.info("created cache [" + cachefactorystrategy.getclass().getname() + "] for " + name); return wrapcache(cache, name); }
上面代码中会通过缓存工厂策略对象来创建一个缓存容器,最后warpcache方法会将此容器放入到caches中。
缓存工厂类的策略
在cachefactory中默认是使用一个defaultlocalcachestrategy来完成缓存创建的。另外还提供了在集群条件下的缓存策略接入。也就是通过实例化不同的策略来切换缓存管理方案。比如后面要提到的hazelcast就是通过这个来替换了本地缓存策略的。从接口的设计上来看,openfire的缓存策略也就是为了集群与非集群的实现。
3、集群的设计
在openfire中的集群主要包括:集群管理、数据同步管理、集群计算任务。
集群管理者
在openfire中主要是一个类来实现:clustermanager,在clustermanager中实现了集群实例的加入、退出管理,因为没有使用主从结构,所以clustermanager实现了一个无中心管理,不知道我理解的对不对。因为只要当前实实例启用了集群,clustermanager就会主动的加载集群管理并与其他的集群进行同步。
startup
startup是启动集群的方法,代码:
publicstaticsynchronizedvoid startup() { if (isclusteringenabled() && !isclusteringstarted()) { initeventdispatcher(); cachefactory.startclustering(); } }
首先要判断是否开启了集群并且当前集群实例未运行时才去启动。
先是初始化了事件分发器,用于处理集群的同步事情。
然后就是调用cachefactory的startclustering来运行集群。在startclustering方法中主要是这几个事情:
会使用集群的缓存工厂策略来启动,同时使自己加入到集群中。
开启一个线程用于同步缓存的状态
在前面startup中的initeventdispatcher方法,在这里会注册一个分发线程监听到集群事件,收到事件后会执行joinedcluster或者leftcluster的操作,joinedcluster就是加入到集群中的意思。
在joinedcluster时会将本地的缓存容器都转换为集群缓存。由此便完成了集群的初始化并加入到集群中了。
shutdown
shutdown相对简单点就是退出集群,并且将缓存工厂恢复为本地缓存。
同步管理
上面主要是讲了如何管理集群,接着比较重要的就是如何在集群间同步数据呢?这部分主要是看具体的分布式计算系统的实现了,从openfire来说就是将数据放到集群缓存中,然后通过集群组件来完成的,比如使用hazelcast。
因为使用缓存来解决,所以在cachefactory中才会有这些么多关于集群的处理代码,特别是对于缓存策略的切换,以及集群任务处理都在cachefactory作为接口方法向外公开。这样也把集群的实现透明了。
集群计算任务
在这之前一直没有提到集群中的计算问题,因为既然有了集群是不是可以利用集群的优势进行一些并行计算呢?这部分我倒没有太过确定,只是看到相关的代码所以简单列一下。
在cachefactory类中有几个方法:doclustertask、dosynchronousclustertask,这两个都是overload方法,参数有所不同而已。这几个方法就是用于执行一些计算任务的。就看一下doclustertask:
public static void doclustertask(final clustertask<?> task) { cachefactorystrategy.doclustertask(task); }
这里有个限定就是必须是clustertask派生的类才行,看看它的定义:
public interface clustertask<v> extends runnable, externalizable { v getresult(); }
主要是为了异步执行和序列化,异步是因为不能阻塞,而序列化当然就是为了能在集群中传送。
再看cachefactory的doclustertask方法可以发现,它只不过是代理了缓存策略工厂的doclustertask,具体的实现还是要看集群实现的。
看一看hazelcast的实现简单理解openfire集群
在openfire中有集群的插件实现,这里就以hazelcast为例子简单的做一下分析与学习。
缓存策略工厂类(clusteredcachefactory)
clusteredcachefactory实现了cachefactorystrategy,代码如下:
publicclass clusteredcachefactory implements cachefactorystrategy {
首先是startcluster方法用于启动集群,主要完成几件事情:
设置缓存序列化工具类,clusterexternalizableutil。这个是用于集群间数据复制时的序列化工具
设置远程session定位器,remotesessionlocator,因为session不同步,所以它主要是用于多实例间的session读取
设置远程包路由器clusterpacketrouter,这样就可以在集群中发送消息了
加载hazelcast的实例设置nodeid,以及设置clusterlistener
在前面说起集群启动时提到了缓存切换,那具体实现时是如何做的呢?
因为集群启动后就要是cachefactory.joinedcluster方法来加入集群的。看一下加入的代码:
/** * notification message indicating that this jvm has joined a cluster. */ @suppresswarnings("unchecked") publicstaticsynchronizedvoid joinedcluster() { cachefactorystrategy = clusteredcachefactorystrategy; // loop through local caches and switch them to clustered cache (copy content)for (cache cache : getallcaches()) { // skip local-only cachesif (localonly.contains(cache.getname())) continue; cachewrapper cachewrapper = ((cachewrapper) cache); cache clusteredcache = cachefactorystrategy.createcache(cachewrapper.getname()); clusteredcache.putall(cache); cachewrapper.setwrappedcache(clusteredcache); } clusteringstarting = false; clusteringstarted = true; log.info("clustering started; cache migration complete"); }
这里可以看到会读取所有的缓存容器并一个个的使用wrapper包装一下,然后用同样的缓存名称去createcache一个新的cache,这步使用的是切换后的集群缓存策略工厂,也就是说会使用clusteredcachefactory去创建新的缓存容器。最后再将cache写入到新的clusteredcache 里,这样就完成了缓存的切换。
当然这里还是要看一下clusteredcachefactory的createcache实现:
public cache createcache(string name) { // check if cluster is being started upwhile (state == state.starting) { // wait until cluster is fully started (or failed)try { thread.sleep(250); } catch (interruptedexception e) { // ignore } } if (state == state.stopped) { thrownew illegalstateexception("cannot create clustered cache when not in a cluster"); } returnnew clusteredcache(name, hazelcast.getmap(name)); }
这里使用的是clusteredcache,而且最重要的是传入的第二个map参数换成了hazelcast的了,这样之后再访问这个缓存容器时已经不再是原先的本地cache了,已经是hazelcast的map对象。hazelcast会自动对map的数据进行同步管理,这也就完成了缓存同步的功能。
集群计算
那就看hazelcast的实现吧,在clusteredcachefactory中doclustertask举个例子吧
publicvoid doclustertask(final clustertask task) { if (cluster == null) { return; } set<member> members = new hashset<member>(); member current = cluster.getlocalmember(); for(member member : cluster.getmembers()) { if (!member.getuuid().equals(current.getuuid())) { members.add(member); } } if (members.size() > 0) { // asynchronously execute the task on the other cluster members logger.debug("executing asynchronous multitask: " + task.getclass().getname()); hazelcast.getexecutorservice(hazelcast_executor_service_name).submittomembers( new callabletask<object>(task), members); } else { logger.warn("no cluster members selected for cluster task " + task.getclass().getname()); } }
过程就是,先获取到集群中的实例成员,当然要排除自己。然后hazelcast提供了executorservice来执行这个task,方法就是submitetomembers。这样就提交了一个运算任务。只不过具体是如何分配计算并汇集结果倒真不太清楚。
总结
花了一天时间看了一下openfire的集群,顺手就写了一篇文章,确实也到了一些东西。和一些网友沟通中好像目前大家更愿意使用redies来完成缓存共享,以及通过代理来实现集群,而不愿意使用openfire的集群方案。这部分我没有遇到如何大的并发量需求确实不知道区别在哪里。以后有机会还是动手试试写一个redies的插件。