bizsocket異步 socket 庫
bizsocket 是一個(gè)異步 socket 庫,對(duì)一些業(yè)務(wù)場(chǎng)景做了支持
-
斷線重連
-
一對(duì)一請(qǐng)求
-
通知、粘性通知
-
串行請(qǐng)求合并
-
包分片處理(AbstractFragmentRequestQueue)
-
緩存
-
攔截器
-
支持rxjava,提供類似于retrofit的支持
使用方式
Maven
<dependency> <groupId>com.github.typ0520</groupId> <artifactId>bizsocket-rx</artifactId> <version>1.0.0</version> </dependency>
or Gradle
buildscript {
repositories {
jcenter()
}
}
dependencies {
compile 'com.github.typ0520:bizsocket-rx:1.0.0'
}
適用協(xié)議
如果想使用此庫,客戶端和服務(wù)器的通訊協(xié)議中必須要有命令號(hào)、包序列號(hào)這兩個(gè)字段
-
命令號(hào)代表請(qǐng)求類型,可以想象成http中url的作用
-
包序列號(hào)是數(shù)據(jù)包的唯一索引,客戶端發(fā)起請(qǐng)求時(shí)為數(shù)據(jù)包生成一個(gè)唯一索引,服務(wù)器返回請(qǐng)求對(duì)應(yīng)的結(jié)果時(shí)把這個(gè)包序列號(hào)帶回去
協(xié)議可以類似于下面這種:
| cmd | packetId | contentLength | content |
|---|---|---|---|
| int | int | int | byte[] |
也可以類似于下面這樣的每個(gè)數(shù)據(jù)包都是一段json字符串,包與包之間用換行符分割
{"cmd": xxx , "packetId": xxx , ...... }
數(shù)據(jù)包的創(chuàng)建是通過這兩個(gè)抽象類PacketFactory、Packet,整個(gè)庫的數(shù)據(jù)流轉(zhuǎn)都是通過命令號(hào)、包序列號(hào)這兩個(gè)字段來完成的,字段名、出現(xiàn)的位置以及形式不限,只要有這兩個(gè)字段就適用此庫
配置BizSocket
sample中client與server之間的通訊協(xié)議是
| length(int) | cmd(int) | seq(int) | content(byte[]) |
|---|---|---|---|
| 數(shù)據(jù)包的總長度 | 命令號(hào) | 數(shù)據(jù)包的唯一索引 | 報(bào)文體,可以想象成http協(xié)議中的body |
下面的代碼片段來自sample,建議把代碼拉下來看
1、 首先需要?jiǎng)?chuàng)建一個(gè)數(shù)據(jù)包類繼承自Packet
public class SamplePacket extends Packet {
static volatile int currentSeq = 0;
public int length;
public int cmd;
public int seq;
public String content;
@Override
public int getCommand() {
//覆蓋父類的抽象方法
return cmd;
}
@Override
public String getPacketID() {
//覆蓋父類的抽象方法
return String.valueOf(seq);
}
//獲取請(qǐng)求數(shù)據(jù)包byte[],寫給服務(wù)器
public byte[] toBytes() {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
BufferedSink bufferedSink = Okio.buffer(Okio.sink(bos));
try {
//包長 = 內(nèi)容長度 + 包頭固定的12個(gè)字節(jié)
ByteString byteString = ByteString.encodeUtf8(content);
bufferedSink.writeInt(byteString.size() + 12);
bufferedSink.writeInt(cmd);
bufferedSink.writeInt(seq);
bufferedSink.write(byteString);
bufferedSink.flush();
} catch (IOException e) {
e.printStackTrace();
}
return bos.toByteArray();
}
}
2、創(chuàng)建PacketFactory,主要用來從流中解析出server發(fā)給client的數(shù)據(jù)包
public class SamplePacketFactory extends PacketFactory {
@Override
public Packet getRequestPacket(Packet reusable,Request request) {
return new SamplePacket(request.command(),request.body());
}
@Override
public Packet getHeartBeatPacket(Packet reusable) {
return new SamplePacket(SampleCmd.HEARTBEAT.getValue(), ByteString.encodeUtf8("{}"));
}
@Override
public Packet getRemotePacket(Packet reusable,BufferedSource source) throws IOException {
SamplePacket packet = new SamplePacket();
packet.length = reader.readInt();
packet.cmd = reader.readInt();
packet.seq = reader.readInt();
//減去協(xié)議頭的12個(gè)字節(jié)長度
packet.content = reader.readString(packet.length - 12, Charset.forName("utf-8"));
return packet;
}
}
3、配置client
public class SampleClient extends AbstractBizSocket {
public SampleClient(Configuration configuration) {
super(configuration);
}
@Override
protected PacketFactory createPacketFactory() {
return new SamplePacketFactory();
}
}
-
3、啟動(dòng)client,以j2se為例寫一個(gè)main方法
public static void main(String[] args) {
SampleClient client = new SampleClient(new Configuration.Builder()
.host("127.0.0.1")
.port(9103)
.readTimeout(TimeUnit.SECONDS,30)
.heartbeat(60)
.build());
client.getInterceptorChain().addInterceptor(new Interceptor() {
@Override
public boolean postRequestHandle(RequestContext context) throws Exception {
System.out.println("發(fā)現(xiàn)一個(gè)請(qǐng)求postRequestHandle: " + context);
return false;
}
@Override
public boolean postResponseHandle(int command, Packet responsePacket) throws Exception {
System.out.println("收到一個(gè)包postResponseHandle: " + responsePacket);
return false;
}
});
try {
//連接
client.connect();
//啟動(dòng)斷線重連
client.getSocketConnection().bindReconnectionManager();
//開啟心跳
client.getSocketConnection().startHeartBeat();
} catch (Exception e) {
e.printStackTrace();
}
//注冊(cè)通知,接收服務(wù)端的推送
client.subscribe(client, SampleCmd.NOTIFY_PRICE.getValue(), new ResponseHandler() {
@Override
public void sendSuccessMessage(int command, ByteString requestBody, Packet responsePacket) {
System.out.println("cmd: " + command + " ,requestBody: " + requestBody + " responsePacket: " + responsePacket);
}
@Override
public void sendFailureMessage(int command, Throwable error) {
System.out.println(command + " ,err: " + error);
}
});
//發(fā)起一對(duì)一請(qǐng)求
String json = "{\"productId\" : \"1\",\"isJuan\" : \"0\",\"type\" : \"2\",\"sl\" : \"1\"}";
client.request(new Request.Builder().command(SampleCmd.CREATE_ORDER.getValue()).utf8body(json).build(), new ResponseHandler() {
@Override
public void sendSuccessMessage(int command, ByteString requestBody, Packet responsePacket) {
System.out.println("cmd: " + command + " ,requestBody: " + requestBody + " attach: " + " responsePacket: " + responsePacket);
}
@Override
public void sendFailureMessage(int command, Throwable error) {
System.out.println(command + " ,err: " + error);
}
});
//如果想用rxjava的形式調(diào)用也是支持的,提供了類似于retrofit通過動(dòng)態(tài)代理創(chuàng)建的service類來調(diào)用
BizSocketRxSupport rxSupport = new BizSocketRxSupport.Builder()
.requestConverter(new JSONRequestConverter())
.requestConverter(new JSONRequestConverter())
.bizSocket(client)
.build();
SampleService service = rxSupport.create(SampleService.class);
JSONObject params = new JSONObject();
try {
params.put("pageSize","10000");
} catch (JSONException e) {
e.printStackTrace();
}
service.queryOrderList(params).subscribe(new Subscriber<JSONObject>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(JSONObject jsonObject) {
System.out.println("rx response: " + jsonObject);
}
});
//阻塞主線程,防止程序退出,可以想象成android中的Looper類
while (true) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}