欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

关于netty kafka hdfs hbase性能调研记录

程序员文章站 2022-04-14 07:57:35
...

 

1.netty调研记录

    项目中准备用netty框架来实现socket接口,对于netty的性能做了个初步调研,大致过程如下:

    1.1 调用socket接口的客户端

        为了让客户端快速发送数据,我们已经提前将需要传输的数据通过java的ObjectInputStream写数据到了一个文件,主要是节省客户端接口的编码,然后让客户端不断循环发送数据一段时间(比如20分钟,时间是可以指定),客户端代码如下:

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;

public class SocketTask implements Runnable {
	private long sentTime=0L;
	private long endTime=0L;
	private int packageIndex=-1;
	private String ip = "127.0.0.1";
	private int port = 2005;
	private long sleep = 0L;
	private boolean sleepFlag = true;

	public SocketTask(String ip, int port, int packageType, long sentTime, long sleep) {
		super();
		this.ip = ip;
		this.port = port;
		this.packageIndex = packageType;
		this.sentTime = sentTime;
		this.sleep = sleep;
	}

	public void run() {
		if (this.sentTime > 0L && this.port > 0) {
			if(this.sleep>0) {
				this.sleepFlag = true;
			}else{
				this.sleepFlag = false;
			}
			this.endTime = System.currentTimeMillis() + this.sentTime;
			testSocket();
		}
	}

	private void testSocket() {
		Socket socket = null;
		OutputStream os = null;
		try {
			// 1.建立客户端socket连接,指定服务器位置及端口
			socket = new Socket(this.ip, this.port);

			// 2.得到socket读写流
			os = socket.getOutputStream();
			// InputStream is = socket.getInputStream();
			// BufferedReader br = new BufferedReader(new
			// InputStreamReader(is));

			// 3.socket账号验证
			//MessageBind.txt已经存储了socket接口账号验证的包数据,直接读取发送
			byte[] MessageBind = stream2Bytes("MessageBind.txt");
			os.write(MessageBind);
			os.flush();

			// 4.socket发送数据
			byte[][] byteArray = new byte[6][];
			//MessageSendRecord_*_*k.txt已经将要发送的数据包以二进制流写到文件,这里直接读取,避免了编码
			byteArray[0] = stream2Bytes("MessageSendRecord_190_10k.txt");// 10K
			byteArray[1] = stream2Bytes("MessageSendRecord_380_20k.txt");// 20K
			byteArray[2] = stream2Bytes("MessageSendRecord_570_30k.txt");// 30K
			byteArray[3] = stream2Bytes("MessageSendRecord_760_40k.txt");// 40K
			byteArray[4] = stream2Bytes("MessageSendRecord_950_50k.txt");// 50K
			byteArray[5] = stream2Bytes("MessageSendRecord_1140_60k.txt");// 60K
			java.util.Random r = new java.util.Random();
			int index = 0;
			boolean flag = (this.packageIndex >= 0 && this.packageIndex <= 5);
			while (true) {
				if (this.endTime < System.currentTimeMillis()) break;
				if (flag) {
					index = this.packageIndex;
				} else {
					//随机选取一个包发送数据
					index = Math.abs(r.nextInt()) % 6;
				}
				os.write(byteArray[index]);
				os.flush();
				//这里休眠只是为了模拟客户端在收集数据和编码所耗时间
				if(this.sleepFlag){
					Thread.currentThread().sleep(this.sleep);
				}
			}
		} catch (UnknownHostException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}finally{
			if(null!=os){
				try {
					os.close();
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				os = null;
			}
			if(null!=socket){
				try {
					socket.close();
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				socket = null;
			}
		}
	}

	/**
	 * 读取文件中的二进制数据
	 * @param filename
	 * @return
	 */
	private byte[] stream2Bytes(String filename) {
		byte[] encodeBytes = null;
		try {
			ObjectInputStream ois = new ObjectInputStream(this.getClass().getClassLoader().getResourceAsStream(filename));
			encodeBytes = (byte[]) ois.readObject();
			System.out.println(encodeBytes);
		} catch (FileNotFoundException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		return encodeBytes;
	}
}
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class JavaSocketClient{
	public static void main(String[] args) {
		//检查参数
	    checkParameters(args);
	    
	    //解析参数
        String ip = args[0];//被测试socket接口的IP
        int port = Integer.parseInt(args[1]);//被测试socket接口的端口
        int concurrentNumber =Integer.parseInt(args[2]);//多少并发去发送数据
        int packageType = Integer.parseInt(args[3]);//发送哪个大小的数据包,10k 20k 30k 40k 50k 60k
        long sentTime = Long.parseLong(args[4]);//发送多长时间
        long sleep = Long.parseLong(args[5]);//每发送一次休眠多长时间,休眠的时间可以理解为客户端在收集数据或者编码数据
        
		ExecutorService executorService = Executors.newFixedThreadPool(concurrentNumber);
		for(int count=0;count<concurrentNumber;count++){
			executorService.execute(new SocketTask(ip, port,packageType, sentTime,sleep));
		}
	}

	private static void checkParameters(String[] args) {
		if (args.length != 6) {
	        System.err.println("Usage: Test <ip> <port> <concurrentNumber> <packageType(-1,0,1,2,3,4,5)> <sentTime> <sleep>");
	        System.exit(1);
	      }
	    if ("".equals(args[0])) {
	        System.err.println("Usage: Test <ip> <port> <concurrentNumber> <packageType(-1,0,1,2,3,4,5)> <sentTime> <sleep>,ip is null");
	        System.exit(1);
	      }
	    if ("".equals(args[1])) {
	        System.err.println("Usage: Test <ip> <port> <concurrentNumber> <packageType(-1,0,1,2,3,4,5)> <sentTime> <sleep>,port shoud be a number");
	        System.exit(1);
	      }
	    if ("".equals(args[2])) {
	        System.err.println("Usage: Test <ip> <port> <concurrentNumber> <packageType(-1,0,1,2,3,4,5)> <sentTime> <sleep>,concurrentNumber shoud be a number");
	        System.exit(1);
	      }
	    if ("".equals(args[3])) {
	        System.err.println("Usage: Test <ip> <port> <concurrentNumber> <packageType(-1,0,1,2,3,4,5)> <sentTime> <sleep>,packageType shoud be a number");
	        System.exit(1);
	      }
	    if ("".equals(args[4])) {
	        System.err.println("Usage: Test <ip> <port> <concurrentNumber> <packageType(-1,0,1,2,3,4,5)> <sentTime> <sleep>,sentTime shoud be a number");
	        System.exit(1);
	      }
	    if ("".equals(args[5])) {
	        System.err.println("Usage: Test <ip> <port> <concurrentNumber> <packageType(-1,0,1,2,3,4,5)> <sentTime> <sleep>,sleep shoud be a number");
	        System.exit(1);
	      }
	}
}

 

    1.2 netty服务端大致代码如下

bossGroup = new NioEventLoopGroup(1);  //主线程池
        
        //netty里面默认通过Runtime.getRuntime().availableProcessors()获取机器逻辑CPU个数,建议NioEventLoopGroup值在(逻辑CPU个数~逻辑CPU个数*2)之间
       // 逻辑CPU的个数=物理CPU个数*物理CPU中core的个数(即核数)*超线程数
        //# 查看物理CPU个数:cat /proc/cpuinfo| grep "physical id"| sort| uniq| wc -l
        //# 查看每个物理CPU中core的个数(即核数):cat /proc/cpuinfo| grep "cpu cores"| uniq
        //# 查看逻辑CPU的个数:cat /proc/cpuinfo| grep "processor"| wc -l
        workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors()*2); //工作线程池,默认为CPU个数的2倍,也即linux逻辑CPU的个数

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             /**
              * ChannelOption.SO_BACKLOG:
				SO_BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,
				用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50
				**/
             .option(ChannelOption.SO_BACKLOG, 1024)  //最大等待队列数
             /**
              * ChannelOption.TCP_NODELAY:
              * 在TCP/IP协议中,无论发送多少数据,总是要在数据前面加上协议头,同时,对方接收到数据,
              * 也需要发送ACK表示确认。为了尽可能的利用网络带宽,TCP总是希望尽可能的发送足够大的数据。
              * 这里就涉及到一个名为Nagle的算法,该算法的目的就是为了尽可能发送大块数据,避免网络中充斥着许多小数据块。
              * 
              * TCP_NODELAY就是用于启用或关闭Nagle算法。如果要求高实时性,有数据发送时就马上发送,
              * 就将该选项设置为true关闭Nagle算法;如果要减少发送次数减少网络交互,就设置为false等累积一定大小后再发送。
              * 默认为false。
				**/
             .option(ChannelOption.TCP_NODELAY, true)  //Nagle算法
             /**
				ChannelOption.SO_KEEPALIVE:
				是否启用心跳保活机制。在双方TCP套接字建立连接后(即都进入ESTABLISHED状态)
				并且在两个小时左右上层没有任何数据传输的情况下,这套机制才会被激活。
				**/
//             .option(ChannelOption.SO_KEEPALIVE, true) //服务端主动检测客户端是否存活 
//             .option(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(false))  // true 直接内存/ false heap内存
             /**
              * ChannelOption.SO_SNDBUF参数对应于套接字选项中的SO_SNDBUF,
              * ChannelOption.SO_RCVBUF参数对应于套接字选项中的SO_RCVBUF,
              * 这两个参数用于操作接收缓冲区和发送缓冲区的大小,接收缓冲区用于保存网络协议站内收到的数据,
              * 直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成功。
              */
             .option(ChannelOption.SO_SNDBUF, 256 * 1024 )    //256K          双方协定
             .option(ChannelOption.SO_RCVBUF, 256 * 1024  )    //256K          双方协定
             //.childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(true))
             .childHandler(new ChannelInitializer<SocketChannel>() {  
                    @Override  
                    public void initChannel(SocketChannel ch) throws Exception {  
                          ChannelPipeline pipeline = ch.pipeline();
                          pipeline.addLast("decoder", new Decoder());  
                          pipeline.addLast("encoder", new Encoder()); 
                          pipeline.addLast("readTimeoutHandler",new ReadTimeoutHandler(Integer.parseInt(data.getString("server.readTimeout"))));
                          pipeline.addLast("exception", new ExceptionHandler());                          
                          pipeline.addLast("auth",new BindCheckHandler());
//                          pipeline.addLast("sendRecord", new SendRecordHandler());
                          //这里单独设置SendRecordHandler使用的线程池,不设置的话默认使用的是workerGroup线程池
                          pipeline.addLast(businessGroup, "sendRecord", new SendRecordHandler());
                          pipeline.addLast("checkLink",new CheckLinkHandler());
                          pipeline.addLast("unbind",new UnbindHandler());
                    }  
              });  

            ChannelFuture f = b.bind(PORT).sync();
            logger.info("netty服务器已经启动");
            isOpen = true;  
            f.channel().closeFuture().sync();
        }catch (Exception e) {
            e.printStackTrace();
        }

 

    1.3 单线程socket客户端测试的结果如下:

       单线程socket客户端跑20分钟,分别测试休眠1毫秒和不休眠传输速度和占用带宽资源

发送间隔(休眠时间) 发送包 速度(行/秒) 均值(Bytes/秒) 占用带宽
1毫秒 20K,380行记录 344054 18772555 143Mb
0毫秒 20K,380行记录 1985689 107018186 816Mb

        基本上千兆带宽在20分钟测试过程中被消耗殆尽,但是netty服务端性能杠杠的,带宽是唯一的制约因素。

 

2.kafka调研结果记录

    场景:两台kafka,新建topic=topictest,partitions=6,replication-factor=2,ACK=ALL(每个parttion的2个replication同时写完才算写完) 
关于netty kafka hdfs hbase性能调研记录
            
    
    博客分类: 学习总结nettykafkaHadoop netty kafka hdfs hbase调研速度 
 

 

3.hdfs测试记录

    场景:4台datanode 
关于netty kafka hdfs hbase性能调研记录
            
    
    博客分类: 学习总结nettykafkaHadoop netty kafka hdfs hbase调研速度 
 

    磁盘RAID对于写入速度的影响

关于netty kafka hdfs hbase性能调研记录
            
    
    博客分类: 学习总结nettykafkaHadoop netty kafka hdfs hbase调研速度 
 

 

4.hbase调研记录

    4.1 发起测试的机器性能对于hbase写入速度的影响关于netty kafka hdfs hbase性能调研记录
            
    
    博客分类: 学习总结nettykafkaHadoop netty kafka hdfs hbase调研速度  

    4.2 发起测试的并发线程数对于hbase写入速度的影响关于netty kafka hdfs hbase性能调研记录
            
    
    博客分类: 学习总结nettykafkaHadoop netty kafka hdfs hbase调研速度  

    4.3 发起测试的批量大小对于hbase写入速度的影响(hbase.client.write.buffer=2MB)关于netty kafka hdfs hbase性能调研记录
            
    
    博客分类: 学习总结nettykafkaHadoop netty kafka hdfs hbase调研速度  

    4.4 拆分数据表影响测试关于netty kafka hdfs hbase性能调研记录
            
    
    博客分类: 学习总结nettykafkaHadoop netty kafka hdfs hbase调研速度 

 

    4.5 RADI对于HBASE读写的影响关于netty kafka hdfs hbase性能调研记录
            
    
    博客分类: 学习总结nettykafkaHadoop netty kafka hdfs hbase调研速度  

  • 关于netty kafka hdfs hbase性能调研记录
            
    
    博客分类: 学习总结nettykafkaHadoop netty kafka hdfs hbase调研速度 
  • 大小: 28 KB
  • 关于netty kafka hdfs hbase性能调研记录
            
    
    博客分类: 学习总结nettykafkaHadoop netty kafka hdfs hbase调研速度 
  • 大小: 23.2 KB
  • 关于netty kafka hdfs hbase性能调研记录
            
    
    博客分类: 学习总结nettykafkaHadoop netty kafka hdfs hbase调研速度 
  • 大小: 33.4 KB
  • 关于netty kafka hdfs hbase性能调研记录
            
    
    博客分类: 学习总结nettykafkaHadoop netty kafka hdfs hbase调研速度 
  • 大小: 29.4 KB
  • 关于netty kafka hdfs hbase性能调研记录
            
    
    博客分类: 学习总结nettykafkaHadoop netty kafka hdfs hbase调研速度 
  • 大小: 29.7 KB
  • 关于netty kafka hdfs hbase性能调研记录
            
    
    博客分类: 学习总结nettykafkaHadoop netty kafka hdfs hbase调研速度 
  • 大小: 23.6 KB
  • 关于netty kafka hdfs hbase性能调研记录
            
    
    博客分类: 学习总结nettykafkaHadoop netty kafka hdfs hbase调研速度 
  • 大小: 33.5 KB
  • 关于netty kafka hdfs hbase性能调研记录
            
    
    博客分类: 学习总结nettykafkaHadoop netty kafka hdfs hbase调研速度 
  • 大小: 22.7 KB