Hadoop2.7.1+Hbase1.2.1集群环境搭建(9)spring-hadoop实战
程序员文章站
2022-06-16 15:29:39
...
(1)hadoop2.7.1源码编译 | http://aperise.iteye.com/blog/2246856 |
(2)hadoop2.7.1安装准备 | http://aperise.iteye.com/blog/2253544 |
(3)1.x和2.x都支持的集群安装 | http://aperise.iteye.com/blog/2245547 |
(4)hbase安装准备 | http://aperise.iteye.com/blog/2254451 |
(5)hbase安装 | http://aperise.iteye.com/blog/2254460 |
(6)snappy安装 | http://aperise.iteye.com/blog/2254487 |
(7)hbase性能优化 | http://aperise.iteye.com/blog/2282670 |
(8)雅虎YCSBC测试hbase性能测试 | http://aperise.iteye.com/blog/2248863 |
(9)spring-hadoop实战 | http://aperise.iteye.com/blog/2254491 |
(10)基于ZK的Hadoop HA集群安装 | http://aperise.iteye.com/blog/2305809 |
1.http://spring.io/blog/2015/02/09/spring-for-apache-hadoop-2-1-released
2.http://docs.spring.io/spring-hadoop/docs/current/reference/html/
上面是两处比较好的文档,因项目没整完,整完再放所有项目源代码。这里贴两张图:
1.maven工程中添加对spring-data-hadoop的依赖
<!--spring --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>4.1.6.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>4.1.6.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.1.6.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>4.1.6.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>4.1.6.RELEASE</version> </dependency> <!-- spring-hadoop --> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-hadoop</artifactId> <version>2.2.0.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-hadoop-store</artifactId> <version>2.2.0.RELEASE</version> <exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </exclusion> <exclusion> <artifactId>netty</artifactId> <groupId>io.netty</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.xerial.snappy</groupId> <artifactId>snappy-java</artifactId> <version>1.1.0</version> <scope>runtime</scope> </dependency> <!-- hadoop --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0</version> <scope>compile</scope> <exclusions> <exclusion> <groupId>org.mortbay.jetty</groupId> <artifactId>jetty</artifactId> </exclusion> <exclusion> <groupId>org.mortbay.jetty</groupId> <artifactId>jetty-util</artifactId> </exclusion> <exclusion> <groupId>org.mortbay.jetty</groupId> <artifactId>jsp-2.1</artifactId> </exclusion> <exclusion> <groupId>org.mortbay.jetty</groupId> <artifactId>jsp-api-2.1</artifactId> </exclusion> <exclusion> <groupId>org.mortbay.jetty</groupId> <artifactId>servlet-api-2.1</artifactId> </exclusion> <exclusion> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </exclusion> <exclusion> <groupId>javax.servlet.jsp</groupId> <artifactId>jsp-api</artifactId> </exclusion> <exclusion> <groupId>tomcat</groupId> <artifactId>jasper-compiler</artifactId> </exclusion> <exclusion> <groupId>tomcat</groupId> <artifactId>jasper-runtime</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-auth</artifactId> <version>2.6.0</version> <scope>compile</scope> </dependency> <!-- hbase --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>0.98.5-hadoop2</version> <exclusions> <exclusion> <groupId>org.mortbay.jetty</groupId> <artifactId>jetty</artifactId> </exclusion> <exclusion> <groupId>org.mortbay.jetty</groupId> <artifactId>jetty-util</artifactId> </exclusion> <exclusion> <groupId>org.mortbay.jetty</groupId> <artifactId>jsp-2.1</artifactId> </exclusion> <exclusion> <groupId>org.mortbay.jetty</groupId> <artifactId>jsp-api-2.1</artifactId> </exclusion> <exclusion> <groupId>org.mortbay.jetty</groupId> <artifactId>servlet-api-2.1</artifactId> </exclusion> <exclusion> <groupId>tomcat</groupId> <artifactId>jasper-compiler</artifactId> </exclusion> <exclusion> <groupId>tomcat</groupId> <artifactId>jasper-runtime</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>0.98.5-hadoop2</version> <scope>compile</scope> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>0.98.5-hadoop2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-protocol</artifactId> <version>0.98.5-hadoop2</version> </dependency> <!--zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> <exclusions> <exclusion> <artifactId>netty</artifactId> <groupId>io.netty</groupId> </exclusion> </exclusions> </dependency> <!--log --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency>
2.hadoop1.x namenode+secondarynamenode方式下spring-data-hadoop配置文件如下:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:hdp="http://www.springframework.org/schema/hadoop" xmlns:beans="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd"> <!-- 默认的hadoopConfiguration,默认ID为hadoopConfiguration,且对于file-system等不需指定ref,自动注入hadoopConfiguration --> <hdp:configuration> fs.defaultFS=hdfs://192.168.202.131:9000/ dfs.replication=3 dfs.client.socket-timeout=600000 </hdp:configuration> <!-- hadoop hdfs 操作类FileSystem,用来读写HDFS文件 --> <hdp:file-system id="hadoop-cluster" uri="hdfs://192.168.202.131:9000/" /> <!-- 配置zookeeper地址和端口 --> <hdp:hbase-configuration configuration-ref="hadoopConfiguration" zk-quorum="192.168.202.131,192.168.202.132,192.168.202.133" zk-port="2181"> hbase.rootdir=hdfs://192.168.202.131:9000/hbase dfs.replication=3 dfs.client.socket-timeout=600000 </hdp:hbase-configuration> <!-- 配置HbaseTemplate --> <bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HbaseTemplate"> <property name="configuration" ref="hbaseConfiguration" /> </bean> </beans>
3.Hadoop 2.x HA下spring-data-hadoop配置文件如下:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:hdp="http://www.springframework.org/schema/hadoop" xmlns:beans="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd"> <!-- 默认的hadoopConfiguration,默认ID为hadoopConfiguration,且对于file-system等不需指定ref,自动注入hadoopConfiguration --> <hdp:configuration> fs.defaultFS=hdfs://hadoop-ha-cluster dfs.client.socket-timeout=600000 ha.zookeeper.quorum=zk1:2181,zk2:2181,zk3:2181,zk4:2181,zk5:2181 ha.zookeeper.session-timeout.ms=300000 dfs.nameservices=hadoop-ha-cluster dfs.ha.namenodes.hadoop-ha-cluster=namenode1,namenode2 dfs.namenode.rpc-address.hadoop-ha-cluster.namenode1=hadoop31:9000 dfs.namenode.http-address.hadoop-ha-cluster.namenode1=hadoop31:50070 dfs.namenode.rpc-address.hadoop-ha-cluster.namenode2=hadoop32:9000 dfs.namenode.http-address.hadoop-ha-cluster.namenode2=hadoop32:50070 dfs.client.failover.proxy.provider.hadoop-ha-cluster=org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider </hdp:configuration> <!-- hadoop hdfs 操作类FileSystem,用来读写HDFS文件 --> <hdp:file-system id="hadoop-cluster" configuration-ref="hadoopConfiguration" /> <!-- 配置zookeeper地址和端口 --> <hdp:hbase-configuration configuration-ref="hadoopConfiguration" zk-quorum="zk1,zk2,zk3,zk4,zk5" zk-port="2181"> hbase.rootdir=hdfs://hadoop-ha-cluster/hbase hbase.cluster.distributed=true zookeeper.session.timeout=30000 hbase.hregion.majorcompaction=0 hbase.regionserver.regionSplitLimit=1 dfs.client.socket-timeout=600000 </hdp:hbase-configuration> <!-- 配置HbaseTemplate --> <bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HbaseTemplate"> <property name="configuration" ref="hbaseConfiguration" /> </bean> </beans>
4.一个在J2EE项目中一个获得spring上下文的工具类
1)在web.xml中保证配置了spring监听器,如下:
<!-- spring 配置文件的加载 --> <context-param> <param-name>contextConfigLocation</param-name> <param-value>classpath*:/applicationContext.xml</param-value> </context-param> <!-- 监听器 --> <listener> <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class> </listener>2)工具类SpringContextHolder
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; /** * 以静态变量保存Spring ApplicationContext, 可在任何代码任何地方任何时候中取出ApplicaitonContext. * * @author calvin */ public class SpringContextHolder implements ApplicationContextAware, DisposableBean { private static ApplicationContext applicationContext = null; private static Logger logger = LoggerFactory.getLogger(SpringContextHolder.class); /** * 实现ApplicationContextAware接口, 注入Context到静态变量中. */ public void setApplicationContext(ApplicationContext applicationContext) { logger.debug("注入ApplicationContext到SpringContextHolder:" + applicationContext); if (SpringContextHolder.applicationContext != null) { logger.warn("SpringContextHolder中的ApplicationContext被覆盖, 原有ApplicationContext为:" + SpringContextHolder.applicationContext); } SpringContextHolder.applicationContext = applicationContext; //NOSONAR } /** * 实现DisposableBean接口,在Context关闭时清理静态变量. */ public void destroy() throws Exception { SpringContextHolder.clear(); } /** * 取得存储在静态变量中的ApplicationContext. */ public static ApplicationContext getApplicationContext() { assertContextInjected(); return applicationContext; } /** * 从静态变量applicationContext中取得Bean, 自动转型为所赋值对象的类型. */ @SuppressWarnings("unchecked") public static <T> T getBean(String name) { assertContextInjected(); return (T) applicationContext.getBean(name); } /** * 从静态变量applicationContext中取得Bean, 自动转型为所赋值对象的类型. */ public static <T> T getBean(Class<T> requiredType) { assertContextInjected(); return applicationContext.getBean(requiredType); } /** * 清除SpringContextHolder中的ApplicationContext为Null. */ public static void clear() { logger.debug("清除SpringContextHolder中的ApplicationContext:" + applicationContext); applicationContext = null; } /** * 检查ApplicationContext不为空. */ private static void assertContextInjected() { if (applicationContext == null) { throw new IllegalStateException("applicaitonContext未注入,请在applicationContext.xml中定义SpringContextHolder"); } } }3)工具类需要在spring配置文件中配置
<!-- SpringContext Holder --> <bean id="springContextHolder" class="com.xxx.xxx.xxx.SpringContextHolder" lazy-init="false" />
5.在J2EE项目中使用HDFS
import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; import java.io.InputStream; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import com.besttone.spring.SpringContextHolder; public class FileSystemUtil { private static FileSystem fs = (FileSystem) SpringContextHolder.getBean("hadoop-cluster"); public void mkdirs() throws Exception { // create HDFS folder 创建一个文件夹 Path path = new Path("/test"); fs.mkdirs(path); } public void create() throws Exception { // create a file 创建一个文件 Path path = new Path("/test/a.txt"); FSDataOutputStream out = fs.create(path); out.write("hello hadoop".getBytes()); } public void rename() throws Exception { // rename a file 重命名 Path path = new Path("/test/a.txt"); Path newPath = new Path("/test/b.txt"); System.out.println(fs.rename(path, newPath)); } public void copyFromLocalFile() throws Exception { // upload a local file // 上传文件 Path src = new Path("/home/hadoop/hadoop-1.2.1/bin/rcc"); Path dst = new Path("/test"); fs.copyFromLocalFile(src, dst); } // upload a local file // 上传文件 public void uploadLocalFile2() throws Exception { Path src = new Path("/home/hadoop/hadoop-1.2.1/bin/rcc"); Path dst = new Path("/test"); InputStream in = new BufferedInputStream(new FileInputStream(new File( "/home/hadoop/hadoop-1.2.1/bin/rcc"))); FSDataOutputStream out = fs.create(new Path("/test/rcc1")); IOUtils.copyBytes(in, out, 4096); } public void listFiles() throws Exception { // list files under folder // 列出文件 Path dst = new Path("/test"); FileStatus[] files = fs.listStatus(dst); for (FileStatus file : files) { System.out.println(file.getPath().toString()); } } public void getBlockInfo() throws Exception { // list block info of file // 查找文件所在的数据块 Path dst = new Path("/test/rcc"); FileStatus fileStatus = fs.getFileStatus(dst); BlockLocation[] blkloc = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); // 查找文件所在数据块 for (BlockLocation loc : blkloc) { for (int i = 0; i < loc.getHosts().length; i++) System.out.println(loc.getHosts()[i]); } } }
6.在J2EE项目中使用hbase
import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.hadoop.hbase.HbaseTemplate; import org.springframework.data.hadoop.hbase.RowMapper; import org.springframework.data.hadoop.hbase.TableCallback; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSON; @Component public class HbaseService { private static final Logger logger = Logger.getLogger(HbaseService.class); private static int FETCH_HBASE_SIZE=15000; @Autowired HbaseTemplate hbaseTemplate; /** * 通过表名和key获取一行数据 * * @param tableName * @param rowKey * @return */ public Map<String, Object> get(String tableName, String rowKey) { return hbaseTemplate.get(tableName, rowKey, new RowMapper<Map<String, Object>>() { public Map<String, Object> mapRow(Result result, int rowNum) throws Exception { List<Cell> ceList = result.listCells(); Map<String, Object> map = new HashMap<String, Object>(); if (ceList != null && ceList.size() > 0) { for (Cell cell : ceList) { map.put(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) + "_" + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } } return map; } }); } /** * 通过表名和key获取数据,key采取最前端字符匹配方式 * * @param tableName * @param startRow * @param stopRow * @return */ public List<Map<String, Object>> find(String tableName, String startRow, String stopRow) { logger.info("----------------------------------------------------------------------------------------------------------"); logger.info("hbaseTemplate.getConfiguration().iterator start-----------------------------------------------------------"); Iterator<Map.Entry<String, String>> iterator = hbaseTemplate.getConfiguration().iterator(); while (null != iterator && iterator.hasNext()) { Map.Entry<String, String> entry = iterator.next(); logger.info("key=" + entry.getKey() + ",value=" + entry.getValue()); } logger.info("hbaseTemplate.getConfiguration().iterator end -----------------------------------------------------------"); logger.info("----------------------------------------------------------------------------------------------------------"); if (startRow == null) { startRow = ""; } if (stopRow == null) { stopRow = ""; } Scan scan = new Scan(Bytes.toBytes(startRow), Bytes.toBytes(stopRow)); PageFilter filter = new PageFilter(5000); scan.setFilter(filter); return hbaseTemplate.find(tableName, scan, new RowMapper<Map<String, Object>>() { public Map<String, Object> mapRow(Result result, int rowNum) throws Exception { List<Cell> ceList = result.listCells(); Map<String, Object> map = new HashMap<String, Object>(); String row = ""; if (ceList != null && ceList.size() > 0) { for (Cell cell : ceList) { row = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); // String family = Bytes.toString(cell.getFamilyArray(), // cell.getFamilyOffset(),cell.getFamilyLength()); String quali = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); // map.put(family + ":" + quali, value); map.put(quali, value); } map.put("rowKey", row); } return map; } }); } public boolean batchExcuteInsert(final TableData tableData) { return hbaseTemplate.execute(tableData.getTable(), new TableCallback<Boolean>() { public Boolean doInTable(HTableInterface table) throws Throwable { logger.info("into batchExcuteInsert"); // table.setAutoFlushTo(false); // 缓存在服务器上/opt/hbase-1.1.2/conf/hbase-site.xml统一配置为10M,对所有HTable都生效,这里无须再设置 // table.setWriteBufferSize(10*1024*1024);//设置缓存到达10M才提交一次 boolean flag = false; if (null != tableData && null != tableData.getRows() && 0 < tableData.getRows().size()) { List<Put> putList = new ArrayList<Put>(); for (RowData row : tableData.getRows()) { if (null == row.getColumns() || 0 == row.getColumns().size()) continue; Put put = new Put(row.getRowKey()); for (ColumnData column : row.getColumns()) { put.add(column.getFamily(), column.getQualifier(), column.getValue()); } put.setDurability(Durability.SKIP_WAL); putList.add(put); } logger.info("batchExcuteInsert size=" + putList.size()); table.put(putList); // table.flushCommits(); flag = true; } logger.info("out batchExcuteInsert"); return flag; } }); } private String fillZero(String src, int length) { StringBuilder sb = new StringBuilder(); if (src.length() < length) { for (int count = 0; count < (length - src.length()); count++) { sb.append("0"); } } sb.append(src); return sb.toString(); } /** * * @param table * @param called * @param startTime * @param endTime * @param fromWeb * 来自web查询为true,否则为false * @return */ public List<Map<String, Object>> querySignalList(String table, String called, String startTime, String endTime, boolean fromWeb) { String tableName = table; String startRow = ""; String stopRow = ""; String timeFormat = fromWeb ? webQueryTimeFormat : interfaceTimeFormat; if (null == called || called.equals("")) { startRow = ""; stopRow = ""; } else { if (null == startTime || startTime.equals("")) { startRow = new StringBuffer(fillZero(called, 16)).reverse().toString(); } else { String timeKey = fromTimeStr2TimeStr(timeFormat, startTime, hbaseTimeFormat_signal); startRow = new StringBuffer(fillZero(called, 16)).reverse().toString() + timeKey; } if (null == endTime || endTime.equals("")) { String timeKey = date2Str(hbaseTimeFormat_signal, new Date()); stopRow = new StringBuffer(fillZero(called, 16)).reverse().toString() + timeKey; } else { String timeKey = fromTimeStr2TimeStr(timeFormat, endTime, hbaseTimeFormat_signal); stopRow = new StringBuffer(fillZero(called, 16)).reverse().toString() + timeKey; } } return this.find(tableName, startRow, stopRow); } String hbaseTimeFormat_signal = "yyyyMMddHHmmssSSS"; String hbaseTimeFormat_sms = "yyyyMMddHHmmss"; String webQueryTimeFormat = "yyyy-MM-dd HH:mm:ss"; String interfaceTimeFormat = "yyyyMMddHHmmss"; private String date2Str(String timeFormatStr, Date date) { DateFormat sdf = new SimpleDateFormat(timeFormatStr); return sdf.format(date); } private Date str2Date(String timeFormatStr, String dateStr) { DateFormat sdf = new SimpleDateFormat(timeFormatStr); try { return sdf.parse(dateStr); } catch (ParseException e) { logger.error(e.getMessage(), e); return null; } } private String fromTimeStr2TimeStr(String srcTimeFormat, String srcDate, String desTimeFormat) { return date2Str(desTimeFormat, str2Date(srcTimeFormat, srcDate)); } /** * * @param table * 查询哪张表 * @param called * 查询的被叫号码 * @param startTime * 查询的起始时间 * @param endTime * 查询的结束时间 * @param page * 查询的分页信息 * @param fromWeb * 是否来自管理端页面查询,管理端页面时间格式和接口中时间格式不同 * @return */ public Page querySignalByPage(String table, String called, String startTime, String endTime, Page page, boolean fromWeb) { String tableName = table; String startRow = ""; String stopRow = ""; String timeFormat = fromWeb ? webQueryTimeFormat : interfaceTimeFormat; if (null == called || called.equals("")) { startRow = ""; stopRow = ""; } else { if (null == startTime || startTime.equals("")) { startRow = new StringBuffer(fillZero(called, 16)).reverse().toString(); } else { String timeKey = fromTimeStr2TimeStr(timeFormat, startTime, hbaseTimeFormat_signal); startRow = new StringBuffer(fillZero(called, 16)).reverse().toString() + timeKey; } if (null == endTime || endTime.equals("")) { String timeKey = date2Str(hbaseTimeFormat_signal, new Date()); stopRow = new StringBuffer(fillZero(called, 16)).reverse().toString() + timeKey; } else { String timeKey = fromTimeStr2TimeStr(timeFormat, endTime, hbaseTimeFormat_signal); stopRow = new StringBuffer(fillZero(called, 16)).reverse().toString() + timeKey; } } Scan scan = new Scan(Bytes.toBytes(startRow), Bytes.toBytes(stopRow)); PageFilter filter = new PageFilter(FETCH_HBASE_SIZE); scan.setFilter(filter); PageRowMapper pageRowMapper = new PageRowMapper(page); hbaseTemplate.find(tableName, scan, pageRowMapper); if(null!=pageRowMapper&&pageRowMapper.getPage().getTotal()>=FETCH_HBASE_SIZE){ PageFilter filter2 = new PageFilter(FETCH_HBASE_SIZE*2); scan.setFilter(filter2); PageRowMapper pageRowMapper2 = new PageRowMapper(page); hbaseTemplate.find(tableName, scan, pageRowMapper2); return pageRowMapper2.getPage(); } return pageRowMapper.getPage(); } public Page querySmsSendResultByPage(String table, String sender, String startTime, String endTime, Page page, boolean fromWeb) { String tableName = table; String startRow = ""; String stopRow = ""; String timeFormat = fromWeb ? webQueryTimeFormat : interfaceTimeFormat; if (null == sender || sender.equals("")) { startRow = ""; stopRow = ""; } else { if (null == startTime || startTime.equals("")) { startRow = new StringBuffer(fillZero(sender, 25)).reverse().toString(); } else { String timeKey = fromTimeStr2TimeStr(timeFormat, startTime, hbaseTimeFormat_sms); startRow = new StringBuffer(fillZero(sender, 25)).reverse().toString() + timeKey; } if (null == endTime || endTime.equals("")) { String timeKey = date2Str(hbaseTimeFormat_sms, new Date()); stopRow = new StringBuffer(fillZero(sender, 25)).reverse().toString() + timeKey; } else { String timeKey = fromTimeStr2TimeStr(timeFormat, endTime, hbaseTimeFormat_sms); stopRow = new StringBuffer(fillZero(sender, 25)).reverse().toString() + timeKey; } } Scan scan = new Scan(Bytes.toBytes(startRow), Bytes.toBytes(stopRow)); PageFilter filter = new PageFilter(10000); scan.setFilter(filter); PageRowMapper pageRowMapper = new PageRowMapper(page); hbaseTemplate.find(tableName, scan, pageRowMapper); System.out.println("------------------------------------------------------------"); System.out.println("tableName:"+tableName); System.out.println("startRow:"+startRow); System.out.println("stopRow:"+stopRow); System.out.println("sssss:"+JSON.toJSONString(pageRowMapper.getPage())); System.out.println("------------------------------------------------------------"); return pageRowMapper.getPage(); } }
上一篇: 苹果13如何删除电话铃声?iPhone13删除电话铃声方法
下一篇: PS 矩形工具的使用
推荐阅读
-
Hadoop2.7.1+Hbase1.2.1集群环境搭建(6)snappy安装
-
Hadoop2.7.1+Hbase1.2.1集群环境搭建(5)hbase安装
-
Hadoop2.7.1+Hbase1.2.1集群环境搭建(5)hbase安装
-
Hadoop2.7.1+Hbase1.2.1集群环境搭建(4)hbase安装准备
-
Hadoop2.7.1+Hbase1.2.1集群环境搭建(9)spring-hadoop实战
-
Hadoop2.7.1+Hbase1.2.1集群环境搭建(4)hbase安装准备
-
Hadoop2.7.1+Hbase1.2.1集群环境搭建(9)spring-hadoop实战
-
Hadoop2.7.1+Hbase1.2.1集群环境搭建(10)基于ZK的Hadoop HA集群安装
-
Hadoop2.7.1+Hbase1.2.1集群环境搭建(7)hbase 性能优化
-
Hadoop2.7.1+Hbase1.2.1集群环境搭建(6)snappy安装