accept

write

我们已经建立好的TCP连接,对应着操作系统分配的1个套接字。操作TCP协议发送数据时,面对的是数据流。通常调用诸如send或者write方法来发送数据到另一台主机,那么,调用这样的方法时,在操作系统内核中发生了什么事情呢?我们带着以下3个问题来细细分析:发送方法成功返回时,能保证TCP另一端的主机接收到吗?能保证数据已经发送到网络上了吗?套接字为阻塞或者非阻塞时,发送方法做的事情有何不同?

要回答上面3个问题涉及了不少知识点,我们先在TCP层面上看看,发送方法调用时内核做了哪些事。我不想去罗列内核中的数据结构、方法等,毕竟大部分应用程序开发者不需要了解这些,仅以一幅示意图粗略表示,如下:

reference

再详述上图10个步骤前,先要澄清几个概念:MTU、MSS、tcp_write_queue发送队列、阻塞与非阻塞套接字、拥塞窗口、滑动窗口、Nagle算法。 当我们调用发送方法时,会把我们代码中构造好的消息流作为参数传递。这个消息流可大可小,例如几个字节,或者几兆字节。当消息流较大时,将有可能出现分片。我们先来讨论分片问题。

supplement

1. MSS与TCP的分片

由上一篇文中可知,TCP层是第4层传输层,第3层IP网络层、第2层数据链路层具备的约束条件同样对TCP层生效。下面来看看数据链路层中的一个概念:最大传输单元MTU。 无论何种类型的数据链路层,都会对网络分组的长度有一个限制。例如以太网限制为1500字节,802.3限制为1492字节。当内核的IP网络层试图发送报文时,若一个报文的长度大于MTU限制,就会被分成若干个小于MTU的报文,每个报文都会有独立的IP头部。

看看IP头部的格式: reference

图2 IP头部格式

可以看到,其指定IP包总长度的是一个16位(2字节)的字段,这意味一个IP包最大可以是65535字节。 若TCP层在以太网中试图发送一个大于1500字节的消息,调用IP网络层方法发送消息时,IP层会自动的获取所在局域网的MTU值,并按照所在网络的MTU大小来分片。IP层同时希望这个分片对于传输层来说是透明的,接收方的IP层会根据收到的多个IP包头部,将发送方IP层分片出的IP包重组为一个消息。 这种IP层的分片效率是很差的,因为必须所有分片都到达才能重组成一个包,其中任何一个分片丢失了,都必须重发所有分片。所以,TCP层会试图避免IP层执行数据报分片。

为了避免IP层的分片,TCP协议定义了一个新的概念:最大报文段长度MSS。它定义了一个TCP连接上,一个主机期望对端主机发送单个报文的最大长度。TCP3次握手建立连接时,连接双方都要互相告知自己期望接收到的MSS大小。例如(使用tcpdump抓包):

1
2
3
15:05:08.230782 IP 10.7.80.57.64569 > houyi-vm02.dev.sd.aliyun.com.tproxy: S 3027092051:3027092051(0) win 8192 <mss 1460,nop,wscale 8,nop,nop,sackOK>
15:05:08.234267 IP houyi-vm02.dev.sd.aliyun.com.tproxy > 10.7.80.57.64569: S 26006838:26006838(0) ack 3027092052 win 5840 <mss 1460,nop,nop,sackOK,nop,wscale 9>
15:05:08.233320 IP 10.7.80.57.64543 > houyi-vm02.dev.sd.aliyun.com.tproxy: P 78972532:78972923(391) ack 12915963 win 255

由于例子中两台主机都在以太网内,以太网的MTU为1500,减去IP和TCP头部的长度,MSS就是1460,三次握手中,SYN包都会携带期望的MSS大小。

当应用层调用TCP层提供的发送方法时,内核的TCP模块在tcp_sendmsg方法里,会按照对方告知的MSS来分片,把消息流分为多个网络分组(如图1中的3个网络分组),再调用IP层的方法发送数据。

这个MSS就不会改变了吗?

会的。上文说过,MSS就是为了避免IP层分片,在建立握手时告知对方期望接收的MSS值并不一定靠得住。因为这个值是预估的,TCP连接上的两台主机若处于不同的网络中,那么,连接上可能有许多中间网络,这些网络分别具有不同的数据链路层,这样,TCP连接上有许多个MTU。特别是,若中间途径的MTU小于两台主机所在的网络MTU时,选定的MSS仍然太大了,会导致中间路由器出现IP层的分片。

怎样避免中间网络可能出现的分片呢?

通过IP头部的DF标志位,这个标志位是告诉IP报文所途经的所有IP层代码:不要对这个报文分片。如果一个IP报文太大必须要分片,则直接返回一个ICMP错误,说明必须要分片了,且待分片路由器网络接受的MTU值。这样,连接上的发送方主机就可以重新确定MSS。

2. 发送方法返回成功后,数据一定发送到了TCP的另一端吗?

答案当然是否定的。解释这个问题前,先来看看TCP是如何保证可靠传输的。

TCP把自己要发送的数据流里的每一个字节都看成一个序号,可靠性是要求连接对端在接收到数据后,要发送ACK确认,告诉它已经接收到了多少字节的数据。也就是说,怎样确保数据一定发送成功了呢?必须等待发送数据对应序号的ACK到达,才能确保数据一定发送成功。TCP层提供的send或者write这样的方法是不会做这件事的,看看图1,它究竟做了哪些事。

图1中分为10步。

  • (1)应用程序试图调用send方法来发送一段较长的数据。
  • (2)内核主要通过tcp_sendmsg方法来完成。
  • (3)(4)内核真正执行报文的发送,与send方法的调用并不是同步的。即,send方法返回成功了,也不一定把IP报文都发送到网络中了。因此,需要把用户需要发送的用户态内存中的数据,拷贝到内核态内存中,不依赖于用户态内存,也使得进程可以快速释放发送数据占用的用户态内存。但这个拷贝操作并不是简单的复制,而是把待发送数据,按照MSS来划分成多个尽量达到MSS大小的分片报文段,复制到内核中的sk_buff结构来存放,同时把这些分片组成队列,放到这个TCP连接对应的tcp_write_queue发送队列中。
  • (5)内核中为这个TCP连接分配的内核缓存是有限的(/proc/sys/net/core/wmem_default)。当没有多余的内核态缓存来复制用户态的待发送数据时,就需要调用一个方法sk_stream_wait_memory来等待滑动窗口移动,释放出一些缓存出来(收到ACK后,不需要再缓存原来已经发送出的报文,因为既然已经确认对方收到,就不需要定时重发,自然就释放缓存了)。例如:
1
2
3
4
5
6
wait_for_memory:
    if (copied)
    	tcp_push(sk, tp, flags & ~MSG_MORE, mss_now, TCP_NAGLE_PUSH);
 
    if ((err = sk_stream_wait_memory(sk, &timeo)) != 0)
    	goto do_error;

这里的sk_stream_wait_memory方法接受一个参数timeo,就是等待超时的时间。这个时间是tcp_sendmsg方法刚开始就拿到的,如下:timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);

看看其实现:

1
2
3
4
    static inline long sock_sndtimeo(const struct sock *sk, int noblock)
    {
    	return noblock ? 0 : sk->sk_sndtimeo;
    }

也就是说,当这个套接字是阻塞套接字时,timeo就是SO_SNDTIMEO选项指定的发送超时时间。如果这个套接字是非阻塞套接字, timeo变量就会是0。 实际上,sk_stream_wait_memory对于非阻塞套接字会直接返回,并将 errno错误码置为EAGAIN。

  • (6)在图1的例子中,我们假定使用了阻塞套接字,且等待了足够久的时间,收到了对方的ACK,滑动窗口释放出了缓存。
  • (7)将剩下的用户态数据都组成MSS报文拷贝到内核态的sk_buff中。
  • (8)最后,调用tcp_push等方法,它最终会调用IP层的方法来发送tcp_write_queue队列中的报文。 注意,IP层返回时,并不一定是把报文发送了出去。
  • (9)(10)发送方法返回。

从图1的10个步骤中可知,无论是使用阻塞还是非阻塞套接字,发送方法成功返回时(无论全部成功或者部分成功),既不代表TCP连接的另一端主机接收到了消息,也不代表本机把消息发送到了网络上,只是说明,内核将会试图保证把消息送达对方。

3、Nagle算法、滑动窗口、拥塞窗口对发送方法的影响

图1第8步tcp_push方法做了些什么呢?先来看看主要的流程:

reference

图3 发送TCP消息的简易流程

下面简单看看这几个概念:

####(1)滑动窗口 滑动窗口大家都比较熟悉,就不详细介绍了。TCP连接上的双方都会通知对方自己的接收窗口大小。而对方的接收窗口大小就是自己的发送窗口大小。tcp_push在发送数据时当然需要与发送窗口打交道。发送窗口是一个时刻变化的值,随着ACK的到达会变大,随着发出新的数据包会变小。当然,最大也只能到三次握手时对方通告的窗口大小。tcp_push在发送数据时,最终会使用tcp_snd_wnd_test方法来判断当前待发送的数据,其序号是否超出了发送滑动窗口的大小,例如: //检查这一次要发送的报文最大序号是否超出了发送滑动窗口大小 static inline int tcp_snd_wnd_test(struct tcp_sock *tp, struct sk_buff *skb, unsigned int cur_mss) { //end_seq待发送的最大序号 u32 end_seq = TCP_SKB_CB(skb)->end_seq;

if (skb->len > cur_mss)
	end_seq = TCP_SKB_CB(skb)->seq + cur_mss;

    //snd_una是已经发送过的数据中,最小的没被确认的序号;而snd_wnd就是发送窗口的大小
return !after(end_seq, tp->snd_una + tp->snd_wnd);

}

####(2)慢启动和拥塞窗口 由于两台主机间的网络可能很复杂,通过广域网时,中间的路由器转发能力可能是瓶颈。也就是说,如果一方简单的按照另一方主机三次握手时通告的滑动窗口大小来发送数据的话,可能会使得网络上的转发路由器性能雪上加霜,最终丢失更多的分组。这时,各个操作系统内核都会对TCP的发送阶段加入慢启动和拥塞避免算法。慢启动算法说白了,就是对方通告的窗口大小只表示对方接收TCP分组的能力,不表示中间网络能够处理分组的能力。所以,发送方请悠着点发,确保网络非常通畅了后,再按照对方通告窗口来敞开了发。 拥塞窗口就是下面的cwnd,它用来帮助慢启动的实现。连接刚建立时,拥塞窗口的大小远小于发送窗口,它实际上是一个MSS。每收到一个ACK,拥塞窗口扩大一个MSS大小,当然,拥塞窗口最大只能到对方通告的接收窗口大小。当然,为了避免指数式增长,拥塞窗口大小的增长会更慢一些,是线性的平滑的增长过程。 所以,在tcp_push发送消息时,还会检查拥塞窗口,飞行中的报文数要小于拥塞窗口个数,而发送数据的长度也要小于拥塞窗口的长度。 如下所示,首先用unsigned int tcp_cwnd_test方法检查飞行的报文数是否小于拥塞窗口个数(多少个MSS的个数): static inline unsigned int tcp_cwnd_test(struct tcp_sock *tp, struct sk_buff *skb) { u32 in_flight, cwnd;

/* Don't be strict about the congestion window for the final FIN.  */
if (TCP_SKB_CB(skb)->flags & TCPCB_FLAG_FIN)
	return 1;

    //飞行中的数据,也就是没有ACK的字节总数
in_flight = tcp_packets_in_flight(tp);
cwnd = tp->snd_cwnd;
    //如果拥塞窗口允许,需要返回依据拥塞窗口的大小,还能发送多少字节的数据
if (in_flight < cwnd)
	return (cwnd - in_flight);

return 0;

}

再通过tcp_window_allows方法获取拥塞窗口与滑动窗口的最小长度,检查待发送的数据是否超出: static unsigned int tcp_window_allows(struct tcp_sock *tp, struct sk_buff *skb, unsigned int mss_now, unsigned int cwnd) { u32 window, cwnd_len;

window = (tp->snd_una + tp->snd_wnd - TCP_SKB_CB(skb)->seq);
cwnd_len = mss_now * cwnd;
return min(window, cwnd_len);

}

####(3)是否符合NAGLE算法? Nagle算法的初衷是这样的:应用进程调用发送方法时,可能每次只发送小块数据,造成这台机器发送了许多小的TCP报文。对于整个网络的执行效率来说,小的TCP报文会增加网络拥塞的可能,因此,如果有可能,应该将相临的TCP报文合并成一个较大的TCP报文(当然还是小于MSS的)发送。 Nagle算法要求一个TCP连接上最多只能有一个发送出去还没被确认的小分组,在该分组的确认到达之前不能发送其他的小分组。 内核中是通过 tcp_nagle_test方法实现该算法的。我们简单的看下: static inline int tcp_nagle_test(struct tcp_sock *tp, struct sk_buff *skb, unsigned int cur_mss, int nonagle) { //nonagle标志位设置了,返回1表示允许这个分组发送出去 if (nonagle & TCP_NAGLE_PUSH) return 1;

//如果这个分组包含了四次握手关闭连接的FIN包,也可以发送出去
if (tp->urg_mode ||
    (TCP_SKB_CB(skb)->flags & TCPCB_FLAG_FIN))
	return 1;

    //检查Nagle算法
if (!tcp_nagle_check(tp, skb, cur_mss, nonagle))
	return 1;

return 0;

}

再来看看tcp_nagle_check方法,它与上一个方法不同,返回0表示可以发送,返回非0则不可以,正好相反。 static inline int tcp_nagle_check(const struct tcp_sock *tp, const struct sk_buff *skb, unsigned mss_now, int nonagle) { //先检查是否为小分组,即报文长度是否小于MSS return (skb->len < mss_now && ((nonagle&TCP_NAGLE_CORK) || //如果开启了Nagle算法 (!nonagle && //若已经有小分组发出(packets_out表示“飞行”中的分组)还没有确认 tp->packets_out && tcp_minshall_check(tp)))); }

最后看看tcp_minshall_check做了些什么: static inline int tcp_minshall_check(const struct tcp_sock *tp) { //最后一次发送的小分组还没有被确认 return after(tp->snd_sml,tp->snd_una) && //将要发送的序号是要大于等于上次发送分组对应的序号 !after(tp->snd_sml, tp->snd_nxt); }

想象一种场景,当对请求的时延非常在意且网络环境非常好的时候(例如同一个机房内),Nagle算法可以关闭,这实在也没必要。使用TCP_NODELAY套接字选项就可以关闭Nagle算法。看看setsockopt是怎么与上述方法配合工作的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
static int do_tcp_setsockopt(struct sock *sk, int level,
		int optname, char __user *optval, int optlen)
        ...
	switch (optname) {
        ...
	case TCP_NODELAY:
		if (val) {
                        //如果设置了TCP_NODELAY,则更新nonagle标志
			tp->nonagle |= TCP_NAGLE_OFF|TCP_NAGLE_PUSH;
			tcp_push_pending_frames(sk, tp);
		} else {
			tp->nonagle &= ~TCP_NAGLE_OFF;
		}
		break;
        }
}

可以看到,nonagle标志位就是这么更改的。

当然,调用了IP层的方法返回后,也未必就保证此时数据一定发送到网络中去了。 下一篇我们探讨如何接收TCP消息,以及接收到ack后内核做了些什么。

read

这篇文章将试图说明应用程序如何接收网络上发送过来的TCP消息流,由于篇幅所限,暂时忽略ACK报文的回复和接收窗口的滑动。

为了快速掌握本文所要表达的思想,我们可以带着以下问题阅读:

  1. 应用程序调用read、recv等方法时,socket套接字可以设置为阻塞或者非阻塞,这两种方式是如何工作的?
  • 若socket为默认的阻塞套接字,此时recv方法传入的len参数,是表示必须超时(SO_RCVTIMEO)或者接收到len长度的消息,recv方法才会返回吗?而且,socket上可以设置一个属性叫做SO_RCVLOWAT,它会与len产生什么样的交集,又是决定recv等接收方法什么时候返回?
  1. 应用程序开始收取TCP消息,与程序所在的机器网卡上接收到网络里发来的TCP消息,这是两个独立的流程。它们之间是如何互相影响的?例如,应用程序正在收取消息时,内核通过网卡又在这条TCP连接上收到消息时,究竟是如何处理的?若应用程序没有调用read或者recv时,内核收到TCP连接上的消息后又是怎样处理的?
  2. recv这样的接收方法还可以传入各种flags,例如MSG_WAITALL、MSG_PEEK、MSG_TRUNK等等。它们是如何工作的?
  3. 1个socket套接字可能被多个进程在使用,出现并发访问时,内核是怎么处理这种状况的?
  4. linux的sysctl系统参数中,有类似tcp_low_latency这样的开关,默认为0或者配置为1时是如何影响TCP消息处理流程的?

书接上文。本文将通过三幅图讲述三种典型的接收TCP消息场景,理清内核为实现TCP消息的接收所实现的4个队列容器。当然,了解内核的实现并不是目的,而是如何使用socket接口、如何配置操作系统内核参数,才能使TCP传输消息更高效,这才是最终目的。

很多同学不希望被内核代码扰乱了思维,如何阅读本文呢? 我会在图1的步骤都介绍完了才来从代码上说明tcp_v4_rcv等主要方法。像flags参数、非阻塞套接字会产生怎样的效果我是在代码介绍中说的。然后我会介绍图2、图3,介绍它们的步骤时我会穿插一些上文没有涉及的少量代码。不喜欢了解内核代码的同学请直接看完图1的步骤后,请跳到图2、图3中,我认为这3幅图覆盖了主要的TCP接收场景,能够帮助你理清其流程。

接收消息时调用的系统方法要比上一篇发送TCP消息复杂许多。

接收TCP消息的过程可以一分为二:

  • 首先是PC上的网卡接收到网线传来的报文,通过软中断内核拿到并且解析其为TCP报文,然后TCP模块决定如何处理这个TCP报文。
  • 其次,用户进程调用read、recv等方法获取TCP消息,则是将内核已经从网卡上收到的消息流拷贝到用户进程里的内存中。

first situation

第一幅图描述的场景是:

  • TCP连接上将要收到的消息序号是S1(TCP上的每个报文都有序号,详见《TCP/IP协议详解》),此时操作系统内核依次收到了序号S1-S2的报文、S3-S4、S2-S3的报文,注意后两个包乱序了。
  • 之后,用户进程分配了一段len大小的内存用于接收TCP消息,此时,len是大于S4-S1的。
  • 另外,用户进程始终没有对这个socket设置过SO_RCVLOWAT参数,因此,接收阀值SO_RCVLOWAT使用默认值1。
  • 另外,系统参数tcp_low_latency设置为0,即从操作系统的总体效率出发,使用prequeue队列提升吞吐量。当然,由于用户进程收消息时,并没有新包来临,所以此图中prequeue队列始终为空。先不细表。

图1如下: reference

上图中有13个步骤,应用进程使用了阻塞套接字,调用recv等方法时flag标志位为0,用户进程读取套接字时没有发生进程睡眠。内核在处理接收到的TCP报文时使用了4个队列容器(当链表理解也可),分别为receive、out_of_order、prequeue、backlog队列,本文会说明它们存在的意义。下面详细说明这13个步骤。

  1. 当网卡接收到报文并判断为TCP协议后,将会调用到内核的tcp_v4_rcv方法。此时,这个TCP连接上需要接收的下一个报文序号恰好就是S1,而这一步里,网卡上收到了S1-S2的报文,所以,tcp_v4_rcv方法会把这个报文直接插入到receive队列中。 注意:receive队列是允许用户进程直接读取的,它是将已经接收到的TCP报文,去除了TCP头部、排好序放入的、用户进程可以直接按序读取的队列。由于socket不在进程上下文中(也就是没有进程在读socket),由于我们需要S1序号的报文,而恰好收到了S1-S2报文,因此,它进入了receive队列。

  2. 接着,我们收到了S3-S4报文。在第1步结束后,这时我们需要收到的是S2序号,但到来的报文却是S3打头的,怎么办呢?进入out_of_order队列!从这个队列名称就可以看出来,所有乱序的报文都会暂时放在这。

  3. 仍然没有进入来读取socket,但又过来了我们期望的S2-S3报文,它会像第1步一样,直接进入receive队列。不同的时,由于此时out_of_order队列不像第1步是空的,所以,引发了接来的第4步。

  4. 每次向receive队列插入报文时都会检查out_of_order队列。由于收到S2-S3报文后,期待的序号成为了S3,这样,out_of_order队列里的唯一报文S3-S4报文将会移出本队列而插入到receive队列中(这件事由tcp_ofo_queue方法完成)。

用户进程开始读取socket: 1-4是第一阶段, 后面的是第二阶段

  1. 终于有用户进程开始读取socket了。做过应用端编程的同学都知道,先要在进程里分配一块内存,接着调用read或者recv等方法,把内存的首地址和内存长度传入,再把建立好连接的socket也传入。当然,对这个socket还可以配置其属性。这里,假定没有设置任何属性,都使用默认值,因此,此时socket是阻塞式,它的SO_RCVLOWAT是默认的1。当然,recv这样的方法还会接收一个flag参数,它可以设置为MSG_WAITALL、MSG_PEEK、MSG_TRUNK等等,这里我们假定为最常用的0。进程调用了recv方法。

  2. 无论是何种接口,C库和内核经过层层封装,接收TCP消息最终一定会走到tcp_recvmsg方法。下面介绍代码细节时,它会是重点。

  3. 在tcp_recvmsg方法里,会首先锁住socket。为什么呢?因此socket是可以被多进程同时使用的,同时,内核中断也会操作它,而下面的代码都是核心的、操作数据的、有状态的代码,不可以被重入的,锁住后,再有用户进程进来时拿不到锁就要休眠在这了。内核中断看到被锁住后也会做不同的处理,参见图2、图3。

  4. 此时,第1-4步已经为receive队列里准备好了3个报文。最上面的报文是S1-S2,将它拷贝到用户态内存中。由于第5步flag参数并没有携带MSG_PEEK这样的标志位,因此,再将S1-S2报文从receive队列的头部移除,从内核态释放掉。反之,MSG_PEEK标志位会导致receive队列不会删除报文。所以,MSG_PEEK主要用于多进程读取同一套接字的情形。

下面这一步非常重要, 是否recv函数传入进来的len仅仅只是为了判断是否当前copy超出了用户态给定的地址?

  • 限制了传给用户态的data长度不会超出给定的len, 但是只限制了最大, 最小的限制是SO_RCVLOWAT, 当满足了它的大小, 阻塞的socket可以返回,
  • 非阻塞的socket就不会管上面的
  1. 如第8步,拷贝S2-S3报文到用户态内存中。当然,执行拷贝前都会检查用户态内存的剩余空间是否足以放下当前这个报文,不足以时会直接返回已经拷贝的字节数。

  2. 同上。

  3. receive队列为空了,此时会先来检查SO_RCVLOWAT这个阀值。如果已经拷贝的字节数到现在还小于它,那么可能导致进程会休眠,等待拷贝更多的数据。第5步已经说明过了,socket套接字使用的默认的SO_RCVLOWAT,也就是1,这表明,只要读取到报文了,就认为可以返回了。 做完这个检查了,再检查backlog队列。backlog队列是进程正在拷贝数据时,网卡收到的报文会进这个队列。此时若backlog队列有数据,就顺带处理下。图3会覆盖这种场景。

  4. 在本图对应的场景中,backlog队列是没有数据的,已经拷贝的字节数为S4-S1,它是大于1的,因此,释放第7步里加的锁,准备返回用户态了。

  5. 用户进程代码开始执行,此时recv等方法返回的就是S4-S1,即从内核拷贝的字节数。

second situation

图2给出了第2种场景:

  • 这里涉及到prequeue队列。用户进程调用recv方法时,连接上没有任何接收并缓存到内核的报文,而socket是阻塞的,所以进程睡眠了。
  • 然后网卡中收到了TCP连接上的报文,此时prequeue队列开始产生作用。图2中tcp_low_latency为默认的0,套接字socket的SO_RCVLOWAT是默认的1,仍然是阻塞socket

图2如下: reference

简单描述上述11个步骤:

  1. 用户进程分配了一块len大小的内存,将其传入recv这样的函数,同时socket参数皆为默认,即阻塞的、SO_RCVLOWAT为1。调用接收方法,其中flags参数为0。

  2. C库和内核最终调用到tcp_recvmsg方法来处理。

  3. 锁住socket。

  4. 由于此时receive、prequeue、backlog队列都是空的,即没有拷贝1个字节的消息到用户内存中,而我们的最低要求是拷贝至少SO_RCVLOWAT为1长度的消息。此时,开始进入阻塞式套接字的等待流程。最长等待时间为SO_RCVTIMEO指定的时间。 这个等待函数叫做sk_wait_data,有必要看下其实现:

1
2
3
4
5
    int sk_wait_data(struct sock *sk, long *timeo)
    {
            //注意,它的自动唤醒条件有两个,要么timeo时间到达,要么receive队列不为空
    	rc = sk_wait_event(sk, timeo, !skb_queue_empty(&sk->sk_receive_queue));
    }

sk_wait_event也值得我们简单看下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
    #define sk_wait_event(__sk, __timeo, __condition)		\
    ({	int rc;							\
    	release_sock(__sk);					\
    	rc = __condition;					\
    	if (!rc) {						\
    		*(__timeo) = schedule_timeout(*(__timeo));	\
    	}							\
    	lock_sock(__sk);					\
    	rc = __condition;					\
    	rc;							\
    })

注意,它在睡眠前会调用release_sock,这个方法会释放socket锁,使得下面的第5步中,新到的报文不再只能进入backlog队列。

  1. 这个套接字上期望接收的序号也是S1,此时网卡恰好收到了S1-S2的报文,在tcp_v4_rcv方法中,通过调用tcp_prequeue方法把报文插入到prequeue队列中。

  2. 插入prequeue队列后,此时会接着调用wake_up_interruptible方法,唤醒在socket上睡眠的进程。参见tcp_prequque方法。

  3. 用户进程被唤醒后,重新调用lock_sock接管了这个socket,此后再进来的报文都只能进入backlog队列了。

  4. 进程醒来后,先去检查receive队列,当然仍然是空的;再去检查prequeue队列,发现有一个报文S1-S2,正好是socket连接待拷贝的起始序号S1,于是,从prequeue队列中取出这个报文并把内容复制到用户内存中,再释放内核中的这个报文。

  5. 目前已经拷贝了S2-S1个字节到用户态,检查这个长度是否超过了最低阀值(即len和SO_RCVLOWAT的最小值)。

  6. 由于SO_RCVLOWAT使用了默认的1,所以准备返回用户。此时会顺带再看看backlog队列中有没有数据,若有,则检查这个无序的队列中是否有可以直接拷贝给用户的报文。当然,此时是没有的。所以准备返回,释放socket锁。

  7. 返回用户已经拷贝的字节数。

third situation

图3给出了第3种场景: 这个场景中,我们把系统参数tcp_low_latency设为1,socket上设置了SO_RCVLOWAT属性的值。

  • 服务器先是收到了S1-S2这个报文,但S2-S1的长度是小于SO_RCVLOWAT的,用户进程调用recv方法读套接字时,虽然读到了一些,但没有达到最小阀值,所以进程睡眠了,
  • 与此同时,在睡眠前收到的乱序的S3-S4包直接进入backlog队列。
  • 此时先到达了S2-S3包,由于没有使用prequeue队列,而它起始序号正是下一个待拷贝的值,所以直接拷贝到用户内存中,总共拷贝字节数已满足SO_RCVLOWAT的要求!
  • 最后在返回用户前把backlog队列中S3-S4报文也拷贝给用户了.

图3如下: reference

简明描述上述15个步骤:

  1. 内核收到报文S1-S2,S1正是这个socket连接上待接收的序号,因此,直接将它插入有序的receive队列中。

  2. 用户进程所处的linux操作系统上,将sysctl中的tcp_low_latency设置为1。这意味着,这台服务器希望TCP进程能够更及时的接收到TCP消息。用户调用了recv方法接收socket上的消息,这个socket上设置了SO_RCVLOWAT属性为某个值n,这个n是大于S2-S1,也就是第1步收到的报文大小。这里,仍然是阻塞socket,用户依然是分配了足够大的len长度内存以接收TCP消息。

  3. 通过tcp_recvmsg方法来完成接收工作。先锁住socket,避免并发进程读取同一socket的同时,也在告诉内核网络软中断处理到这一socket时要有不同行为,如第6步。

  4. 准备处理内核各个接收队列中的报文。

  5. receive队列中的有序报文可直接拷贝,在检查到S2-S1是小于len之后,将报文内容拷贝到用户态内存中。

  6. 在第5步进行的同时,socket是被锁住的,这时内核又收到了一个S3-S4报文,因此报文直接进入backlog队列。注意,这个报文不是有序的,因为此时连接上期待接收序号为S2。

  7. 在第5步,拷贝了S2-S1个字节到用户内存,它是小于SO_RCVLOWAT的,因此,由于socket是阻塞型套接字(超时时间在本文中忽略),进程将不得不转入睡眠。转入睡眠之前,还会干一件事,就是处理backlog队列里的报文,图2的第4步介绍过休眠方法sk_wait_data,它在睡眠前会执行release_sock方法,看看是如何实现的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
void fastcall release_sock(struct sock *sk)
{
	mutex_release(&sk->sk_lock.dep_map, 1, _RET_IP_);
 
	spin_lock_bh(&sk->sk_lock.slock);
        //这里会遍历backlog队列中的每一个报文
	if (sk->sk_backlog.tail)
		__release_sock(sk);
        //这里是网络中断执行时,告诉内核,现在socket并不在进程上下文中
	sk->sk_lock.owner = NULL;
	if (waitqueue_active(&sk->sk_lock.wq))
		wake_up(&sk->sk_lock.wq);
	spin_unlock_bh(&sk->sk_lock.slock);
}

再看看__release_sock方法是如何遍历backlog队列的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static void __release_sock(struct sock *sk)
{
	struct sk_buff *skb = sk->sk_backlog.head;
 
        //遍历backlog队列
	do {
		sk->sk_backlog.head = sk->sk_backlog.tail = NULL;
		bh_unlock_sock(sk);
 
		do {
			struct sk_buff *next = skb->next;
 
			skb->next = NULL;
                        //处理报文,其实就是tcp_v4_do_rcv方法,上文介绍过,不再赘述
			sk->sk_backlog_rcv(sk, skb);
 
			cond_resched_softirq();
 
			skb = next;
		} while (skb != NULL);
 
		bh_lock_sock(sk);
	} while((skb = sk->sk_backlog.head) != NULL);
}

此时遍历到S3-S4报文,但因为它是失序的,所以从backlog队列中移入out_of_order队列中(参见上文说过的tcp_ofo_queue方法)。

  1. 进程休眠,直到超时或者receive队列不为空。

  2. 内核接收到了S2-S3报文。注意,这里由于打开了tcp_low_latency标志位,这个报文是不会进入prequeue队列以待进程上下文处理的。

  3. 此时,由于S2是连接上正要接收的序号,同时,有一个用户进程正在休眠等待接收数据中,且它要等待的数据起始序号正是S2,于是,这种种条件下,使得这一步同时也是网络软中断执行上下文中,把S2-S3报文直接拷贝进用户内存。

  4. 上文介绍tcp_data_queue方法时大家可以看到,每处理完1个有序报文(无论是拷贝到receive队列还是直接复制到用户内存)后都会检查out_of_order队列,看看是否有报文可以处理。那么,S3-S4报文恰好是待处理的,于是拷贝进用户内存。然后唤醒用户进程。

  5. 用户进程被唤醒了,当然唤醒后会先来拿到socket锁。以下执行又在进程上下文中了。

  6. 此时会检查已拷贝的字节数是否大于SO_RCVLOWAT,以及backlog队列是否为空。两者皆满足,准备返回。

  7. 释放socket锁,退出tcp_recvmsg方法。

  8. 返回用户已经复制的字节数S4-S1。

好了,这3个场景读完,想必大家对于TCP的接收流程是怎样的已经非常清楚了,本文起始的6个问题也在这一大篇中都涉及到了。下一篇我们来讨论TCP连接的关闭。

close and shutdown

close

  • 3)使用了so_linger的连接

首先要澄清,为何要有so_linger这个功能?

因为我们可能有强可靠性的需求,也就是说,必须确保发出的消息、FIN都被对方收到。

  • Situation: 例如,有些响应发出后调用close关闭连接,接下来就会关闭进程。

  • Conflict: 如果close时发出的消息其实丢失在网络中了; 那么,进程突然退出时连接上发出的RST就可能被对方收到; 而且,之前丢失的消息不会有重发来保障可靠性了。

  • Answer: > so_linger用来保证对方收到了close时发出的消息,即,至少需要对方通过发送ACK且到达本机。 » 怎么保证呢?等待!close会阻塞住进程,直到确认对方收到了消息再返回。然而,网络环境又得复杂的,如果对方总是不响应怎么办?所以还需要l_linger这个超时时间,控制close阻塞进程的最长时间。注意,务必慎用so_linger,它会在不经意间降低你程序中代码的执行速度(close的阻塞)。

所以,当这个进程设置了so_linger后,前半段依然没变化。 检查是否有未读消息,若有则发RST关连接,不会触发等待。 接下来检查是否有未发送的消息时与第2种情形一致,设好FIN后关闭angle算法发出。 接下来,则会设置最大等待时间l_linger,然后开始将进程睡眠,直到确认对方收到后才会醒来,将控制权交还给用户进程。

这里需要注意,so_linger不是确保连接被四次握手关闭再使close返回,而只是保证我方发出的消息都已被对方收到。 例如,若对方程序写的有问题,当它收到FIN进入CLOSE_WAIT状态,却一直不调用close发出FIN,此时,对方仍然会通过ACK确认,我方收到了ACK进入FIN_WAIT2状态,但没收到对方的FIN,我方的close调用却不会再阻塞,close直接返回,控制权交还用户进程。

从上图可知,so_linger还有个偏门的用法,若l_linger超时时间竟被设为0,则不会触发FIN包的发送,而是直接RST复位关闭连接。我个人认为,这种玩法确没多大用处。

最后做个总结。

  • 调用close时,可能导致发送RST复位关闭连接,例如有未读消息打开so_linger但l_linger却为0关闭监听句柄时半打开的连接.
  • 更多时会导致发FIN来四次握手关闭连接,但打开so_linger可能导致close阻塞住等待着对方的ACK表明收到了消息.

shutdown

  • 1)shutdown可携带一个参数,取值有3个,分别意味着:只关闭读、只关闭写、同时关闭读写。

    对于监听句柄,如果参数为关闭写,显然没有任何意义。但关闭读从某方面来说是有意义的,例如不再接受新的连接。 看看最右边蓝色分支,针对监听句柄,若参数为关闭写,则不做任何事; 若为关闭读,则把端口上的半打开连接使用RST关闭,与close如出一辙。

  • 2)若shutdown的是半打开的连接,则发出RST来关闭连接。

  • 3)若shutdown的是正常连接,那么关闭读其实与对端是没有关系的。只要本机把接收掉的消息丢掉,其实就等价于关闭读了,并不一定非要对端关闭写的。

    实际上,shutdown正是这么干的。若参数中的标志位含有关闭读,只是标识下,当我们调用read等方法时这个标识就起作用了,会使进程读不到任何数据。

  • 4)若参数中有标志位为关闭写,那么下面做的事与close是一致的:发出FIN包,告诉对方,本机不会再发消息了。

对高并发编程,目前只有一种模型,也是本质上唯一有效的玩法。

从这个系列的前4篇文章可知,

连接上的消息处理,可以分为两个阶段:

等待消息准备好、消息处理。

当使用默认的阻塞套接字时(例如上面提到的1个线程捆绑处理1个连接),往往是把这两个阶段合而为一, 这样操作套接字的代码所在的线程就得睡眠来等待消息准备好,这导致了高并发下线程会频繁的睡眠、唤醒,从而影响了CPU的使用效率。

  • 高并发编程方法当然就是把两个阶段分开处理。
    • 等待消息准备好的代码段,与处理消息的代码段是分离的。

当然,这也要求套接字必须是非阻塞的,否则,处理消息的代码段很容易导致条件不满足时,所在线程又进入了睡眠等待阶段。 那么问题来了,等待消息准备好这个阶段怎么实现?

它毕竟还是等待,这意味着线程还是要睡眠的!!! —> 解决办法就是,线程主动查询,或者让1个线程为所有连接而等待!

IO多路复用

  • 这就是IO多路复用了。
    • 多路复用就是处理等待消息准备好这件事的,但它可以同时处理多个连接!它也可能“等待”,所以它也会导致线程睡眠.
    • 然而这不要紧,因为它一对多、它可以监控所有连接。这样,当我们的线程被唤醒执行时,就一定是有一些连接准备好被我们的代码执行了,这是有效率的!没有那么多个线程都在争抢处理“等待消息准备好”阶段,整个世界终于清净了!

epoll and select

多路复用有很多种实现,在linux上,2.4内核前主要是select和poll. 现在主流是epoll,它们的使用方法似乎很不同,但本质是一样的. 效率却也不同,这也是epoll完全替代了select的原因.

简单的谈下epoll为何会替代select。 前面提到过,高并发的核心解决方案是1个线程处理所有连接的“等待消息准备好”,这一点上epoll和select是无争议的。 但select预估错误了一件事,就像我们开篇所说,当数十万并发连接存在时,可能每一毫秒只有数百个活跃的连接,同时其余数十万连接在这一毫秒是非活跃的.

select

select的使用方法是这样的:

  • 返回的活跃连接 == select(全部待监控的连接)
  • 什么时候会调用select方法呢?

在你认为需要找出有报文到达的活跃连接时,就应该调用。 所以,调用select在高并发时是会被频繁调用的。这样,这个频繁调用的方法就很有必要看看它是否有效率,因为,它的轻微效率损失都会被“频繁”二字所放大。它有效率损失吗?显而易见,全部待监控连接是数以十万计的,返回的只是数百个活跃连接,这本身就是无效率的表现。被放大后就会发现,处理并发上万个连接时,select就完全力不从心了。

epoll

再来说说epoll是如何解决的。它很聪明的用了3个方法来实现select方法要做的事:

  • 新建的epoll描述符 == epoll_create() |—>epoll_create()
  • epoll_ctrl(epoll描述符,添加或者删除所有待监控的连接) |—>epoll_ctrl()
  • 返回的活跃连接 == epoll_wait(epoll描述符) |—>epoll_wait() 这么做的好处主要是:分清了频繁调用和不频繁调用的操作。例如,epoll_ctrl是不太频繁调用的,而epoll_wait是非常频繁调用的。这时,epoll_wait却几乎没有入参,这比select的效率高出一大截,而且,它也不会随着并发连接的增加使得入参越发多起来,导致内核执行效率下降。

epoll是怎么实现的呢?其实很简单,从这3个方法就可以看出,它比select聪明的避免了每次频繁 调用“哪些连接已经处在消息准备好阶段”的epoll_wait时,是不需要把所有待监控连接传入的。 这意味着,它在内核态维护了一个数据结构保存着所有待监控的连接 —>这个数据结构就是一棵红黑树,它的结点的增加、减少是通过epoll_ctrl来完成的。

用我在《深入理解Nginx》第8章中所画的图来看,它是非常简单的:

图中左下方的红黑树由所有待监控的连接构成。左上方的链表,同是目前所有活跃的连接。于是,epoll_wait执行时只是检查左上方的链表,并返回左上方链表中的连接给用户。这样,epoll_wait的执行效率能不高吗?

ET and LT

最后,再看看epoll提供的2种玩法ET和LT,即翻译过来的边缘触发和水平触发。 其实这两个中文名字倒也有些贴切。 这2种使用方式针对的仍然是效率问题,只不过变成了epoll_wait返回的连接如何能够更准确些。

  • example: 例如,我们需要监控一个连接的写缓冲区是否空闲,满足“可写”时我们就可以从用户态将响应调用write发送给客户端. 但是,或者连接可写时,我们的“响应”内容还在磁盘上呢,此时若是磁盘读取还未完成呢?肯定不能使线程阻塞的,那么就不发送响应了。 但是,下一次epoll_wait时可能又把这个连接返回给你了,你还得检查下是否要处理。 可能,我们的程序有另一个模块专门处理磁盘IO,它会在磁盘IO完成时再发送响应. 那么,每次epoll_wait都返回这个“可写”的、却无法立刻处理的连接,是否符合用户预期呢?

于是,ET和LT模式就应运而生了。LT是每次满足期待状态的连接,都得在epoll_wait中返回,所以它一视同仁,都在一条水平线上。 ET则不然,它倾向更精确的返回连接。在上面的例子中,连接第一次变为可写后,若是程序未向连接上写入任何数据,那么下一次epoll_wait是不会返回这个连接的。 ET叫做 边缘触发,就是指,只有连接从一个状态转到另一个状态时,才会触发epoll_wait返回它。 可见,ET的编程要复杂不少,至少应用程序要小心的防止epoll_wait的返回的连接出现: 可写时未写数据后却期待下一次“可写”、可读时未读尽数据却期待下一次“可读”。

当然,从一般应用场景上它们性能是不会有什么大的差距的,ET可能的优点是,epoll_wait的调用次数会减少一些,某些场景下连接在不必要唤醒时不会被唤醒(此唤醒指epoll_wait返回). 但如果像我上面举例所说的,有时它不单纯是一个网络问题,跟应用场景相关。当然,大部分开源框架都是基于ET写的,框架嘛,它追求的是纯技术问题,当然力求尽善尽美。

  • moduo: 最好是read ET write LT

reactor

定时触发功能通常是服务器必备组件,反应堆模型往往还不得不将定时器的管理囊括在内。本篇将介绍反应堆模型的特点和用法。

首先我们要谈谈,

网络编程界为什么需要反应堆?有了IO复用,有了epoll,我们已经可以使服务器并发几十万连接的同时,维持高TPS了,难道这还不够吗? 我的答案是,技术层面足够了,但在软件工程层面却是不够的。

程序使用IO复用的难点在哪里呢?

1个请求虽然由多次IO处理完成,但相比传统的单线程完整处理请求生命期的方法,IO复用在人的大脑思维中并不自然.

因为,程序员编程中,处理请求A的时候,假定A请求必须经过多个IO操作A1-An(两次IO间可能间隔很长时间), 每经过一次IO操作,再调用IO复用时,IO复用的调用返回里,非常可能不再有A,而是返回了请求B。 即请求A会经常被请求B打断,处理请求B时,又被C打断。这种思维下,编程容易出错。

形象的说,传统编程方法就好像是到了银行营业厅里,每个窗口前排了长队,业务员们在窗口后一个个的解决客户们的请求. 一个业务员可以尽情思考着客户A依次提出的问题,例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
“我要买2万XX理财产品。“
“看清楚了,5万起售。”
“等等,查下我活期余额。”
“余额5万。”
“那就买 5万吧。”
业务员开始录入信息。
”对了,XX理财产品年利率8%?”
“是预期8%,最低无利息保本。“
”早不说,拜拜,我去买余额宝。“
业务员无表情的删着已经录入的信息进行事务回滚。
”下一个!“

用了IO复用则是大师业务员开始挑战极限,在超大营业厅里给客户们人手一个牌子,黑压压的客户们都在大厅中,有问题时举牌申请提问, 大师目光敏锐点名指定某人提问,该客户迅速得到大师的答复后,要经过一段时间思考,查查自己的银袋子,咨询下LD,才能再次进行下一个提问, 直到得到完整的满意答复退出大厅, 例如:

1
2
3
4
大师刚指导A填写转帐单的某一项,
B又来申请兑换泰铢,给了B兑换单后,
C又来办理定转活,
然后D与F在争抢有限的圆珠笔时出现了不和谐现象,被大师叫停业务,暂时等待。

这就是基于事件驱动的IO复用编程比起传统1线程1请求的方式来,有难度的设计点了,客户们都是上帝,既不能出错,还不能厚此薄彼。

当没有反应堆时,我们可能的设计方法是这样的:

1
2
3
4
5
大师把每个客户的提问都记录下来,
当客户A提问时,首先查阅A之前问过什么做过什么,这叫联系上下文,
然后再根据上下文和当前提问查阅有关的银行规章制度,有针对性的回答A,并把回答也记录下来。
当圆满回答了A的所有问题后,
删除A的所有记录。

回到码农生涯,即,

1
2
3
4
5
6
某一瞬间,服务器共有10万个并发连接,
此时,一次IO复用接口的调用返回了100个活跃的连接等待处理。
先根据这100个连接找出其对应的对象,这并不难,epoll的返回连接数据结构里就有这样的指针可以用。
接着,循环的处理每一个连接,找出这个对象此刻的上下文状态,
再使用read、write这样的网络IO获取此次的操作内容,结合上下文状态查询此时应当选择哪个业务方法处理,  --->业务method
调用相应方法完成操作后,若请求结束,则删除对象及其上下文。
  • 这样,我们就陷入了面向过程编程方法之中了,在面向应用、快速响应为王的移动互联网时代,这样做早晚得把自己玩死。
  • 我们的主程序需要关注各种不同类型的请求,在不同状态下,对于不同的请求命令选择不同的业务处理方法。
  • 这会导致随着请求类型的增加,请求状态的增加,请求命令的增加,主程序复杂度快速膨胀,导致维护越来越困难,程序员再也不敢轻易接新需求、重构。

concluding:

  • 反应堆是解决上述软件工程问题的一种途径,它也许并不优雅,开发效率上也不是最高的,但其执行效率面向过程的使用IO复用却几乎是等价的,所以,无论是nginx、memcached、redis等等这些高性能组件的代名词,都义无反顾的一头扎进了反应堆的怀抱中。
  • 反应堆模式可以在软件工程层面,将事件驱动框架分离出具体业务,将不同类型请求之间用OO的思想分离。通常,反应堆不仅使用IO复用处理网络事件驱动,还会实现定时器来处理时间事件的驱动(请求的超时处理或者定时任务的处理),就像下面的示意图:

这幅图有5点意思:

  • (1)处理应用时基于OO思想,不同的类型的请求处理间是分离的。

    • 例如,A类型请求是用户注册请求,B类型请求是查询用户头像,那么当我们把用户头像新增多种分辨率图片时,更改B类型请求的代码处理逻辑时,完全不涉及A类型请求代码的修改。
  • (2)应用处理请求的逻辑,与事件分发框架完全分离。

    • 什么意思呢?即写应用处理时,不用去管何时调用IO复用,不用去管什么调用epoll_wait,去处理它返回的多个socket连接。 应用代码中,只关心如何读取、发送socket上的数据,如何处理业务逻辑。事件分发框架有一个抽象的事件接口,所有的应用必须实现抽象的事件接口,通过这种抽象才把应用与框架进行分离。
  • (3)反应堆上提供注册、移除事件方法,供应用代码使用,而分发事件方法,通常是循环的调用而已,是否提供给应用代码调用,还是由框架简单粗暴的直接循环使用,这是框架的自由。

  • (4)IO多路复用也是一个抽象,它可以是具体的select,也可以是epoll,它们只必须提供采集到某一瞬间所有待监控连接中活跃的连接。

  • (5)定时器也是由反应堆对象使用,它必须至少提供4个方法,包括添加、删除定时器事件,这该由应用代码调用。最近超时时间是需要的,这会被反应堆对象使用,用于确认select或者epoll_wait执行时的阻塞超时时间,防止IO的等待影响了定时事件的处理。遍历也是由反应堆框架使用,用于处理定时事件。

这里可以看到,为什么定时器集合需要提供最近超时事件距离现在的时间?

  • 因为,调用epoll_wait或者select时,并不能够始终传入-1作为timeout参数。
  • 因为,我们的服务器主营业务往往是网络请求处理,如果网络请求很少时,那么CPU的所有时间都会被频繁却又不必要的epoll_wait调用所占用。
  • 在服务器闲时使进程的CPU利用率降低是很有意义的,它可以使服务器上其他进程得到更多的执行机会,也可以延长服务器的寿命,还可以省电。 这样,就需要传入准确的timeout最大阻塞时间给epoll_wait了。

什么样的timeout时间才是准确的呢? 这等价于,我们需要准确的分析,什么样的时段进程可以真正休息,进入sleep状态? 一个没有意义的答案是:不需要进程执行任务的时间段内是可以休息的。 这就要求我们仔细想想,进程做了哪几类任务,例如:

  1. 所有网络包的处理,例如TCP连接的建立、读写、关闭,基本上所有的正常请求都由网络包来驱动的。 1.1 对这类任务而言,没有新的网络分组到达本机时,就是可以使进程休息的时段。
  2. 定时器的管理,它与网络、IO复用无关,虽然它们在业务上可能有相关性。定时器里的事件需要及时的触发执行,不能因为其他原因,例如阻塞在epoll_wait上时耽误了定时事件的处理。当一段时间内,可以预判没有定时事件达到触发条件时(这也是提供接口查询最近一个定时事件距当下的时间的意义所在),对定时任务的管理而言,进程就可以休息了。
  3. 其他类型的任务,例如磁盘IO执行完成,或者收到其他进程的signal信号,等等,这些任务明显不需要执行的时间段内,进程可以休息。

于是,使用反应堆模型的进程代码中,通常除了epoll_wait这样的IO复用外,其他调用都会基于无阻塞的方式使用。 所以,epoll_wait的timeout超时时间,就是除网络外,其他任务所能允许的进程睡眠时间。 而只考虑常见的定时器任务时,就像上图中那样,只需要定时器集合能够提供最近超时事件到现在的时间即可。

定时器

从这里也可以推导出,定时器集合通常会采用有序容器这样的数据结构,好处是:

  1. 容易取到最近超时事件的时间。
  2. 可以从最近超时事件开始,向后依次遍历已经超时的事件,直到第一个没有超时的事件为止即可停止遍历,不用全部遍历到。

因此,粗暴的采用无序的数据结构,例如普通的链表,通常是不足取的。

但事无绝对,redis就是用了个毫无顺序的链表,原因何在?因为redis的客户端连接没有超时概念,所以对于并发的成千上万个连上,都不会因为超时被断开。 redis的定时器唯一的用途在于定时的将内存数据刷到磁盘上,这样的定时事件通常只有个位数,其性能无关紧要。

如果定时事件非常多,综合插入、遍历、删除的使用频率,使用树的机会最多,例如小根堆(libevent)、二叉平衡树(nginx红黑树)。 当然,场景特殊时,尽可以用有序数组、跳跃表等等实现。

example

Notice: 重点关注

void add_timer( heap_timer* timer ) throw ( std::exception ) void pop_timer() plus void percolate_down( int hole ) 好像就是使用的跳跃表? 从arrary的zero index开始进行分为了几条线, insert的时候只关心这几所在的一条线的顺序, pop的时候, 也只是使用把这条线的顺序排好.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
#ifndef intIME_HEAP
#define intIME_HEAP

#include <iostream>
#include <netinet/in.h>
#include <time.h>
using std::exception;

#define BUFFER_SIZE 64

class heap_timer
{
public:
    heap_timer( int delay )
    {
        expire = time( NULL ) + delay;
    }

public:
   time_t expire;
   void (*cb_func)( client_data* );
   client_data* user_data;
};

struct client_data
{
    sockaddr_in address;
    int sockfd;
    char buf[ BUFFER_SIZE ];
    heap_timer* timer;
};

// ------------------------------------------- //


class time_heap
{
public:
    time_heap( int cap ) throw ( std::exception ):capacity( cap ), cur_size( 0 )
    {
		array = new heap_timer* [capacity];
		if ( ! array )
		{
				throw std::exception();
		}
        for( int i = 0; i < capacity; ++i )
        {
            array[i] = NULL;
        }
    }
    time_heap( heap_timer** init_array, int size, int capacity ) throw ( std::exception )
        : cur_size( size ), capacity( capacity )
    {
        if ( capacity < size )
        {
            throw std::exception();
        }
        array = new heap_timer* [capacity];
        if ( ! array )
        {
            throw std::exception();
        }
        for( int i = 0; i < capacity; ++i )
        {
            array[i] = NULL;
        }
        if ( size != 0 )
        {
            for ( int i =  0; i < size; ++i )
            {
                array[ i ] = init_array[ i ];
            }
            for ( int i = (cur_size-1)/2; i >=0; --i )
            {
                percolate_down( i );
            }
        }
    }
    ~time_heap()
    {
        for ( int i =  0; i < cur_size; ++i )
        {
            delete array[i];
        }
        delete [] array; 
    }

public:
    void add_timer( heap_timer* timer ) throw ( std::exception )
    {
        if( !timer )
        {
            return;
        }
        if( cur_size >= capacity )
        {
            resize();
        }
        int hole = cur_size++;
        int parent = 0;
        for( ; hole > 0; hole=parent )
        {
            parent = (hole-1)/2;
            if ( array[parent]->expire <= timer->expire )
            {
                break;
            }
            array[hole] = array[parent];
        }
        array[hole] = timer;
    }
    void del_timer( heap_timer* timer )
    {
        if( !timer )
        {
            return;
        }
        // lazy delelte
        timer->cb_func = NULL;
    }
    heap_timer* top() const
    {
        if ( empty() )
        {
            return NULL;
        }
        return array[0];
    }
    void pop_timer()
    {
        if( empty() )
        {
            return;
        }
        if( array[0] )
        {
            delete array[0];
            array[0] = array[--cur_size];
            percolate_down( 0 );
        }
    }
    void tick()
    {
        heap_timer* tmp = array[0];
        time_t cur = time( NULL );
        while( !empty() )
        {
            if( !tmp )
            {
                break;
            }
            if( tmp->expire > cur )
            {
                break;
            }
            if( array[0]->cb_func )
            {
                array[0]->cb_func( array[0]->user_data );
            }
            pop_timer();
            tmp = array[0];
        }
    }
    bool empty() const { return cur_size == 0; }

private:
    void percolate_down( int hole )
    {
        heap_timer* temp = array[hole];
        int child = 0;
        for ( ; ((hole*2+1) <= (cur_size-1)); hole=child )
        {
            child = hole*2+1;
            if ( (child < (cur_size-1)) && (array[child+1]->expire < array[child]->expire ) )
            {
                ++child;
            }
            if ( array[child]->expire < temp->expire )
            {
                array[hole] = array[child];
            }
            else
            {
                break;
            } }
        array[hole] = temp;
    }
    void resize() throw ( std::exception )
    {
        heap_timer** temp = new heap_timer* [2*capacity];
        for( int i = 0; i < 2*capacity; ++i )
        {
            temp[i] = NULL;
        }
        if ( ! temp )
        {
            throw std::exception();
        }
        capacity = 2*capacity;
        for ( int i = 0; i < cur_size; ++i )
        {
            temp[i] = array[i];
        }
        delete [] array;
        array = temp;
    }

private:
    heap_timer** array;
    int capacity;
    int cur_size;
};
#endif

tcp连接的内存使用

当服务器的并发TCP连接数以十万计时,我们就会对一个TCP连接在操作系统内核上消耗的内存多少感兴趣。 socket编程方法提供了SO_SNDBUF、SO_RCVBUF这样的接口来设置连接的读写缓存, linux上还提供了以下系统级的配置来整体设置服务器上的TCP内存使用,但这些配置看名字却有些互相冲突、概念模糊的感觉, 如下(sysctl -a命令可以查看这些配置):

1
2
3
4
5
6
7
net.ipv4.tcp_rmem = 8192 87380 16777216
net.ipv4.tcp_wmem = 8192 65536 16777216
net.ipv4.tcp_mem = 8388608 12582912 16777216
net.core.rmem_default = 262144
net.core.wmem_default = 262144
net.core.rmem_max = 16777216
net.core.wmem_max = 16777216

还有一些较少被提及的、也跟TCP内存相关的配置:

1
2
net.ipv4.tcp_moderate_rcvbuf = 1
net.ipv4.tcp_adv_win_scale = 2

(注:为方便下文讲述,介绍以上系统配置时前缀省略掉,配置值以空格分隔的多个数字以数组来称呼,例如tcp_rmem[2]表示上面第一行最后一列16777216。) 网上可以找到很多这些系统配置项的说明,然而往往还是让人费解,例如,tcp_rmem[2]和rmem_max似乎都跟接收缓存最大值有关,但它们却可以不一致,究竟有什么区别? 或者tcp_wmem[1]和wmem_default似乎都表示发送缓存的默认值,冲突了怎么办? 在用抓包软件抓到的syn握手包里,为什么TCP接收窗口大小似乎与这些配置完全没关系?

TCP连接在进程中使用的内存大小千变万化,通常程序较复杂时可能不是直接基于socket编程,这时平台级的组件可能就封装了TCP连接使用到的用户态内存。 不同的平台、组件、中间件、网络库都大不相同。 而内核态为TCP连接分配内存的算法则是基本不变的,这篇文章将试图说明TCP连接在内核态中会使用多少内存,操作系统使用怎样的策略来平衡宏观的吞吐量与微观的某个连接传输速度。

这篇文章也将一如既往的面向应用程序开发者,而不是系统级的内核开发者,所以,不会详细的介绍为了一个TCP连接、一个TCP报文操作系统分配了多少字节的内存,内核级的数据结构也不是本文的关注点,这些也不是应用级程序员的关注点。 这篇文章主要描述linux内核为了TCP连接上传输的数据是怎样管理读写缓存的。

一、缓存上限是什么?

  • (1)先从应用程序编程时可以设置的SO_SNDBUF、SO_RCVBUF说起。 无论何种语言,都对TCP连接提供基于setsockopt方法实现的SO_SNDBUF、SO_RCVBUF,怎么理解这两个属性的意义呢? SO_SNDBUF、SO_RCVBUF都是个体化的设置,即,只会影响到设置过的连接,而不会对其他连接生效。SO_SNDBUF表示这个连接上的内核写缓存上限。实际上,进程设置的SO_SNDBUF也并不是真的上限,在内核中会把这个值翻一倍再作为写缓存上限使用,我们不需要纠结这种细节,只需要知道,当设置了SO_SNDBUF时,就相当于划定了所操作的TCP连接上的写缓存能够使用的最大内存。然而,这个值也不是可以由着进程随意设置的,它会受制于系统级的上下限,当它大于上面的系统配置wmem_max(net.core.wmem_max)时,将会被wmem_max替代(同样翻一倍);而当它特别小时,例如在2.6.18内核中设计的写缓存最小值为2K字节,此时也会被直接替代为2K。

SO_RCVBUF表示连接上的读缓存上限,与SO_SNDBUF类似,它也受制于rmem_max配置项,实际在内核中也是2倍大小作为读缓存的使用上限。SO_RCVBUF设置时也有下限,同样在2.6.18内核中若这个值小于256字节就会被256所替代。

  • (2)那么,可以设置的SO_SNDBUF、SO_RCVBUF缓存使用上限与实际内存到底有怎样的关系呢? TCP连接所用内存主要由读写缓存决定,而读写缓存的大小只与实际使用场景有关,在实际使用未达到上限时,SO_SNDBUF、SO_RCVBUF是不起任何作用的。对读缓存来说,接收到一个来自连接对端的TCP报文时,会导致读缓存增加,当然,如果加上报文大小后读缓存已经超过了读缓存上限,那么这个报文会被丢弃从而读缓存大小维持不变。什么时候读缓存使用的内存会减少呢?当进程调用read、recv这样的方法读取TCP流时,读缓存就会减少。因此,读缓存是一个动态变化的、实际用到多少才分配多少的缓冲内存,当这个连接非常空闲时,且用户进程已经把连接上接收到的数据都消费了,那么读缓存使用内存就是0。

写缓存也是同样道理。当用户进程调用send或者write这样的方法发送TCP流时,就会造成写缓存增大。当然,如果写缓存已经到达上限,那么写缓存维持不变,向用户进程返回失败。而每当接收到TCP连接对端发来的ACK确认了报文的成功发送时,写缓存就会减少,这是因为TCP的可靠性决定的,发出去报文后由于担心报文丢失而不会销毁它,可能会由重发定时器来重发报文。因此,写缓存也是动态变化的,空闲的正常连接上,写缓存所用内存通常也为0。

因此,只有当接收网络报文的速度大于应用程序读取报文的速度时,可能使读缓存达到了上限,这时这个缓存使用上限才会起作用。所起作用为:丢弃掉新收到的报文,防止这个TCP连接消耗太多的服务器资源。同样,当应用程序发送报文的速度大于接收对方确认ACK报文的速度时,写缓存可能达到上限,从而使send这样的方法失败,内核不为其分配内存。

二、缓存的大小与TCP的滑动窗口到底有什么关系?

  • (1)滑动窗口的大小与缓存大小肯定是有关的,但却不是一一对应的关系,更不会与缓存上限具有一一对应的关系。因此,网上很多资料介绍rmem_max等配置设置了滑动窗口的最大值,与我们tcpdump抓包时看到的win窗口值完全不一致,是讲得通的。下面我们来细探其分别在哪里。

读缓存的作用有2个:1、将无序的、落在接收滑动窗口内的TCP报文缓存起来;2、当有序的、可以供应用程序读取的报文出现时,由于应用程序的读取是延时的,所以会把待应用程序读取的报文也保存在读缓存中。所以,读缓存一分为二,一部分缓存无序报文,一部分缓存待延时读取的有序报文。这两部分缓存大小之和由于受制于同一个上限值,所以它们是会互相影响的,当应用程序读取速率过慢时,这块过大的应用缓存将会影响到套接字缓存,使接收滑动窗口缩小,从而通知连接的对端降低发送速度,避免无谓的网络传输。当应用程序长时间不读取数据,造成应用缓存将套接字缓存挤压到没空间,那么连接对端会收到接收窗口为0的通知,告诉对方:我现在消化不了更多的报文了。

反之,接收滑动窗口也是一直在变化的,我们用tcpdump抓三次握手的报文:

1
14:49:52.421674 IP houyi-vm02.dev.sd.aliyun.com.6400 > r14a02001.dg.tbsite.net.54073: S 2736789705:2736789705(0) ack 1609024383 win 5792 <mss 1460,sackOK,timestamp 2925954240 2940689794,nop,wscale 9>

可以看到初始的接收窗口是5792,当然也远小于最大接收缓存(稍后介绍的tcp_rmem[1])。 这当然是有原因的,TCP协议需要考虑复杂的网络环境,所以使用了慢启动、拥塞窗口(参见高性能网络编程2—-TCP消息的发送),建立连接时的初始窗口并不会按照接收缓存的最大值来初始化。这是因为,过大的初始窗口从宏观角度,对整个网络可能造成过载引发恶性循环,也就是考虑到链路上各环节的诸多路由器、交换机可能扛不住压力不断的丢包(特别是广域网),而微观的TCP连接的双方却只按照自己的读缓存上限作为接收窗口,这样双方的发送窗口(对方的接收窗口)越大就对网络产生越坏的影响。慢启动就是使初始窗口尽量的小,随着接收到对方的有效报文,确认了网络的有效传输能力后,才开始增大接收窗口。

不同的linux内核有着不同的初始窗口,我们以广为使用的linux2.6.18内核为例,在以太网里,MSS大小为1460,此时初始窗口大小为4倍的MSS,简单列下代码(*rcv_wnd即初始接收窗口):

1
2
3
4
5
6
7
  int init_cwnd = 4;
  if (mss > 1460*3)
   init_cwnd = 2;
  else if (mss > 1460)
   init_cwnd = 3;
  if (*rcv_wnd > init_cwnd*mss)
   *rcv_wnd = init_cwnd*mss;

大家可能要问,为何上面的抓包上显示窗口其实是5792,并不是1460*4为5840呢?这是因为1460想表达的意义是:将1500字节的MTU去除了20字节的IP头、20字节的TCP头以后,一个最大报文能够承载的有效数据长度。但有些网络中,会在TCP的可选头部里,使用12字节作为时间戳使用,这样,有效数据就是MSS再减去12,初始窗口就是(1460-12)*4=5792,这与窗口想表达的含义是一致的,即:我能够处理的有效数据长度。

在linux3以后的版本中,初始窗口调整到了10个MSS大小,这主要来自于GOOGLE的建议。原因是这样的,接收窗口虽然常以指数方式来快速增加窗口大小(拥塞阀值以下是指数增长的,阀值以上进入拥塞避免阶段则为线性增长,而且,拥塞阀值自身在收到128以上数据报文时也有机会快速增加),若是传输视频这样的大数据,那么随着窗口增加到(接近)最大读缓存后,就会“开足马力”传输数据,但若是通常都是几十KB的网页,那么过小的初始窗口还没有增加到合适的窗口时,连接就结束了。这样相比较大的初始窗口,就使得用户需要更多的时间(RTT)才能传输完数据,体验不好。

那么这时大家可能有疑问,当窗口从初始窗口一路扩张到最大接收窗口时,最大接收窗口就是最大读缓存吗? 不是,因为必须分一部分缓存用于应用程序的延时报文读取。到底会分多少出来呢?这是可配的系统选项,如下: net.ipv4.tcp_adv_win_scale = 2

这里的tcp_adv_win_scale意味着,将要拿出1/(2^tcp_adv_win_scale)缓存出来做应用缓存。即,默认tcp_adv_win_scale配置为2时,就是拿出至少1/4的内存用于应用读缓存,那么,最大的接收滑动窗口的大小只能到达读缓存的3/4。

  • (2)最大读缓存到底应该设置到多少为合适呢?

当应用缓存所占的份额通过tcp_adv_win_scale配置确定后,读缓存的上限应当由最大的TCP接收窗口决定。初始窗口可能只有4个或者10个MSS,但在无丢包情形下随着报文的交互窗口就会增大,当窗口过大时,“过大”是什么意思呢?即,对于通讯的两台机器的内存而言不算大,但是对于整个网络负载来说过大了,就会对网络设备引发恶性循环,不断的因为繁忙的网络设备造成丢包。而窗口过小时,就无法充分的利用网络资源。所以,一般会以BDP来设置最大接收窗口(可计算出最大读缓存)。BDP叫做带宽时延积,也就是带宽与网络时延的乘积,例如若我们的带宽为2Gbps,时延为10ms,那么带宽时延积BDP则为2G/8*0.01=2.5MB,所以这样的网络中可以设最大接收窗口为2.5MB,这样最大读缓存可以设为4/3*2.5MB=3.3MB。

为什么呢?因为BDP就表示了网络承载能力,最大接收窗口就表示了网络承载能力内可以不经确认发出的报文。如下图所示:

经常提及的所谓长肥网络,“长”就是是时延长,“肥”就是带宽大,这两者任何一个大时,BDP就大,都应导致最大窗口增大,进而导致读缓存上限增大。所以在长肥网络中的服务器,缓存上限都是比较大的。(当然,TCP原始的16位长度的数字表示窗口虽然有上限,但在RFC1323中定义的弹性滑动窗口使得滑动窗口可以扩展到足够大。)

发送窗口实际上就是TCP连接对方的接收窗口,所以大家可以按接收窗口来推断,这里不再啰嗦。

三、linux的TCP缓存上限自动调整策略

那么,设置好最大缓存限制后就高枕无忧了吗?对于一个TCP连接来说,可能已经充分利用网络资源,使用大窗口、大缓存来保持高速传输了。比如在长肥网络中,缓存上限可能会被设置为几十兆字节,但系统的总内存却是有限的,当每一个连接都全速飞奔使用到最大窗口时,1万个连接就会占用内存到几百G了,这就限制了高并发场景的使用,公平性也得不到保证。我们希望的场景是,在并发连接比较少时,把缓存限制放大一些,让每一个TCP连接开足马力工作;当并发连接很多时,此时系统内存资源不足,那么就把缓存限制缩小一些,使每一个TCP连接的缓存尽量的小一些,以容纳更多的连接。

linux为了实现这种场景,引入了自动调整内存分配的功能,由tcp_moderate_rcvbuf配置决定,如下:net.ipv4.tcp_moderate_rcvbuf = 1 默认tcp_moderate_rcvbuf配置为1,表示打开了TCP内存自动调整功能。若配置为0,这个功能将不会生效(慎用)。

另外请注意:当我们在编程中对连接设置了SO_SNDBUF、SO_RCVBUF,将会使linux内核不再对这样的连接执行自动调整功能!

那么,这个功能到底是怎样起作用的呢?看以下配置:

1
2
3
net.ipv4.tcp_rmem = 8192 87380 16777216
net.ipv4.tcp_wmem = 8192 65536 16777216
net.ipv4.tcp_mem = 8388608 12582912 16777216

tcp_rmem[3]数组表示任何一个TCP连接上的读缓存上限,其中: tcp_rmem[0]表示最小上限, tcp_rmem[1]表示初始上限(注意,它会覆盖适用于所有协议的rmem_default配置),tcp_rmem[2]表示最大上限。 tcp_wmem[3]数组表示写缓存,与tcp_rmem[3]类似,不再赘述。

tcp_mem[3]数组就用来设定TCP内存的整体使用状况,所以它的值很大(它的单位也不是字节,而是页–4K或者8K等这样的单位!)。这3个值定义了TCP整体内存的无压力值、压力模式开启阀值、最大使用值。以这3个值为标记点则内存共有4种情况:

  1. 当TCP整体内存小于tcp_mem[0]时,表示系统内存总体无压力。若之前内存曾经超过了tcp_mem[1]使系统进入内存压力模式,那么此时也会把压力模式关闭。 这种情况下,只要TCP连接使用的缓存没有达到上限(注意,虽然初始上限是tcp_rmem[1],但这个值是可变的,下文会详述),那么新内存的分配一定是成功的。

  2. 当TCP内存在tcp_mem[0]与tcp_mem[1]之间时,系统可能处于内存压力模式,例如总内存刚从tcp_mem[1]之上下来;也可能是在非压力模式下,例如总内存刚从tcp_mem[0]以下上来。 此时,无论是否在压力模式下,只要TCP连接所用缓存未超过tcp_rmem[0]或者tcp_wmem[0],那么都一定都能成功分配新内存。否则,基本上就会面临分配失败的状况。(注意:还有一些例外场景允许分配内存成功,由于对于我们理解这几个配置项意义不大,故略过。)

  3. 当TCP内存在tcp_mem[1]与tcp_mem[2]之间时,系统一定处于系统压力模式下。其他行为与上同。

  4. 当TCP内存在tcp_mem[2]之上时,毫无疑问,系统一定在压力模式下,而且此时所有的新TCP缓存分配都会失败。

下图为需要新缓存时内核的简化逻辑: 当系统在非压力模式下,上面我所说的每个连接的读写缓存上限,才有可能增加,当然最大也不会超过tcp_rmem[2]或者tcp_wmem[2]。相反,在压力模式下,读写缓存上限则有可能减少,虽然上限可能会小于tcp_rmem[0]或者tcp_wmem[0]。

所以,粗略的总结下,对这3个数组可以这么看:

  1. 只要系统TCP的总体内存超了 tcp_mem[2] ,新内存分配都会失败。
  2. tcp_rmem[0]或者tcp_wmem[0]优先级也很高,只要条件1不超限,那么只要连接内存小于这两个值,就保证新内存分配一定成功。
  3. 只要总体内存不超过tcp_mem[0],那么新内存在不超过连接缓存的上限时也能保证分配成功。
  4. tcp_mem[1]与tcp_mem[0]构成了开启、关闭内存压力模式的开关。在压力模式下,连接缓存上限可能会减少。在非压力模式下,连接缓存上限可能会增加,最多增加到tcp_rmem[2]或者tcp_wmem[2]。

reference

  • Edge trigger works best for writeing and accepting
  • Level trigger works best for reading data
  • Current Linux kernel doesn’t support mix LT/ET for on socket
    • muduo uses level trigger