指定字段上传Kafka写入txt文件并文件上传ftp服务器
dataWarehouseOss项目小结
一、修改zookeeper配置文件
二、修改kafka配置文件
这样才能启动zookeeper和kafka并且能连接上。
三、解析JSON字符串转对象
kafka生产者传到队列中是一组json字符串
json实例
{"countAll":0,"countCorrect":0,"datatime":"6","logid":"202008251616401404295157","requestinfo":"{\"imei\":\"192016e9-3ab2-4849-8f41-d3a01c541ab0\",\"channelno\":\"221\",\"timestamp\":\"1598343399833\",\"subjectNum\":\"10000\"}","requestip":"113.57.246.122","requesttime":"2020-08-25 16:16:40","requesttype":"63","responsecode":"000000","responseinfo":"{\"data\":[{\"address\":\"\",\"call\":\"\",\"classCode1\":\"\",\"classCode2\":\"511001\",\"classname1\":\"\",\"classname2\":\"运营商\",\"custId\":\"\",\"logo\":\"/icon/omc/1450424263860.png\",\"name\":\"中国电信\",\"type\":\"0\"}],\"errorCode\":\"000000\",\"flag\":false}","userAgent":"TAS-AN00 Build/HUAWEITAS-AN00; 10"}
实体类解析
到消费者消费时首先要把这json字符串解析成对象才能处理。
这个json字符串有好几层,所以要层层解析。使用fastjson解析。
首先创建对应的实体类,如图:
注意:对象中的对象要用String类型解析出来,再解析,否则直接用对象类型会报错。
注意:其中还有json数组
IogItemEntity iogItemEntity = JSONObject.parseObject(record.value(), IogItemEntity.class); Requestinfo requestinfo = JSONObject.parseObject(iogItemEntity.getRequestinfo(), Requestinfo.class); Responseinfo responseinfo = JSONObject.parseObject(iogItemEntity.getResponseinfo(), Responseinfo.class); List<DataEntity> dataEntities = JSONObject.parseArray(responseinfo.getData(), DataEntity.class);
四、字段处理
规则
要提取指定字段进行处理按照一定顺序。
有两种json字符串格式:
号码:subjectNum或者是id,有的数据是subjectNum,有的是id
手机型号:userAgent
调用时间:requesttime
是否查到:根据responsecode字段,如果是”000000“就是查到即Y;如果是”010005“就是未查到即N。
名称:有的是name有的是custName。
顺序:号码加密|号码前7位|手机型号|调用时间|是否查到|名称加密|行业code|行业name
解决办法
1. 空指针异常
号码有的是subjectNum有的是id,那么subjectNum的数据id就为空,是id的数据subjectNum就为空,就会发生空指针异常。同样名称字段也是。
在这里我使用断言的方法来解决:Assert.notNull()
这个方法是如果对象是空就会抛出异常!我们在用try/catch代码来捕获!
如果不抛出异常,那么就执行try中的代码,如果抛出了异常,异常就会被捕获,从而执行catch中的代码。
try{ Assert.notNull(dataEntity.getName(),"异常信息:name为null,名称使用custName"); NAME = dataEntity.getName(); }catch (IllegalArgumentException e){ log.info(e.getMessage()); NAME = dataEntity.getCustName(); }
2.下标越界
取到号码之后的第二个顺序就是取号码的前7位。
这个时候发现有的号码不足7位,做一下if判断即可。
String NUMBER = ""; try{ Assert.notNull(requestinfo.getSubjectNum(),"异常信息:SubjectNum为null,名称使用ID"); NUMBER = requestinfo.getSubjectNum(); }catch (IllegalArgumentException e){ log.info(e.getMessage()); NUMBER = requestinfo.getId(); } String encrpytSubjectNum = DecryptTool.datahouseDataEncrpyt(NUMBER); if (NUMBER.length()>=7){ NUMBER = NUMBER.substring(0,7); }
3.Maven打包时本地jar包打不进去
由于需要加密,所以引入了外部jar包,我参考了这篇博客Maven项目依赖外部jar进行打包,虽然maven可以打包成功,但是部署到服务器中运行时,执行到加密方法时tomcat卡住了,没反应了,我怀疑还是这个本地的jar包没有打进去,我注释掉使用加密的行,发现tomcat可以正常运行。所以还是打包的问题。
我查到了另外一种方法才解决了这个问题:
使用Maven命令将本地的jar包打包成maven依赖:mvn install:install-file -Dfile=本地jar包的路径 -DgroupId=组名 -DartifactId=项目名 -Dversion=版本号 -Dpackaging=jar
就可把本地jar包打包成为maven依赖,在本地仓库对应的位置查看是否成功。
(jar包的路径可以使用鼠标直接拖进黑窗口)
五、写入txt文件
将排好序的字符串获得字节数组,使用字节流写成txt文件。
byte[] body = data.getBytes(); FileOutputStream fos = new FileOutputStream(file,true); fos.write(body); fos.close();
这样的话,每条数据会直接追加到文末,会连在一起不好分别,所以在加上换行符:\r\n
即在数据的最后加上+"\r\n"
即可达到换行的目的。
六、上传ftp
依赖
操作ftp需要引进依赖
<dependency> <groupId>commons-net</groupId> <artifactId>commons-net</artifactId> <version>3.7</version> </dependency>
代码
public static FTPClient ftp; System.out.println("---===开始连接ftp===---"); ftp = new FTPClient(); System.out.println("---===连接ftp===---"); ftp.connect(FTP_IP, FTP_PORT); boolean login = ftp.login(FTP_USERNAME, FTP_PASSWORD); log.info("-----FTP登陆是否成功:"+login); boolean makeDirectory = ftp.makeDirectory(FTP_PATH); log.info("makeDirectory:"+makeDirectory); ftp.changeWorkingDirectory(FTP_PATH); log.info("--开始上传ftp--"); FileInputStream fis = new FileInputStream(file); boolean b = ftp.storeFile(file.getName(), fis); fis.close(); log.info("--上传完成--"+b);
七、删除ftp上两天之前的文件
1.使用正则判断两天之前的文件
使用String类的str.matches(regex)
方法。
首先创建一个规则:String regex = twoDaysAgo+".*";
在正则表达式中.
代表匹配任意字符;*
代表匹配0次或多次
所以file1.getName().matches(regex)
可以匹配出两天之前的文件!
2.递归删除文件夹/文件
/**
* 递归删除文件夹及文件
* @param dir
*/ public static void remove(File dir) { File files[] = dir.listFiles(); for (int i = 0; i < files.length; i++) { if(files[i].isDirectory()) { remove(files[i]); }else { //删除文件 log.info("删除文件 :: "+files[i].toString()); files[i].delete(); } } //删除目录 dir.delete(); log.info("删除目录 :: "+dir.toString()); }
2.删除本地文件
public static void deltwoDaysAgoFile(){ String twoDaysAgo = LocalDateTime.now().minusDays(2).format(DateTimeFormatter.ofPattern("yyyyMMdd")); String regex = twoDaysAgo+".*"; File file = new File(LOCAL_PATH); if (!file.exists()){ log.info("目录不存在,正在创建..."); file.mkdirs(); } File[] files = file.listFiles(); for (File file1 : files) { if (file1.getName().matches(regex)){ boolean delete = file1.delete(); if (delete){ log.info(file1.getName()+" 本地文件删除成功~"); } else { log.info(file1.getName()+" 本地文件删除失败~"); } } } }
3. 删除ftp上两天以前的文件
需要ftp获得登陆,递归文件夹,匹配两天前的文件名,注意:删除ftp文件的时候需要ftp客户端切换工作路径!才能删除成功!
/**
* 要切换ftp的工作目录才能删除!ftp.changeWorkingDirectory("/lshrabbitMQ/hw_data");
* @throws IOException
*/ public static void delFTPFile() throws IOException { ftp = new FTPClient(); ftp.connect(FTP_IP, FTP_PORT); ftp.login(FTP_USERNAME, FTP_PASSWORD); if(FTPReply.isPositiveCompletion(ftp.getReplyCode())){ log.info("连接成功"); }else{ log.info("连接失败"); } String twoDaysAgo = LocalDateTime.now().minusDays(2).format(DateTimeFormatter.ofPattern("yyyyMMdd")); String regex = twoDaysAgo+".*"; FTPFile[] ftpFiles = ftp.listFiles(FTP_PATH); for (FTPFile ftpFile : ftpFiles) { if (ftpFile.getName().matches(regex)){ ftp.changeWorkingDirectory(FTP_PATH); boolean bool = ftp.deleteFile(ftpFile.getName()); if (bool){ log.info(ftpFile.getName()+" ftp文件删除成功!"); }else { log.info(ftpFile.getName()+" ftp文件删除失败!"); } } } }
本文地址:https://blog.csdn.net/DreamsArchitects/article/details/108241419
上一篇: 男人口臭毁所有 这么做让你口气清新
下一篇: Java线程开发和多线程并发