0%

Java中使用KCP协议

传统游戏项目一般使用TCP协议进行通信,得益于它的稳定和可靠,不过在网络不稳定的情况下,会出现丢包严重。
不过近期有不少基于UDP的应用层协议,声称对UDP的不可靠进行了改造,这意味着我们既可以享受网络层提供稳定可靠的服务,又可以享受它的速度。
KCP就是这样的一个协议

不过网上说的再天花乱坠,我们也得亲自调研,分析源码和它的机制,并测试它的性能,是否满足项目上线要求。本文从C版本的源码入手理解KCP的机制,再研究各种Java版本的实现

一、KCP协议

原版源码(C代码):https://github.com/skywind3000/kcp

基于底层协议(一般是UDP)之上,完全在应用层实现类TCP的可靠机制(快速重传,拥塞控制等)

二、KCP特性

KCP实现以下特性,也可参考github中README中对KCP的定义

特性 说明 源码位置
RTO优化 超时时间计算优于TCP ikcp_update_ack
选择性重传 KCP只重传真正丢失的数据包,TCP会全部重传丢失包之后的全部数据 ikcp_parse_fastack,ikcp_flush
快速重传 根据配置,可以在丢失包被跳过一定次数后直接重传,不等RTO超时 ikcp_parse_fastack,ikcp_flush
UNA + ACK ARQ模型响应有两种,UNA(此编号前所有包已收到,如TCP),ACK(该编号包已收到),光用UNA将导致全部重传,光用ACK则丢失成本太高,以往协议都是二选其一,而 KCP协议中,除去单独的 ACK包外,所有包都有UNA信息。 ikcp_flush(每次update,都发送ACK)
非延迟ACK KCP可配置是否延迟发送ACK ikcp_update_ack
流量控制 同TCP的公平退让原则,发送窗口大小由:发送缓存大小、接收端剩余接收缓存大小、丢包退让及慢启动这四要素决定 ikcp_input,
ikcp_flush

三、KCP报文

1. 报文解析源码

源码中对报文解析部分代码如下

1
2
3
4
5
6
7
8
9
10
data = ikcp_decode32u(data, &conv);
if (conv != kcp->conv) return -1;

data = ikcp_decode8u(data, &cmd);
data = ikcp_decode8u(data, &frg);
data = ikcp_decode16u(data, &wnd);
data = ikcp_decode32u(data, &ts);
data = ikcp_decode32u(data, &sn);
data = ikcp_decode32u(data, &una);
data = ikcp_decode32u(data, &len);

2. 报文定义

报文中标识的定义

名词 全称 备注 作用
conv conversation id 会话ID 每个连接的唯一标识
cmd command 命令 每个数据包指定逻辑
frg fragment count 数据分段序号 根据mtu(最大传输单元)和mss(最大报文长度)的数据分段
wnd window size 接收窗口大小 流量控制
ts timestamp 时间戳 数据包发送时间记录
sn serial number 数据报的序号 确保包的有序
una un-acknowledged serial number 对端下一个要接收的数据报序号 确保包的有序

3. 消息类型

KCP报文的四种消息类型

1
2
3
4
const IUINT32 IKCP_CMD_PUSH = 81;     // cmd: push data: 推送数据
const IUINT32 IKCP_CMD_ACK = 82; // cmd: ack: 对推送数据的确认
const IUINT32 IKCP_CMD_WASK = 83; // cmd: window probe (ask): 询问窗口大小
const IUINT32 IKCP_CMD_WINS = 84; // cmd: window size (tell): 回复窗口大小
  1. 报文结构

报文结构.png

四、源码解析

在网络四层模型中,KCP和TCP/UDP(传输层),IP(网络层)等协议有着本质上区别,理论上KCP是属于应用层协议。
KCP并不提供协议实际收发处理,它只是在传输层只上对消息和链接的一层中间管理。

在KCP的源码中,它仅仅包含ikcp.c和ikcp.h两个文件,仅提供KCP的数据管理和数据接口,而用户需要在应用层进行KCP的调度

1. 结构体定义

KCP分包结构KCP对象结构体定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
struct IKCPSEG
{
struct IQUEUEHEAD node;
IUINT32 conv; //用来标记这个seg属于哪个kcp
IUINT32 cmd;//这个包的指令是: // 数据 ack 询问/应答窗口大小
IUINT32 frg; //分包时,分包的序号,0为终结
IUINT32 wnd;//发送这个seg的这个端的 窗口大小--> 远端的接收窗口大小
IUINT32 ts; //我不知道为什么要用时间轴,这个都1秒,有什么用 ??
IUINT32 sn;//相当于tcp的ack
IUINT32 una;//una 远端等待接收的一个序号
IUINT32 len; //data的长度
IUINT32 resendts;//重发的时间轴
IUINT32 rto;//等于发送端kcp的 rx_rto->由 计算得来
IUINT32 fastack;//ack跳过的次数,用于快速重传
IUINT32 xmit;// fastack resend次数
char data[1];//当malloc时,只需要 malloc(sizeof(IKCPSEG)+datalen) 则,data长=数据长度+1 刚好用来放0
};

struct IKCPCB
{
//会话ID,最大传输单元,最大分片大小,状态 mss=mtu-sizeof(IKCPSEG)
IUINT32 conv, mtu, mss, state;
//第一个未接收到的包,待发送的包(可以认为是tcp的ack自增),接收消息的序号-> 用来赋seg的una值
IUINT32 snd_una, snd_nxt, rcv_nxt;
//前两个不知道干嘛 拥塞窗口的阈值 用来控制cwnd值变化的
IUINT32 ts_recent, ts_lastack, ssthresh;
//这几个变量是用来更新rto的
// rx_rttval 接收ack的浮动值
// rx_srtt 接收ack的平滑值
// rx_rto 计算出来的rto
// rx_minrto 最小rto
IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto;
//发送队列的窗口大小
//接收队列的窗口大小
//远端的接收队列的窗口大小
//窗口大小
//probe 用来二进制标记
IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe;
//时间轴 时间间隔 下一次flush的时间 xmit发射多少次? 看不到有什么地方用到
IUINT32 current, interval, ts_flush, xmit;
//接收到的数据seg个数
//需要发送的seg个数
IUINT32 nrcv_buf, nsnd_buf;
//接收队列的数据 seg个数
//发送队列的数据 seg个数
IUINT32 nrcv_que, nsnd_que;
//是否为nodelay模式:如果开启,rto计算范围更小
//updated 在调用flush时,有没有调用过update
IUINT32 nodelay, updated;
//请求访问窗口的时间相关 当远程端口大小为0时
IUINT32 ts_probe, probe_wait;
IUINT32 dead_link, incr;
//发送队列
struct IQUEUEHEAD snd_queue;
//接收队列
struct IQUEUEHEAD rcv_queue;
//待发送队列
struct IQUEUEHEAD snd_buf;
//待接收队列
struct IQUEUEHEAD rcv_buf;
//用来缓存自己接收到了多少个ack
IUINT32 *acklist;
IUINT32 ackcount;
IUINT32 ackblock;

//用户信息
void *user;
//好像就用来操作数据的中转站
char *buffer;
//快速重传的阈值
int fastresend;
//快速重传的上限
int fastlimit;
//是否无视重传等其它设置窗口
//steam模式的话,会将几个小包合并成大包
int nocwnd, stream;
int logmask;
int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user);
void (*writelog)(const char *log, struct IKCPCB *kcp, void *user);
};

2. 接口分析

分析C源码,KCP作为中间管理层,主要提供以下接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
//---------------------------------------------------------------------
// interface
//---------------------------------------------------------------------

// create a new kcp control object, 'conv' must equal in two endpoint
// from the same connection. 'user' will be passed to the output callback
// output callback can be setup like this: 'kcp->output = my_udp_output'
// 创建kcp对象,conv必须在两个端之间相同,user会被传递到output回调,
// output回调这样设置:kcp->output = my_udp_output
ikcpcb* ikcp_create(IUINT32 conv, void *user);

// release kcp control object
// 释放kcp对象
void ikcp_release(ikcpcb *kcp);

// set output callback, which will be invoked by kcp
// 设置kcp调用的output回调
void ikcp_setoutput(ikcpcb *kcp, int (*output)(const char *buf, int len,
ikcpcb *kcp, void *user));

// user/upper level recv: returns size, returns below zero for EAGAIN
// 用户层/上层 接收消息:返回接收长度,数据读取错误返回值小于0
int ikcp_recv(ikcpcb *kcp, char *buffer, int len);

// user/upper level send, returns below zero for error
// 用户层/上层 发送消息,错误返回值小于0
int ikcp_send(ikcpcb *kcp, const char *buffer, int len);

// update state (call it repeatedly, every 10ms-100ms), or you can ask
// ikcp_check when to call it again (without ikcp_input/_send calling).
// 'current' - current timestamp in millisec.
// 更新状态(每10ms-100ms调用一次),或者你可以通过调用ikcp_check,
// 来得知什么时候再次调用(不调用ikcp_input/_send)
// current - 当前时间戳(毫秒)
void ikcp_update(ikcpcb *kcp, IUINT32 current);

// Determine when should you invoke ikcp_update:
// returns when you should invoke ikcp_update in millisec, if there
// is no ikcp_input/_send calling. you can call ikcp_update in that
// time, instead of call update repeatly.
// Important to reduce unnacessary ikcp_update invoking. use it to
// schedule ikcp_update (eg. implementing an epoll-like mechanism,
// or optimize ikcp_update when handling massive kcp connections)
// 决定你什么时候调用ikcp_update
// 返回你多少毫秒后应该调用ikcp_update,如果没有ikcp_input/_send调用,你可以在那个时间
// 调用ikcp_updates来代替自己驱动update调用
// 用于减少不必要的ikcp_update调用。用这个来驱动ikcp_update(比如:实现类epoll的机制,
// 或者优化处理大量kcp连接时的ikcp_update调用)
IUINT32 ikcp_check(const ikcpcb *kcp, IUINT32 current);

// when you received a low level packet (eg. UDP packet), call it
// 接收下层数据包(比如:UDP数据包)时调用
int ikcp_input(ikcpcb *kcp, const char *data, long size);

// flush pending data
// 刷新数据
void ikcp_flush(ikcpcb *kcp);

// check the size of next message in the recv queue
// 检测接收队列里下条消息的长度
int ikcp_peeksize(const ikcpcb *kcp);

// change MTU size, default is 1400
// 修改MTU长度,默认1400
int ikcp_setmtu(ikcpcb *kcp, int mtu);

// set maximum window size: sndwnd=32, rcvwnd=32 by default
// 设置最大窗口大小,默认值:sndwnd=32, rcvwnd=32
int ikcp_wndsize(ikcpcb *kcp, int sndwnd, int rcvwnd);

// get how many packet is waiting to be sent
// 获取准备发送的数据包
int ikcp_waitsnd(const ikcpcb *kcp);

// fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
// nodelay: 0:disable(default), 1:enable
// interval: internal update timer interval in millisec, default is 100ms
// resend: 0:disable fast resend(default), 1:enable fast resend
// nc: 0:normal congestion control(default), 1:disable congestion control
// 快速设置:ikcp_nodelay(kcp, 1, 20, 2, 1)
// nodelay:0:使用(默认),1:使用
// interval:update时间(毫秒),默认100ms
// resend:0:不适用快速重发(默认), 其他:自己设置值,若设置为2(则2次ACK跨越将会直接重传)
// nc:0:正常拥塞控制(默认), 1:不适用拥塞控制
int ikcp_nodelay(ikcpcb *kcp, int nodelay, int interval, int resend, int nc);

void ikcp_log(ikcpcb *kcp, int mask, const char *fmt, ...);

// setup allocator
// 设置kcp allocator
void ikcp_allocator(void* (*new_malloc)(size_t), void (*new_free)(void*));

// read conv
// 获取conv
IUINT32 ikcp_getconv(const void *ptr);

3. 调度逻辑

KCP调度逻辑.png

KCP关键接口:

  • 更新(上层驱动KCP状态更新)
    ikcp_update:kcp状态更新接口,需要上层进行调度,判断flush时间,满足条件调用ikcp_flush刷新数据,同时也负责对收到数据的kcp端回复ACK消息
  • 发送
    ikcp_send -> ikcp_update -> ikcp_output
    ikcp_send:上层调用发送接口,把数据根据mss值进行分片,设置分包编号,放到snd_queue队尾
    ikcp_flush:发送数据接口,根据对端窗口大小,拷贝snd_queue的数据到snd_buf,遍历snd_buf,满足条件则调用output回调(调用网络层的发送)
  • 接收
    ikcp_input -> ikcp_update -> ikcp_recv
    ikcp_input:解析上层输入数据,拷贝rcv_buf到rcv_queue
    ikcp_recv:数据接收接口,上层从rcv_queue中复制数据到网络层buffer

五、Java版本

从源码来看,作者对于KCP的设计仅仅是应用层的通信管理,对内是数据和连接管理,对外是上层需要调用的接口,所以理论上要在Java中使用,核心逻辑部分照着翻译成Java版本,在网络层实现一套KCP的调度即可。
实际调研看来,github上确实已有不上已经实现的版本,其中有不少代码实现非常优秀的版本,有着良好的封装,优秀的线程模型管理,各种附加优化等等。本着不重复造轮子的思想(对,就是懒),我们完全可以直接采用大佬们的作品,避开大佬们踩过的坑。
目前github上有几个高star的java版本实现,选取最高的三个进行分析

1. https://github.com/szhnet/kcp-netty.git(star:212)

实现原理:

1.KCP逻辑是源码的Java翻译版(一模一样)
2.UkcpServerChannel继承ServerChannel,UkcpServerBootStrap
3.用Boss线程EventLoopGroup的read事件来驱动KCP逻辑

优点:使用Netty的Boss线程Read事件来驱动KCP,不用while(true)的驱动;使用简单,只需使用指定的ServerChannel和ServerBootStrap来启动Netty
缺点:无明显缺点

2. https://github.com/beykery/jkcp.git(star:172)

实现原理:

1.KCP逻辑是源码的Java翻译版(一模一样)
2.启动指定线程数的KcpThread自定义IO线程池,进行KCP逻辑调度
3.Netty读消息时抛到KcpThread自定义IO线程

1
2
3
4
5
// 通过hash选择IO线程处理
InetSocketAddress sender = dp.sender();
int hash = sender.hashCode();
hash = hash < 0 ? -hash : hash;
this.workers[hash % workers.length].input(dp);

优点:代码简单明了,容易理解,核心是翻译版源码,外壳套的是Netty+自定义IO线程池
缺点:IO线程池会while(true)的调用KCP的update

3. https://github.com/l42111996/java-Kcp.git(star:187)

实现原理:

1.KCP逻辑是源码的Java翻译版(一模一样)
2.Netty读消息时,扔到定时器,1ms后,抛出任务到自定义IO线程

优点:拥有1的全部优点,也在Netty的读消息,把消息抛到定时器去调用KCP的逻辑,避免了2的无意义的while(true),同时实现功能更全,有上线项目验证(据作者描述)
缺点:Netty相关逻辑完全封装起来,不能修改任何Netty参数(不过源码中对Netty的参数已配置的很好了)

目前看来,第三种实现(https://github.com/l42111996/java-Kcp.git)是最理想的方式

如果大家感兴趣,后边会对第三种实现进行详细的源码分析

六、性能测试

近期准备做性能测试进行对比,感兴趣的朋友可以关注下

1
// TODO
大爷赏点儿呗.

欢迎关注我的其它发布渠道