欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

指定字段上传Kafka写入txt文件并文件上传ftp服务器

程序员文章站 2022-07-04 15:03:36
指定字段上传Kafka写入txt文件并文件上传ftp总结JSON字符串转对象字段处理写入txt文件上传ftp删除两天之前的文件...



一、修改zookeeper配置文件

指定字段上传Kafka写入txt文件并文件上传ftp服务器

二、修改kafka配置文件

指定字段上传Kafka写入txt文件并文件上传ftp服务器
指定字段上传Kafka写入txt文件并文件上传ftp服务器

这样才能启动zookeeper和kafka并且能连接上。

三、解析JSON字符串转对象

kafka生产者传到队列中是一组json字符串

json实例

{"countAll":0,"countCorrect":0,"datatime":"6","logid":"202008251616401404295157","requestinfo":"{\"imei\":\"192016e9-3ab2-4849-8f41-d3a01c541ab0\",\"channelno\":\"221\",\"timestamp\":\"1598343399833\",\"subjectNum\":\"10000\"}","requestip":"113.57.246.122","requesttime":"2020-08-25 16:16:40","requesttype":"63","responsecode":"000000","responseinfo":"{\"data\":[{\"address\":\"\",\"call\":\"\",\"classCode1\":\"\",\"classCode2\":\"511001\",\"classname1\":\"\",\"classname2\":\"运营商\",\"custId\":\"\",\"logo\":\"/icon/omc/1450424263860.png\",\"name\":\"中国电信\",\"type\":\"0\"}],\"errorCode\":\"000000\",\"flag\":false}","userAgent":"TAS-AN00 Build/HUAWEITAS-AN00; 10"} 

实体类解析

到消费者消费时首先要把这json字符串解析成对象才能处理。
这个json字符串有好几层,所以要层层解析。使用fastjson解析。
首先创建对应的实体类,如图:

指定字段上传Kafka写入txt文件并文件上传ftp服务器
注意:对象中的对象要用String类型解析出来,再解析,否则直接用对象类型会报错。
注意:其中还有json数组

IogItemEntity iogItemEntity = JSONObject.parseObject(record.value(), IogItemEntity.class); Requestinfo requestinfo = JSONObject.parseObject(iogItemEntity.getRequestinfo(), Requestinfo.class); Responseinfo responseinfo = JSONObject.parseObject(iogItemEntity.getResponseinfo(), Responseinfo.class); List<DataEntity> dataEntities = JSONObject.parseArray(responseinfo.getData(), DataEntity.class); 

指定字段上传Kafka写入txt文件并文件上传ftp服务器

四、字段处理

规则

要提取指定字段进行处理按照一定顺序。
有两种json字符串格式:
号码:subjectNum或者是id,有的数据是subjectNum,有的是id
手机型号:userAgent
调用时间:requesttime
是否查到:根据responsecode字段,如果是”000000“就是查到即Y;如果是”010005“就是未查到即N。
名称:有的是name有的是custName。

顺序:号码加密|号码前7位|手机型号|调用时间|是否查到|名称加密|行业code|行业name

解决办法

1. 空指针异常

号码有的是subjectNum有的是id,那么subjectNum的数据id就为空,是id的数据subjectNum就为空,就会发生空指针异常。同样名称字段也是。
在这里我使用断言的方法来解决:
Assert.notNull()这个方法是如果对象是空就会抛出异常!我们在用try/catch代码来捕获!
如果不抛出异常,那么就执行try中的代码,如果抛出了异常,异常就会被捕获,从而执行catch中的代码。

 try{ Assert.notNull(dataEntity.getName(),"异常信息:name为null,名称使用custName"); NAME = dataEntity.getName(); }catch (IllegalArgumentException e){ log.info(e.getMessage()); NAME = dataEntity.getCustName(); } 

2.下标越界

取到号码之后的第二个顺序就是取号码的前7位。
这个时候发现有的号码不足7位,做一下if判断即可。

 String NUMBER = ""; try{ Assert.notNull(requestinfo.getSubjectNum(),"异常信息:SubjectNum为null,名称使用ID"); NUMBER = requestinfo.getSubjectNum(); }catch (IllegalArgumentException e){ log.info(e.getMessage()); NUMBER = requestinfo.getId(); } String encrpytSubjectNum = DecryptTool.datahouseDataEncrpyt(NUMBER); if (NUMBER.length()>=7){ NUMBER = NUMBER.substring(0,7); } 

3.Maven打包时本地jar包打不进去

由于需要加密,所以引入了外部jar包,我参考了这篇博客Maven项目依赖外部jar进行打包,虽然maven可以打包成功,但是部署到服务器中运行时,执行到加密方法时tomcat卡住了,没反应了,我怀疑还是这个本地的jar包没有打进去,我注释掉使用加密的行,发现tomcat可以正常运行。所以还是打包的问题。
我查到了另外一种方法才解决了这个问题:
使用Maven命令将本地的jar包打包成maven依赖:mvn install:install-file -Dfile=本地jar包的路径 -DgroupId=组名 -DartifactId=项目名 -Dversion=版本号 -Dpackaging=jar
就可把本地jar包打包成为maven依赖,在本地仓库对应的位置查看是否成功。
(jar包的路径可以使用鼠标直接拖进黑窗口)

五、写入txt文件

将排好序的字符串获得字节数组,使用字节流写成txt文件。

byte[] body = data.getBytes(); FileOutputStream fos = new FileOutputStream(file,true); fos.write(body); fos.close(); 

这样的话,每条数据会直接追加到文末,会连在一起不好分别,所以在加上换行符:\r\n
即在数据的最后加上+"\r\n"即可达到换行的目的。
指定字段上传Kafka写入txt文件并文件上传ftp服务器

六、上传ftp

依赖

操作ftp需要引进依赖

 <dependency> <groupId>commons-net</groupId> <artifactId>commons-net</artifactId> <version>3.7</version> </dependency> 

代码

 public static FTPClient ftp; System.out.println("---===开始连接ftp===---"); ftp = new FTPClient(); System.out.println("---===连接ftp===---"); ftp.connect(FTP_IP, FTP_PORT); boolean login = ftp.login(FTP_USERNAME, FTP_PASSWORD); log.info("-----FTP登陆是否成功:"+login); boolean makeDirectory = ftp.makeDirectory(FTP_PATH); log.info("makeDirectory:"+makeDirectory); ftp.changeWorkingDirectory(FTP_PATH); log.info("--开始上传ftp--"); FileInputStream fis = new FileInputStream(file); boolean b = ftp.storeFile(file.getName(), fis); fis.close(); log.info("--上传完成--"+b); 

七、删除ftp上两天之前的文件

1.使用正则判断两天之前的文件

使用String类的str.matches(regex)方法。
首先创建一个规则:String regex = twoDaysAgo+".*";
在正则表达式中.代表匹配任意字符;*代表匹配0次或多次
所以file1.getName().matches(regex)可以匹配出两天之前的文件!

2.递归删除文件夹/文件

 /**
     * 递归删除文件夹及文件
     * @param dir
     */ public static void remove(File dir) { File files[] = dir.listFiles(); for (int i = 0; i < files.length; i++) { if(files[i].isDirectory()) { remove(files[i]); }else { //删除文件 log.info("删除文件  ::  "+files[i].toString()); files[i].delete(); } } //删除目录 dir.delete(); log.info("删除目录  ::  "+dir.toString()); } 

2.删除本地文件

 public static void deltwoDaysAgoFile(){ String twoDaysAgo = LocalDateTime.now().minusDays(2).format(DateTimeFormatter.ofPattern("yyyyMMdd")); String regex = twoDaysAgo+".*"; File file = new File(LOCAL_PATH); if (!file.exists()){ log.info("目录不存在,正在创建..."); file.mkdirs(); } File[] files = file.listFiles(); for (File file1 : files) { if (file1.getName().matches(regex)){ boolean delete = file1.delete(); if (delete){ log.info(file1.getName()+" 本地文件删除成功~"); } else { log.info(file1.getName()+" 本地文件删除失败~"); } } } } 

3. 删除ftp上两天以前的文件

需要ftp获得登陆,递归文件夹,匹配两天前的文件名,注意:删除ftp文件的时候需要ftp客户端切换工作路径!才能删除成功!

 /**
     * 要切换ftp的工作目录才能删除!ftp.changeWorkingDirectory("/lshrabbitMQ/hw_data");
     * @throws IOException
     */ public static void delFTPFile() throws IOException { ftp = new FTPClient(); ftp.connect(FTP_IP, FTP_PORT); ftp.login(FTP_USERNAME, FTP_PASSWORD); if(FTPReply.isPositiveCompletion(ftp.getReplyCode())){ log.info("连接成功"); }else{ log.info("连接失败"); } String twoDaysAgo = LocalDateTime.now().minusDays(2).format(DateTimeFormatter.ofPattern("yyyyMMdd")); String regex = twoDaysAgo+".*"; FTPFile[] ftpFiles = ftp.listFiles(FTP_PATH); for (FTPFile ftpFile : ftpFiles) { if (ftpFile.getName().matches(regex)){ ftp.changeWorkingDirectory(FTP_PATH); boolean bool = ftp.deleteFile(ftpFile.getName()); if (bool){ log.info(ftpFile.getName()+" ftp文件删除成功!"); }else { log.info(ftpFile.getName()+" ftp文件删除失败!"); } } } } 

本文地址:https://blog.csdn.net/DreamsArchitects/article/details/108241419