欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

FactoryBean是如何运用在我们程序中的

程序员文章站 2022-05-23 18:07:08
...

       我们在项目中可能都会用到redis,es,或者数据库,比如我们项目中需要连接多个es分片的时候,我们的配置文件最开始可能是下面的形式 ,ConnectionConfig是连接信息类,TransportUtils是工具类,下面是列举了2个业务分片,假如有10多个业务分片的时候需要十多个链接信息类和TransportUtils,显然是不合理的。

<bean id="connectionConfig11" class="com.kk.list.search.es.ConnectionConfig">
<property name="transportAddresses" value="127.0.0.1:9300"/>
<property name="clusterName" value="elastic"/>
<property name="clientTransportSniff" value="false"/>
</bean>

<bean id="connectionConfig12" class="com.kk.list.search.es.ConnectionConfig">
<property name="transportAddresses" value="127.0.0.1:9200"/>
<property name="clusterName" value="elastic"/>
<property name="clientTransportSniff" value="false"/>
</bean>

<bean id="transportUtils11" class="com.kk.list.search.es.TransportUtils">
<property name="connectionConfig"  value="connectionConfig11"/>
</bean>

<bean id="transportUtils12" class="com.kk.list.search.es.TransportUtils">
<property name="connectionConfig" value="connectionConfig12"/>
</bean>

​​​这个时候我们想到用FactoryBean来解决这个场景的问题,FactoryBean 通常是用来创建比较复杂的bean,一般的bean 直接用xml配置即可,但如果一个bean的创建过程中涉及到很多其他的bean 和复杂的逻辑,用xml配置比较困难,这时可以考虑用FactoryBean,我们直接看代码修改后的结果

1先看配置文件,在配置文件传递参数,通过ElasticsearchTransportClientFactoryBean类来生成TransportUtils实例对象

 <bean id="transportUtils11" class="com.kk.list.search.es.ElasticsearchTransportClientFactoryBean">
        <property name="transportAddresses" value="127.0.0.1:9300"/>
        <property name="clusterName" value="elastic"/>
        <property name="clientTransportSniff" value="false"/>
    </bean>

    <bean id="transportUtils12" class="com.kk.list.search.es.ElasticsearchTransportClientFactoryBean">
        <property name="transportAddresses" value="127.0.0.1:9200"/>
        <property name="clusterName" value="elastic"/>
        <property name="clientTransportSniff" value="false"/>
    </bean>

2再看这个ElasticsearchTransportClientFactoryBean类,实现了FacetoryBean的三个方法,还实现DisposableBean接口


import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;



public class ElasticsearchTransportClientFactoryBean implements FactoryBean,DisposableBean {
    protected final Log logger = LogFactory.getLog(getClass());
    private TransportUtils transportUtils;
    private ConnectionConfig connectionConfig = new ConnectionConfig();

    public void setTransportAddresses(String transportAddresses) {
        connectionConfig.setTransportAddresses(transportAddresses);
    }

    public Object getObject() throws Exception {

        TransportUtils transportUtils = new TransportUtils(connectionConfig);
        return transportUtils;
    }

    public void setClusterName(String clusterName) {
        connectionConfig.setClusterName(clusterName);
    }

    public void setClientTransportSniff(boolean clientTransportSniff) {
        connectionConfig.setClientTransportSniff(clientTransportSniff);
    }

    public Class getObjectType() {
        return TransportUtils.class;
    }

    public boolean isSingleton() {
        return true;
    }

    public void destroy() throws Exception {
        transportUtils.destroy();
    }

    public void afterPropertiesSet() throws Exception {
        logger.info("begin to link to server");
        getObject();
        logger.info("link to the server");
    }
}

3我们看看ConnectionConfig类,就是一个POJO类,来传递参数用的

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.ArrayList;
import java.util.List;


public class ConnectionConfig {

    protected final Log logger = LogFactory.getLog(getClass());

    /**
     * es集群地址配置串
     * address1:port,address2:port
     */
    private String transportAddresses;

    /**
     * es地址
     * address
     */
    private String addresses;

    /**
     * port
     */
    private int port;

    /**
     * 集群名称
     */
    private String clusterName;


    /**
     * 是否探测服务器集群
     */
    private Boolean clientTransportSniff =false;

    private List<ConnectionConfig> connectionConfigList;


    public String getTransportAddresses() {
        return transportAddresses;
    }

    public void setTransportAddresses(String transportAddresses) {
        this.transportAddresses = transportAddresses;
        if (StringUtils.isNotBlank(transportAddresses)) {
            String[] ads = transportAddresses.split(";");
            if (null != ads && ads.length > 0) {
                List<ConnectionConfig> connectionConfigList = new ArrayList<ConnectionConfig>();
                for (int i = 0; i < ads.length; i++) {
                    String address = ads[i];
                    if (StringUtils.isNotBlank(address)) {
                        String[] urls = address.split(":");
                        if (urls.length == 2) {
                            ConnectionConfig connectionConfig = new ConnectionConfig();
                            connectionConfig.setAddresses(urls[0]);
                            connectionConfig.setPort(Integer.valueOf(urls[1]));
                            connectionConfigList.add(connectionConfig);
                        } else {
                            throw new ExceptionInInitializerError(ads[i] + "  parameter is format error,please check spring config file!");
                        }
                    }
                    setConnectionConfigList(connectionConfigList);
                }
            } else {
                throw new ExceptionInInitializerError("transportAddresses  parameter is format error,please check spring config file!");
            }

        } else {
            throw new ExceptionInInitializerError("transportAddresses init parameter is empty,please check spring config file!");
        }
    }

    public String getAddresses() {
        return addresses;
    }

    public void setAddresses(String addresses) {
        this.addresses = addresses;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public List<ConnectionConfig> getConnectionConfigList() {
        return connectionConfigList;
    }

    public void setConnectionConfigList(List<ConnectionConfig> connectionConfigList) {
        this.connectionConfigList = connectionConfigList;
    }

    public String getClusterName() {
        return clusterName;
    }

    public void setClusterName(String clusterName) {
        this.clusterName = clusterName;
    }

    public Boolean getClientTransportSniff() {
        return clientTransportSniff;
    }

    public void setClientTransportSniff(Boolean clientTransportSniff) {
        this.clientTransportSniff = clientTransportSniff;
    }
}

4接着看TransportUtils工具类,这个才是ElasticsearchTransportClientFactoryBean最后实例化生成的类


import com.carrotsearch.hppc.ObjectContainer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;


public class TransportUtils {
    TransportClient client = null;
    private ConnectionConfig connectionConfig;
    protected final Log logger = LogFactory.getLog(getClass());
    private BulkProcessor staticBulkProcessor = null;


    public TransportClient getClient() {
        return client;
    }

    public TransportUtils(ConnectionConfig config) throws UnknownHostException {
        this.connectionConfig = config;
        init();
    }
    public void destroy() throws Exception {
        client.close();
    }

    private void init() throws UnknownHostException {
        List<ConnectionConfig> configList = connectionConfig.getConnectionConfigList();
        if (null != configList && !configList.isEmpty()) {
            Settings settings = Settings.settingsBuilder().put("client.transport.ping_timeout", "10s").put("cluster.name", connectionConfig.getClusterName()).put("client.transport.sniff", connectionConfig.getClientTransportSniff()).build();
            logger.info("clusterName:" + connectionConfig.getClusterName() + "clientTransportSniff:" + connectionConfig.getClientTransportSniff());
            for (ConnectionConfig config : configList) {
                if (null != config.getAddresses()) {
                    if (logger.isInfoEnabled()) {
                        logger.info("Adding transport address: " + config.getAddresses() + " port: " + config.getPort());
                    }
                    if (null ==client){
                        client = TransportClient.builder().settings(settings).build();
                    }

                    client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(config.getAddresses()), config.getPort()));
                }
            }
        } else {
            throw new ExceptionInInitializerError("transportAddresses init parameter is empty,please check spring config file!");
        }
        staticBulkProcessor= getBulkProcessor(client);
        logger.info("transport init  succeed : cluster.name:" + connectionConfig.getClusterName());
    }




    /**【设置自动提交文档】*/
    public BulkProcessor getBulkProcessor(TransportClient client) {
        //自动批量提交方式
        if (staticBulkProcessor == null) {
            try {
                staticBulkProcessor = BulkProcessor.builder(client,
                        new BulkProcessor.Listener() {
                            public void beforeBulk(long executionId, BulkRequest request) {
                                //提交前调用
                                //System.out.println(new Date().toString() + " before");
                            }
                            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                                //提交结束后调用(无论成功或失败)
                                if(response.hasFailures()){
                                    logger.error("有文档提交失败!" + response.buildFailureMessage());
                                    //return;
                                }
                            }
                            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                                //提交结束且失败时调用
                                System.out.println( " 有文档提交失败!after failure=" + failure.getMessage());
                                //return;
                            }
                        })
                        .setBulkActions(1000)//文档数量达到1000时提交
                        .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))//总文档体积达到5MB时提交 //
                        .setFlushInterval(TimeValue.timeValueSeconds(5))//每5S提交一次(无论文档数量、体积是否达到阈值)
                        .setConcurrentRequests(10)//加1后为可并行的提交请求数,即设为0代表只可1个请求并行,设为1为2个并行
                        .build();
                //staticBulkProcessor.awaitClose(10, TimeUnit.MINUTES);//关闭,如有未提交完成的文档则等待完成,最多等待10分钟
            } catch (Exception e) {//关闭时抛出异常
                e.printStackTrace();
            }
        }
        return staticBulkProcessor;
    }



    public BulkProcessor getStaticBulkProcessor() {
        return staticBulkProcessor;
    }

    public void setStaticBulkProcessor(BulkProcessor staticBulkProcessor) {
        this.staticBulkProcessor = staticBulkProcessor;
    }


}

 下面几篇博客讲的比较清楚

  https://www.jianshu.com/p/6f0a59623090

  https://blog.csdn.net/joenqc/article/details/66479154

  https://blog.csdn.net/u013185616/article/details/52335864

相关标签: factoryBean