欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

看源码学编程系列之多线程在kafka应用(一)

程序员文章站 2022-05-29 09:45:49
kafka 由于它自身的高性能发送与消费能力,而受到广大企业的喜欢,所以我们就先看看kafka 一些源码实现如下: 这段代码摘抄的是,kafka源码 生产者发送消息demo(kafka.examples.Producer) 里面的一个片段,主要是涉及到两个知识点,一个是异步发送消息, 回调函数的实现 ......

  kafka 由于它自身的高性能发送与消费能力,而受到广大企业的喜欢,所以我们就先看看kafka 一些源码实现如下:  

 1  public void run() {
 2         int messageno = 1;
 3         while (true) {
 4             string messagestr = "message_" + messageno;
 5             long starttime = system.currenttimemillis();
 6             if (isasync) { 
 7                 producer.send(new producerrecord<>(topic,
 8                     messageno,
 9                     messagestr), new democallback(starttime, messageno, messagestr));// 异步发送
10             } else { 
11                 try {
12                     producer.send(new producerrecord<>(topic,
13                         messageno,
14                         messagestr)).get();// 同步发送
15                     system.out.println("sent message: (" + messageno + ", " + messagestr + ")");
16                 } catch (interruptedexception | executionexception e) {
17                     e.printstacktrace();
18                 }
19             }
20             ++messageno;
21         }
22     }

  这段代码摘抄的是,kafka源码 生产者发送消息demo(kafka.examples.producer) 里面的一个片段,主要是涉及到两个知识点,一个是异步发送消息,

回调函数的实现,另一个就是同步发送,多线程future.get 模式的实现。现在分别阐述这两种实现方式。

  异步回调方式

  其实这种方式主要应用在调用多线程执行某个任务时,不用傻傻等到该线程完成后得到相应的反馈信息。举个例子client端需要调用server端来执行某个任务,并且希望server端执行完成后

主动将相应的结果告诉client端。这个过程就叫做回调了。如下代码:

  

 1 public class client implements cscallback {
 2     
 3     private volatile boolean stopthread = false;
 4     private server server;
 5 
 6     public client(server server) {
 7         this.server = server;
 8     }
 9 
10     public void sendmsg(final string msg){
11         system.out.println("threadname="+thread.currentthread().getname()+" 客户端:发送的消息为:" + msg);
12         new thread(new runnable() {
13             @override
14             public void run() {
15                 server.getclientmsg(client.this,msg);// 核心代码1:将被调用方自己当作参数(client)传递到调用方(server)
16                 
17                 while(!stopthread) {// 模拟等待另服务器端代码完成
18                     system.out.println("threadname="+thread.currentthread().getname()+"客户端:模拟等待回调完成");
19                     
20                     try {
21                         thread.sleep(50);
22                     } catch (interruptedexception e) {
23                         e.printstacktrace();
24                     }
25                 }
26             }
27         }).start();
28         system.out.println("threadname="+thread.currentthread().getname()+" 客户端:异步发送成功");
29     }
30 
31     @override
32     public void process(string status) {
33         stopthread = true;
34         system.out.println("threadname="+thread.currentthread().getname()+" 客户端:收到服务端回调状态为:" + status);
35     }
36 }

 

 1 public class server {
 2 
 3     public void getclientmsg(cscallback cscallback , string msg) {
 4         
 5 
 6         // 模拟服务端需要对数据处理
 7         try {
 8              new thread(new runnable() {
 9                  @override
10                  public void run() {
11                      system.out.println("threadname="+thread.currentthread().getname()+" 服务端:服务端接收到客户端发送的消息为:" + msg);
12                      while(true) {
13                         int max=10,min=1;
14                          int rannum = (int) (math.random()*(max-min)+min); 
15                          
16                          if(rannum >6) {//  当随机数大于5时认为任务完成
17                               system.out.println("threadname="+thread.currentthread().getname()+" 服务端:数据处理成功,返回成功状态 200");
18                              string status = "200";
19                              cscallback.process(status);// 核心代码2:调用方(server)任务处理完成相应的任务后,调用被调用方(client)的方法告知任务完成
20                              break;
21                          }
22                          
23                          try {
24                             thread.sleep(80);
25                         } catch (interruptedexception e) {
26                             e.printstacktrace();
27                         }
28                      }
29                  }
30              }).start();
31             
32         } catch (exception e) {
33             e.printstacktrace();
34         }
35        
36     }
37 }

         

其实核心代码就两个:

      client端:被调用方自己当作参数(client)传递到调用方(server)。

      server端:调用方(server)任务处理完成相应的任务后,调用被调用方(client)的方法告知任务完成。

   同步发送多线程 future.get 模式实现 

        这种方式方式主要是用来等待某一项任务完成后,接着顺序执行某项任务。和上面的例子一样都是client 端 向server 端请求完成某项任务,并且期望server 端在完成任务后,返回结果

  实例代码如下:

  

 1 public class futuredemo {
 2 
 3     protected realdata realdata = null;
 4     protected boolean isready = false;
 5     public synchronized void requestdata(realdata realdata) {// client请求server完成某项任务
 6         if (isready) {                        
 7             return;     
 8         }
 9         this.realdata = realdata;
10         isready = true;
11         notifyall();//核心代码2:当请求的任务处理完成时,唤醒等待中的线程
12     }
13     
14     public synchronized string getresult() {// client等待server完成任务后返回,此处就相当于 future.get 
15         while (!isready) {
16             try {
17                 wait();//核心代码1:发出请求后等待线程被激活
18             } catch (interruptedexception e) {
19             }
20         }
21         return realdata.result;
22     }
23 }

  核心实现代码其实就是多线程里面的,wait 和 notify 实现方式。异步回调 和 同步 future get 模式最大的区别,举个例子吧,

              老婆(client 端)很爱老公,老公(服务器端)每天完成加班很晚,老婆都会等到老公回家然后给他做夜宵(同步 future get 模式)

             老婆(client 端)很爱老公,老公(服务器端)每天完成加班很晚,老婆觉得一直等太累了,就先睡觉,等老公回来后通知老婆(回调),然后老婆再给老公做夜宵(异步回调方式)。

 

      所以大家都期望自己的老婆是, future get 模式 还是 异步回调模式?