FactoryBean是如何运用在我们程序中的
程序员文章站
2022-05-23 18:07:08
...
我们在项目中可能都会用到redis,es,或者数据库,比如我们项目中需要连接多个es分片的时候,我们的配置文件最开始可能是下面的形式 ,ConnectionConfig是连接信息类,TransportUtils是工具类,下面是列举了2个业务分片,假如有10多个业务分片的时候需要十多个链接信息类和TransportUtils,显然是不合理的。
<bean id="connectionConfig11" class="com.kk.list.search.es.ConnectionConfig">
<property name="transportAddresses" value="127.0.0.1:9300"/>
<property name="clusterName" value="elastic"/>
<property name="clientTransportSniff" value="false"/>
</bean>
<bean id="connectionConfig12" class="com.kk.list.search.es.ConnectionConfig">
<property name="transportAddresses" value="127.0.0.1:9200"/>
<property name="clusterName" value="elastic"/>
<property name="clientTransportSniff" value="false"/>
</bean>
<bean id="transportUtils11" class="com.kk.list.search.es.TransportUtils">
<property name="connectionConfig" value="connectionConfig11"/>
</bean>
<bean id="transportUtils12" class="com.kk.list.search.es.TransportUtils">
<property name="connectionConfig" value="connectionConfig12"/>
</bean>
这个时候我们想到用FactoryBean来解决这个场景的问题,FactoryBean 通常是用来创建比较复杂的bean,一般的bean 直接用xml配置即可,但如果一个bean的创建过程中涉及到很多其他的bean 和复杂的逻辑,用xml配置比较困难,这时可以考虑用FactoryBean,我们直接看代码修改后的结果
1先看配置文件,在配置文件传递参数,通过ElasticsearchTransportClientFactoryBean类来生成TransportUtils实例对象
<bean id="transportUtils11" class="com.kk.list.search.es.ElasticsearchTransportClientFactoryBean">
<property name="transportAddresses" value="127.0.0.1:9300"/>
<property name="clusterName" value="elastic"/>
<property name="clientTransportSniff" value="false"/>
</bean>
<bean id="transportUtils12" class="com.kk.list.search.es.ElasticsearchTransportClientFactoryBean">
<property name="transportAddresses" value="127.0.0.1:9200"/>
<property name="clusterName" value="elastic"/>
<property name="clientTransportSniff" value="false"/>
</bean>
2再看这个ElasticsearchTransportClientFactoryBean类,实现了FacetoryBean的三个方法,还实现DisposableBean接口
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
public class ElasticsearchTransportClientFactoryBean implements FactoryBean,DisposableBean {
protected final Log logger = LogFactory.getLog(getClass());
private TransportUtils transportUtils;
private ConnectionConfig connectionConfig = new ConnectionConfig();
public void setTransportAddresses(String transportAddresses) {
connectionConfig.setTransportAddresses(transportAddresses);
}
public Object getObject() throws Exception {
TransportUtils transportUtils = new TransportUtils(connectionConfig);
return transportUtils;
}
public void setClusterName(String clusterName) {
connectionConfig.setClusterName(clusterName);
}
public void setClientTransportSniff(boolean clientTransportSniff) {
connectionConfig.setClientTransportSniff(clientTransportSniff);
}
public Class getObjectType() {
return TransportUtils.class;
}
public boolean isSingleton() {
return true;
}
public void destroy() throws Exception {
transportUtils.destroy();
}
public void afterPropertiesSet() throws Exception {
logger.info("begin to link to server");
getObject();
logger.info("link to the server");
}
}
3我们看看ConnectionConfig类,就是一个POJO类,来传递参数用的
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.ArrayList;
import java.util.List;
public class ConnectionConfig {
protected final Log logger = LogFactory.getLog(getClass());
/**
* es集群地址配置串
* address1:port,address2:port
*/
private String transportAddresses;
/**
* es地址
* address
*/
private String addresses;
/**
* port
*/
private int port;
/**
* 集群名称
*/
private String clusterName;
/**
* 是否探测服务器集群
*/
private Boolean clientTransportSniff =false;
private List<ConnectionConfig> connectionConfigList;
public String getTransportAddresses() {
return transportAddresses;
}
public void setTransportAddresses(String transportAddresses) {
this.transportAddresses = transportAddresses;
if (StringUtils.isNotBlank(transportAddresses)) {
String[] ads = transportAddresses.split(";");
if (null != ads && ads.length > 0) {
List<ConnectionConfig> connectionConfigList = new ArrayList<ConnectionConfig>();
for (int i = 0; i < ads.length; i++) {
String address = ads[i];
if (StringUtils.isNotBlank(address)) {
String[] urls = address.split(":");
if (urls.length == 2) {
ConnectionConfig connectionConfig = new ConnectionConfig();
connectionConfig.setAddresses(urls[0]);
connectionConfig.setPort(Integer.valueOf(urls[1]));
connectionConfigList.add(connectionConfig);
} else {
throw new ExceptionInInitializerError(ads[i] + " parameter is format error,please check spring config file!");
}
}
setConnectionConfigList(connectionConfigList);
}
} else {
throw new ExceptionInInitializerError("transportAddresses parameter is format error,please check spring config file!");
}
} else {
throw new ExceptionInInitializerError("transportAddresses init parameter is empty,please check spring config file!");
}
}
public String getAddresses() {
return addresses;
}
public void setAddresses(String addresses) {
this.addresses = addresses;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public List<ConnectionConfig> getConnectionConfigList() {
return connectionConfigList;
}
public void setConnectionConfigList(List<ConnectionConfig> connectionConfigList) {
this.connectionConfigList = connectionConfigList;
}
public String getClusterName() {
return clusterName;
}
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
public Boolean getClientTransportSniff() {
return clientTransportSniff;
}
public void setClientTransportSniff(Boolean clientTransportSniff) {
this.clientTransportSniff = clientTransportSniff;
}
}
4接着看TransportUtils工具类,这个才是ElasticsearchTransportClientFactoryBean最后实例化生成的类
import com.carrotsearch.hppc.ObjectContainer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class TransportUtils {
TransportClient client = null;
private ConnectionConfig connectionConfig;
protected final Log logger = LogFactory.getLog(getClass());
private BulkProcessor staticBulkProcessor = null;
public TransportClient getClient() {
return client;
}
public TransportUtils(ConnectionConfig config) throws UnknownHostException {
this.connectionConfig = config;
init();
}
public void destroy() throws Exception {
client.close();
}
private void init() throws UnknownHostException {
List<ConnectionConfig> configList = connectionConfig.getConnectionConfigList();
if (null != configList && !configList.isEmpty()) {
Settings settings = Settings.settingsBuilder().put("client.transport.ping_timeout", "10s").put("cluster.name", connectionConfig.getClusterName()).put("client.transport.sniff", connectionConfig.getClientTransportSniff()).build();
logger.info("clusterName:" + connectionConfig.getClusterName() + "clientTransportSniff:" + connectionConfig.getClientTransportSniff());
for (ConnectionConfig config : configList) {
if (null != config.getAddresses()) {
if (logger.isInfoEnabled()) {
logger.info("Adding transport address: " + config.getAddresses() + " port: " + config.getPort());
}
if (null ==client){
client = TransportClient.builder().settings(settings).build();
}
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(config.getAddresses()), config.getPort()));
}
}
} else {
throw new ExceptionInInitializerError("transportAddresses init parameter is empty,please check spring config file!");
}
staticBulkProcessor= getBulkProcessor(client);
logger.info("transport init succeed : cluster.name:" + connectionConfig.getClusterName());
}
/**【设置自动提交文档】*/
public BulkProcessor getBulkProcessor(TransportClient client) {
//自动批量提交方式
if (staticBulkProcessor == null) {
try {
staticBulkProcessor = BulkProcessor.builder(client,
new BulkProcessor.Listener() {
public void beforeBulk(long executionId, BulkRequest request) {
//提交前调用
//System.out.println(new Date().toString() + " before");
}
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
//提交结束后调用(无论成功或失败)
if(response.hasFailures()){
logger.error("有文档提交失败!" + response.buildFailureMessage());
//return;
}
}
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
//提交结束且失败时调用
System.out.println( " 有文档提交失败!after failure=" + failure.getMessage());
//return;
}
})
.setBulkActions(1000)//文档数量达到1000时提交
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))//总文档体积达到5MB时提交 //
.setFlushInterval(TimeValue.timeValueSeconds(5))//每5S提交一次(无论文档数量、体积是否达到阈值)
.setConcurrentRequests(10)//加1后为可并行的提交请求数,即设为0代表只可1个请求并行,设为1为2个并行
.build();
//staticBulkProcessor.awaitClose(10, TimeUnit.MINUTES);//关闭,如有未提交完成的文档则等待完成,最多等待10分钟
} catch (Exception e) {//关闭时抛出异常
e.printStackTrace();
}
}
return staticBulkProcessor;
}
public BulkProcessor getStaticBulkProcessor() {
return staticBulkProcessor;
}
public void setStaticBulkProcessor(BulkProcessor staticBulkProcessor) {
this.staticBulkProcessor = staticBulkProcessor;
}
}
下面几篇博客讲的比较清楚
https://www.jianshu.com/p/6f0a59623090
上一篇: Win10全屏时怎么隐藏输入法图标?