新华字典中对“流”字的解释有不少,其中一项是:像水那样流动不定。
像水流那样,怎么理解呢?
流没有边界
在一条水管中,只要上游不关闭,那么水管中就会一直有水流,没有边界,这就是流的特征。
而 TCP 协议就是一种流式协议,所以利用它传输数据时,也是没有边界的。
我们举一个例子:利用 TCP 协议传输字符串 Hello, World
。
在之前 TCP 程序示例中,发送端通过 write
或 send
函数发送完之后,接收端立马就收到了整个 Hello, World
字符串。
这就会造成一个误解:发送端在一个 TCP 分组[1]中把整个字符串发送过去,对方也是一次性接收的。
xxxxxxHello, Worldxxxxxx
这种理解是不对的。在良好的网络和传输数据小的条件下,这种情况可能很常见。但如果数据量大,并且网络不好,那么我们看到的现象可能完全不一样。比如,Hell
在第一个 TCP 分组中发送出去,而 o, World
在第二个分组中发送出去:
xxxxxxHell
o, Worldxxxxxx
可以看到,这样的组合会随着传输数据的增加而不断膨胀,你完全不知道最后是按什么顺序发送出去的。
从上图可知,我们调用 write
或 send
函数后,数据先存储到内核缓冲区,然后交由 TCP 协议栈处理。那数据什么时候会真正发送出去呢?这取决于缓冲区大小、拥塞窗口大小、滑动窗口大小等条件,关于这些内容,笔者将会在后续文章中进行总结。
上面是从发送端的角度来看的,那么从接收端呢?
xxxxxxHello, Worldxxxxxx
同样,从接收端来看,其缓冲区也是以字节流的形式存在的,如上所示。
数据会先到达接收端的内核缓冲区,随着应用程序不断从接收缓冲区拉取数据,接收缓冲区便会空出更多的空间用于容纳新的数据。所以,不论发生端的数据按照什么样的 TCP 分组,接收端的字节流形式不会变,因为 TCP 协议会保证字节流的顺序不乱。
参考《TCP/IP 网络编程》,总结一下 TCP 流式协议:
- 可靠的:不会丢失数据。如果丢了,协议栈会重发;
- 有序的:按序发,按序收。如果后发的包先到达,那么协议栈会等前面的数据到了之后,再发送 ACK 回复。也就是说,流中某个区间段的数据都按序到达后,才会形成可以被应用程序读取的数据流;
- 基于字节流的:没有边界的流传输,逻辑上的一次完整交互怎么算,需要应用层来定义;
- 基于连接的:客户端与服务端连接是一一对应的。
既然字节流没有边界,那就需要应用层来定义,要不然无法区分每一次完整交互的信息。按笔者的理解,这个数据格式的定义其实就是序列化。
序列化
维基百科[2]中对序列化的定义是:
序列化(serialization)在计算机科学的资料处理中,是指将数据结构或对象状态转换成可取用格式(例如存成文件,存于缓冲,或经由网络中发送),以留待后续在相同或另一台计算机环境中,能恢复原先状态的过程。依照序列化格式重新获取字节的结果时,可以利用它来产生与原始对象相同语义的副本。对于许多对象,像是使用大量引用的复杂对象,这种序列化重建的过程并不容易。
再看何小锋老师在《RPC 实战与核心原理》这门课中下的定义,笔者觉得更通俗易懂:
实际上任何一种序列化框架,核心思想就是设计一种序列化协议,将对象的类型、属性类型、属性值一一按照固定的格式写到二进制字节流中来完成序列化,再按照固定的格式一一读出对象的类型、属性类型、属性值,通过这些信息重新创建出一个新的对象,来完成反序列化。
简单来说:序列化就是将对象转换成二进制数据的过程,而反序列就是反过来将二进制转换为对象的过程。
前面提到,TCP 是流式协议,无法区分数据的边界,那么我们便可以使用某种序列化协议来定义数据格式。
一般来说有两种方式:1)显示编码报文长度;2)使用特殊字符来进行边界划分。
显示编码报文长度
简单来说,就是发送端将数据长度通过报文预先告知接收端,接收端收到数据长度后,按需读取报文数据。
比如下面的 message
结构体:
struct message {
u_int32_t len;
u_int32_t type;
char buf[256];
};
假设发送方使用如下代码发送数据:
void send_data(int socket_fd) {
struct message msg;
bzero(&msg, sizeof(msg));
strcpy(msg.buf, "abcdef好");
msg.len = htonl(strlen(msg.buf));
msg.type = htonl(1001);
unsigned int total_length = strlen(msg.buf) + sizeof(msg.len) + sizeof(msg.type);
ssize_t n_written = send(socket_fd, &msg, total_length, 0);
fprintf(stdout, "send into buffer: %ld\n", n_written);
if (n_written != total_length) {
error_handling(stderr, "client send message failed");
}
}
那么接收方便可以使用如下方式来解析数据:
// read special length data from socket
size_t readn(int fd, void *buffer, size_t length) {
char *ptr = buffer;
size_t count = length;
ssize_t read_num;
while (count > 0) {
read_num = read(fd, ptr, count);
if (read_num == 0) {
break;
} else if (read_num < 0) {
if (EINTR == errno) {
continue;
}
error_logging(stderr, "read from socket error");
return -1;
}
count -= read_num;
ptr += read_num;
}
return (length - count);
}
// read data from socket via message struct
void read_data(int sockfd) {
struct message msg;
bzero(&msg, sizeof(msg));
// msg.len
size_t read_len_expect = sizeof(msg.len);
size_t read_len_actual = readn(sockfd, &msg.len, read_len_expect);
if (read_len_actual != read_len_expect) {
error_logging(stderr, "read msg.len error");
return;
}
// convert network byte order to host byte order
msg.len = ntohl(msg.len);
printf("msg.len: %d\n", msg.len);
if (msg.len >= sizeof(msg.buf)) {
error_logging(stderr, "msg.len is too large...");
return;
}
// msg.type
read_len_expect = sizeof(msg.type);
read_len_actual = readn(sockfd, &msg.type, read_len_expect);
if (read_len_actual != read_len_expect) {
error_logging(stderr, "read msg.type error");
return;
}
msg.type = ntohl(msg.type);
printf("msg.type: %d\n", msg.type);
// msg.buf
read_len_actual = readn(sockfd, &msg.buf, msg.len);
if (read_len_actual != msg.len) {
error_logging(stderr, "read msg.buf error");
return;
}
}
特殊字符作为边界
再来看第二种方式,即使用特殊字符。我们天天用的 http 协议就是这么玩的。
接下来笔者就以 http 协议为样例,了解如何使用特殊字符来划分数据流的边界。
我们先来看看,http 报文格式是怎样的:
其中的实体数据可以是各种格式:文本,或图片、视频等二进制数据。
通过 Wireshark 抓包,来看看实际的包格式:
一个是请求报文:
GET / HTTP/1.1\r\n
Host: baidu.com\r\n
User-Agent: curl /7.83.1\r\n
Accept: */*\r\n
\r\n
一个是响应报文:
HTTP/1.1 200 OK\r\n
Date: Thu, 16 Jun 2022 12:07:28 GMT\r\n
Server: Apache\r\n
Last-Modified: Tue, 12 Jan 2010 13:48:00 GMT\r\n
ETag: "51-47cf7eee8400"\r\n
Accept-Ranges: bytes\r\n
Content-Length: 81\r\n
Cache-Control: max-age=86400\r\n
Expires: Fri, 17 Jun 2022 12:07:28 GMT\r\n
Connection: Keep-Alive\r\n
Content-Type: text/html\r\n
\r\n
<html>\n
<meta http-equiv="refresh" content="0;url=http://www.baidu.com/">\n
</html>\n
我们以请求对象为例,对数据格式抽象一下:
可以看到,\r\n
就是用来区分头部和正文的特殊字符。试着问问自己,如果是你,你的解析程序会怎么写:即利用这些字符进行边界的处理。
上面说的是头部和正文的边界划分,那正文结束的标志又是什么?
http 协议早已为我们想到了:
1)一般情况下,头部会有一个 Content-Length
字段,它会显示说明 body 的长度,比如上面示例中的 81,即请求正文有 81 个字节。
2)如果传输时不知道总长度,比如分块传输(chunked transfer)[3],此时头部会有 Transfer-Encoding: chunked
字段来标识[4],并且每一块数据都有长度和回车换行符,在最后一块后面有结束块标识,具体可以查看如下图示:
下面是来自罗剑锋老师《透视 HTTP 协议》课程中的一个实战案例,抓包可以看到对应的分块传输内容:
[2 Reassembled TCP Segments (227 bytes): #7(182), #9(45)]
Hypertext Transfer Protocol
HTTP/1.1 200 OK\r\n
Server: openresty/1.19.3.2\r\n
Date: Sun, 30 Oct 2022 12:27:17 GMT\r\n
Content-Type: text/plain\r\n
Transfer-Encoding: chunked\r\n
Connection: keep-alive\r\n
\r\n
[HTTP response 1/1]
[Time since request: 0.000879000 seconds]
[Request in frame: 5]
[Request URI: http://www.chrono.com/16-1]
HTTP chunked response
Data chunk (15 octets)
Chunk size: 15 octets
Chunk data: 6368756e6b6564206461746120310a
Chunk boundary: 0d0a
Data chunk (15 octets)
Chunk size: 15 octets
Chunk data: 6368756e6b6564206461746120320a
Chunk boundary: 0d0a
Data chunk (15 octets)
Chunk size: 15 octets
Chunk data: 6368756e6b6564206461746120330a
Chunk boundary: 0d0a
End of chunked encoding
Chunk size: 0 octets
\r\n
File Data: 45 bytes
Line-based text data: text/plain (3 lines)
chunked data 1\n
chunked data 2\n
chunked data 3\n
那程序会如何处理这些特殊字符呢?笔者截取了 Go 1.19.2
中的部分代码[5],借此我们可以有一个初步的理解:
// .../src/net/http/response.go
// ReadResponse reads and returns an HTTP response from r.
// The req parameter optionally specifies the Request that corresponds
// to this Response. If nil, a GET request is assumed.
// Clients must call resp.Body.Close when finished reading resp.Body.
// After that call, clients can inspect resp.Trailer to find key/value
// pairs included in the response trailer.
func ReadResponse(r *bufio.Reader, req *Request) (*Response, error) {
tp := textproto.NewReader(r)
resp := &Response{
Request: req,
}
// Parse the first line of the response.
line, err := tp.ReadLine()
if err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
return nil, err
}
proto, status, ok := strings.Cut(line, " ")
if !ok {
return nil, badStringError("malformed HTTP response", line)
}
resp.Proto = proto
resp.Status = strings.TrimLeft(status, " ")
statusCode, _, _ := strings.Cut(resp.Status, " ")
if len(statusCode) != 3 {
return nil, badStringError("malformed HTTP status code", statusCode)
}
resp.StatusCode, err = strconv.Atoi(statusCode)
if err != nil || resp.StatusCode < 0 {
return nil, badStringError("malformed HTTP status code", statusCode)
}
if resp.ProtoMajor, resp.ProtoMinor, ok = ParseHTTPVersion(resp.Proto); !ok {
return nil, badStringError("malformed HTTP version", resp.Proto)
}
// Parse the response headers.
mimeHeader, err := tp.ReadMIMEHeader()
if err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
return nil, err
}
resp.Header = Header(mimeHeader)
fixPragmaCacheControl(resp.Header)
err = readTransfer(resp, r)
if err != nil {
return nil, err
}
return resp, nil
}
// .../src/net/textproto/reader.go
// ReadMIMEHeader reads a MIME-style header from r.
// The header is a sequence of possibly continued Key: Value lines
// ending in a blank line.
// The returned map m maps CanonicalMIMEHeaderKey(key) to a
// sequence of values in the same order encountered in the input.
//
// For example, consider this input:
//
// My-Key: Value 1
// Long-Key: Even
// Longer Value
// My-Key: Value 2
//
// Given that input, ReadMIMEHeader returns the map:
//
// map[string][]string{
// "My-Key": {"Value 1", "Value 2"},
// "Long-Key": {"Even Longer Value"},
// }
func (r *Reader) ReadMIMEHeader() (MIMEHeader, error) {
// Avoid lots of small slice allocations later by allocating one
// large one ahead of time which we'll cut up into smaller
// slices. If this isn't big enough later, we allocate small ones.
var strs []string
hint := r.upcomingHeaderNewlines()
if hint > 0 {
strs = make([]string, hint)
}
m := make(MIMEHeader, hint)
// The first line cannot start with a leading space.
if buf, err := r.R.Peek(1); err == nil && (buf[0] == ' ' || buf[0] == '\t') {
line, err := r.readLineSlice()
if err != nil {
return m, err
}
return m, ProtocolError("malformed MIME header initial line: " + string(line))
}
for {
kv, err := r.readContinuedLineSlice(mustHaveFieldNameColon)
if len(kv) == 0 {
return m, err
}
// Key ends at first colon.
k, v, ok := bytes.Cut(kv, colon)
if !ok {
return m, ProtocolError("malformed MIME header line: " + string(kv))
}
key := canonicalMIMEHeaderKey(k)
// As per RFC 7230 field-name is a token, tokens consist of one or more chars.
// We could return a ProtocolError here, but better to be liberal in what we
// accept, so if we get an empty key, skip it.
if key == "" {
continue
}
// Skip initial spaces in value.
value := strings.TrimLeft(string(v), " \t")
vv := m[key]
if vv == nil && len(strs) > 0 {
// More than likely this will be a single-element key.
// Most headers aren't multi-valued.
// Set the capacity on strs[0] to 1, so any future append
// won't extend the slice into the other strings.
vv, strs = strs[:1:1], strs[1:]
vv[0] = value
m[key] = vv
} else {
m[key] = append(vv, value)
}
if err != nil {
return m, err
}
}
}
// readContinuedLineSlice reads continued lines from the reader buffer,
// returning a byte slice with all lines. The validateFirstLine function
// is run on the first read line, and if it returns an error then this
// error is returned from readContinuedLineSlice.
func (r *Reader) readContinuedLineSlice(validateFirstLine func([]byte) error) ([]byte, error) {
if validateFirstLine == nil {
return nil, fmt.Errorf("missing validateFirstLine func")
}
// Read the first line.
line, err := r.readLineSlice()
if err != nil {
return nil, err
}
if len(line) == 0 { // blank line - no continuation
return line, nil
}
if err := validateFirstLine(line); err != nil {
return nil, err
}
// Optimistically assume that we have started to buffer the next line
// and it starts with an ASCII letter (the next header key), or a blank
// line, so we can avoid copying that buffered data around in memory
// and skipping over non-existent whitespace.
if r.R.Buffered() > 1 {
peek, _ := r.R.Peek(2)
if len(peek) > 0 && (isASCIILetter(peek[0]) || peek[0] == '\n') ||
len(peek) == 2 && peek[0] == '\r' && peek[1] == '\n' {
return trim(line), nil
}
}
// ReadByte or the next readLineSlice will flush the read buffer;
// copy the slice into buf.
r.buf = append(r.buf[:0], trim(line)...)
// Read continuation lines.
for r.skipSpace() > 0 {
line, err := r.readLineSlice()
if err != nil {
break
}
r.buf = append(r.buf, ' ')
r.buf = append(r.buf, trim(line)...)
}
return r.buf, nil
}
结合上面两种报文格式,回过头再去理解 TCP 的流,相信会有不一样的感受。
总结
基于前面的讨论,我们现在知道了 TCP 是一种流式协议,而流是没有边界的。流没有边界,但应用层的数据交换是有边界的,故应用层需要进行数据交换格式的定义,封装和解析。
另外,通过 Socket 收发数据时,数据会先到输入/输出缓冲区,这些 I/O 缓冲区是在创建 Socket 时自动生成的,故每个 Socket 连接都有,不会存在冲突或并发的问题。笔者在之前聊 TCP 关闭连接的文章中曾经提到过,关闭 Socket 不会影响输出缓冲区,但会影响输入缓冲区,所以要优先使用 shutdown
函数关闭其中一方的流写入。
那 I/O 缓冲区满了以后,会导致数据丢失吗?比如接收方输入缓冲区只有 100 字节大小,发送方输出缓冲区也只有 100 字节大小,而此时发生方发了 500 字节的数据,数据会丢失吗?答案是不会,TCP 协议栈会处理这种情况。笔者将在后续的文章中介绍 TCP 的流量控制,敬请期待。
TCP 是以段为单位发送数据的,即 MSS。MSS 是三次握手的时候,在两端主机之间被计算得出。 ↩︎
维基百科上的定义:“分块传输编码(Chunked transfer encoding)是超文本传输协议(HTTP)中的一种数据传输机制,允许 HTTP 由网页服务器发送给客户端应用( 通常是网页浏览器)的数据可以分成多个部分。分块传输编码只在HTTP协议1.1版本(HTTP/1.1)中提供。”简单来说,就是正文数据比较大(注意是正文),将其拆成多个子部分分别传输,然后再由对端进行拼接。这个过程中,总长度是未知的,所以头部没有
Content-Length
字段。 ↩︎http 头部中
Content-Length
和Transfer-Encoding: chunked
互斥,不会同时出现。 ↩︎https://github.com/golang/go/blob/go1.19.2/src/net/http/response.go ↩︎