Apache thrift 之請求處理流程
本文我們以 HelloService 為例,來分析thrfit的請求處理流程。
服務(wù)端啟動
HelloService 的服務(wù)端啟動在 HelloServer,這是我們自定義的類,其中就只有一個main方法:
public static void main(String[] args) {
try {
// 創(chuàng)建處理器,這個就是最終處理請求的類
HelloService.Processor processor = new HelloService.Processor<>(new HelloServiceImpl());
// 配置傳輸類型
TServerTransport transport = new TServerSocket(SERVER_PORT);
// 配置服務(wù)器
TServer server = new TSimpleServer(new TServer.Args(transport).processor(processor));
System.out.println("Starting the simple server...");
// 對外提供服務(wù)
server.serve();
} catch (Exception e) {
e.printStackTrace();
}
}
該方法的關(guān)鍵部分在注釋中已經(jīng)詳細(xì)說明了,接下來我們來一步步分析這些步驟。
創(chuàng)建處理器
創(chuàng)建處理器的代碼為
HelloService.Processor processor = new HelloService.Processor<>(new HelloServiceImpl())
我們進(jìn)入HelloService.Processor#Processor(I)方法:
public Processor(I iface) {
super(iface, getProcessMap(new java.util.HashMap<java.lang.String,
org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
}
Processor(I)方法的參數(shù)是iface,即HelloServiceImpl的對象,也是我們自己實(shí)現(xiàn)的內(nèi)容:
public class HelloServiceImpl implements HelloService.Iface {
@Override
public String hello(String text) throws TException {
return "hello, " + text + " !";
}
}
getProcessMap(...)方法
在Processor(I)方法中,會調(diào)用super(...)方法,注意到super(...)方法的參數(shù)中調(diào)用了getProcessMap(...)方法,我們進(jìn)入其中,來到HelloService.Processor#getProcessMap方法:
private static <I extends Iface> java.util.Map<...>
getProcessMap(java.util.Map<...> processMap) {
processMap.put("hello", new hello());
return processMap;
}
processMap.put("hello", new hello());中的"hello",是HelloService中的方法名,如果有HelloService中有多個方法,processMap就會put多個對象。
需要注意的是,這里的key是方法名,如果多個方法同名,那么先放入的對象會被后放入的對象覆蓋,也就是說,「thrift不支持方法重載」!
那么new hello()是啥呢?我們進(jìn)入其中:
public static class hello<I extends Iface>
extends org.apache.thrift.ProcessFunction<I, hello_args> {
public hello() {
super("hello");
}
...
}
hello繼承了ProcessFunction,繼續(xù)到父類ProcessFunction#ProcessFunction:
public abstract class ProcessFunction<I, T extends TBase> {
private final String methodName;
public ProcessFunction(String methodName) {
this.methodName = methodName;
}
...
}
這里我們大概就知道它是把hello的方法名包裝成了ProcessFunction對象。
TBaseProcessor#TBaseProcessor
我們再回到HelloService.Processor#Processor(I)方法:
public Processor(I iface) {
super(iface, getProcessMap(new java.util.HashMap<java.lang.String,
org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
}
進(jìn)入super(...),也就是TBaseProcessor#TBaseProcessor:
protected TBaseProcessor(I iface, Map<String,
ProcessFunction<I, ? extends TBase>> processFunctionMap) {
this.iface = iface;
this.processMap = processFunctionMap;
}
TBaseProcessor 中保存了兩個內(nèi)容:
服務(wù)的實(shí)現(xiàn)類(由開發(fā)者提供),這里就是 HelloServiceImpl服務(wù)的方法及方法對象(由 thrift生成)
從代碼來看,這一步就是把自主實(shí)現(xiàn)的HelloServiceImpl包裝成thrift的Processor。
new TServerSocket(SERVER_PORT)
我們繼續(xù),接下來分析配置傳輸類型,進(jìn)入 TServerSocket#TServerSocket(int):
public TServerSocket(ServerSocketTransportArgs args) throws TTransportException {
clientTimeout_ = args.clientTimeout;
if (args.serverSocket != null) {
this.serverSocket_ = args.serverSocket;
return;
}
try {
// 創(chuàng)建 ServerSocket
serverSocket_ = new ServerSocket();
// 地址重用,也就是ip與端口重用
serverSocket_.setReuseAddress(true);
// 綁定ip與端口
serverSocket_.bind(args.bindAddr, args.backlog);
} catch (IOException ioe) {
close();
throw new TTransportException("Could not create ServerSocket on address "
+ args.bindAddr.toString() + ".", ioe);
}
}
這個方法主要是用來開啟socket服務(wù)的,使用的是ServerSocket,也就是阻塞IO。
new TServer.Args(transport)
接下我們來看看配置服務(wù)器的操作,進(jìn)入new TServer.Args(transport):
public abstract class TServer {
public static class Args extends AbstractServerArgs<Args> {
public Args(TServerTransport transport) {
// 調(diào)用的是 AbstractServerArgs 的構(gòu)造方法
super(transport);
}
}
/**
* 存放參數(shù)的類
*/
public static abstract class AbstractServerArgs<T extends AbstractServerArgs<T>> {
final TServerTransport serverTransport;
TProcessorFactory processorFactory;
TTransportFactory inputTransportFactory = new TTransportFactory();
TTransportFactory outputTransportFactory = new TTransportFactory();
TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();
TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();
// 最終調(diào)用的方法
public AbstractServerArgs(TServerTransport transport) {
serverTransport = transport;
}
...
}
}
這一塊主要是做一些配置,也就是把前面創(chuàng)建的ServerSocket對象保存到AbstractServerArgs對象中。
new TServer.Args(transport).processor(processor)
TServer.AbstractServerArgs#processor方法內(nèi)容如下:
public T processor(TProcessor processor) {
this.processorFactory = new TProcessorFactory(processor);
return (T) this;
}
返回的對象類型還是TServer.Args。
這一步僅僅是把processor放入到TProcessorFactory中,TProcessorFactory內(nèi)容如下:
public class TProcessorFactory {
private final TProcessor processor_;
public TProcessorFactory(TProcessor processor) {
processor_ = processor;
}
public TProcessor getProcessor(TTransport trans) {
return processor_;
}
public boolean isAsyncProcessor() {
return processor_ instanceof TAsyncProcessor;
}
}
其中僅有一個processor_,并且會在getProcessor()方法中原樣返回。
new TSimpleServer()
繼續(xù),進(jìn)入TSimpleServer的構(gòu)造方法:
public TSimpleServer(AbstractServerArgs args) {
super(args);
}
TSimpleServer實(shí)現(xiàn)了TServer,TServer的構(gòu)造方法如下:
protected TServer(AbstractServerArgs args) {
processorFactory_ = args.processorFactory;
serverTransport_ = args.serverTransport;
inputTransportFactory_ = args.inputTransportFactory;
outputTransportFactory_ = args.outputTransportFactory;
inputProtocolFactory_ = args.inputProtocolFactory;
outputProtocolFactory_ = args.outputProtocolFactory;
}
這一步是為TSimpleServer設(shè)置各種屬性,即將AbstractServerArgs中的屬性賦值到TServer的屬性中 。args中的屬性值,就是TServer.Args(transport)中設(shè)置的以及thrift提供的默認(rèn)內(nèi)容。
server.serve()
接下來就是服務(wù)端的重頭戲了:提供對外服務(wù),方法為TSimpleServer#serve:
public void serve() {
try {
// 啟動監(jiān)聽,表示可以監(jiān)聽端口的連接了
serverTransport_.listen();
} catch (TTransportException ttx) {
LOGGER.error("Error occurred during listening.", ttx);
return;
}
if (eventHandler_ != null) {
// 運(yùn)行 eventHandler_.preServe() 方法
eventHandler_.preServe();
}
setServing(true);
// 死循環(huán)不斷獲取連接
while (!stopped_) {
TTransport client = null;
TProcessor processor = null;
TTransport inputTransport = null;
TTransport outputTransport = null;
TProtocol inputProtocol = null;
TProtocol outputProtocol = null;
ServerContext connectionContext = null;
try {
// 獲取連接,這里會阻塞
client = serverTransport_.accept();
if (client != null) {
processor = processorFactory_.getProcessor(client);
inputTransport = inputTransportFactory_.getTransport(client);
outputTransport = outputTransportFactory_.getTransport(client);
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
if (eventHandler_ != null) {
// 運(yùn)行 eventHandler_.createContext(...) 方法
connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);
}
while (true) {
if (eventHandler_ != null) {
// 運(yùn)行 eventHandler_.processContext(...) 方法
eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
}
// 處理方法操作,這里會執(zhí)行 HelloServiceImpl 的方法
processor.process(inputProtocol, outputProtocol);
}
}
} catch (...) {
...
}
if (eventHandler_ != null) {
// 運(yùn)行 eventHandler_.deleteContext(...) 方法
eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);
}
if (inputTransport != null) {
inputTransport.close();
}
if (outputTransport != null) {
outputTransport.close();
}
}
setServing(false);
}
這個就是服務(wù)端處理請求的整個流程了,下面我們一步步來分析。
啟動服務(wù)監(jiān)聽:serverTransport_.listen()
TServerSocket#listen方法內(nèi)容如下:
public void listen() throws TTransportException {
// Make sure to block on accept
if (serverSocket_ != null) {
try {
serverSocket_.setSoTimeout(0);
} catch (SocketException sx) {
LOGGER.error("Could not set socket timeout.", sx);
}
}
}
可以看到,僅是配置了一個屬性:soTimeout,這個soTimeout是啥意思呢?我們直接看它的注釋:
以指定的超時時間啟用/禁用SO_TIMEOUT ,以毫秒為單位。 通過將此選項(xiàng)設(shè)置為非零超時,對此ServerSocket的accept()調(diào)用將僅在此時間量內(nèi)阻塞。 如果超時到期,則將拋出java.net.SocketTimeoutException ,盡管ServerSocket仍然有效。 必須先啟用該選項(xiàng),然后才能執(zhí)行阻止操作。 超時時間必須> 0 。 零超時被解釋為無限超時。
Enable/disable {@link SocketOptions#SO_TIMEOUT SO_TIMEOUT} with the specified timeout, in milliseconds. With this option set to a non-zero timeout, a call to accept() for this ServerSocket will block for only this amount of time. If the timeout expires, a java.net.SocketTimeoutException is raised, though the ServerSocket is still valid. The option must be enabled prior to entering the blocking operation to have effect. The timeout must be {@code > 0}. A timeout of zero is interpreted as an infinite timeout.
也不是說,這個參數(shù)是用來設(shè)置超時時間的,這里設(shè)置成了0,表示不限超時時間。
運(yùn)行 eventHandler_.xxx(...) 方法
eventHandler_類型為TServerEventHandler,它的定義如下:
public interface TServerEventHandler {
/**
* Called before the server begins.
* 服務(wù)開啟前調(diào)用
*/
void preServe();
/**
* Called when a new client has connected and is about to being processing.
* 服務(wù)創(chuàng)建 context 時調(diào)用
*/
ServerContext createContext(TProtocol input,
TProtocol output);
/**
* Called when a client has finished request-handling to delete server
* context.
* 服務(wù)關(guān)閉時調(diào)用
*/
void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output);
/**
* Called when a client is about to call the processor.
* 處理連接請求
*/
void processContext(ServerContext serverContext,
TTransport inputTransport, TTransport outputTransport);
可以看到這是個接口,里面定義了幾個方法,會在服務(wù)處理的過程中調(diào)用,當(dāng)我們要監(jiān)聽連接的某些操作時,就可以實(shí)現(xiàn)這個接口,然后將其添加到TServerSocket,像這樣:
TServerTransport transport = new TServerSocket(port);
TServer server = new TSimpleServer(new TServer.Args(transport).processor(processor));
// 設(shè)置 ServerEventHandler
server.setServerEventHandler(new MyTServerEventHandler());
server.serve();
獲取連接與處理
server.serve()的核心功能如下:
while (!stopped_) {
...
try {
// 獲取連接,這里會阻塞
client = serverTransport_.accept();
if (client != null) {
processor = processorFactory_.getProcessor(client);
...
while (true) {
...
// 處理方法操作,這里會執(zhí)行 HelloServiceImpl 的方法
processor.process(inputProtocol, outputProtocol);
}
}
} catch (...) {
...
}
使用 TServerSocket#accept獲取連接請求,這是jdk提供的方法使用 processorFactory_.getProcessor(client);方法獲取processor使用 processor.process(...)執(zhí)行具體的方法
這塊先有個印象吧,后面分析執(zhí)行時,再使用調(diào)試的方式來具體分析。
客戶端啟動
客戶端啟動類為HelloClient,這個類是我們自主實(shí)現(xiàn)的,代碼如下:
public static void main(String[] args) {
TTransport transport = null;
try {
// 打開連接
transport = new TSocket("localhost", SERVER_PORT);
transport.open();
// 指定傳輸協(xié)議
TProtocol protocol = new TBinaryProtocol(transport);
// 創(chuàng)建客戶端
HelloService.Client client = new HelloService.Client(protocol);
// 調(diào)用 HelloService#hello 方法
String result = client.hello("thrift world");
System.out.println("result=" + result);
} catch (Exception e) {
e.printStackTrace();
} finally {
if(null != transport) {
transport.close();
}
}
}
打開一個連接
打開連接的操作如下:
transport = new TSocket("localhost", SERVER_PORT);
transport.open();
進(jìn)入TSocket的構(gòu)造方法:
public TSocket(TConfiguration config, String host, int port, int socketTimeout,
int connectTimeout) throws TTransportException {
// 參數(shù)賦值
super(config);
host_ = host;
port_ = port;
socketTimeout_ = socketTimeout;
connectTimeout_ = connectTimeout;
initSocket();
}
/**
* 初始化 socket 對象
*/
private void initSocket() {
socket_ = new Socket();
try {
socket_.setSoLinger(false, 0);
socket_.setTcpNoDelay(true);
socket_.setKeepAlive(true);
socket_.setSoTimeout(socketTimeout_);
} catch (SocketException sx) {
LOGGER.error("Could not configure socket.", sx);
}
}
這一步只是創(chuàng)建了TSocket對象,TSocket的構(gòu)造方法里只是做了一些賦值操作。
再來看看TSocket#open方法:
public void open() throws TTransportException {
// 省略判斷操作
...
try {
// 連接
socket_.connect(new InetSocketAddress(host_, port_), connectTimeout_);
inputStream_ = new BufferedInputStream(socket_.getInputStream());
outputStream_ = new BufferedOutputStream(socket_.getOutputStream());
} catch (IOException iox) {
close();
throw new TTransportException(TTransportException.NOT_OPEN, iox);
}
}
打開連接的方法為java.net.Socket#connect(java.net.SocketAddress, int),使用的是BIO.
獲取一個客戶端
TProtocol protocol = new TBinaryProtocol(transport);
HelloService.Client client = new HelloService.Client(protocol);
這段代碼先使用new TBinaryProtocol(transport)創(chuàng)建了一個二進(jìn)制協(xié)議對象,進(jìn)入TBinaryProtocol#TBinaryProtocol(...) 方法:
public TBinaryProtocol(TTransport trans, long stringLengthLimit,
long containerLengthLimit, boolean strictRead, boolean strictWrite) {
super(trans);
stringLengthLimit_ = stringLengthLimit;
containerLengthLimit_ = containerLengthLimit;
strictRead_ = strictRead;
strictWrite_ = strictWrite;
}
這個方法先是調(diào)用了父類的構(gòu)造方法,然后是一堆的賦值操作,我們進(jìn)入父類的構(gòu)造方法TProtocol#TProtocol(...)中:
protected TProtocol(TTransport trans) {
trans_ = trans;
}
可以看到,整個創(chuàng)建過程就只是一些賦值操作。
我們來看看看客戶端的獲取,進(jìn)入HelloService.Client#Client(...)方法:
public Client(org.apache.thrift.protocol.TProtocol prot)
{
super(prot, prot);
}
繼續(xù),進(jìn)入TServiceClient:
public abstract class TServiceClient {
public TServiceClient(TProtocol prot) {
this(prot, prot);
}
public TServiceClient(TProtocol iprot, TProtocol oprot) {
iprot_ = iprot;
oprot_ = oprot;
}
...
}
可以看到,客戶端的創(chuàng)建依然是一些賦值操作。
執(zhí)行操作
準(zhǔn)備就緒后,接下來就可以執(zhí)行方法了,即:
String result = client.hello("thrift world");
這一行代碼最終調(diào)用的是服務(wù)端的HelloService#hello方法,也就是HelloServiceHandler#hello:
public class HelloServiceHandler implements HelloService.Iface {
@Override
public String hello(String text) throws TException {
return "hello " + text;
}
}
在客戶端調(diào)用本地方法,如何能調(diào)用到遠(yuǎn)程服務(wù)的方法呢?接下來我們就來分析這其中的操作。
執(zhí)行流程
客戶端調(diào)用服務(wù)端由HelloService.Client#hello發(fā)起,我們進(jìn)入該方法:
public java.lang.String hello(java.lang.String text) throws org.apache.thrift.TException
{
send_hello(text);
return recv_hello();
}
這個方法就兩行代碼,從代碼的命名來看,大致能猜出這兩行代碼的含義:
send_hello(...):發(fā)送hello()方法的調(diào)用請求recv_hello(...):接收hello()方法的調(diào)用結(jié)果
客戶端發(fā)送請求:send_hello(text)
進(jìn)入 send_hello(text) 方法:
public void send_hello(java.lang.String text) throws org.apache.thrift.TException
{
hello_args args = new hello_args();
args.setText(text);
sendBase("hello", args);
}
hello_args 封裝就是方法的參數(shù),設(shè)置完參數(shù)后,最終調(diào)用sendBase(...)方法:
TServiceClient#sendBase(String, TBase<?,?>, byte)
private void sendBase(String methodName, TBase<?,?> args, byte type) throws TException {
oprot_.writeMessageBegin(new TMessage(methodName, type, ++seqid_));
args.write(oprot_);
oprot_.writeMessageEnd();
oprot_.getTransport().flush();
}

當(dāng)前對象是TSocket的實(shí)例,outputStream_.flush()中的outputStream_就是TSocket持有的outputStream_。執(zhí)行完成flush操作后,數(shù)據(jù)就發(fā)送到服務(wù)端了,發(fā)送的數(shù)據(jù)主要為方法名與參數(shù)值。
客戶端接收響應(yīng):recv_hello()
接下來我們來看看數(shù)據(jù)的接收流程,也就是recv_hello()方法:
HelloService.Client#recv_hello
public java.lang.String recv_hello() throws org.apache.thrift.TException
{
hello_result result = new hello_result();
// 繼續(xù)處理
receiveBase(result, "hello");
if (result.isSetSuccess()) {
return result.success;
}
throw new org.apache.thrift.TApplicationException(
org.apache.thrift.TApplicationException.MISSING_RESULT,
"hello failed: unknown result");
}
在以上方法中,先是創(chuàng)建了一個hello_result對象,該對象用來保存方法的執(zhí)行結(jié)果,然后調(diào)用receiveBase(...)方法:TServiceClient#receiveBase
protected void receiveBase(TBase<?,?> result, String methodName) throws TException {
TMessage msg = iprot_.readMessageBegin();
...
// read 操作
result.read(iprot_);
iprot_.readMessageEnd();
}
這個方法主要調(diào)用了result.read(iprot_)方法,繼續(xù):
HelloService.hello_result.hello_resultStandardScheme#read
public void read(org.apache.thrift.protocol.TProtocol iprot, hello_result struct)
throws org.apache.thrift.TException {
org.apache.thrift.protocol.TField schemeField;
iprot.readStructBegin();
while (true)
{
schemeField = iprot.readFieldBegin();
// 閱讀完成的標(biāo)識
if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
break;
}
switch (schemeField.id) {
case 0: // SUCCESS
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
// 在這里讀取結(jié)果,返回結(jié)果是String類型,直接讀取
struct.success = iprot.readString();
struct.setSuccessIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
iprot.readFieldEnd();
}
iprot.readStructEnd();
struct.validate();
}
這一步就是讀取返回結(jié)果的內(nèi)容了。
到了這里,客戶端的讀寫操作也就完成了。
服務(wù)端處理:TSimpleServer#serve
接下來我們來看看服務(wù)端是如何處理請求的,進(jìn)入TSimpleServer#serve方法:
public void serve() {
...
while (!stopped_) {
...
try {
client = serverTransport_.accept();
if (client != null) {
...
while (true) {
if (eventHandler_ != null) {
eventHandler_.processContext(connectionContext,
inputTransport, outputTransport);
}
// 這里處理請求
processor.process(inputProtocol, outputProtocol);
}
}
} catch (...) {
...
}
...
}
setServing(false);
}
繼續(xù)進(jìn)入 org.apache.thrift.TBaseProcessor#process:
@Override
public void process(TProtocol in, TProtocol out) throws TException {
TMessage msg = in.readMessageBegin();
// 獲取 ProcessFunction
ProcessFunction fn = processMap.get(msg.name);
if (fn == null) {
...
} else {
// 繼續(xù)處理
fn.process(msg.seqid, in, out, iface);
}
}
通過調(diào)試的方式,可以看到得到的fn如下:

繼續(xù)進(jìn)入 org.apache.thrift.ProcessFunction#process 方法:
public final void process(int seqid, TProtocol iprot, TProtocol oprot, I iface) throws TException {
T args = getEmptyArgsInstance();
try {
// 讀取參數(shù)
args.read(iprot);
} catch (TProtocolException e) {
...
return;
}
iprot.readMessageEnd();
TSerializable result = null;
byte msgType = TMessageType.REPLY;
try {
// 處理結(jié)果
result = getResult(iface, args);
} catch (...) {
...
}
if(!isOneway()) {
...
}
}
這個方法中主要是讀取方法的執(zhí)行參數(shù),讀取到的內(nèi)容如下:

到這里,服務(wù)端的類、方法以及方法的參數(shù)都已經(jīng)獲取了,接下來就是方法的執(zhí)行了,繼續(xù)進(jìn)入HelloService.Processor.hello#getResult:
public hello_result getResult(I iface, hello_args args) throws org.apache.thrift.TException {
hello_result result = new hello_result();
result.success = iface.hello(args.text);
return result;
}
這個iface就是HelloServiceImpl,最終執(zhí)行的就是HelloServiceImpl#hello方法了。
總結(jié)
本文主要分析了thrift請求處理流程,過程如下:
客戶端調(diào)用本地方法時,本地方法會把調(diào)用的類名、方法名以及方法參數(shù)通過 socket連接發(fā)往服務(wù)端;服務(wù)端收到客戶端的數(shù)據(jù)后,根據(jù)類名與方法名找到對應(yīng)的處理方法,調(diào)用方法時使用的參數(shù)值就是客戶端傳遞的參數(shù)值; 服務(wù)端調(diào)用完具體的方法后,再將方法的執(zhí)行結(jié)果通過 socket返回給客戶端;客戶端通過 socket接收到結(jié)果后,再把結(jié)果返回給本地方法。
限于作者個人水平,文中難免有錯誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處。
本文首發(fā)于微信公眾號 「Java技術(shù)探秘」,如果您喜歡本文,歡迎關(guān)注該公眾號,讓我們一起在技術(shù)的世界里探秘吧!
