Mysql 流增量写入 Hdfs(二) --Storm + hdfs 的流式处理
一. 概述
上一篇我们介绍了如何将数据从 mysql 抛到 kafka,这次我们就专注于利用 storm 将数据写入到 hdfs 的过程,由于 storm 写入 hdfs 的可定制东西有些多,我们先不从 kafka 读取,而先自己定义一个 spout 数据充当数据源,下章再进行整合。这里默认你是拥有一定的 storm 知识的基础,起码知道 spout 和 bolt 是什么。
写入 hdfs 可以有以下的定制策略:
- 自定义写入文件的名字
- 定义写入内容格式
- 满足给定条件后更改写入的文件
- 更改写入文件时触发的 action
本篇会先说明如何用 storm 写入 hdfs,写入过程一些 api 的描述,以及最后给定一个例子:
storm 每接收到 10 个 tuple 后就会改变 hdfs 写入文件,新文件的名字就是第几次改变。
ps:storm 版本:1.1.1 。hadoop 版本:2.7.4 。
接下来我们首先看看 storm 如何写入 hdfs 。
二. storm 写入 hdfs
storm 官方有提供了相应的 api 让我们可以使用。可以通过创建 hdfsbolt 以及定义相应的规则,即可写入 hdfs 。
首先通过 maven 配置依赖以及插件。
<properties> <storm.version>1.1.1</storm.version> </properties> <dependencies> <dependency> <groupid>org.apache.storm</groupid> <artifactid>storm-core</artifactid> <version>${storm.version}</version> <!--<scope>provided</scope>--> <exclusions> <exclusion> <groupid>org.slf4j</groupid> <artifactid>log4j-over-slf4j</artifactid> </exclusion> </exclusions> </dependency> <dependency> <groupid>commons-collections</groupid> <artifactid>commons-collections</artifactid> <version>3.2.1</version> </dependency> <dependency> <groupid>com.google.guava</groupid> <artifactid>guava</artifactid> <version>15.0</version> </dependency> <!--hadoop模块--> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-client</artifactid> <version>2.7.4</version> <exclusions> <exclusion> <groupid>org.slf4j</groupid> <artifactid>slf4j-log4j12</artifactid> </exclusion> </exclusions> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-hdfs</artifactid> <version>2.7.4</version> <exclusions> <exclusion> <groupid>org.slf4j</groupid> <artifactid>slf4j-log4j12</artifactid> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-hdfs --> <dependency> <groupid>org.apache.storm</groupid> <artifactid>storm-hdfs</artifactid> <version>1.1.1</version> <!--<scope>test</scope>--> </dependency> </dependencies> <build> <plugins> <plugin> <groupid>org.apache.maven.plugins</groupid> <artifactid>maven-compiler-plugin</artifactid> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupid>org.codehaus.mojo</groupid> <artifactid>exec-maven-plugin</artifactid> <version>1.2.1</version> <executions> <execution> <goals> <goal>exec</goal> </goals> </execution> </executions> <configuration> <executable>java</executable> <includeprojectdependencies>true</includeprojectdependencies> <includeplugindependencies>false</includeplugindependencies> <classpathscope>compile</classpathscope> <mainclass>com.learningstorm.kafka.kafkatopology</mainclass> </configuration> </plugin> <plugin> <groupid>org.apache.maven.plugins</groupid> <artifactid>maven-shade-plugin</artifactid> <version>1.7</version> <configuration> <createdependencyreducedpom>true</createdependencyreducedpom> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.servicesresourcetransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.manifestresourcetransformer"> <mainclass></mainclass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
这里要提一下,如果要打包部署到集群上的话,打包的插件需要使用 maven-shade-plugin 这个插件,然后使用 maven lifecycle 中的 package 打包。而不是用 maven-assembly-plugin 插件进行打包。
因为使用 maven-assembly-plugin 的时候,会将所有依赖的包unpack,然后在pack,这样就会出现,同样的文件被覆盖的情况。发布到集群上的时候就会报 no filesystem for scheme: hdfs 的错 。
然后是使用 hdfsbolt 写入 hdfs。这里来看看官方文档中的例子吧。
// 使用 "|" 来替代 ",",来进行字符分割 recordformat format = new delimitedrecordformat() .withfielddelimiter("|"); // 每输入 1k 后将内容同步到 hdfs 中 syncpolicy syncpolicy = new countsyncpolicy(1000); // 当文件大小达到 5mb ,转换写入文件,即写入到一个新的文件中 filerotationpolicy rotationpolicy = new filesizerotationpolicy(5.0f, units.mb); //当转换写入文件时,生成新文件的名字并使用 filenameformat filenameformat = new defaultfilenameformat() .withpath("/foo/"); hdfsbolt bolt = new hdfsbolt() .withfsurl("hdfs://localhost:9000") .withfilenameformat(filenameformat) .withrecordformat(format) .withrotationpolicy(rotationpolicy) .withsyncpolicy(syncpolicy); //生成该 bolt topologybuilder.setbolt("hdfsbolt", bolt, 5).globalgrouping("randomstrspout");
到这里就结束了。可以将 hdfsbolt 当作一个 storm 中特殊一些的 bolt 即可。这个 bolt 的作用即使根据接收信息写入 hdfs。
而在新建 hdfsbolt 中,storm 为我们提供了相当强的灵活性,我们可以定义一些策略,比如当达成某个条件的时候转换写入文件,新写入文件的名字,写入时候的分隔符等等。
如果选择使用的话,storm 有提供部分接口供我们使用,但如果我们觉得不够丰富也可以自定义相应的类。下面我们看看如何控制这些策略吧。
recordformat
这是一个接口,允许你*定义接收到内容的格式。
public interface recordformat extends serializable { byte[] format(tuple tuple); }
storm 提供了 delimitedrecordformat ,使用方法在上面已经有了。这个类默认的分割符是逗号",",而你可以通过 withfielddelimiter 方法改变分隔符。
如果你的初始分隔符不是逗号的话,那么也可以重写写一个类实现 recordformat 接口即可。
filenameformat
同样是一个接口。
public interface filenameformat extends serializable { void prepare(map conf, topologycontext topologycontext); string getname(long rotation, long timestamp); string getpath(); }
storm 所提供的默认的是 org.apache.storm.hdfs.format.defaultfilenameformat 。默认人使用的转换文件名有点长,格式是这样的:
{prefix}{componentid}-{taskid}-{rotationnum}-{timestamp}{extension}
例如:
mybolt-5-7-1390579837830.txt
默认情况下,前缀是空的,扩展标识是".txt"。
syncpolicy
同步策略允许你将 buffered data 缓冲到 hdfs 文件中(从而client可以读取数据),通过实现org.apache.storm.hdfs.sync.syncpolicy 接口:
public interface syncpolicy extends serializable { boolean mark(tuple tuple, long offset); void reset(); }
filerotationpolicy
这个接口允许你控制什么情况下转换写入文件。
public interface filerotationpolicy extends serializable { boolean mark(tuple tuple, long offset); void reset(); }
storm 有提供三个实现该接口的类:
最简单的就是不进行转换的org.apache.storm.hdfs.bolt.rotation.norotationpolicy ,就是什么也不干。
通过文件大小触发转换的 org.apache.storm.hdfs.bolt.rotation.filesizerotationpolicy。
通过时间条件来触发转换的 org.apache.storm.hdfs.bolt.rotation.timedrotationpolicy。
如果有更加复杂的需求也可以自己定义。
rotationaction
这个主要是提供一个或多个 hook ,可加可不加。主要是在触发写入文件转换的时候会启动。
public interface rotationaction extends serializable { void execute(filesystem filesystem, path filepath) throws ioexception; }
三.实现一个例子
了解了上面的情况后,我们会实现一个例子,根据写入记录的多少来控制写入转换(改变写入的文件),并且转换后文件的名字表示当前是第几次转换。
首先来看看 hdfsbolt 的内容:
recordformat format = new delimitedrecordformat().withfielddelimiter(" "); // sync the filesystem after every 1k tuples syncpolicy syncpolicy = new countsyncpolicy(1000); // filerotationpolicy rotationpolicy = new filesizerotationpolicy(1.0f, filesizerotationpolicy.units.kb); /** rotate file with date,every month create a new file * format:yyyymm.txt */ filerotationpolicy rotationpolicy = new countstrrotationpolicy(); filenameformat filenameformat = new timesfilenameformat().withpath("/test/"); rotationaction action = new newfileaction(); hdfsbolt bolt = new hdfsbolt() .withfsurl("hdfs://127.0.0.1:9000") .withfilenameformat(filenameformat) .withrecordformat(format) .withrotationpolicy(rotationpolicy) .withsyncpolicy(syncpolicy) .addrotationaction(action);
然后分别来看各个策略的类。
filerotationpolicy
import org.apache.storm.hdfs.bolt.rotation.filerotationpolicy; import org.apache.storm.tuple.tuple; import java.text.simpledateformat; import java.util.date; /** * 计数以改变hdfs写入文件的位置,当写入10次的时候,则更改写入文件,更改名字取决于 “timesfilenameformat” * 这个类是线程安全 */ public class countstrrotationpolicy implements filerotationpolicy { private simpledateformat df = new simpledateformat("yyyymm"); private string date = null; private int count = 0; public countstrrotationpolicy(){ this.date = df.format(new date()); // this.date = df.format(new date()); } /** * called for every tuple the hdfsbolt executes. * * @param tuple the tuple executed. * @param offset current offset of file being written * @return true if a file rotation should be performed */ @override public boolean mark(tuple tuple, long offset) { count ++; if(count == 10) { system.out.print("num :" +count + " "); count = 0; return true; } else { return false; } } /** * called after the hdfsbolt rotates a file. */ @override public void reset() { } @override public filerotationpolicy copy() { return new countstrrotationpolicy(); } }
filenameformat
import org.apache.storm.hdfs.bolt.format.filenameformat; import org.apache.storm.task.topologycontext; import java.util.map; /** * 决定重新写入文件时候的名字 * 这里会返回是第几次转换写入文件,将这个第几次做为文件名 */ public class timesfilenameformat implements filenameformat { //默认路径 private string path = "/storm"; //默认后缀 private string extension = ".txt"; private long times = new long(0); public timesfilenameformat withpath(string path){ this.path = path; return this; } @override public void prepare(map conf, topologycontext topologycontext) { } @override public string getname(long rotation, long timestamp) { times ++ ; //返回文件名,文件名为更换写入文件次数 return times.tostring() + this.extension; } public string getpath(){ return this.path; } }
rotationaction
import org.apache.hadoop.fs.filecontext; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.storm.hdfs.common.rotation.rotationaction; import org.slf4j.logger; import org.slf4j.loggerfactory; import java.io.ioexception; import java.net.uri; /** 当转换写入文件时候调用的 hook ,这里仅写入日志。 */ public class newfileaction implements rotationaction { private static final logger log = loggerfactory.getlogger(newfileaction.class); @override public void execute(filesystem filesystem, path filepath) throws ioexception { log.info("hdfs change the written file!!"); return; } }
ok,这样就大功告成了。通过上面的代码,每接收到 10 个 tuple 后就会转换写入文件,新文件的名字就是第几次转换。
完整代码包括一个随机生成字符串的 spout ,可以到我的 github 上查看。
stormhdfsdemo:https://github.com/shezhiming/stormhdfsdemo
更多干货,欢迎关注公众号,哈尔的数据城堡。
上一篇: C# 调用打印机打印文件
下一篇: hadoop在CentOS下的安装配置