欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Tigase 发送消息的流程源码分析

程序员文章站 2022-06-28 12:57:46
XMPP 的节是使用基本的”push”方法来从一个地方到另一个地方得到消息。因为消息通常是不告知的,它们是一种”fire-and-forget”(发射后自寻目的)的机制来从一个地方到另一个地方快速获取信息消息节有五种不同的类型,通过 type 属性来进行区分:例如 chat 类型 ......
xmpp 的<message/>节是使用基本的”push”方法来从一个地方到另一个地方得到消息。因为消息通常是不告知的,它们是一种”fire-and-forget”(发射后自寻目的)的机制来从一个地方到另一个地方快速获取信息
消息节有五种不同的类型,通过 type 属性来进行区分:例如 chat 类型为 chat 的消息在两个实体间的实时对话中交换,例如两个朋友之间的即时通讯聊天。除了 type 属性外,消息节还包括一个 to 和 from 地址,并且也可以包含一个用于跟踪目的的 id  属性(我们在使用更为广泛的 iq  节中详细的讨论 ids)。to  地址是预期接收人的
jabberid,from 地址是发送者的jabberid。from 地址不由发送客户端提供,而是由发送者的服务器添加邮戳,以避免地址欺骗。
在tigase中,有两个重要的组成,一个组件,二是插件,可以去官方网去看下他的架构介绍 https://docs.tigase.net/tigase-server/7.1.4/development_guide/html/#writeplugincode
例如最著名的组件的一个例子是muc或pubsub。在tigase中,几乎所有东西实际上都是一个组件:会话管理器、s2s连接管理器、消息路由器等等,组件是根据服务器配置加载的,新的组件可以在运行时加载和激活。您可以轻松地替换组件实现,唯一要做的更改是配置条目中的类名。

tigase 中定义一个最简单的消息组件,需要实现messagereceiver或继承 extends abstractmessagereceiver 类, messagereceiver 的抽象类: abstractmessagereceiver 子类 :
一、clientconnectionmanager
二、sessionmanager
三、 messagerouter
public void setproperties(map<string, object> props){
    for (string name : msgrcv_names) {
        mr = conf.getmsgrcvinstance(name);
        if (mr instanceof messagereceiver) {
            ((messagereceiver) mr).setparent(this);
            ((messagereceiver) mr).start();
        }
    }
}

1、当客户端发送的message消息到tigase服务端,每个一socket连接都会被包装成ioservice对象,ioservice包含一系列操作socket的方法(接收发送数据等),processsocketdata()接收网络数据,由tigase.net处理解析成xml对象,并将packet放到接收队列receivedpackets中再调用servicelistener.packetsready(this)。由于connectionmanager实现ioservicelistener接口,实现上调用的的是connectionmanager中的packetsready()来开始处理数据

此时的packet :packetfrom=null,packetto=null。
 
clientconnectionmanager.processsocketdata方法中设置packet的一些属性:
此时: packetfrom=c2s@llooper/192.168.0.33_5222_192.168.0.33_38624, packetto=sess-man@llooper
clientconnectionmanager.processsocketdata(xmppioservice<object>serv)
    jid id = serv.getconnectionid(); //c2s@llooper/192.168.0.33_5222_192.168.0.33_38624
    p.setpacketfrom(id); //packetfrom 设置为onnectionid
    p.setpacketto(serv.getdatareceiver()); //packetto 设置为sess-man --> sessionmanager 
    addoutpacket(p);//将会委托给父 messagerouter 路由
    
}
//packet 被设置上一些源信息,和目的地信息,接下来,这个数据包将会委托给父 messagerouter 帮忙路由到 sessionmanager组件中进行处理
packet = (tigase.server.message) from=c2s@llooper/192.168.0.33_5222_192.168.0.33_38624, to=sess-man@llooper, data=<message xmlns="jabber:client" id="44grm-176" type="chat" to="llooper@llooper"><thread>swjzv5</thread><composing xmlns="http://jabber.org/protocol/chatstates"/></message>, size=170, xmlns=jabber:client, priority=normal, permission=none, type=chat
 
packet = from=c2s@llooper/192.168.0.33_5222_192.168.0.33_38624, to=sess-man@llooper, data=<message to="admin@llooper" type="chat" id="2jepe-253" xmlns="jabber:client"><thread>7vkmrq</thread><composing xmlns="http://jabber.org/protocol/chatstates"/></message>, size=168, xmlns=jabber:client, priority=normal, permission=none, type=chat
 
2、messagerouter.processpacket(packet packet)部分代码如下:
 
//我们不会处理没有目标地址的数据包,只是丢弃它们并写一个日志消息
if (packet.getto() == null) {
    log.log(level.warning, "packet with to attribute set to null: {0}", packet);
    return;
}   


//它不是一个服务发现包,我们必须找到一个处理组件
//下面的代码块是“快速”找到一个组件if

//这个包to 组件id,格式在以下一项:
// 1。组件名+“@”+默认域名
// 2。组件名+“@”+任何虚拟主机名
// 3。组件名+ "."+默认域名
// 4。组件名+ "."+任何虚拟主机名

servercomponent comp = getlocalcomponent(packet.getto()); //sessionmanager
comp.processpacket(packet, results);

 3、sessionmanager.processpacket(final packet packet)处理,有要代码如下。 例如a->b,这样做的目的是为了首先确定用户a有权限发送packet,然后是确定用户b有权限接收数据。如果用户b不在线,那么离线消息处理器会把packet保存到数据库当中。

//xmppresourceconnection session——用户会话保存所有用户会话数据,并提供对用户数据存储库的访问。它只允许在会话的生命周期内将信息存储在永久存储或内存中。如果在分组处理时没有联机用户会话,则此参数可以为空。
xmppresourceconnection conn = getxmppresourceconnection(packet);
//现在要走sessionmanager的处理函数,主要是走插件流程,插件在tigase中也是一个重要的组成,入口就是在这里,sm plugin
processpacket(packet, conn);

   插入下sm plugin 流程说明 :

这个设计有一个惊人的结果。如果你看下面的图片,显示了两个用户之间的通信,你可以看到数据包被复制了两次才送到最终目的地: 
Tigase 发送消息的流程源码分析
会话管理器(sessionmanager)必须对数据包进行两次处理。第一次以用户a的名义将其作为传出包进行处理,第二次以用户b的名义将其作为传入包进行处理。
这是为了确保用户a有权限发送一个包,所有的processor都应用到packet上,也为了确保用户b有权限接收packet,所有的processor都应用到packet了。例如,如果用户b是脱机的,那么有一个脱机消息processor应该将包发送到数据库,而不是用户b。
 
protected xmppresourceconnection getxmppresourceconnection(packet p) {
        xmppresourceconnection conn = null;
        
        //首先根据这个包的发起者,来查找他的连接资源类,找不到则找接收者的资源类
        jid    from = p.getpacketfrom();
        if (from != null) {
            conn = connectionsbyfrom.get(from);
            if (conn != null) {
                return conn;
            }
        }

        //这个接收者它可能是这个服务器上某个用户的消息,让我们为这个用户查找已建立的会话
        jid to = p.getstanzato();

        if (to != null) {
            if (log.isloggable(level.finest)) {
                log.finest("searching for resource connection for: " + to);
            }
            conn = getresourceconnection(to);
        } else {

            // hm, not sure what should i do now....
            // maybe i should treat it as message to admin....
            log.log(level.info,
                    "message without to attribute set, don''t know what to do wih this: {0}", p);
        }    // end of else

        return conn;
    }
    
    
protected void processpacket(packet packet, xmppresourceconnection conn) {

    ...
    packet.setpacketto(getcomponentid()); //sess-man@llooper
    ...

    if (!stop) {
        //授权匹配的processor处理packet
        walk(packet, conn);
        try {
            if ((conn != null) && conn.getconnectionid().equals(packet.getpacketfrom())) {
                handlelocalpacket(packet, conn);
            }
        } catch (noconnectionidexception ex) {
            ...
        }
    }
    
    ...
}

 

packetto被设置为组件id(sess-man@llooper),其值原先也是这个。
其中walk(packet, conn)方法,匹配处理器(授权)。对于message,此处匹配到的processor是amp和message-carbons,message-carbons没有怎么处理,主要是amp在处理,packet被塞amp的队列中等待处理。

private void walk(final packet packet, final xmppresourceconnection connection) {

        for (xmppprocessorifc proc_t : processors.values()) {
            xmppprocessorifc processor = proc_t;
            //根据element和xmlns,授权匹配成功的processor
            authorization    result    = processor.canhandle(packet, connection);

            if (result == authorization.authorized) {
                ....
            
                processingthreads pt = workerthreads.get(processor.id());

                if (pt == null) {
                    pt = workerthreads.get(defpluginsthreadspool);
                }
                //packet 放到(additem)授权了的processor的队列
                if (pt.additem(processor, packet, connection)) {
                    packet.processedby(processor.id());
                } else {

                    ...
                }
            } else {
                ...
            }
        }   
    }
workerthread.run() 从队列中取出packet,由sessionmanager.process(queueitem item)给amp处理。
sessionmanager.pocess(queueitem item) 如下:
@override
public void process(queueitem item) {
    
    xmppprocessorifc processor = item.getprocessor();

    try {
        //由授权的 processor 处理 packet
        processor.process(item.getpacket(), item.getconn(), nauserrepository,local_results, plugin_config.get(processor.id()));
        if (item.getconn() != null) {
            setpermissions(item.getconn(), local_results);
        }
        addoutpackets(item.getpacket(), item.getconn(), local_results);
    } catch (packeterrortypeexception e) {
        ...
    } catch (xmppexception e) {
        ...
    }
}


//其中processor.process()------> messageamp.process(),如下:

@override
public void process(packet packet, xmppresourceconnection session,
        nonauthuserrepository repo, queue results, map settings) throws xmppexception {
    if (packet.getelemname() == "presence") {
        ...
        
    } else {
        element amp = packet.getelement().getchild("amp", xmlns);

        if ((amp == null) || (amp.getattributestaticstr("status") != null)) {
            messageprocessor.process(packet, session, repo, results, settings);
        } else {
            ...
    }
}

// 其中messageprocessor.process() --------> message.process(),如下


@override
public void process(packet packet, xmppresourceconnection session,
        nonauthuserrepository repo, queue results, map settings) throws xmppexception {

    ...
    try {
        ...
        // 在比较jids之前,记住要去除资源部分
        id = (packet.getstanzafrom() != null)
                ? packet.getstanzafrom().getbarejid()
                : null;

        // 检查这是否是来自客户端的数据包
        if (session.isuserid(id)) {
            // 这是来自这个客户端的数据包,最简单的操作是转发到它的目的地:
            // simple clone the xml element and....
            // ... putting it to results queue is enough
            results.offer(packet.copyelementonly());

            return;
        }

        
    } catch (notauthorizedexception e) {
        ...
    }    // end of try-catch
}

 

检查stanzaffrom与session匹配通过后,将packet.copyelementonly()放到results中,作后续投递,原来的packet 就丢弃了。
此时投递的packet :packetfrom=null,packetto=null。
packet在sessionmanager.addoutpacket(packet packet)中判断packetfrom是否为空,为空则将其设置为componentid(此处为sess-man@llooper),然后调用父类(abstractmessagereceiver.java) 的addoutpacket(packet)方法塞到out_queue 队列中。
此时packet::packetfrom=sess-man@llooper,packetto=null。
 

4、上层组件messagerouter处理,把packet塞到in_queues. 又回到了messagerouter.processpacket(packet packet)处理:

 
不同的是 packetto为空,packet.getto()的返回值是stanzato。
getlocalcomponent(packet.getto());方法根据stanzato与compid、comp name、component都匹配不到。
此时packet会给组件sessionmanager处理,packet will be processed by: sess-man@llooper,由abstractmessagereceiver的非阻塞性方法addpacketnb(packet packet)加入到in_queues。
 
 5、第二次来到sessionmanager.processpacket(final packet packet)处理。不同的是在getxmppresourceconnection(packet)方法中,
conn = connectionsbyfrom.get(from)返回值是null,所以是根据stanzato取获取接收方的session,返回接收方连接的connection。
protected xmppresourceconnection getxmppresourceconnection(packet p) {
    xmppresourceconnection conn = null;
    jid                    from = p.getpacketfrom();

    if (from != null) {
        conn = connectionsbyfrom.get(from);
        if (conn != null) {
            return conn;
        }
    }

    // it might be a message _to_ some user on this server
    // so let's look for established session for this user...
    jid to = p.getstanzato();

    if (to != null) {
        ...
        conn = getresourceconnection(to);
    } else {

        ...
    }    // end of else

    return conn;
}

 

 6、如同步骤3,此时packet作为一个以用户b的名义将其作为传入包进行处理。

然后packetto被设置为组件id(sess-man@llooper)

此时packet: packetfrom = sess-man@llooper,packetto =sess-man@llooper。

之后packet又经walk(packet, conn)方法,匹配处理器(授权),扔给amp处理。

 如同前面: 直到message.process(),如下:
@override
public void process(packet packet, xmppresourceconnection session,
        nonauthuserrepository repo, queue<packet> results, map<string, object> settings) throws xmppexception {

    // for performance reasons it is better to do the check
    // before calling logging method.
    if (log.isloggable(level.finest)) {
        log.log(level.finest, "processing packet: {0}, for session: {1}", new object[] {
                packet,
                session });
    }

    // you may want to skip processing completely if the user is offline.
    if (session == null) {
        processofflineuser( packet, results );
        return;
    }    // end of if (session == null)
    try {

        // remember to cut the resource part off before comparing jids
        barejid id = (packet.getstanzato() != null)
                ? packet.getstanzato().getbarejid()
                : null;

        // checking if this is a packet to the owner of the session
        if (session.isuserid(id)) {
            if (log.isloggable(level.finest)) {
                log.log(level.finest, "message 'to' this user, packet: {0}, for session: {1}",
                        new object[] { packet,
                        session });
            }

            if (packet.getstanzafrom() != null && session.isuserid(packet.getstanzafrom().getbarejid())) {
                jid connectionid = session.getconnectionid();
                if (connectionid.equals(packet.getpacketfrom())) {
                    results.offer(packet.copyelementonly());
                    // this would cause message packet to be stored in offline storage and will not
                    // send recipient-unavailable error but it will behave the same as a message to
                    // unavailable resources from other sessions or servers
                    return;
                }
            }

            // yes this is message to 'this' client
            list<xmppresourceconnection> conns = new arraylist<xmppresourceconnection>(5);

            // this is where and how we set the address of the component
            // which should rceive the result packet for the final delivery
            // to the end-user. in most cases this is a c2s or bosh component
            // which keep the user connection.
            string resource = packet.getstanzato().getresource();

            if (resource == null) {

                // if the message is sent to barejid then the message is delivered to
                // all resources
                conns.addall(getconnectionsformessagedelivery(session));
            } else {

                // otherwise only to the given resource or sent back as error.
                xmppresourceconnection con = session.getparentsession().getresourceforresource(
                        resource);

                if (con != null) {
                    conns.add(con);
                }
            }

            // messagecarbons: message cloned to all resources? why? it should be copied only
            // to resources with non negative priority!!

            if (conns.size() > 0) {
                for (xmppresourceconnection con : conns) {
                    packet result = packet.copyelementonly();

                    result.setpacketto(con.getconnectionid());

                    // in most cases this might be skept, however if there is a
                    // problem during packet delivery an error might be sent back
                    result.setpacketfrom(packet.getto());

                    // don't forget to add the packet to the results queue or it
                    // will be lost.
                    results.offer(result);
                    if (log.isloggable(level.finest)) {
                        log.log(level.finest, "delivering message, packet: {0}, to session: {1}",
                                new object[] { packet,
                                con });
                    }
                }
            } else {
                // if there are no user connections we should process packet
                // the same as with missing session (i.e. should be stored if
                // has type 'chat'
                processofflineuser( packet, results );
            }

            return;
        }    // end of else

        // remember to cut the resource part off before comparing jids
        id = (packet.getstanzafrom() != null)
                ? packet.getstanzafrom().getbarejid()
                : null;

        // checking if this is maybe packet from the client
        if (session.isuserid(id)) {

            // this is a packet from this client, the simplest action is
            // to forward it to is't destination:
            // simple clone the xml element and....
            // ... putting it to results queue is enough
            results.offer(packet.copyelementonly());

            return;
        }

        // can we really reach this place here?
        // yes, some packets don't even have from or to address.
        // the best example is iq packet which is usually a request to
        // the server for some data. such packets may not have any addresses
        // and they usually require more complex processing
        // this is how you check whether this is a packet from the user
        // who is owner of the session:
        jid jid = packet.getfrom();

        // this test is in most cases equal to checking getelemfrom()
        if (session.getconnectionid().equals(jid)) {

            // do some packet specific processing here, but we are dealing
            // with messages here which normally need just forwarding
            element el_result = packet.getelement().clone();

            // if we are here it means from address was missing from the
            // packet, it is a place to set it here:
            el_result.setattribute("from", session.getjid().tostring());

            packet result = packet.packetinstance(el_result, session.getjid(), packet
                    .getstanzato());

            // ... putting it to results queue is enough
            results.offer(result);
        }
    } catch (notauthorizedexception e) {
        log.log(level.fine, "notauthorizedexception for packet: " + packet + " for session: " + session, e);
        results.offer(authorization.not_authorized.getresponsemessage(packet,
                "you must authorize session first.", true));
    }    // end of try-catch
}

检查stanzato与session匹配通过后,根据session拿到接收方所有的连接(可能多端登陆),然后packet result = packet.copyelementonly()生成新的packet(原packet丢弃了),并将packetto设置为接收方连接的connectionid(例如:c2s@llooper/192.168.0.33_5222_192.168.0.33_38624),通过addoutpacket()方法塞到out_queue队列。
此时packet:packetfrom = sess-man@llooper,packetto =c2s@llooper/192.168.0.33_5222_192.168.0.33_38624。

7、 如同前面步骤2,不同的是根据packetto匹配到组件 c2s@llooper

8、 组件 c2s@llooper 从queue中取出packet,分发到目的地

public void processpacket(final packet packet) {
    ...
    if (packet.iscommand() && (packet.getcommand() != command.other)) {
        ...
    } else {
        // 把packet 发送给客户端
        if (!writepackettosocket(packet)) {

            ...
            
        }
    }    // end of else
}

 

后续有时间会不断更新,欢迎加入qq群 310790965 更多的交流