搭建生产级的Netty项目
程序员文章站
2022-03-26 20:03:13
Netty是Trustin Lee在2004年开发的一款高性能的网络应用程序框架。相比于JDK自带的NIO,Netty做了相当多的增强,且隔离了jdk nio的实现细节,API也比较友好,还支持流量整形等高级特性。在我们常见的一些开源项目中已经普遍的应用到了Netty,比如Dubbo、Elastic ......
netty是trustin lee在2004年开发的一款高性能的网络应用程序框架。相比于jdk自带的nio,netty做了相当多的增强,且隔离了jdk nio的实现细节,api也比较友好,还支持流量整形等高级特性。在我们常见的一些开源项目中已经普遍的应用到了netty,比如dubbo、elasticsearch、zookeeper等。
netty的具体开发
提示:因代码相对较多,这里只展示其主要部分,至于项目中用到的编解码器、工具类,请直接拉到最后下载源码!也欢迎顺手给个star~
需要的依赖
<dependency> <groupid>com.google.code.gson</groupid> <artifactid>gson</artifactid> </dependency> <dependency> <groupid>org.projectlombok</groupid> <artifactid>lombok</artifactid> </dependency> <dependency> <groupid>io.dropwizard.metrics</groupid> <artifactid>metrics-core</artifactid> <version>4.1.1</version> </dependency> <dependency> <groupid>io.dropwizard.metrics</groupid> <artifactid>metrics-jmx</artifactid> <version>4.1.1</version> </dependency> <dependency> <groupid>org.apache.commons</groupid> <artifactid>commons-lang3</artifactid> </dependency> <dependency> <groupid>io.netty</groupid> <artifactid>netty-all</artifactid> <version>4.1.29.final</version> </dependency>
client端代码
package com.example.nettydemo.client; import com.example.nettydemo.client.codec.*; import com.example.nettydemo.client.codec.dispatcher.operationresultfuture; import com.example.nettydemo.client.codec.dispatcher.requestpendingcenter; import com.example.nettydemo.client.codec.dispatcher.responsedispatcherhandler; import com.example.nettydemo.common.requestmessage; import com.example.nettydemo.common.string.stringoperation; import com.example.nettydemo.util.idutil; import io.netty.bootstrap.bootstrap; import io.netty.channel.channelfuture; import io.netty.channel.channelinitializer; import io.netty.channel.channelpipeline; import io.netty.channel.nio.nioeventloopgroup; import io.netty.channel.socket.nio.niochanneloption; import io.netty.channel.socket.nio.niosocketchannel; import io.netty.handler.logging.loglevel; import io.netty.handler.logging.logginghandler; import javax.net.ssl.sslexception; import java.util.concurrent.executionexception; public class client { public static void main(string[] args) throws interruptedexception, executionexception, sslexception { bootstrap bootstrap = new bootstrap(); bootstrap.channel(niosocketchannel.class); //客户端连接服务器最大允许时间,默认为30s bootstrap.option(niochanneloption.connect_timeout_millis, 30 * 1000); //10s nioeventloopgroup group = new nioeventloopgroup(); try { bootstrap.group(group); requestpendingcenter requestpendingcenter = new requestpendingcenter(); logginghandler logginghandler = new logginghandler(loglevel.info); bootstrap.handler(new channelinitializer<niosocketchannel>() { @override protected void initchannel(niosocketchannel ch) throws exception { channelpipeline pipeline = ch.pipeline(); pipeline.addlast(new framedecoder()); pipeline.addlast(new frameencoder()); pipeline.addlast(new protocolencoder()); pipeline.addlast(new protocoldecoder()); pipeline.addlast(new responsedispatcherhandler(requestpendingcenter)); pipeline.addlast(new operationtorequestmessageencoder()); // pipeline.addlast(logginghandler); } }); //连接服务 channelfuture channelfuture = bootstrap.connect("127.0.0.1", 8888); //因为future是异步执行,所以需要先连接上后,再进行下一步操作 channelfuture.sync(); long streamid = idutil.nextid(); /** * 发送数据测试,按照定义的规则组装数据 */ // orderoperation orderoperation = new orderoperation(1001, "你好啊,hi"); requestmessage requestmessage = new requestmessage(streamid, new stringoperation(1234, "你好啊,hi")); //将future放入center operationresultfuture operationresultfuture = new operationresultfuture(); requestpendingcenter.add(streamid, operationresultfuture); //发送消息 for (int i = 0; i < 10; i++) { channelfuture.channel().writeandflush(requestmessage); } //阻塞等待结果,结果来了之后会调用responsedispatcherhandler去set结果 // operationresult operationresult = operationresultfuture.get(); // //将结果打印 // system.out.println("返回:"+operationresult); channelfuture.channel().closefuture().get(); } finally { group.shutdowngracefully(); } } }
server端代码
package com.example.nettydemo.server; import com.example.nettydemo.server.codec.framedecoder; import com.example.nettydemo.server.codec.frameencoder; import com.example.nettydemo.server.codec.protocoldecoder; import com.example.nettydemo.server.codec.protocolencoder; import com.example.nettydemo.server.handler.metricshandler; import com.example.nettydemo.server.handler.serveridlecheckhandler; import com.example.nettydemo.server.handler.serverprocesshandler; import io.netty.bootstrap.serverbootstrap; import io.netty.channel.channelfuture; import io.netty.channel.channelinitializer; import io.netty.channel.channelpipeline; import io.netty.channel.nio.nioeventloopgroup; import io.netty.channel.socket.nio.niochanneloption; import io.netty.channel.socket.nio.nioserversocketchannel; import io.netty.channel.socket.nio.niosocketchannel; import io.netty.handler.flush.flushconsolidationhandler; import io.netty.handler.logging.loglevel; import io.netty.handler.logging.logginghandler; import io.netty.handler.traffic.globaltrafficshapinghandler; import io.netty.util.concurrent.defaultthreadfactory; import io.netty.util.concurrent.unorderedthreadpooleventexecutor; import lombok.extern.slf4j.slf4j; import javax.net.ssl.sslexception; import java.security.cert.certificateexception; import java.util.concurrent.executionexception; /** * netty server 入口 */ @slf4j public class server { public static void main(string... args) throws interruptedexception, executionexception, certificateexception, sslexception { serverbootstrap serverbootstrap = new serverbootstrap(); //设置channel模式,因为是server所以使用nioserversocketchannel serverbootstrap.channel(nioserversocketchannel.class); //最大的等待连接数量 serverbootstrap.option(niochanneloption.so_backlog, 1024); //设置是否启用 nagle 算法:用将小的碎片数据连接成更大的报文 来提高发送效率。 //如果需要发送一些较小的报文,则需要禁用该算法 serverbootstrap.childoption(niochanneloption.tcp_nodelay, true); //设置netty自带的log,并设置级别 serverbootstrap.handler(new logginghandler(loglevel.info)); //thread //用户指定线程名 nioeventloopgroup bossgroup = new nioeventloopgroup(0, new defaultthreadfactory("boss")); nioeventloopgroup workgroup = new nioeventloopgroup(0, new defaultthreadfactory("worker")); unorderedthreadpooleventexecutor businessgroup = new unorderedthreadpooleventexecutor(10, new defaultthreadfactory("business")); //只能使用一个线程,因globaltrafficshapinghandler比较轻量级 nioeventloopgroup eventloopgroupfortrafficshaping = new nioeventloopgroup(0, new defaultthreadfactory("ts")); try { //设置react方式 serverbootstrap.group(bossgroup, workgroup); //metrics metricshandler metricshandler = new metricshandler(); //trafficshaping流量整形 //long writelimit 写入时控制, long readlimit 读取时控制 具体设置看业务修改 globaltrafficshapinghandler globaltrafficshapinghandler = new globaltrafficshapinghandler(eventloopgroupfortrafficshaping, 10 * 1024 * 1024, 10 * 1024 * 1024); //log logginghandler debugloghandler = new logginghandler(loglevel.debug); logginghandler infologhandler = new logginghandler(loglevel.info); //设置childhandler,按执行顺序放 serverbootstrap.childhandler(new channelinitializer<niosocketchannel>() { @override protected void initchannel(niosocketchannel ch) throws exception { channelpipeline pipeline = ch.pipeline(); pipeline.addlast("debuglog", debugloghandler); pipeline.addlast("tshandler", globaltrafficshapinghandler); pipeline.addlast("metrichandler", metricshandler); pipeline.addlast("idlehandler", new serveridlecheckhandler()); pipeline.addlast("framedecoder", new framedecoder()); pipeline.addlast("frameencoder", new frameencoder()); pipeline.addlast("protocoldecoder", new protocoldecoder()); pipeline.addlast("protocolencoder", new protocolencoder()); pipeline.addlast("infolog", infologhandler); //对flush增强,减少flush次数牺牲延迟增强吞吐量 pipeline.addlast("flushenhance", new flushconsolidationhandler(10, true)); //为业务处理指定单独的线程池 pipeline.addlast(businessgroup, new serverprocesshandler());//businessgroup, } }); //绑定端口并阻塞启动 channelfuture channelfuture = serverbootstrap.bind(8888).sync(); channelfuture.channel().closefuture().sync(); } finally { bossgroup.shutdowngracefully(); workgroup.shutdowngracefully(); businessgroup.shutdowngracefully(); eventloopgroupfortrafficshaping.shutdowngracefully(); } } }
最后
以上介绍了netty的基本用法,在代码中也做了一部分的关键注释,但可能还会有许多不足,也不可能满足所有人的要求,大家可根据自己的实际需求去改造此项目。附上源码地址
持续学习,记录点滴。更多文章请访问
上一篇: vue的axios请求,对它进行封装使用
下一篇: C++11 基于范围的for循环讲解
推荐阅读
-
在IntelliJ IDEA 搭建springmvc项目配置debug的教程详解
-
Diycode开源项目实例搭建上拉加载和下拉刷新的Fragment
-
在vue-cli搭建的项目中增加后台mock接口的方法
-
vue-cli搭建的项目中使用Echarts教程
-
web项目的初始搭建和intellij的tomcat的配置
-
lamp - windows server 2008R2服务器下如何搭建php的的生产环境( Apache 服务器)?
-
.NET企业级项目中遇到的国际化问题和解决方法
-
详解如何搭建mpvue框架搭配vant组件库的小程序项目
-
vue.js 2.*项目环境搭建、运行、打包发布的详细步骤
-
如何搭建新的WPF项目框架