Thrift 简单使用
一.Thrift介绍
thrift是一个软件框架,用来进行可扩展且跨语言的服务的开发。它结合了功能强大的软件堆栈和代码生成引擎,以构建在 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 这些编程语言间无缝结合的、高效的服务。
thrift最初由facebook开发用做系统内个语言之间的RPC通信 。
2007年由facebook贡献到apache基金 ,08年5月进入apache孵化器 。
支持多种语言之间的RPC(Remote Procedure Call Protocol 远程过程调用协议)方式的通信:其它语言client可以构造一个对象,调用相应的服务方法来调用java语言的服务 ,跨越语言的C/S RPC调用 。——来自百度
Thrift在接口定义文档、数据类型支持、跨语言等方面存在优势,然而对服务地址和服务管理方面支持力度不够,并且存在类型侵入问题。——《搜狗商业平台Java技术实践》
协议、传输层等其它细节可以参考 http://www.ibm.com/developerworks/cn/java/j-lo-apachethrift/
关于protobuf和thrift的对比可以自行搜索,protobuf是谷歌出的二进制解析协议就好像JSON一样。
二.为什么选用Thrift
1.处理类似的问题,我们会先想到WebService,不管是基于XML的jax-ws,还是基于JSON的jax-rs(按照表述性状态转移(REST)架构风格创建Web服务)都是基于Http协议的,Thrift可以基于TCP,效率上有优势;
2.Thrift有两种常用的协议TCompactProtocol,TBinaryProtocol(默认),体积超小,使用起来比较麻烦,不如前两者轻便,但是对于高并发、数据传输量大、多语言环境比XML和JSON传输对象体积更小,处理效率更高。
三.IDL定义
Thrift 脚本可定义的数据类型包括以下几种类型:
1.基本类型:
bool:布尔值,true 或 false,对应 Java 的 boolean
byte:8 位有符号整数,对应 Java 的 byte
i16:16 位有符号整数,对应 Java 的 short
i32:32 位有符号整数,对应 Java 的 int
i64:64 位有符号整数,对应 Java 的 long
double:64 位浮点数,对应 Java 的 double
string:未知编码文本或二进制字符串,对应 Java 的 String
2.结构体类型:
struct:定义公共的对象,类似于 C 语言中的结构体定义,在 Java 中是一个 JavaBean
3.容器类型:
list:对应 Java 的 ArrayList
set:对应 Java 的 HashSet
map:对应 Java 的 HashMap
4.异常类型:
exception:对应 Java 的 Exception
5.服务类型:
service:对应服务的类
四.Thrift生成DEMO
1.下载thrift
下面的地址下载thrift的可执行文件,目前的版本是thrift-0.9.2.exe
http://thrift.apache.org/download
2.定义IDL
定义两个IDL(.thrift)文件,ThriftStruct.thrift是Domain定义,ThriftService.thrift是Service方法的定义
注意Thrift服务不支持方法重载
namespace java com.gqshao.thirft.domain struct ThriftStruct{ 1:i32 id; 2:string name; 3:double number; 4:bool isBoolean; }
namespace java com.gqshao.thirft.service include "ThriftStruct.thrift" service ThriftService { string helloString(1:string param) i32 helloInt(1:i32 param) bool helloBoolean(1:bool param) void helloVoid() string helloNull() i32 add(1:i32 n1, 2:i32 n2) i32 size(1:list<ThriftStruct.ThriftStruct> structs) bool sendSet(1:set<ThriftStruct.ThriftStruct> structs) bool sendMap(1:map<string, ThriftStruct.ThriftStruct> structs) }
3.生成文件
执行下面的命令生成Domain和Service,并将文件拷贝到源码相应目录下
thrift-0.9.2.exe -r -gen java ./ThriftService.thrift
4.实现接口
public class ThriftServiceImpl implements ThriftService.Iface { private static final Logger logger = LoggerFactory.getLogger(ThriftServiceImpl.class); @Override public String helloString(String param) throws TException { logger.info("helloString : " + param); return param; } @Override public int helloInt(int param) throws TException { logger.info("helloInt : " + param + ", begin sleep 20000"); try { Thread.sleep(20000); } catch (InterruptedException e) { e.printStackTrace(); } logger.info("helloInt : " + param + ", begin sleep over"); return param; } @Override public boolean helloBoolean(boolean param) throws TException { logger.info("helloBoolean : " + param); return param; } @Override public void helloVoid() throws TException { logger.info("helloVoid"); } @Override public String helloNull() throws TException { logger.info("helloNull"); return null; } @Override public int add(int n1, int n2) throws TException { logger.info("add n1 : " + n1 + ", n2" + n2); return n1 + n2; } @Override public int size(List<ThriftStruct> structs) throws TException { logger.info("size"); if (CollectionUtils.isEmpty(structs)) { return 0; } return structs.size(); } @Override public boolean sendSet(Set<ThriftStruct> structs) throws TException { logger.info("sendSet"); if (CollectionUtils.isEmpty(structs)) { return false; } return true; } @Override public boolean sendMap(Map<String, ThriftStruct> structs) throws TException { logger.info("sendMap"); if (MapUtils.isEmpty(structs)) { return false; } return true; } }
五.协议
Thrift可以让你选择客户端与服务端之间传输通信协议的类别,在传输协议上总体上划分为文本(text)和二进制(binary)传输协议, 为节约带宽,提供传输效率,一般情况下使用二进制类型的传输协议为多数,但有时会还是会使用基于文本类型的协议,这需要根据项目/产品中的实际需求:
* TBinaryProtocol – 二进制编码格式进行数据传输。
* TCompactProtocol – 这种协议非常有效的,使用Variable-Length Quantity (VLQ) 编码对数据进行压缩。
* TJSONProtocol – 使用JSON的数据编码协议进行数据传输。
* TSimpleJSONProtocol – 这种节约只提供JSON只写的协议,适用于通过脚本语言解析
* TDebugProtocol – 在开发的过程中帮助开发人员调试用的,以文本的形式展现方便阅读。
六.传输层
常用的传输层有以下几种:
1.TSocket
使用阻塞式 I/O 进行传输,是最常见的模式
2.TFramedTransport
使用非阻塞方式,按块的大小进行传输,类似于 Java 中的 NIO 若使用 TFramedTransport 传输层,其服务器必须修改为非阻塞的服务类型,TNonblockingServerTransport 类是构建非阻塞 socket 的抽象类,TNonblockingServerSocket 类继承 TNonblockingServerTransport
使用 TFramedTransport 传输层构建的 HelloServiceServer.java
TNonblockingServerTransport serverTransport; serverTransport = new TNonblockingServerSocket(10005); Hello.Processor processor = new Hello.Processor(new HelloServiceImpl()); TServer server = new TNonblockingServer(processor, serverTransport); System.out.println("Start server on port 10005 ..."); server.serve();
使用 TFramedTransport 传输层的 HelloServiceClient.java
TTransport transport = new TFramedTransport(new TSocket("localhost", 10005));
3.TNonblockingTransport
使用非阻塞方式,用于构建异步客户端
七.服务端类型
· TSimpleServer
TSimplerServer接受一个连接,处理连接请求,直到客户端关闭了连接,它才回去接受一个新的连接。正因为它只在一个单独的线程中以阻塞I/O的方式完成这些工作,所以它只能服务一个客户端连接,其他所有客户端在被服务器端接受之前都只能等待。TSimpleServer主要用于测试目的,不要在生产环境中使用它!
· TNonblockingServer
TNonblockingServer使用非阻塞的I/O解决了TSimpleServer一个客户端阻塞其他所有客户端的问题。它使用了java.nio.channels.Selector,通过调用select(),它使得你阻塞在多个连接上,而不是阻塞在单一的连接上。当一或多个连接准备好被接受/读/写时,select()调用便会返回。TNonblockingServer处理这些连接的时候,要么接受它,要么从它那读数据,要么把数据写到它那里,然后再次调用select()来等待下一个可用的连接。通用这种方式,server可同时服务多个客户端,而不会出现一个客户端把其他客户端全部“饿死”的情况。
· THsHaServer
所有消息是被调用select()方法的同一个线程处理的。假设有10个客户端,处理每条消息所需时间为100毫秒,那么,latency和吞吐量分别是多少?当一条消息被处理的时候,其他9个客户端就等着被select,所以客户端需要等待1秒钟才能从服务器端得到回应,吞吐量就是10个请求/秒。如果可以同时处理多条消息的话,会很不错吧?因此,THsHaServer(半同步/半异步的server)就应运而生了。它使用一个单独的线程来处理网络I/O,一个独立的worker线程池来处理消息。这样,只要有空闲的worker线程,消息就会被立即处理,因此多条消息能被并行处理。用上面的例子来说,现在的latency就是100毫秒,而吞吐量就是100个请求/秒。
· TThreadedSelectorServer
Thrift 0.8引入了另一种server实现,即TThreadedSelectorServer。它与THsHaServer的主要区别在于,TThreadedSelectorServer允许你用多个线程来处理网络I/O。它维护了两个线程池,一个用来处理网络I/O,另一个用来进行请求的处理。当网络I/O是瓶颈的时候,TThreadedSelectorServer比THsHaServer的表现要好。
· TThreadPoolServer
TThreadPoolServer与其他三种server不同的是有一个专用的线程用来接受连接。一旦接受了一个连接,它就会被放入ThreadPoolExecutor中的一个worker线程里处理。 worker线程被绑定到特定的客户端连接上,直到它关闭。一旦连接关闭,该worker线程就又回到了线程池中。 你可以配置线程池的最小、最大线程数,默认值分别是5(最小)和Integer.MAX_VALUE(最大)。这意味着,如果有1万个并发的客户端连接,你就需要运行1万个线程。所以它对系统资源的消耗不像其他类型的server一样那么“友好”。此外,如果客户端数量超过了线程池中的最大线程数,在有一个worker线程可用之前,请求将被一直阻塞在那里。TThreadPoolServer的表现非常优异。如果你提前知道了将要连接到你服务器上的客户端数量,并且你不介意运行大量线程的话,TThreadPoolServer对你可能是个很好的选择。
结论
TThreadedSelectorServer对大多数案例来说都是个安全之选。如果你的系统资源允许运行大量并发线程的话,你可能会想考虑使用TThreadPoolServer.
八.Server和Client
项目中需要引入thrift依赖,Maven依赖如下
<dependency> <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> <version>0.9.2</version> </dependency>
1.服务器端,分别实现了TBinaryProtocol、TCompactProtocol和采用TCompactProtocol协议的ThreadPoolServer
import com.gqshao.thrift.service.ThriftService; import com.gqshao.thrift.service.impl.ThriftServiceImpl; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TSimpleServer; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TTransportFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ThriftServer { private static final Logger logger = LoggerFactory.getLogger(ThriftServer.class); /** * 二进制编码格式进行数据传输 */ public void initTBinaryProtocolServer() { try { // transport定义了消息是怎样在客户端和服务器端之间通信的 TServerSocket serverTransport = new TServerSocket(7911); TSimpleServer.Args args = new TSimpleServer.Args(serverTransport); // 关联处理器与服务的实现 TProcessor processor = new ThriftService.Processor<ThriftServiceImpl>(new ThriftServiceImpl()); args.processor(processor); // 使用二进制来编码应用层的数据 protocol(协议)定义了消息是怎样序列化的 args.protocolFactory(new TBinaryProtocol.Factory(true, true)); // 使用普通的socket来传输数据 args.transportFactory(new TTransportFactory()); TServer server = new TSimpleServer(args); logger.info("Start TBinary Protocol Server on port 7911..."); server.serve(); } catch (Exception e) { e.printStackTrace(); } } /** * 高效率的、密集的二进制编码格式进行数据传输 */ public void initTCompactProtocolServer() { try { // transport定义了消息是怎样在客户端和服务器端之间通信的 TServerSocket serverTransport = new TServerSocket(7912); TSimpleServer.Args args = new TSimpleServer.Args(serverTransport); // 关联处理器与服务的实现 TProcessor processor = new ThriftService.Processor<ThriftServiceImpl>(new ThriftServiceImpl()); args.processor(processor); // 使用二进制来编码应用层的数据 protocol(协议)定义了消息是怎样序列化的 args.protocolFactory(new TCompactProtocol.Factory()); // 使用普通的socket来传输数据 args.transportFactory(new TTransportFactory()); TServer server = new TSimpleServer(args); logger.info("Start TCompact Protocol Server on port 7911..."); server.serve(); } catch (Exception e) { e.printStackTrace(); } } /** * ThreadPoolServer 和 TCompactProtocol */ public void initTThreadPoolServerAndTCompactProtocolServer() { try { TServerSocket serverTransport = new TServerSocket(7913); TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverTransport); // 1.关联处理器与服务的实现 TProcessor processor = new ThriftService.Processor<ThriftServiceImpl>(new ThriftServiceImpl()); args.processor(processor); // 2.定义protocol 使用高效率的、密集的二进制编码格式进行数据传输 protocol(协议)定义了消息是怎样序列化的 args.protocolFactory(new TCompactProtocol.Factory()); // 使用普通的socket来传输数据 transport定义了消息是怎样在客户端和服务器端之间通信的 // 默认 TTransportFactory // args.transportFactory(new TTransportFactory()); // 实例化TThreadPoolServer TServer server = new TThreadPoolServer(args); logger.info("Start TCompact Protocol Server on port 7911..."); server.serve(); } catch (Exception e) { e.printStackTrace(); } } }
import com.google.common.collect.Lists; import com.gqshao.thrift.domain.ThriftStruct; import com.gqshao.thrift.server.ThriftServer; import com.gqshao.thrift.service.ThriftService; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.concurrent.CountDownLatch; public class ThriftClientService { private static final Logger logger = LoggerFactory.getLogger(ThriftClientService.class); public static void initTBinaryProtocolServer() { new Thread(new Runnable() { @Override public void run() { new ThriftServer().initTBinaryProtocolServer(); } }).start(); } @Test public void testTBinaryProtocolClient() { TTransport transport = null; try { initTBinaryProtocolServer(); transport = new TSocket("localhost", 7911); transport.open(); // 设置传输协议为 TBinaryProtocol TProtocol protocol = new TBinaryProtocol(transport); ThriftService.Client client = new ThriftService.Client(protocol); // 调用服务的 helloVoid 方法 client.helloString("world"); } catch (TException e) { e.printStackTrace(); } finally { if (transport != null) { transport.close(); } } } public static void initTCompactProtocolServer() { new Thread(new Runnable() { @Override public void run() { new ThriftServer().initTCompactProtocolServer(); } }).start(); } @Test public void testTCompactProtocolClient() { TTransport transport = null; try { initTCompactProtocolServer(); transport = new TSocket("localhost", 7912); transport.open(); // 设置传输协议为 TCompactProtocol TProtocol protocol = new TCompactProtocol(transport); ThriftService.Client client = new ThriftService.Client(protocol); // 调用服务的 size 方法 ThriftStruct s1 = new ThriftStruct(); s1.setId(1); ThriftStruct s2 = new ThriftStruct(); s2.setId(2); List<ThriftStruct> structs = Lists.newArrayList(s1, s2); int size = client.size(structs); logger.info("size === " + size); } catch (TException e) { e.printStackTrace(); } finally { if (transport != null) { transport.close(); } } } public static void initTThreadPoolServerAndTCompactProtocolServer() { new Thread(new Runnable() { @Override public void run() { new ThriftServer().initTThreadPoolServerAndTCompactProtocolServer(); } }).start(); } @Test public void testTThreadPoolServerAndTCompactProtocolServerClient() { initTThreadPoolServerAndTCompactProtocolServer(); int threadNum = 50; final CountDownLatch threadSignal = new CountDownLatch(threadNum); try { } catch (Exception e) { e.printStackTrace(); } for (int i = 0; i < 50; i++) { final int finalI = i; new Thread(new Runnable() { @Override public void run() { TTransport transport = null; try { // 传输层选用TSocket transport = new TSocket("localhost", 7913); transport.open(); // 设置传输协议为 TCompactProtocol TProtocol protocol = new TCompactProtocol(transport); ThriftService.Client client = new ThriftService.Client(protocol); // 调用服务的 helloVoid 方法 // client.helloString("World !"); client.helloInt(finalI); } catch (Exception e) { e.printStackTrace(); } finally { transport.close(); } threadSignal.countDown(); } }).start(); } try { threadSignal.await(); } catch (InterruptedException e) { e.printStackTrace(); } logger.info("testTThreadPoolServerAndTCompactProtocolServerClient test over"); } }
上一篇: php基础问题,请指点