Overviewlinux
从宏观的角度来看,一个packet从网卡到socket接收缓冲区的路径以下所示:express
整个流程会在下文的各个章节中进行详细的描述,而下文中的protocol layer会以IP和UDP做为例子,可是其中的不少内容,对于其余protocol layer都是通用的。api
Detailed Look缓存
本文将会以igb驱动程序做为例子,并用它来控制一个比较常见的服务器网卡Intel I350。所以,咱们首先来看看igb设备驱动程序是怎么工做的。服务器
Network Device Driver网络
Initialization数据结构
驱动程序会利用module_init宏注册一个初始化函数,当内核加载驱动程序时,该函数就会被调用。igb初始化函数(igb_init_module)和它利用module_init进行注册的代码以下:app
/** * igb_init_module - Driver Registration Routine * * igb_init_module is the first routine called when the driver is * loaded. All it does is register with the PCI subsystem. **/ static int __init igb_init_module(void) { int ret; pr_info("%s - version %s\n", igb_driver_string, igb_driver_version); pr_info("%s\n", igb_copyright); /* ... */ ret = pci_register_driver(&igb_driver); return ret; } module_init(igb_init_module);
其中初始化设备的大部分工做都是由pci_register_driver来完成的。咱们将在下面详细介绍。负载均衡
PCI initializationsocket
Intel I350是一个PCI express设备。PCI设备经过PCI Configuration Space中的一些寄存器标识本身。
当一个设备驱动程序被编译时,会用一个叫作MODULE_DEVICE_TABLE的宏来建立一个table,用该table来包含该设备驱动程序能够控制的PCI设备的设备ID。接着这个table也会被注册为一个结构的一部分,咱们在下面立刻就能看到。
而内核最终将会使用这个table来决定该加载哪一个驱动程序来控制该设备。
操做系统就是这样肯定哪一个设备和系统链接了,以及该使用哪一个驱动来和该设备进行交互。
static DEFINE_PCI_DEVICE_TABLE(igb_pci_tbl) = { { PCI_VDEVICE(INTEL, E1000_DEV_ID_I354_BACKPLANE_1GBPS) }, { PCI_VDEVICE(INTEL, E1000_DEV_ID_I354_SGMII) }, { PCI_VDEVICE(INTEL, E1000_DEV_ID_I354_BACKPLANE_2_5GBPS) }, { PCI_VDEVICE(INTEL, E1000_DEV_ID_I211_COPPER), board_82575 }, { PCI_VDEVICE(INTEL, E1000_DEV_ID_I210_COPPER), board_82575 }, { PCI_VDEVICE(INTEL, E1000_DEV_ID_I210_FIBER), board_82575 }, { PCI_VDEVICE(INTEL, E1000_DEV_ID_I210_SERDES), board_82575 }, { PCI_VDEVICE(INTEL, E1000_DEV_ID_I210_SGMII), board_82575 }, { PCI_VDEVICE(INTEL, E1000_DEV_ID_I210_COPPER_FLASHLESS), board_82575 }, { PCI_VDEVICE(INTEL, E1000_DEV_ID_I210_SERDES_FLASHLESS), board_82575 }, /* ... */ }; MODULE_DEVICE_TABLE(pci, igb_pci_tbl);
从上文已知,在驱动的初始化函数中会调用pci_register_driver函数。
该函数会注册一个尽是指针的结构,其中大多数的指针都是函数指针,不过包含PCI device ID的table一样会被注册。内核会利用这些驱动注册的函数来启动PCI设备。
static struct pci_driver igb_driver = { .name = igb_driver_name, .id_table = igb_pci_tbl, .probe = igb_probe, .remove = igb_remove, /* ... */ };
PCI probe
一旦一个设备经过它的PCI ID被识别,内核就会选择合适的驱动程序来控制该设备。每个PCI设备驱动程序都在内核的PCI子系统中注册了一个probe function。对于尚未驱动控制的设备,内核会调用该函数,直到和某个驱动程序相匹配。大多数驱动程序都有大量的代码用来控制设备。具体的操做各个驱动也有所不一样。可是一些典型的操做以下所示:
让咱们来快速浏览一下,igd里对应的igb_probe是如何完成上述操做的
A peek into PCI initialization
接下来的这些代码取自igb_probe函数,主要用于一些基本的PCI配置
err = pci_enable_device_mem(pdev); /* ... */ err = dma_set_mask_and_coherent(&pdev->dev, DMA_BIT_MASK(64)); /* ... */ err = pci_request_selected_regions(pdev, pci_select_bars(pdev, IORESOURCE_MEM), igb_driver_name); pci_enable_pcie_error_reporting(pdev); pci_set_master(pdev); pci_save_state(pdev);
首先,设备会由pci_enable_device_mem初始化,若是该设备处于暂停状态就会被唤醒,获取内存资源以及其余一些工做。接着会对DMA mask进行设置,由于该设备会读写64位的内存地址,所以dma_set_mask_and_coherent的参数为DMA_BIT_MASK(64)。而后调用pci_request_selected_regions获取内存,同时使能PCI Express Advanced Error Reporting功能,最终调用pci_set_master使能DMA而且调用pci_save_state保存PCI configuration space。
Network device initialization
igb_probe函数作了大量关于网络设备初始化的工做。除了一些针对PCI的工做之外,它还须要作以下这些工做:
下面咱们对上述的每一部分进行详细的分析。
struct net_device_ops
struct net_device_ops中包含许多函数指针指向一些网络子系统用来操做设备的重要功能。咱们将在接下来的内容中屡次说起此结构。在igb_probe中net_device_ops结构将会和struct net_device绑定,代码以下:
static int igb_probe(struct pci_dev *pdev, const struct pci_device_id *ent) { /* ... */ netdev->netdev_ops = &igb_netdev_ops;
而net_device_ops结构中的各个指针指向的函数也定义在同一个文件中:
static const struct net_device_ops igb_netdev_ops = { .ndo_open = igb_open, .ndo_stop = igb_close, .ndo_start_xmit = igb_xmit_frame, .ndo_get_stats64 = igb_get_stats64, .ndo_set_rx_mode = igb_set_rx_mode, .ndo_set_mac_address = igb_set_mac, .ndo_change_mtu = igb_change_mtu, .ndo_do_ioctl = igb_ioctl, /* ... */
咱们能够看到,这个结构中包含不少有趣的字段,例如ndo_open,ndo_stop,ndo_start_xmit和ndo_get_stats64,他们都包含了igb驱动实现的对应函数的地址。咱们下面会对其中的某些内容作进一步的分析。
ethtool registration
ethtool是一个命令行工具,用来获取和设置各类驱动和硬件相关的选项。一般会利用ethtool来从网络设备收集一些详细的数据。
ethtool经过ioctl系统调用和设备驱动程序交互。设备驱动程序注册了一系列的函数用于ethtool的操做。当ethtool发出一个ioctl调用时,内核会找到对应驱动的ethtool结构而且执行相应的注册函数。驱动的ethtool函数能够作许多事情,包括修改驱动中一个简单的falg,乃至经过写设备的寄存器来调整真实设备。
igb驱动经过在igb_probe中调用igb_set_ethtool_ops来注册ethtool的各个操做。
static int igb_probe(struct pci_dev *pdev, const struct pci_device_id *ent) { /* ... */ igb_set_ethtool_ops(netdev);
void igb_set_ethtool_ops(struct net_device *netdev) { SET_ETHTOOL_OPS(netdev, &igb_ethtool_ops); }
static const struct ethtool_ops igb_ethtool_ops = { .get_settings = igb_get_settings, .set_settings = igb_set_settings, .get_drvinfo = igb_get_drvinfo, .get_regs_len = igb_get_regs_len, .get_regs = igb_get_regs, /* ... */
每一个驱动都能本身决定哪些ethtool函数和本身有关而且决定实现其中的哪些。并非每一个驱动都须要实现全部的ethtool函数。其中一个比较有趣的ethtool函数是get_ethtool_stats,它会建立一些很是详细的计数器进行追踪,它们要么位于驱动中,要么位于设备内。
IRQs
当一个数据帧经过DMA被写入RAM时,网卡是如何通知系统的其他部分,已经有数据能够处理了呢?
通常网卡会产生一个interrupt request(IRQ)表示有数据到了。有如下三种IRQ类型:MSI-X,MSI和legacy IRQ。可是若是有大量的数据帧到达时,就会致使产生大量的IRQ。而产生的IRQ越多,那么用于high level task,例如用户进程的CPU时间就越少。
因而建立了New API(NAPI)这种机制,用于减小数据包的到来致使设备产生中断的数目。尽管NAPI能够减小IRQ的数目,可是并不能彻底避免。下面的章节会告诉咱们缘由。
NAPI
NAPI在许多方面和获取数据传统的方式不一样。NAPI容许设备驱动程序注册一个poll函数,NAPI子系统会调用它来获取数据帧。
NAPI通常的使用方式以下:
上述这种收集数据的方式和传统方式相比可以有效减小overhead,由于一次能处理不少数据,而不须要每一个数据帧产生一次IRQ。设备驱动程序实现了poll函数并经过调用netif_napi_add将它注册到NAPI中。当经过netif_napi_add向NAPI注册poll时,驱动同时会声明一个weight,大多数驱动都会将它固定为64。该值的意义将会在下文讨论。
一般,驱动程序会在初始化的时候注册他们的NAPI poll函数。
NAPI initialization in the igb driver
igb驱动经过以下一个长长的调用链来实现NAPI的初始化:
这个调用链会致使一些上层的事发生:
让咱们来看一看igb_alloc_q_vector是如何注册poll回调函数以及它的私有数据的
static int igb_alloc_q_vector(struct igb_adapter *adapter, int v_count, int v_idx, int txr_count, int txr_idx, int rxr_count, int rxr_idx) { /* ... */ /* allocate q_vector and rings */ q_vector = kzalloc(size, GFP_KERNEL); if (!q_vector) return -ENOMEM; /* initialize NAPI */ netif_napi_add(adapter->netdev, &q_vector->napi, igb_poll, 64); /* ... */
上述代码为receive queue分配了内存,而且向NAPI子系统注册了igb_poll函数。其中的参数包含了新建立的receive queue相关的struct napi_struct的引用(&q_vector->napi)。当须要从receive queue中接收数据包时,NAPI子系统会将它传输给igb_poll函数。这对于咱们之后研究数据流从驱动发送网络栈的过程是很是重要的。
Bring a network device up
回忆一下以前说过的net_device_ops结构,它注册了一系列函数用于启动设备,传输包,设置mac地址等等。当一个网络设备被启动时(好比,调用ifconfig eth0 up) ,和net_device_ops结构中的ndo_open域相关的函数就会被调用。
ndo_open函数会作以下操做:
在igb驱动中,和net_device_ops结构中的ndo_open相关的函数为igb_open。
Preparing to receive data from the network
如今大多数的网卡都利用DMA直接将数据写入RAM,从而让操做系统能直接获取数据进行处理。许多网卡为此使用的数据结构相似于建立在环形缓冲区上的队列。为了实现DMA,设备驱动程序必须和操做系统合做,保留一些内存可供网卡使用。一旦区域肯定,网卡会获得关于通知,而且会将收到的数据都写入其中。以后这些数据会被取出并交由网络子系统处理。
这些都很是简单,可是若是数据包到达的过快,单个CPU不能很好地处理全部的数据包怎么办?由于该数据结构是基于一个固定大小的内存区域,所以接收到的包将会被丢弃。这个时候,像Receive Side Scaling(RSS)或multiqueue就能派上用场了。有的设备有能力同时将包写入不一样的RAM,每一个区域都是一个单独的队列。这就容许操做系统在硬件层面使用多个CPU并行处理获取的数据。可是这个特性并非被全部网卡支持的。不过Intel I350是支持multiple queue的。咱们能够看到在igb驱动中,它在启动时就是调用一个叫igb_setup_all_rx_resources的函数。而它又会调用另外一个函数,igb_setup_rx_resources,用于让receive queue处理DMA内存。事实上,receive queue的数目和长度能够经过ethtool进行调整。对该这些值进行调整,咱们能够看到对于已处理包的数目和已丢弃包数目比例的影响。
网卡通常使用基于packet header field(源地址,目的地址,端口)的哈希函数来肯定某个包该发往哪一个receive queue。有的网卡还容许你调整某些receive queue的权重,从而让某些特定的队列处理更多的流量。还有的网卡甚至容许你调整哈希函数。这样的话,你能够将特定的数据流发往特定的receive queue进行处理,甚至在硬件层面就将包丢弃。接下来咱们很快会看到如何对这些设置进行调整。
Enable NAPI
当打开一个设备时,驱动一般会使能NAPI。以前咱们看到了驱动是如何注册NAPI的poll函数的,可是直到打开设备前,NAPI都不是使能的。使能NAPI其实很是简单直接,调用napi_enable翻转struct napi_struct 中的一个位就表示NAPI使能了。如上所述,尽管NAPI使能了,可是它仍然可能处于关闭状态。在igb driver中,每一个q_vector的NAPI都会在设备打开或者利用ethtool改变队列的数目或大小时使能。
for (i = 0; i < adapter->num_q_vectors; i++) napi_enable(&(adapter->q_vector[i]->napi));
Register an interrupt handler
使能了NAPI以后,下一步就是注册一个interrupt handler。如今有好几种方式用于发生一个中断:MSI-X,MSI以及legacy interrupts。所以,这一部分的代码对于不一样的驱动都是不一样的,这取决于特定的硬件支持哪一种中断方式。驱动必须肯定设备支持哪一种中断方式,而且注册合适的处理方法,从而能在中断发生时进行处理。有些驱动,例如igb会为每种方法注册一个interrupt handler,一种方法失败就换另外一种。对于支持multiple receive queue的网卡来讲,MSI-X是更好的方法。这样的话,每一个receive queue都有本身的hardware interrupt,从而能被特定的CPU处理(经过irqbalance或修改/proc/irq/IRQ_NUMBER/smp_affinity)。咱们很快就能看到,处理中断的CPU也将是对包进行处理的CPU。这样一来,收到的包就能从hardware interrupt开始直到整个网络栈都由不一样的CPU处理。
若是MSI-X不能用,MSI仍然要优于legacy interrupts。在igb驱动中,函数igb_msix_ring,igb_intr_msi和igb_intr分别是MSI-X,MSI和legacy interrupt对应的interrupt handler。
static int igb_request_irq(struct igb_adapter *adapter) { struct net_device *netdev = adapter->netdev; struct pci_dev *pdev = adapter->pdev; int err = 0; if (adapter->msix_entries) { err = igb_request_msix(adapter); if (!err) goto request_done; /* fall back to MSI */ /* ... */ } /* ... */ if (adapter->flags & IGB_FLAG_HAS_MSI) { err = request_irq(pdev->irq, igb_intr_msi, 0, netdev->name, adapter); if (!err) goto request_done; /* fall back to legacy interrupts */ /* ... */ } err = request_irq(pdev->irq, igb_intr, IRQF_SHARED, netdev->name, adapter); if (err) dev_err(&pdev->dev, "Error %d getting interrupt\n", err); request_done: return err; }
从上面的代码咱们能够看到,驱动会首先尝试利用igb_request_msix设置MSI-X的interrupt handler,若是失败的话,进入MSI。request_irq用于注册MSI的interrupt handler,igb_intr_mis。若是这也失败了,则会进入legacy interrupts。这个时候会再次使用request_irq注册legacy interrupt的interrupt handler,igb_intr。igb的驱动就是这样注册一个函数用于处理,当网卡发出中断说明有数据到达并已经准备好接受处理了。
Enable Interrupts
到如今为止,基本上全部事情都设置完毕了。惟一剩下的就是打开中断而且等待数据的到来。打开中断对于每一个设备都是不同的,对于igb驱动,它是在__igb_open中经过调用igb_irq_enable完成的。通常,打开中断都是经过写设备的寄存器完成的:
static void igb_irq_enable(struct igb_adapter *adapter) { /* ... */ wr32(E1000_IMS, IMS_ENABLE_MASK | E1000_IMS_DRSTA); wr32(E1000_IAM, IMS_ENABLE_MASK | E1000_IMS_DRSTA); /* ... */ }
The network device is now up
驱动可能还须要作另一些事,例如启动定时器,work queue,或者其余硬件相关的设置。一旦这些都完成了,那么设备就已经启动并准备好投入使用了。
SoftIRQs
在深刻网络栈以前,咱们先要了解一下Linux内核中一个叫作SoftIRQ的东西
What is a softirq
Linux内核中的softirq system是一种可以让代码到interrupt handler上下文以外执行的一种机制。它很是重要,由于在几乎全部的interrupt handler的执行过程当中,hardware interrupts都是关闭的。而中断关闭的时间越长,那么就越有可能丢失某些event。所以咱们能够把一些执行时间较长的代码放到interrupt handler以外执行,这样就能让它快点完成从而恢复中断。在内核中,还有其余的机制可以延迟代码的执行,可是对于网络栈来讲,咱们选择softirqs。
softirq system能够被当作是一系列的kernel thread(每一个CPU一个),它们会对不一样的softirq event运行不一样的处理函数。若是你观察过top命令的输出,而且在一系列的kernel threads中看到了一个ksoftirqd/0,那么它就是运行在CPU 0上的一个softirq kernel thread。
内核子系统能够经过运行open_softirq函数来注册一个softirq handler。咱们下面将看到的是网络子系统如何注册它的softirq handlers。如今,咱们先来学习一下softirq是如何工做的。
ksoftirqd
由于softirq对于推迟设备驱动工做的执行太太重要了,你能够想象,它必定在内核整个生命周期中很早的时候就开始执行了。下面咱们来看看ksoftirqd系统是如何初始化的:
static struct smp_hotplug_thread softirq_threads = { .store = &ksoftirqd, .thread_should_run = ksoftirqd_should_run, .thread_fn = run_ksoftirqd, .thread_comm = "ksoftirqd/%u", }; static __init int spawn_ksoftirqd(void) { register_cpu_notifier(&cpu_nfb); BUG_ON(smpboot_register_percpu_thread(&softirq_threads)); return 0; } early_initcall(spawn_ksoftirqd);
你能够看到上面struct smp_hotplug_thread的定义,其中注册了两个函数指针:ksoftirqd_should_run和run_softirqd。这两个函数都会在kernel/smpboot.c中被调用,用来构成一个event loop。kernel/smpboot.c中的代码首先会调用ksoftirqd_should_run来肯定是否还有pending softirq,若是有的话,就执行run_softirqd。run_ksoftirqd会在调用__do_softirq以前执行一些minor bookkeeping。
__do_softirq
__do_softirq函数主要作如下这些事:
所以,如今你看CPU的使用图,其中的softirq或si表明的就是用于这些deferred work所需的时间。
Linux network device subsysem
既然咱们已经大概了解了网卡驱动和softirq是如何工做的,接下来咱们来看看Linux network device subsystem是如何初始化的。接着咱们将追踪一个包从它到达网卡以后所走过的整条路径。
Initialization of network device system
network device (netdev) subsystem是在函数net_dev_init中初始化的。有许多有趣的事情在这个初始化函数中发生。
Initialization of struct softnet_data structures
net_dev_init会为每一个CPU都建立一个struct softnet_data。这个结构会包含不少指针用于处理网络数据:
其中的每一部分咱们都会在下文中详细叙述。
Initialization of softirq handlers
net_dev_init注册了一个receive softirq handler和transmit softirq handler分布用于处理输入和输出的数据。代码以下:
static int __init net_dev_init(void) { /* ... */ open_softirq(NET_TX_SOFTIRQ, net_tx_action); open_softirq(NET_RX_SOFTIRQ, net_rx_action); /* ... */ }
咱们很快就能看到驱动的interrupt handler是如何触发NET_RX_SOFTIRQ的net_rx_action函数的
Data arrives
终于,数据来了!
假设receive queue有足够的descriptors,packet会直接经过DMA写入RAM。以后设备就会产生一个相应的中断(或者在MSI-X中,是packet到达的receive queue对应的中断)
Interrupt handler
通常来讲,当一个中断对应的interrupt handler运行时,它应该将尽可能多的工做都放到中断上下文以外进行。这很是重要,由于在一个中断执行的过程当中,其余中断都阻塞了。让咱们来看看MSI-X interrupt handler的源码,它能很好地解释,为何interrupt handler应该尽量地少作工做。
static irqreturn_t igb_msix_ring(int irq, void *data) { struct igb_q_vector *q_vector = data; /* Write the ITR value calculated from the previous interrupt. */ igb_write_itr(q_vector); napi_schedule(&q_vector->napi); return IRQ_HANDLED; }
这个interrupt handler很是短,在返回以前仅仅作了两个很快的操做。首先,它调用了igb_write_itr,更新了一下硬件相关的寄存器。在这个例子中,被更新的寄存器是用于追踪hardware interrupt到达速率的。这个寄存器一般和一个叫"Interrupt Throttling"(或者叫"Interrupt Coalescing")的硬件特性相结合,它用来调整中断发往CPU的速率。咱们很快能够看到ethtool提供了一种机制,可以调节IRQ发生的速率。
接着调用napi_schedule用来唤醒NAPI processing loop(若是它不在运行的话)。注意的是NAPI processing loop是在softirq运行的,而不是在interrupt handler中。interrupt handler只是简单地让它开始执行,若是它没有准备好的话。
真正的代码会展示这些工做会是多么重要,它会帮助咱们理解网络数据是如何在多CPU系统中处理的。
NAPI and napi_schedule
让咱们来看看hardware interrupt handler中调用的napi_schedule是如何工做的。
要记住,NAPI存在的目的就是在不须要网卡发送中断,表示已经有数据能够准备处理的状况下也能接收数据。如上文所述,NAPI的poll loop会在收到一个hardware interrupt后生成。换句话说:NAPI是使能的,可是处于关闭状态,直到网卡产生一个IRQ表示第一个packet到达,NAPI才算打开。固然还有其余一些状况,咱们很快就能看到,NAPI会被关闭,直到一个hardware interrupt让它从新开启。
NAPI poll loop会在驱动的interrupt handler调用napi_schedule后启动。不过napi_schedule只是一个包装函数,它直接调用了__napi_schedule
/** * __napi_schedule - schedule for receive * @n: entry to schedule * * The entry's receive function will be scheduled to run */ void __napi_schedule(struct napi_struct *n) { unsigned long flags; local_irq_save(flags); ____napi_schedule(&__get_cpu_var(softnet_data), n); local_irq_restore(flags); } EXPORT_SYMBOL(__napi_schedule);
该代码调用__get_cpu_var获取当前运行的CPU的softnet_data结构。接着softnet_data结构和struct napi_struct结构会被传输给__napi_schedule。
/* Called with irq disabled */ static inline void ____napi_schedule(struct softnet_data *sd, struct napi_struct *napi) { list_add_tail(&napi->poll_list, &sd->poll_list); __raise_softirq_irqoff(NET_RX_SOFTIRQ); }
上面的代码主要作了两件事:
咱们很快就能看到,softirq的处理函数net_rx_action会调用NAPI的poll函数用于获取数据
A note about CPU and network data processing
须要注意的是到目前为止咱们见到的全部把任务从hardware interrupt handler推迟到softirq的代码使用的结构都是和当前CPU相关的。尽管驱动的IRQ handler只作不多的工做,可是softirq handler会和驱动的IRQ handler在同一个CPU上执行。
这就是为何IRQ会由哪一个CPU处理很重要了,由于该CPU不只会用于执行驱动的interrupt handler,还会经过对应的NAPI在softirq中获取数据。
咱们接下去将会看到,像Receive Packet Steering这样的机制会将其中的一些工做分发到其余CPU上去。
Network data processing begins
一旦softirq的代码知道了是哪一个softirq被挂起了,它就会开始执行,而且调用net_rx_action,这个时候网络数据的处理就开始了。让咱们来看看net_rx_action的processing loop的各个部分,了解一下它是如何工做的。
net_rx_action processing loop
net_rx_action从被DAM写入的packet所在的内存开始处理。该函数会遍历在当前CPU上排队的NAPI结构,依次取下每一个结构并进行处理。processing loop指定了NAPI的poll函数所能进行的工做量以及消耗的工做时间。它经过以下两种方式实现:
while (!list_empty(&sd->poll_list)) { struct napi_struct *n; int work, weight; /* If softirq window is exhausted then punt. * Allow this to run for 2 jiffies since which will allow * an average latency of 1.5/HZ. */ if (unlikely(budget <= 0 || time_after_eq(jiffies, time_limit))) goto softnet_break;
这就是内核如何防止packet processing一直独占整个CPU。其中的budget是该CPU上每一个NAPI结构的预算的总和。这就是为何multiqueue网卡要当心地调节IRQ affinity的缘由。咱们直到,处理从设备发出的IRQ的CPU也会被用来处理对应的softirq handler,所以也会成为处理上述循环和budget computation的CPU。
有着multiqueue网卡的系统可能会出现这种状况,多个NAPI结构被注册到了同一个CPU上。全部的NAPI结构的处理都会消耗同一个CPU的budget。
若是你没有足够的CPU去分发网卡的IRQ,你能够考虑增长net_rx_action的budget从而容许每一个CPU能处理更多的packet。增长budget会增长CPU的使用率,可是能够减少延时,由于数据处理地更及时(可是CPU的处理时间仍然是2 jiffies,无论budget是多少)。
NAPI poll function and weight
咱们已经知道网卡驱动调用netif_napi_add注册poll函数。在上文中咱们已经看到,igb驱动中有以下这段代码:
/* initialize NAPI */ netif_napi_add(adapter->netdev, &q_vector->napi, igb_poll, 64);
它给NAPI结构的weight赋值为64。咱们如今就会看到它是如何在net_rx_action processing loop中使用的。
weight = n->weight; work = 0; if (test_bit(NAPI_STATE_SCHED, &n->state)) { work = n->poll(n, weight); trace_napi_poll(n); } WARN_ON_ONCE(work > weight); budget -= work;
首先从NAPI结构中获取weight(此处是64),而后将它传递给一样注册到NAPI结构中的poll函数(此处为igb_poll)。poll函数会返回已经被处理的帧数,并保存在work中,以后它将从budget中减去。所以,假设:
你的系统将会在以下任意一种状况发生时,中止处理数据;
The NAPI / network device driver contract
NAPI子系统和设备驱动的交互中还未说起的一部分就是关闭NAPI的条件,包含的内容以下:
咱们先来看看net_rx_action如何处理第一种状况
Finishing the net_rx_action loop
/* Drivers must not modify the NAPI state if they * consume the entire weight. In such cases this code * still "owns" the NAPI instance and therefore can * move the instance around on the list at-will. */ if (unlikely(work == weight)) { if (unlikely(napi_disable_pending(n))) { local_irq_enable(); napi_complete(n); local_irq_disable(); } else { if (n->gro_list) { /* flush too old packets * If HZ < 1000, flush all packets. */ local_irq_enable(); napi_gro_flush(n, HZ >= 1000); local_irq_disable(); } list_move_tail(&n->poll_list, &sd->poll_list); } }
若是全部的work都被消耗完了,net_rx_action须要处理如下两种状况:
这就是packet processing loop如何调用驱动注册的poll函数来处理数据。咱们很快将会看到,poll函数将会获取数据并将它传递到协议栈进行处理。
Exiting the loop when limits are reached
当下列状况发生时,net_rx_action的循环将会退出:
/* If softirq window is exhausted then punt. * Allow this to run for 2 jiffies since which will allow * an average latency of 1.5/HZ. */ if (unlikely(budget <= 0 || time_after_eq(jiffies, time_limit))) goto softnet_break;
softnet_break: sd->time_squeeze++; __raise_softirq_irqoff(NET_RX_SOFTIRQ); goto out;
struct softnet_data结构中的某些统计数据增长了而且softirq的NET_RX_SOFTIRQ被关闭了。其中的time_squeeze域是用来测量这样一个数据:net_rx_action还有不少工做要作,可是要么由于budget用完了,要么超时了,此类状况发生的次数。这些统计数据对于了解网络的瓶颈是很是有用的。NET_RX_SOFTIRQ被关闭从而能给其余任务腾出时间。这一小段代码的意义是,尽管还有不少工做要作,可是咱们不想再独占CPU了,
执行流接着被传递给了out。当没有更多的NAPI结构须要处理,换句话说,budget比network activity更多,全部的驱动都已经关闭了NAPI,net_rx_action无事可作的时候,也会运行到out。
out段代码在从net_rx_action返回以前作了一件重要的事情:调用net_rps_action_and_irq_enable。它在Receive Packet Steering使能的状况下有着重要的做用;它会唤醒远程的CPU用于处理网络数据。
咱们将在以后更多地了解RPS是如何工做的。如今让咱们先走进NAPI poll函数的内部,这样咱们就能向上进入网络栈了。
NAPI poll
咱们已经知道设备驱动程序申请了一块内存用于让设备DMA到达packet。驱动有责任申请这些区域,一样也有责任unmap those regions,获取其中的数据而且将它发往网络栈。让咱们经过观察igb driver是如何完成这些工做的,从而了解在实际过程当中这些步骤是如何完成的。
igb_poll
/** * igb_poll - NAPI Rx polling callback * @napi: napi polling structure * @budget: count of how many packets we should handle **/ static int igb_poll(struct napi_struct *napi, int budget) { struct igb_q_vector *q_vector = container_of(napi, struct igb_q_vector, napi); bool clean_complete = true; #ifdef CONFIG_IGB_DCA if (q_vector->adapter->flags & IGB_FLAG_DCA_ENABLED) igb_update_dca(q_vector); #endif /* ... */ if (q_vector->rx.ring) clean_complete &= igb_clean_rx_irq(q_vector, budget); /* If all work not completed, return budget and keep polling */ if (!clean_complete) return budget; /* If not enough Rx work done, exit the polling mode */ napi_complete(napi); igb_ring_irq_enable(q_vector); return 0; }
上述代码干了以下这些有趣的事情:
让咱们来看看igb_clean_rx_irq是如何将数据送往协议栈的
igb_clean_rx_irq
igb_clean_rx_irq函数是一个循环,它一次处理一个packet直到到达budget或者没有多余的数据须要处理了。
函数中的循环干了以下这些很是重要的事情:
IGB_RX_BUFFER_WRITE(16)
一旦循环结束,函数会将收到的packet数和字节数加到统计数据中。
接着咱们首先来聊一聊Generic Receive Offloading(GRO),以后再进入函数napi_gro_receive
Generic Receive Offloading(GRO)
Generic Receive Offloading(GRO)是硬件层面的优化Large Receive Offloading(LRO)的软件实现。这两种方法的核心思想都是经过将"相似"的包组合起来以减小传输给网络栈的包的数量,从而减小CPU的使用。例如咱们要传输一个大文件,其中有许多包都包含的都是文件中的数据块。显然,咱们能够不用每次都将一个small packet发往网络栈,而是将这些包组合起来,增大负载,最后让这个组合起来的包发往协议栈。这就可让协议层只处理一个包的头部,就能传输更多的数据到用户空间。
可是这类优化的最大问题就是,信息丢失。若是一个packet中设置了一些重要的选项或者标志,若是将这个包和其余包合并,这些选项或者标志就会丢失。这也就是为何不少人都不建议使用LRO的缘由。事实上,LRO对于合并包的规则的定义是很是宽松的。
GRO做为LRO的硬件实现被引入,可是对于哪些包能够组合有着更为严格的规则
若是你有使用过tcpdump而且看到了一些大的难以想象的包,那么颇有可能你的系统已经打开了GRO。你很快就能看到,抓包工具进行抓包的位置是在GRO发生以后,在协议栈的更上层。
napi_gro_receive
函数napi_gro_receive用于处理网络数据的GRO操做(若是GRO打开的话)并将数据传送到协议栈。而一个叫作dev_gro_receive的函数处理了其中的大部分逻辑
dev_gro_receive
这个函数首先检查GRO是否打开,若是打开的话,则准备进行GRO操做。当GRO打开时,首先会遍历一系列的GRO offload filter从而让上层的协议栈对要进行GRO的数据进行处理。这样协议层就能让设备层知道,该packet是否属于正在处理的network flow以及处理一些对于GRO所须要作的特定于协议的事情。例如,TCP协议须要知道是否或者什么时候须要给一个已经组合到现有packet的packet发送ACK
list_for_each_entry_rcu(ptype, head, list) { if (ptype->type != type || !ptype->callbacks.gro_receive) continue; skb_set_network_header(skb, skb_gro_offset(skb)); skb_reset_mac_len(skb); NAPI_GRO_CB(skb)->same_flow = 0; NAPI_GRO_CB(skb)->flush = 0; NAPI_GRO_CB(skb)->free = 0; pp = ptype->callbacks.gro_receive(&napi->gro_list, skb); break; }
若是协议层认为是时候清除GRO packet了,以后就会调用napi_gro_complete进行处理,以后它就会调用协议层对应的gro_complete,最后再调用netif_receive_skb将包传送给网络栈
if (pp) { struct sk_buff *nskb = *pp; *pp = nskb->next; nskb->next = NULL; napi_gro_complete(nskb); napi->gro_count--; }
若是协议层将packet合并进existing flow,napi_gro_receive就会直接返回。若是packet没有被合并,而且如今的GRO flow小于MAX_GRO_SKBS,以后就会在该NAPI的gro_list新增一个条目
if (NAPI_GRO_CB(skb)->flush || napi->gro_count >= MAX_GRO_SKBS) goto normal; napi->gro_count++; NAPI_GRO_CB(skb)->count = 1; NAPI_GRO_CB(skb)->age = jiffies; skb_shinfo(skb)->gso_size = skb_gro_len(skb); skb->next = napi->gro_list; napi->gro_list = skb; ret = GRO_HELD;
Linux网络栈中的GRO系统就是这样工做的
napi_skb_finish
一旦dev_gro_receive运行完成,napi_skb_finish就会被调用,要不就是释放由于包已经被合并就没用了的数据结构,要么调用netif_receive_skb将数据传输给网络栈(由于如今已经有MAX_GRO_SKBS个flow了)。如今是时候看看netif_receive_skb是如何将数据传输给协议层了。可是在此以前,咱们先来看看什么是Receive Packet Steering(RPS)
Receive Packet Steering(RPS)
咱们已经知道每一个网络设备驱动都注册了一个NAPI poll函数。每一个NAPI poller实例都执行在每一个CPU的softirq上下文中。而处理驱动的IRQ handler的CPU会唤醒它的softirq processing loop去处理包。换句话说:处理硬件中断的CPU也会用于poll相应的输入数据。
有的硬件(例如Intel I350)在硬件层面支持multiple queue。这意味着输入的数据会被分流到不一样的receive queue,并被DMA到不一样的内存区域,从而会有不一样的NAPI结构处理对应的区域。从而能让多个CPU并行地处理来自设备的中断而且对数据进行处理。
这个特性咱们就称做Receive Side Scaling(RSS)
而Receive Packet Steering(RPS)是RSS的软件实现。由于它是由软件实现的,所以它能够用于任何网卡,即便是那些只有一个receive queue的网卡。然而,一样由于是软件层面的实现,RPS只能在包从DMA内存区域中取出以后,才能对它进行处理。这意味着,你并不会看到CPU使用在处理IRQ或者NAPI poll loop的时间降低,可是你能够从获取到包以后,对它进行负载均衡,而且今后处开始,到协议层向上减小CPU时间。
RPS经过对输入的数据计算出一个哈希值肯定该由哪一个CPU对其进行处理。以后,该数据会被排入每一个CPU的receive network backlog等待处理。一个Inter-processor Interrupt(IPI)会被发往拥有该backlog的CPU。这会帮助触发backlog的处理,若是它当前仍未进行处理的话。/proc/net/softnet_stat中包含了每个softnet_data中接收到的IPI的次数
所以,netif_receive_skb要么会接着将数据送往网络栈,要么就会经过RPS将它发往其余CPU进行处理
Receive Flow Steering(RFS)
Receive Flow Steering(RFS)一般会和RPS混合使用。RPS会将输入数据在多个CPU之间进行负载均衡,可是它并不会考虑局部性从而最大化CPU cache的命中率。你可使用RFS将属于同一个flow的数据送往同一个CPU处理,从而提升cache命中率。
Hardware accelerated Receive Flow Steering(aRFS)
RFS可使用hardware acceleration来加速。网卡和内核能够联合起来,共同决定哪一个flow须要发往哪一个CPU进行处理。为了使用这一特性,你的网卡和驱动必须对它支持。若是你的网卡驱动有一个叫作ndo_rx_flow_steer的函数,那么该驱动支持accelerated RFS。
Moving up the network stack with netif_receive_skb
netif_receive_skb会在如下两个地方被调用:
须要注意的是netif_receive_skb以及它后续调用的函数都是在softirq processing loop的上下文中进行的。netif_receive_skb首先检查一个sysctl的值用来确认用户是否要求在packet进入backlog queue以前或以后加入receive timestamp。若是有设置的话,如今就对该数据进行timestamp,在进行RPS以前。若是未被设置,则会在它加入队列以后在打timestamp。这能够将timestamp形成的负载在多个CPU间进行均衡,不过一样会引入延迟
netif_receive_skb
当timestamp被处理完以后,netif_receive_skb会根据RPS是否可用进行不一样的操做。让咱们先从最简单的开始:RPS不可用
Without RPS(default setting)
若是RPS不可用,首先会调用__netif_receive_skb作一些bookkeeping接着再调用__netif_receive_skb_core将数据移往协议栈。咱们很快就会看到__netif_receive_skb_core是如何工做的,不过在此以前,咱们先来看看RPS可用时的传输路径是怎样的,由于该代码一样会调用__netif_receive_skb_core。
With RPS enabled
若是RPS可用的话,在timestamp选项被处理完以后,netif_receive_skb会进行一些计算用于决定该使用哪一个CPU的backlog queue。这是经过函数get_rps_cpu完成的
cpu = get_rps_cpu(skb->dev, skb, &rflow); if (cpu >= 0) { ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail); rcu_read_unlock(); return ret; }
get_rps_cpu会将上文所述的RFS和aRFS都考虑在内,并调用enqueue_to_backlog将数据加入相应的CPU的backlog queue
enqueue_to_backlog
该函数首先获取远程CPU的softnet_data结构的指针,其中包含了一个指向input_pkt_queue的指针。接着,获取远程CPU的input_pkt_queue的队列长度
qlen = skb_queue_len(&sd->input_pkt_queue); if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) {
input_pkt_queue的长度首先和netdev_max_backlog相比较。若是队列的长度大于该值,则数据被丢弃。一样flow limit也会被检查,若是超过了,数据一样会被丢弃。这两种状况下,softnet_data节后中丢弃包的数目都将增长。注意这里的softnet_data是数据将要发往的CPU的。
enqueue_to_backlog并不会在不少地方被调用。它只会在RPS可用的包处理过程当中或者netif_rx中。许多驱动不该该使用netif_rx,而应该使用netif_receive_skb。若是你不使用RPS或者你的驱动不使用netif_rx,那么增长backlog不会对你的系统产生任何影响,由于它根本就没被用到。(若是你的驱动使用netif_receive_skb而且未使用RPS,那么增长netdev_max_backlog不会产生任何性能上的提升,由于没有数据会被加入到input_pkt_queue中)
若是input_pkt_queue足够小,而也没有超过flow limit,数据就会被加入队列。大概的逻辑以下:
if (skb_queue_len(&sd->input_pkt_queue)) { enqueue: __skb_queue_tail(&sd->input_pkt_queue, skb); input_queue_tail_incr_save(sd, qtail); rps_unlock(sd); local_irq_restore(flags); return NET_RX_SUCCESS; } /* Schedule NAPI for backlog device * We can use non atomic operation since we own the queue lock */ if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) { if (!rps_ipi_queued(sd)) ____napi_schedule(sd, &sd->backlog); } goto enqueue;
Flow limits
RPS会将包分发到多个CPU进行处理,不过一个large flow极可能会占据整个CPU,从而让其余small flow处于饥饿状态。flow limit可以让每一个flow添加到backlog中的包的数目有一个最大值。这个特性能够帮助small flow一样可以获得处理,即便有larger flow的包也在入队
backlog queue NAPI poller
每一个CPU的backlog queue以和设备驱动程序同样的方式插入NAPI。一个poll函数用于处理来自softirq上下文的包,一样还提供了一个weight。这个NAPI结构在网络系统初始化的时候被处理:
sd->backlog.poll = process_backlog; sd->backlog.weight = weight_p; sd->backlog.gro_list = NULL; sd->backlog.gro_count = 0;
backlog的NAPI结构和驱动的NAPI有所不一样,它的weight参数是能够调节的,而驱动程序则会将它们的NAPI weight硬编码为64。
process_backlog
process_backlog函数是一个循环,直到它的weight耗尽或者backlog中没有其余数据须要处理。每个在backlog中的数据都将从backlog queue传输到__netif_receive_skb。一旦数据到达__netif_receive_skb以后,它的传输路径就和RPS不可用时同样了。__netif_receive_skb只是在调用__netif_receive_skb_core将数据传输到协议栈以前作一些bookkeeping。
process_backlog和驱动程序使用NAPI的方式相同:若是weight没用完,那么关闭NAPI。而poller在enqueue_to_backlog调用__napi_schedule以后被从新启动。
该函数会返回已经完成的工做量,net_rx_action会将它从budget中减去
__netif_receive_skb_core delivers data to packet taps and protocol layer
__netif_receive_skb_core用于完成将数据传往网络栈的工做。在此以前,它先确认是否安装了packet taps用来抓取输入的包。其中一个例子就是libcap使用的AF_PACKET address family。若是有这样的tap存在,则数据先被发往tap,在被发往协议层。
Packet tap delivery
若是安装了packet tap,则包将安装如下代码被发送:
list_for_each_entry_rcu(ptype, &ptype_all, list) { if (!ptype->dev || ptype->dev == skb->dev) { if (pt_prev) ret = deliver_skb(skb, pt_prev, orig_dev); pt_prev = ptype; } }
Protocol layer delivery
一旦tap处理完成以后,__netif_receive_skb_core会将数据发往协议层。先从数据中获取protocol field,而后再遍历一系列该协议类型对应的deliver functions
type = skb->protocol; list_for_each_entry_rcu(ptype, &ptype_base[ntohs(type) & PTYPE_HASH_MASK], list) { if (ptype->type == type && (ptype->dev == null_or_dev || ptype->dev == skb->dev || ptype->dev == orig_dev)) { if (pt_prev) ret = deliver_skb(skb, pt_prev, orig_dev); pt_prev = ptype; } }
上文中的ptype_base是一个以下所示的哈希表:
struct list_head ptype_base[PTYPE_HASH_SIZE] __read_mostly;
每一个协议层都会在哈希表给定的slot中加入一个filter,经过以下的ptype_head函数计算:
static inline struct list_head *ptype_head(const struct packet_type *pt) { if (pt->type == htons(ETH_P_ALL)) return &ptype_all; else return &ptype_base[ntohs(pt->type) & PTYPE_HASH_MASK]; }
将filter加入list的操做是由dev_add_pack完成的。这就是协议层如何注册本身,从而获取发往它们的数据的方法。
如今咱们就知道了数据如何从网卡发往协议层
Protocol layer registration
如今咱们已经知道了数据如何从网络设备发往协议栈,下面咱们就来看看协议层是如何注册本身的。
IP protocol layer
IP协议层会先把本身注册到ptype_base这个哈希表中,从而让数据可以从网络设备发往它
dev_add_pack(&ip_packet_type);
static struct packet_type ip_packet_type __read_mostly = { .type = cpu_to_be16(ETH_P_IP), .func = ip_rcv, };
__netif_receive_skb_core会调用deliver_skb,而它最终会调用func(在这里,即为ip_rcv)
ip_rcv
ip_rcv的操做很是直接,首先对数据进行检查,而后更新一些统计数据。ip_rcv会最终经过netfilter将packet发往ip_rcv_finish,从而让那些iptables中匹配IP协议层的规则可以对数据进行处理
return NF_HOOK(NFPROTO_IPV4, NF_INET_PRE_ROUTING, skb, dev, NULL, ip_rcv_finish);
须要注意的是,若是你有很是多,很是复杂的netfilter或者iptables规则,这些规则都会在softirq上下文中执行,从而致使网络栈的延时,而这每每是不可避免的。
ip_rcv_finish
当netfilter并无把包丢弃时,就会调用ip_rcv_finish。ip_rcv_finish开始就有一个优化,为了将包传输到合适的地方,首先要从路由系统中获取dst_entry。所以,首先须要调用该数据发往的高层协议的early_demux。early_demux首先会判断是否有dst_entry缓存在socket结构中
if (sysctl_ip_early_demux && !skb_dst(skb) && skb->sk == NULL) { const struct net_protocol *ipprot; int protocol = iph->protocol; ipprot = rcu_dereference(inet_protos[protocol]); if (ipprot && ipprot->early_demux) { ipprot->early_demux(skb); /* must reload iph, skb->head might have changed */ iph = ip_hdr(skb); } }
咱们能够看到这部分代码是由sysctl_ip_early_demux控制的。early_demux默认是打开的。若是该优化是打开的,而且没有cached entry(由于这是第一个到达的packet),则这个packet会被发往路由系统,在那可以获取dst_entry。
一旦路由系统工做完毕以后,就会更新计数器,而后再调用dst_input(skb),它转而会调用刚刚获取的dst_entry结构中的input function pointer。
若是packet的最终目的地是本地,那么路由系统就会将ip_local_deliver赋值给dst_entry中的input function pointer。
ip_local_deliver
/* * Deliver IP Packets to the higher protocol layers. */ int ip_local_deliver(struct sk_buff *skb) { /* * Reassemble IP fragments. */ if (ip_is_fragment(ip_hdr(skb))) { if (ip_defrag(skb, IP_DEFRAG_LOCAL_DELIVER)) return 0; } return NF_HOOK(NFPROTO_IPV4, NF_INET_LOCAL_IN, skb, skb->dev, NULL, ip_local_deliver_finish); }
和ip_rcv_finish相似,netfilter会先对packet进行检查,若未被丢弃,则调用ip_local_deliver_finish
ip_local_deliver_finish
ip_local_deliver_finish从packet中获取protocol,而后查询该protocol注册的net_protocol结构,接着再调用该net_protocol结构中的handler函数指针。这就将packet发往更高的协议层了。
Higher level protocol registration
本篇文章主要分析UDP,可是TCP protocol handler和UDP protocol handler的注册方式是相同的。在net/ipv4/af_inet.c中的函数定义包含了用于UDP,TCP和ICMP协议和IP协议层进行链接的处理函数
static const struct net_protocol tcp_protocol = { .early_demux = tcp_v4_early_demux, .handler = tcp_v4_rcv, .err_handler = tcp_v4_err, .no_policy = 1, .netns_ok = 1, }; static const struct net_protocol udp_protocol = { .early_demux = udp_v4_early_demux, .handler = udp_rcv, .err_handler = udp_err, .no_policy = 1, .netns_ok = 1, }; static const struct net_protocol icmp_protocol = { .handler = icmp_rcv, .err_handler = icmp_err, .no_policy = 1, .netns_ok = 1, };
这些结构都在inet address family的初始化代码中被注册
/* * Add all the base protocols. */ if (inet_add_protocol(&icmp_protocol, IPPROTO_ICMP) < 0) pr_crit("%s: Cannot add ICMP protocol\n", __func__); if (inet_add_protocol(&udp_protocol, IPPROTO_UDP) < 0) pr_crit("%s: Cannot add UDP protocol\n", __func__); if (inet_add_protocol(&tcp_protocol, IPPROTO_TCP) < 0) pr_crit("%s: Cannot add TCP protocol\n", __func__);
咱们关注的是UDP协议层,所以对应的处理函数是udp_rcv。这就是数据从IP层通往UDP层的入口
UDP protocol layer
udp_rcv
udp_rcv函数只有一行代码用于直接调用__udp4_lib_rcv用于接收数据
__udp4_lib_rcv
__udp4_lib_rcv函数会检查packet是否合法,接着再获取UDP header,UDP数据报长度,源地址,目的地址,而后是一些完整性检查和checksum verification。
以前在IP层的时候,咱们已经看到在将包传送到上层协议以前会将dst_entry和packet相绑定。若是socket和对应的dst_entry已经找到了,那么__udp4_lib_rcv会将包存入socket:
sk = skb_steal_sock(skb); if (sk) { struct dst_entry *dst = skb_dst(skb); int ret; if (unlikely(sk->sk_rx_dst != dst)) udp_sk_rx_dst_set(sk, dst); ret = udp_queue_rcv_skb(sk, skb); sock_put(sk); /* a return value > 0 means to resubmit the input, but * it wants the return to be -protocol, or 0 */ if (ret > 0) return -ret; return 0; } else {
若是在以前的early_demux操做中没有找到socket,那么就会调用__udp4_lib_lookup_skb对receiving socket进行查找。不管上述哪一种状况,最终数据将被存入socket:
ret = udp_queue_rcv_skb(sk, skb); sock_put(sk);
若是没有找到socket,那么数据报将被丢弃:
/* No socket. Drop packet silently, if checksum is wrong */ if (udp_lib_checksum_complete(skb)) goto csum_error; UDP_INC_STATS_BH(net, UDP_MIB_NOPORTS, proto == IPPROTO_UDPLITE); icmp_send(skb, ICMP_DEST_UNREACH, ICMP_PORT_UNREACH, 0); /* * Hmm. We got an UDP packet to a port to which we * don't wanna listen. Ignore it. */ kfree_skb(skb); return 0;
udp_queue_rcv_skb
这个函数的初始部分以下所示:
最终咱们到达了处理receive queue的逻辑,首先检查socket对应的receive queue是否是已经满了:
if (sk_rcvqueues_full(sk, skb, sk->sk_rcvbuf)) goto drop;
sk_rcvqueue_full
sk_rcvqueue_full函数会检查socket的backlog长度以及socket的sk_rmem_alloc来确认它们的和是否大于socket的sk_rcvbuf
/* * Take into account size of receive queue and backlog queue * Do not take into account this skb truesize, * to allow even a single big packet to come. */ static inline bool sk_rcvqueues_full(const struct sock *sk, const struct sk_buff *skb, unsigned int limit) { unsigned int qsize = sk->sk_backlog.len + atomic_read(&sk->sk_rmem_alloc); return qsize > limit; }
udp_queue_rcv_skb
一旦证实队列未满,则会继续将数据加入队列
bh_lock_sock(sk); if (!sock_owned_by_user(sk)) rc = __udp_queue_rcv_skb(sk, skb); else if (sk_add_backlog(sk, skb, sk->sk_rcvbuf)) { bh_unlock_sock(sk); goto drop; } bh_unlock_sock(sk); return rc;
第一步先判断socket当前是否被用户进程占用。若是不是,则调用__udp_queue_rcv_skb将数据加入receive queue。若是是,则经过调用sk_add_backlog将数据加入backlog。backlog中的数据最终都会加入receive queue,socket相关的系统调用经过调用release_sock释放了该socket
__udp_queue_rcv_skb
__udp_queue_rcv_skb经过调用sock_queue_rcv_skb将数据加入receive queue,若是该数据不能被加入receive queue,则更新统计数据
rc = sock_queue_rcv_skb(sk, skb); if (rc < 0) { int is_udplite = IS_UDPLITE(sk); /* Note that an ENOMEM error is charged twice */ if (rc == -ENOMEM) UDP_INC_STATS_BH(sock_net(sk), UDP_MIB_RCVBUFERRORS,is_udplite); UDP_INC_STATS_BH(sock_net(sk), UDP_MIB_INERRORS, is_udplite); kfree_skb(skb); trace_udp_fail_queue_rcv_skb(rc, sk); return -1; }
Queuing data to a socket
如今数据已经经过调用sock_queue_rcv加入socket的队列了。这个函数在将数据加入队列前作了以下的操做:
以上就是数据如何到达系统,并经过整个协议栈到达socket并准备给用户进程使用的过程
原文连接:
https://blog.packagecloud.io/eng/2016/06/22/monitoring-tuning-linux-networking-stack-receiving-data/