重新理解 TCP 中的流

Demon.Lee 2022年11月03日 1,434次浏览

新华字典中对“流”字的解释有不少,其中一项是:像水那样流动不定。


像水流那样,怎么理解呢?

流没有边界

在一条水管中,只要上游不关闭,那么水管中就会一直有水流,没有边界,这就是流的特征。

而 TCP 协议就是一种流式协议,所以利用它传输数据时,也是没有边界的。

TCP 数据流

我们举一个例子:利用 TCP 协议传输字符串 Hello, World

在之前 TCP 程序示例中,发送端通过 writesend 函数发送完之后,接收端立马就收到了整个 Hello, World 字符串。

这就会造成一个误解:发送端在一个 TCP 分组[1]中把整个字符串发送过去,对方也是一次性接收的。

xxxxxxHello, Worldxxxxxx

这种理解是不对的。在良好的网络和传输数据小的条件下,这种情况可能很常见。但如果数据量大,并且网络不好,那么我们看到的现象可能完全不一样。比如,Hell 在第一个 TCP 分组中发送出去,而 o, World 在第二个分组中发送出去:

xxxxxxHell
o, Worldxxxxxx

可以看到,这样的组合会随着传输数据的增加而不断膨胀,你完全不知道最后是按什么顺序发送出去的。

从上图可知,我们调用 writesend 函数后,数据先存储到内核缓冲区,然后交由 TCP 协议栈处理。那数据什么时候会真正发送出去呢?这取决于缓冲区大小、拥塞窗口大小、滑动窗口大小等条件,关于这些内容,笔者将会在后续文章中进行总结。

上面是从发送端的角度来看的,那么从接收端呢?

xxxxxxHello, Worldxxxxxx

同样,从接收端来看,其缓冲区也是以字节流的形式存在的,如上所示。

数据会先到达接收端的内核缓冲区,随着应用程序不断从接收缓冲区拉取数据,接收缓冲区便会空出更多的空间用于容纳新的数据。所以,不论发生端的数据按照什么样的 TCP 分组,接收端的字节流形式不会变,因为 TCP 协议会保证字节流的顺序不乱。

参考《TCP/IP 网络编程》,总结一下 TCP 流式协议:

  • 可靠的:不会丢失数据。如果丢了,协议栈会重发;
  • 有序的:按序发,按序收。如果后发的包先到达,那么协议栈会等前面的数据到了之后,再发送 ACK 回复。也就是说,流中某个区间段的数据都按序到达后,才会形成可以被应用程序读取的数据流;
  • 基于字节流的:没有边界的流传输,逻辑上的一次完整交互怎么算,需要应用层来定义;
  • 基于连接的:客户端与服务端连接是一一对应的。

既然字节流没有边界,那就需要应用层来定义,要不然无法区分每一次完整交互的信息。按笔者的理解,这个数据格式的定义其实就是序列化。

序列化

维基百科[2]中对序列化的定义是:

序列化(serialization)在计算机科学的资料处理中,是指将数据结构对象状态转换成可取用格式(例如存成文件,存于缓冲,或经由网络中发送),以留待后续在相同或另一台计算机环境中,能恢复原先状态的过程。依照序列化格式重新获取字节的结果时,可以利用它来产生与原始对象相同语义的副本。对于许多对象,像是使用大量引用的复杂对象,这种序列化重建的过程并不容易。

再看何小锋老师在《RPC 实战与核心原理》这门课中下的定义,笔者觉得更通俗易懂:

实际上任何一种序列化框架,核心思想就是设计一种序列化协议,将对象的类型、属性类型、属性值一一按照固定的格式写到二进制字节流中来完成序列化,再按照固定的格式一一读出对象的类型、属性类型、属性值,通过这些信息重新创建出一个新的对象,来完成反序列化。

简单来说:序列化就是将对象转换成二进制数据的过程,而反序列就是反过来将二进制转换为对象的过程。

前面提到,TCP 是流式协议,无法区分数据的边界,那么我们便可以使用某种序列化协议来定义数据格式。

一般来说有两种方式:1)显示编码报文长度;2)使用特殊字符来进行边界划分。

显示编码报文长度

简单来说,就是发送端将数据长度通过报文预先告知接收端,接收端收到数据长度后,按需读取报文数据。

比如下面的 message 结构体:

struct message {
    u_int32_t len;
    u_int32_t type;
    char buf[256];
};


假设发送方使用如下代码发送数据:

void send_data(int socket_fd) {
    struct message msg;
    bzero(&msg, sizeof(msg));
    strcpy(msg.buf, "abcdef好");
    msg.len = htonl(strlen(msg.buf));
    msg.type = htonl(1001);

    unsigned int total_length = strlen(msg.buf) + sizeof(msg.len) + sizeof(msg.type);
    ssize_t n_written = send(socket_fd, &msg, total_length, 0);
    fprintf(stdout, "send into buffer: %ld\n", n_written);
    if (n_written != total_length) {
        error_handling(stderr, "client send message failed");
    }
}

那么接收方便可以使用如下方式来解析数据:

// read special length data from socket
size_t readn(int fd, void *buffer, size_t length) {
    char *ptr = buffer;
    size_t count = length;
    ssize_t read_num;

    while (count > 0) {
        read_num = read(fd, ptr, count);
        if (read_num == 0) {
            break;
        } else if (read_num < 0) {
            if (EINTR == errno) {
                continue;
            }
            error_logging(stderr, "read from socket error");
            return -1;
        }
        count -= read_num;
        ptr += read_num;
    }
    return (length - count);
}

// read data from socket via message struct
void read_data(int sockfd) {
    struct message msg;
    bzero(&msg, sizeof(msg));

    // msg.len
    size_t read_len_expect = sizeof(msg.len);
    size_t read_len_actual = readn(sockfd, &msg.len, read_len_expect);
    if (read_len_actual != read_len_expect) {
        error_logging(stderr, "read msg.len error");
        return;
    }
    // convert network byte order to host byte order
    msg.len = ntohl(msg.len);
    printf("msg.len: %d\n", msg.len);
    if (msg.len >= sizeof(msg.buf)) {
        error_logging(stderr, "msg.len is too large...");
        return;
    }

    // msg.type
    read_len_expect = sizeof(msg.type);
    read_len_actual = readn(sockfd, &msg.type, read_len_expect);
    if (read_len_actual != read_len_expect) {
        error_logging(stderr, "read msg.type error");
        return;
    }
    msg.type = ntohl(msg.type);
    printf("msg.type: %d\n", msg.type);

    // msg.buf
    read_len_actual = readn(sockfd, &msg.buf, msg.len);
    if (read_len_actual != msg.len) {
        error_logging(stderr, "read msg.buf error");
        return;
    }
}

特殊字符作为边界

再来看第二种方式,即使用特殊字符。我们天天用的 http 协议就是这么玩的。

接下来笔者就以 http 协议为样例,了解如何使用特殊字符来划分数据流的边界。

我们先来看看,http 报文格式是怎样的:


其中的实体数据可以是各种格式:文本,或图片、视频等二进制数据。

通过 Wireshark 抓包,来看看实际的包格式:



一个是请求报文:

GET / HTTP/1.1\r\n
Host: baidu.com\r\n
User-Agent: curl /7.83.1\r\n
Accept: */*\r\n
\r\n

一个是响应报文:

HTTP/1.1 200 OK\r\n
Date: Thu, 16 Jun 2022 12:07:28 GMT\r\n
Server: Apache\r\n
Last-Modified: Tue, 12 Jan 2010 13:48:00 GMT\r\n
ETag: "51-47cf7eee8400"\r\n
Accept-Ranges: bytes\r\n
Content-Length: 81\r\n
Cache-Control: max-age=86400\r\n
Expires: Fri, 17 Jun 2022 12:07:28 GMT\r\n
Connection: Keep-Alive\r\n
Content-Type: text/html\r\n
\r\n
<html>\n
<meta http-equiv="refresh" content="0;url=http://www.baidu.com/">\n
</html>\n

我们以请求对象为例,对数据格式抽象一下:


可以看到,\r\n 就是用来区分头部和正文的特殊字符。试着问问自己,如果是你,你的解析程序会怎么写:即利用这些字符进行边界的处理。

上面说的是头部和正文的边界划分,那正文结束的标志又是什么?

http 协议早已为我们想到了:

1)一般情况下,头部会有一个 Content-Length 字段,它会显示说明 body 的长度,比如上面示例中的 81,即请求正文有 81 个字节。

2)如果传输时不知道总长度,比如分块传输(chunked transfer)[3],此时头部会有 Transfer-Encoding: chunked 字段来标识[4],并且每一块数据都有长度和回车换行符,在最后一块后面有结束块标识,具体可以查看如下图示:


下面是来自罗剑锋老师《透视 HTTP 协议》课程中的一个实战案例,抓包可以看到对应的分块传输内容:

[2 Reassembled TCP Segments (227 bytes): #7(182), #9(45)]
Hypertext Transfer Protocol
    HTTP/1.1 200 OK\r\n
    Server: openresty/1.19.3.2\r\n
    Date: Sun, 30 Oct 2022 12:27:17 GMT\r\n
    Content-Type: text/plain\r\n
    Transfer-Encoding: chunked\r\n
    Connection: keep-alive\r\n
    \r\n
    [HTTP response 1/1]
    [Time since request: 0.000879000 seconds]
    [Request in frame: 5]
    [Request URI: http://www.chrono.com/16-1]
    HTTP chunked response
        Data chunk (15 octets)
            Chunk size: 15 octets
            Chunk data: 6368756e6b6564206461746120310a
            Chunk boundary: 0d0a
        Data chunk (15 octets)
            Chunk size: 15 octets
            Chunk data: 6368756e6b6564206461746120320a
            Chunk boundary: 0d0a
        Data chunk (15 octets)
            Chunk size: 15 octets
            Chunk data: 6368756e6b6564206461746120330a
            Chunk boundary: 0d0a
        End of chunked encoding
            Chunk size: 0 octets
        \r\n
    File Data: 45 bytes
Line-based text data: text/plain (3 lines)
    chunked data 1\n
    chunked data 2\n
    chunked data 3\n

那程序会如何处理这些特殊字符呢?笔者截取了 Go 1.19.2 中的部分代码[5],借此我们可以有一个初步的理解:

// .../src/net/http/response.go

// ReadResponse reads and returns an HTTP response from r.
// The req parameter optionally specifies the Request that corresponds
// to this Response. If nil, a GET request is assumed.
// Clients must call resp.Body.Close when finished reading resp.Body.
// After that call, clients can inspect resp.Trailer to find key/value
// pairs included in the response trailer.
func ReadResponse(r *bufio.Reader, req *Request) (*Response, error) {
	tp := textproto.NewReader(r)
	resp := &Response{
		Request: req,
	}

	// Parse the first line of the response.
	line, err := tp.ReadLine()
	if err != nil {
		if err == io.EOF {
			err = io.ErrUnexpectedEOF
		}
		return nil, err
	}
	proto, status, ok := strings.Cut(line, " ")
	if !ok {
		return nil, badStringError("malformed HTTP response", line)
	}
	resp.Proto = proto
	resp.Status = strings.TrimLeft(status, " ")

	statusCode, _, _ := strings.Cut(resp.Status, " ")
	if len(statusCode) != 3 {
		return nil, badStringError("malformed HTTP status code", statusCode)
	}
	resp.StatusCode, err = strconv.Atoi(statusCode)
	if err != nil || resp.StatusCode < 0 {
		return nil, badStringError("malformed HTTP status code", statusCode)
	}
	if resp.ProtoMajor, resp.ProtoMinor, ok = ParseHTTPVersion(resp.Proto); !ok {
		return nil, badStringError("malformed HTTP version", resp.Proto)
	}

	// Parse the response headers.
	mimeHeader, err := tp.ReadMIMEHeader()
	if err != nil {
		if err == io.EOF {
			err = io.ErrUnexpectedEOF
		}
		return nil, err
	}
	resp.Header = Header(mimeHeader)

	fixPragmaCacheControl(resp.Header)

	err = readTransfer(resp, r)
	if err != nil {
		return nil, err
	}

	return resp, nil
}
// .../src/net/textproto/reader.go

// ReadMIMEHeader reads a MIME-style header from r.
// The header is a sequence of possibly continued Key: Value lines
// ending in a blank line.
// The returned map m maps CanonicalMIMEHeaderKey(key) to a
// sequence of values in the same order encountered in the input.
//
// For example, consider this input:
//
//	My-Key: Value 1
//	Long-Key: Even
//	       Longer Value
//	My-Key: Value 2
//
// Given that input, ReadMIMEHeader returns the map:
//
//	map[string][]string{
//		"My-Key": {"Value 1", "Value 2"},
//		"Long-Key": {"Even Longer Value"},
//	}
func (r *Reader) ReadMIMEHeader() (MIMEHeader, error) {
	// Avoid lots of small slice allocations later by allocating one
	// large one ahead of time which we'll cut up into smaller
	// slices. If this isn't big enough later, we allocate small ones.
	var strs []string
	hint := r.upcomingHeaderNewlines()
	if hint > 0 {
		strs = make([]string, hint)
	}

	m := make(MIMEHeader, hint)

	// The first line cannot start with a leading space.
	if buf, err := r.R.Peek(1); err == nil && (buf[0] == ' ' || buf[0] == '\t') {
		line, err := r.readLineSlice()
		if err != nil {
			return m, err
		}
		return m, ProtocolError("malformed MIME header initial line: " + string(line))
	}

	for {
		kv, err := r.readContinuedLineSlice(mustHaveFieldNameColon)
		if len(kv) == 0 {
			return m, err
		}

		// Key ends at first colon.
		k, v, ok := bytes.Cut(kv, colon)
		if !ok {
			return m, ProtocolError("malformed MIME header line: " + string(kv))
		}
		key := canonicalMIMEHeaderKey(k)

		// As per RFC 7230 field-name is a token, tokens consist of one or more chars.
		// We could return a ProtocolError here, but better to be liberal in what we
		// accept, so if we get an empty key, skip it.
		if key == "" {
			continue
		}

		// Skip initial spaces in value.
		value := strings.TrimLeft(string(v), " \t")

		vv := m[key]
		if vv == nil && len(strs) > 0 {
			// More than likely this will be a single-element key.
			// Most headers aren't multi-valued.
			// Set the capacity on strs[0] to 1, so any future append
			// won't extend the slice into the other strings.
			vv, strs = strs[:1:1], strs[1:]
			vv[0] = value
			m[key] = vv
		} else {
			m[key] = append(vv, value)
		}

		if err != nil {
			return m, err
		}
	}
}

// readContinuedLineSlice reads continued lines from the reader buffer,
// returning a byte slice with all lines. The validateFirstLine function
// is run on the first read line, and if it returns an error then this
// error is returned from readContinuedLineSlice.
func (r *Reader) readContinuedLineSlice(validateFirstLine func([]byte) error) ([]byte, error) {
	if validateFirstLine == nil {
		return nil, fmt.Errorf("missing validateFirstLine func")
	}

	// Read the first line.
	line, err := r.readLineSlice()
	if err != nil {
		return nil, err
	}
	if len(line) == 0 { // blank line - no continuation
		return line, nil
	}

	if err := validateFirstLine(line); err != nil {
		return nil, err
	}

	// Optimistically assume that we have started to buffer the next line
	// and it starts with an ASCII letter (the next header key), or a blank
	// line, so we can avoid copying that buffered data around in memory
	// and skipping over non-existent whitespace.
	if r.R.Buffered() > 1 {
		peek, _ := r.R.Peek(2)
		if len(peek) > 0 && (isASCIILetter(peek[0]) || peek[0] == '\n') ||
			len(peek) == 2 && peek[0] == '\r' && peek[1] == '\n' {
			return trim(line), nil
		}
	}

	// ReadByte or the next readLineSlice will flush the read buffer;
	// copy the slice into buf.
	r.buf = append(r.buf[:0], trim(line)...)

	// Read continuation lines.
	for r.skipSpace() > 0 {
		line, err := r.readLineSlice()
		if err != nil {
			break
		}
		r.buf = append(r.buf, ' ')
		r.buf = append(r.buf, trim(line)...)
	}
	return r.buf, nil
}

结合上面两种报文格式,回过头再去理解 TCP 的流,相信会有不一样的感受。

总结

基于前面的讨论,我们现在知道了 TCP 是一种流式协议,而流是没有边界的。流没有边界,但应用层的数据交换是有边界的,故应用层需要进行数据交换格式的定义,封装和解析。

另外,通过 Socket 收发数据时,数据会先到输入/输出缓冲区,这些 I/O 缓冲区是在创建 Socket 时自动生成的,故每个 Socket 连接都有,不会存在冲突或并发的问题。笔者在之前聊 TCP 关闭连接的文章中曾经提到过,关闭 Socket 不会影响输出缓冲区,但会影响输入缓冲区,所以要优先使用 shutdown 函数关闭其中一方的流写入。

那 I/O 缓冲区满了以后,会导致数据丢失吗?比如接收方输入缓冲区只有 100 字节大小,发送方输出缓冲区也只有 100 字节大小,而此时发生方发了 500 字节的数据,数据会丢失吗?答案是不会,TCP 协议栈会处理这种情况。笔者将在后续的文章中介绍 TCP 的流量控制,敬请期待。


  1. TCP 是以段为单位发送数据的,即 MSS。MSS 是三次握手的时候,在两端主机之间被计算得出。 ↩︎

  2. https://zh.wikipedia.org/wiki/序列化 ↩︎

  3. 维基百科上的定义:“分块传输编码Chunked transfer encoding)是超文本传输协议(HTTP)中的一种数据传输机制,允许 HTTP 由网页服务器发送给客户端应用( 通常是网页浏览器)的数据可以分成多个部分。分块传输编码只在HTTP协议1.1版本(HTTP/1.1)中提供。”简单来说,就是正文数据比较大(注意是正文),将其拆成多个子部分分别传输,然后再由对端进行拼接。这个过程中,总长度是未知的,所以头部没有 Content-Length 字段。 ↩︎

  4. http 头部中 Content-LengthTransfer-Encoding: chunked 互斥,不会同时出现。 ↩︎

  5. https://github.com/golang/go/blob/go1.19.2/src/net/http/response.go ↩︎