要写一个 TCP 服务端,实现处理在纯 TCP 流中传输的 Protocol buffers 数据。网络框架很早就选好了,用性能杰出的 gnet,问题是 gnet 的示例库里面没有直接解析纯 Protocol buffers 的编解码器,于是乎只能自己动手了…

协议分析

从 TCP 流里面传过来的是经过简单处理的 Protocol buffers 数据,他在数据的头携带了这个数据包的长度信息,像是这样

[ 头 ][ 数据 ][ 头 ][ 数据 ][ 头 ][ 数据 ][ 头 ][ 数据 ][ 头 ][ 数据 ]

调用 golang 的 proto 官方库中的 func DecodeVarint(b []byte) (uint64, int) 方法可以从数据中拿到两个值,分别是 数据的完整长度、标明数据长度的头信息的长度。

由于没有特定的协议在包与包之间进行明显的划分,所以得用他的头数据来进行分包。

解码器

// 储存连接内的相关信息
type DataStruct struct {
	fullLength   int
	lenNumLength int
	fullData     []byte
}

func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
	ctx, ok := c.Context().(context.Context)
	if !ok {
		err := c.Close()
		if err != nil {
			return nil, nil
		}
	}

    // 从上下文里面拿出这个连接的编解码器储存 struct
	r, ok := ctx.Value("codec").(DataStruct)
	if !ok {
		err := c.Close()
		if err != nil {
			return nil, nil
		}
	}

    // 读取缓冲区内的所有信息
	bytes := c.Read()

    // 判断是否已经开始读取包
	if len(r.fullData) == 0 {

        // 调用函数获取头中带的信息
		var fullLength uint64
		fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
		r.fullLength = int(fullLength)
		fmt.Println(r.fullLength, r.lenNumLength)
		if r.fullLength == 0 {
			return nil, nil
		}
	}

    // 拿到当前时间已经被储存进 struct 的数据的长度
	fullDataLong := len(r.fullData)

    // 把读到的数据一把梭全部拼进 fullData
	r.fullData = append(r.fullData, bytes...)

    // 判断长度是否符合要求
	if len(r.fullData) >= r.fullLength+r.lenNumLength {
		c.ShiftN(r.fullLength + r.lenNumLength - fullDataLong)

        // 截取有效的数据
		res := r.fullData[r.lenNumLength : r.fullLength+r.lenNumLength]

        // 连接的缓存清空
		r.fullData = []byte{}
		ctx = context.WithValue(ctx, "codec", r)
		c.SetContext(ctx)
		return res, nil
	}

    // 移动读取指针
	c.ShiftN(len(bytes))
	ctx = context.WithValue(ctx, "codec", r)
	c.SetContext(ctx)
	return nil, nil
}

上面那种解码方式是目前看运行状况来说暂时没有出现问题的方法,下面那一种则比较节省内存,两种解码方式区别主要是在于调用的 Read 函数不同,前者是把 gnet 的 ring buffer 里面的内容全部读取出来,而后者是先把头读取出来,拿到了完整的数据长度信息之后调用 ReadN 函数直接准确的将包体取出。

func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
	ctx, ok := c.Context().(context.Context)
	if !ok {
		err := c.Close()
		if err != nil {
			return nil, nil
		}
	}

    // 从上下文里面拿出这个连接的编解码器储存 struct
	r, ok := ctx.Value("codec").(DataStruct)
	if !ok {
		err := c.Close()
		if err != nil {
			return nil, nil
		}
	}
    
	if len(r.fullData) == 0 {
		_, bytes := c.ReadN(10)
		var fullLength uint64
		fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
		r.fullLength = int(fullLength)
		fmt.Println(r.fullLength, r.lenNumLength)
		if r.fullLength == 0 {
			return nil, nil
		}
	}
    
	fullDataLong := len(r.fullData)
	n, bytes := c.ReadN(r.fullLength + r.lenNumLength - fullDataLong)
	r.fullData = append(r.fullData, bytes...)
	c.ShiftN(n)
	if len(r.fullData) >= r.fullLength+r.lenNumLength {
		res := r.fullData[r.lenNumLength :]
		r.fullData = []byte{}
		ctx = context.WithValue(ctx, "codec", r)
		c.SetContext(ctx)
		return res, nil
	}
	ctx = context.WithValue(ctx, "codec", r)
	c.SetContext(ctx)
	return nil, nil
}

在代码中也可以看见,头数据中的包体长度信息我是存在连接的上下文中的,所以在 gnet 触发连接打开的事件时需要将储存信息的 struct 塞进上下文中。

func (es *EventServer) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
	ctx := context.WithValue(context.Background(), "codec", DataStruct{})
	c.SetContext(ctx)
	return
}

编码器

编码器这个部分就非常简单了,直接调用 proto 库里面的 EncodeVarint 函数就可以生成这个包体的头,将头信息放在包体的前面就可以将这个数据发送到客户端了。

func (d *Codec) Encode(c gnet.Conn, buf []byte) ([]byte, error) {
	buf = append(proto.EncodeVarint(uint64(len(buf))), buf...)
	return buf, nil
}

2021-11-09 更新

大意了,之前用上下文存储中间信息的方法有 严重的性能问题,在调用 golang 原生的 context.WithValue 方法时候,会在传入的上下文下面创建一个子上下文,这就导致了在一次又一次解码中,上下文树越来越庞大,而且每一层上下文内部都存储了本次解码的 DataStruct,造成内存泄漏的问题。

在苦苦查了好几天,并且修了几个有可能的内存泄漏隐患之后我才意识到这一点(秃头.jpg)

然后再看了下 gnet.Conn 的一个实现的 Context() 方法,发现他只是将我们传进去的东西存在了一个 map 里面,并不需要使用 context 相关的,所以简单的解决方法就是直接将 DataStruct 传进去,目前来看是解决了内存泄漏的问题,代码如下

func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
	// 从上下文里面拿出这个连接的编解码器储存 struct
	r, ok := c.Context().(DataStruct)
	if !ok {
		err := c.Close()
		if err != nil {
			return nil, nil
		}
	}
    
	if len(r.fullData) == 0 {
		_, bytes := c.ReadN(10)
		var fullLength uint64
		fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
		r.fullLength = int(fullLength)
		fmt.Println(r.fullLength, r.lenNumLength)
		if r.fullLength == 0 {
			return nil, nil
		}
	}
    
	fullDataLong := len(r.fullData)
	n, bytes := c.ReadN(r.fullLength + r.lenNumLength - fullDataLong)
	r.fullData = append(r.fullData, bytes...)
	c.ShiftN(n)
	if len(r.fullData) >= r.fullLength+r.lenNumLength {
		res := r.fullData[r.lenNumLength :]
		r.fullData = []byte{}
		c.SetContext(r)
		return res, nil
	}
	ctx = context.WithValue(ctx, "codec", r)
	c.SetContext(r)
	return nil, nil
}
func (es *EventServer) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
	var r = DataStruct{}
	c.SetContext(r)
	return
}

2021-12-24 更新

最近 gnet 发布了 v1.6.x 新版本,新版本的 编解码器行为有所改变,所以需要改造一下代码

主要的改动是在 gnet 库的 eventloop_unix.go 文件的 d1ca7f3 commit 中将进入 React 的时间点从返回的 packet 不为 nil 改为了返回的 err 不为 nil,所以在升级后需要做对应的修改

var (
	ContinueRead = errors.New("continue read")
)


func (d *Codec) Decode(c gnet.Conn) ([]byte, error) {
	// 从上下文里面拿出这个连接的编解码器储存 struct
	r, ok := c.Context().(DataStruct)
	if !ok {
		err := c.Close()
		if err != nil {
			return nil, nil
		}
	}
    
	if len(r.fullData) == 0 {
		_, bytes := c.ReadN(10)
		var fullLength uint64
		fullLength, r.lenNumLength = proto.DecodeVarint(bytes)
		r.fullLength = int(fullLength)
		fmt.Println(r.fullLength, r.lenNumLength)
		if r.fullLength == 0 {
			return nil, ContinueRead
		}
	}
    
	fullDataLong := len(r.fullData)
	n, bytes := c.ReadN(r.fullLength + r.lenNumLength - fullDataLong)
	r.fullData = append(r.fullData, bytes...)
	c.ShiftN(n)
	if len(r.fullData) >= r.fullLength+r.lenNumLength {
		res := r.fullData[r.lenNumLength :]
		r.fullData = []byte{}
		c.SetContext(r)
		return res, nil
	}
	ctx = context.WithValue(ctx, "codec", r)
	c.SetContext(r)
	return nil, ContinueRead
}

参考资料