Thrift小记 博客分类: Server Architecture/Distributed thriftjava
程序员文章站
2024-03-22 13:04:40
...
Thrit用的不多,也不够深入,这里小记一笔。
关于Thrift环境如何安装,可以参考官网。
或者直接下载windows版本的thrift-0.9.2.exe
Thrit跟Java有数据类型的映射关系:
/** * The first thing to know about are types. The available types in Thrift are: * * bool Boolean, one byte * byte Signed byte * i16 Signed 16-bit integer - short * i32 Signed 32-bit integer - int * i64 Signed 64-bit integer - long * double 64-bit floating point value - double * string String * binary Blob (byte array) * map<t1,t2> Map from one type to another * list<t1> Ordered list of one type * set<t1> Set of unique elements of one type */
我就不罗嗦了了,能看到这里的都能自动理解。
RPC的核心就是传参调用,参数抛不开基本数据类型、集合,更常用的是自定义对象。
在Thrift中,需要将自定义对象预先定义,类似于C语言编译要求。或者可以include其他thrift文件。
这里用Profile作为对象载体,这里Java的class对应Thrit中的struct,interface对应service。
做一个操作Profile的接口实现,代码如下:
namespace java org.zlex.support.thrift struct Profile { 1: string name, 2: i32 score, 3: bool enable } service ProfileService { string updateName(1:Profile profile, 2:string name) i32 updateScore(1:Profile profile, 2:i32 score) map<string,string> toMap(1:Profile profile) }
保存为Profile.thrift
执行命令,生成Java代码:
thrift -r -gen java Profile.thrift
生成的代码详见附件。自动生成的代码中,冗余还是不少。修改代码的冲动闪过,保持原生态。
针对接口做个实现类:
/** * Mar 14, 2013 */ package org.zlex.support.thrift.impl; import java.util.HashMap; import java.util.Map; import org.apache.thrift.TException; import org.zlex.support.thrift.Profile; import org.zlex.support.thrift.ProfileService.Iface; /** * * @author snowolf * @version 1.0 * @since 1.0 */ public class ProfileServiceImpl implements Iface { /* * (non-Javadoc) * * @see * org.zlex.support.thrift.ProfileService.Iface#updateName(org.zlex.support * .thrift.Profile, java.lang.String) */ @Override public String updateName(Profile profile, String name) throws TException { profile.setName(name); return profile.getName(); } /* * (non-Javadoc) * * @see * org.zlex.support.thrift.ProfileService.Iface#updateScore(org.zlex.support * .thrift.Profile, int) */ @Override public int updateScore(Profile profile, int score) throws TException { profile.setScore(profile.getScore() + score); return profile.getScore(); } /* * (non-Javadoc) * * @see * org.zlex.support.thrift.ProfileService.Iface#toMap(org.zlex.support.thrift * .Profile) */ @Override public Map<String, String> toMap(Profile profile) throws TException { Map<String, String> map = new HashMap<String, String>(); map.put("name", profile.getName()); map.put("score", "" + profile.getScore()); map.put("isEnable", "" + profile.isEnable()); return map; } }
做一个Server实现,“非阻塞&高效二进制编码”:
/** * Mar 14, 2013 */ package org.zlex.support.thrift; import org.apache.log4j.Logger; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.server.THsHaServer; import org.apache.thrift.server.TServer; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.thrift.transport.TTransportException; import org.zlex.support.thrift.impl.ProfileServiceImpl; /** * * @author snowolf * @version 1.0 * @since 1.0 */ public class Server { /** * Logger for this class */ private static final Logger logger = Logger.getLogger(Server.class); /** * */ private int port; /** * @param port */ public Server(int port) { this.port = port; } /** * */ @SuppressWarnings({ "rawtypes", "unchecked" }) public void start() { try { TNonblockingServerSocket socket = new TNonblockingServerSocket(port); final ProfileService.Processor processor = new ProfileService.Processor( new ProfileServiceImpl()); THsHaServer.Args arg = new THsHaServer.Args(socket); // 高效率的、密集的二进制编码格式进行数据传输 // 使用非阻塞方式,按块的大小进行传输,类似于 Java 中的 NIO arg.protocolFactory(new TCompactProtocol.Factory()); arg.transportFactory(new TFramedTransport.Factory()); arg.processorFactory(new TProcessorFactory(processor)); TServer server = new THsHaServer(arg); logger.info("服务启动-使用:非阻塞&高效二进制编码"); server.serve(); } catch (TTransportException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }
服务器测试用例:
/** * Mar 14, 2013 */ package org.zlex.support.thrift; import org.junit.Before; import org.junit.Test; /** * * @author snowolf * @version 1.0 * @since 1.0 */ public class ServerTest { public final static int PORT = 9999; private Server server; @Before public void init() { server = new Server(PORT); } @Test public void test() { server.start(); } }
客户端测试用例:
/** * Mar 14, 2013 */ package org.zlex.support.thrift; import static org.junit.Assert.*; import java.util.Map; import org.apache.thrift.TApplicationException; import org.apache.thrift.TException; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.junit.Test; /** * * @author snowolf * @version 1.0 * @since 1.0 */ public class ClientTest { public final static int PORT = 9999; public static final String address = "localhost"; public static final int clientTimeout = 30000; @Test public void test() { TTransport transport = new TFramedTransport(new TSocket(address, PORT, clientTimeout)); TProtocol protocol = new TCompactProtocol(transport); ProfileService.Client client = new ProfileService.Client(protocol); try { transport.open(); Profile profile = new Profile(); profile.setName("Snowolf"); Map<String, String> map = client.toMap(profile); assertEquals(map.get("name"),"Snowolf"); } catch (TApplicationException e) { System.out.println(e.getMessage() + " " + e.getType()); } catch (TTransportException e) { e.printStackTrace(); } catch (TException e) { e.printStackTrace(); } transport.close(); } }
Client调用Server接口,将Profile对象交由Server处理,转换为Map。
如果需要Thirft承载高并发的负载,可以通过nginx来完成负载均衡的实现,详见
Nginx扩展(一):nginx_tcp_proxy_module
小记完毕,Go Home!