netdev_rx_queue表示对应的接收队列,不少网卡硬件上已经支持多个队列,此时就会有多个netdev_rx_queue队列,这个结构是挂在net_device,初始化接收队列的函数:netif_alloc_rx_queuesapi
netif_alloc_rx_queues
数组
static int netif_alloc_rx_queues(struct net_device *dev) { /*获取接收队列的个数*/ unsigned int i, count = netdev_extended(dev)->rps_data.num_rx_queues; struct netdev_rx_queue *rx; BUG_ON(count < 1); /*分配netdev_rx_queue 空间*/ rx = kcalloc(count, sizeof(struct netdev_rx_queue), GFP_KERNEL); if (!rx) { pr_err("netdev: Unable to allocate %u rx queues.\n", count); return -ENOMEM; } /* netdev_rx_queue 和net_device关联起来。*/ netdev_extended(dev)->rps_data._rx = rx; /*对netdev_rx_queue 中net_device进行赋值操做*/ for (i = 0; i < count; i++) rx[i].dev = dev; return 0; } struct netdev_rx_queue { /*保存当前队列的rps map*/ struct rps_map *rps_map; /* //每一个设备的队列保存了一个rps_dev_flow_table */ struct rps_dev_flow_table *rps_flow_table; //对应的kobject struct kobject kobj; /*所属的net_device*/ struct net_device *dev; } ____cacheline_aligned_in_smp; struct rps_map { /*CPU的个数,也就是CPU数组的个数*/ unsigned int len; struct rcu_head rcu; /*保存了CPU的ID*/ u16 cpus[0]; };
static int get_rps_cpu(struct net_device *dev, struct sk_buff *skb, struct rps_dev_flow **rflowp) { struct ipv6hdr *ip6; struct iphdr *ip; struct netdev_rx_queue *rxqueue; struct rps_map *map; struct rps_dev_flow_table *flow_table; struct rps_sock_flow_table *sock_flow_table; struct netdev_rps_info *rpinfo = &netdev_extended(dev)->rps_data; int cpu = -1; int tcpu; u8 ip_proto; u32 addr1, addr2, ports, ihl; rcu_read_lock(); if (skb_rx_queue_recorded(skb)) { /*获取设备对应的rx队列。*/ u16 index = skb_get_rx_queue(skb); if (unlikely(index >= rpinfo->num_rx_queues)) { WARN_ONCE(rpinfo->num_rx_queues > 1, "%s received packet " "on queue %u, but number of RX queues is %u\n", dev->name, index, rpinfo->num_rx_queues); goto done; } rxqueue = rpinfo->_rx + index; } else rxqueue = rpinfo->_rx; if (!rxqueue->rps_map && !rxqueue->rps_flow_table) goto done; if (skb->rxhash) //若是硬件已经计算过,则直接跳过,不须要计算HASH值 goto got_hash; /* Skip hash computation on packet header */ switch (skb->protocol) { /*根据不一样的IP协议获取源IP和目的IP*/ case __constant_htons(ETH_P_IP): if (!pskb_may_pull(skb, sizeof(*ip))) goto done; ip = (struct iphdr *) skb->data; ip_proto = ip->protocol; addr1 = ip->saddr; addr2 = ip->daddr; ihl = ip->ihl; break; case __constant_htons(ETH_P_IPV6): if (!pskb_may_pull(skb, sizeof(*ip6))) goto done; ip6 = (struct ipv6hdr *) skb->data; ip_proto = ip6->nexthdr; addr1 = ip6->saddr.s6_addr32[3]; addr2 = ip6->daddr.s6_addr32[3]; ihl = (40 >> 2); break; default: goto done; } ports = 0; switch (ip_proto) { case IPPROTO_TCP: case IPPROTO_UDP: case IPPROTO_DCCP: case IPPROTO_ESP: case IPPROTO_AH: case IPPROTO_SCTP: case IPPROTO_UDPLITE: if (pskb_may_pull(skb, (ihl * 4) + 4)) ports = *((u32 *) (skb->data + (ihl * 4))); /*获取四层协议的端口号,tcp头的前4个字节就是源和目的端口,所以这里跳过ip头获得tcp头的前4个字节*/ break; default: break; } /*根据获取到的SIP和DIP,PORT计算HSAH值,*/ skb->rxhash = jhash_3words(addr1, addr2, ports, hashrnd) >> 16; if (!skb->rxhash) skb->rxhash = 1; got_hash: /* rps_sock_flow_table和rps_dev_flow_table 是为了解决RFS而添加的两张表,rps_sock_flow_table是一个全局的hash表,这个表针对socket的,映射了socket对应的cpu,这里的cpu就是应用层期待软中断所在的cpu ,rps_dev_flow_table,这个是针对设备的,每一个设备队列都含有一个rps_dev_flow_table(这个表主要是保存了上次处理相同连接上的skb所在的cpu),这个hash表中每个元素包含了一个cpu id,一个tail queue的计数器*/ flow_table = rcu_dereference(rxqueue->rps_flow_table); sock_flow_table = rcu_dereference(rps_sock_flow_table); if (flow_table && sock_flow_table) { u16 next_cpu; struct rps_dev_flow *rflow; rflow = &flow_table->flows[skb->rxhash & flow_table->mask]; tcpu = rflow->cpu; next_cpu = sock_flow_table->ents[skb->rxhash & sock_flow_table->mask]; /*首先会获得两个flow table,一个是sock_flow_table,另外一个是设备的rps_flow_table(skb对应的设备队列中对应的flow table),这里的逻辑是这样子的取出来两个cpu,一个是根据rps计算数据包前一次被调度过的cpu(tcpu),一个是应用程序指望的cpu(next_cpu),而后比较这两个值,若是 1 tcpu未设置(等于RPS_NO_CPU) 2 tcpu是离线的 3 tcpu的input_queue_head大于rps_flow_table中的last_qtail 的话就调度这个skb到next_cpu.而这里第三点input_queue_head大于rps_flow_table则说明在当前的dev flow table中的数据包已经发送完毕,不然的话为了不乱序就仍是继续使用tcpu * If the desired CPU (where last recvmsg was done) is * different from current CPU (one in the rx-queue flow * table entry), switch if one of the following holds: * - Current CPU is unset (equal to RPS_NO_CPU). * - Current CPU is offline. * - The current CPU's queue tail has advanced beyond the * last packet that was enqueued using this table entry. * This guarantees that all previous packets for the flow * have been dequeued, thus preserving in order delivery. */ if (unlikely(tcpu != next_cpu) && (tcpu == RPS_NO_CPU || !cpu_online(tcpu) || ((int)(per_cpu(softnet_data, tcpu).input_queue_head - rflow->last_qtail)) >= 0)) { tcpu = rflow->cpu = next_cpu; if (tcpu != RPS_NO_CPU) rflow->last_qtail = per_cpu(softnet_data, tcpu).input_queue_head; } if (tcpu != RPS_NO_CPU && cpu_online(tcpu)) { *rflowp = rflow; /*设置返回的CPU*/ cpu = tcpu; goto done; } } /*当第一次进来时tcpu是RPS_NO_CPU,而且next_cpu也是RPS_NO_CPU,此时会致使跳过rfs处理,而是直接使用rps的处理, */ map = rcu_dereference(rxqueue->rps_map); if (map) { tcpu = map->cpus[((u32) (skb->rxhash * map->len)) >> 16]; /*若是cpu是online的,则返回计算出的这个cpu */ if (cpu_online(tcpu)) { cpu = tcpu; goto done; } } done: rcu_read_unlock(); return cpu; } /*将skb挂在到对应cpu的input queue上的, enqueue_to_backlog接受一个skb和cpu为参数,经过cpu来判断skb如何处理。要么加入所属的input_pkt_queue中,要么schecule 软中断*/
static int enqueue_to_backlog(struct sk_buff *skb, int cpu, unsigned int *qtail) { struct softnet_data *queue; unsigned long flags; /*根据传递过来的CPU,获取softnet_data结构体*/ queue = &per_cpu(softnet_data, cpu); local_irq_save(flags); __get_cpu_var(netdev_rx_stat).total++; spin_lock(&queue->input_pkt_queue.lock); if (queue->input_pkt_queue.qlen <= netdev_max_backlog) { if (queue->input_pkt_queue.qlen) { enqueue:/*将数据包添加到input_pkt_queue队列中*/ __skb_queue_tail(&queue->input_pkt_queue, skb); *qtail = queue->input_queue_head + queue->input_pkt_queue.qlen; spin_unlock_irqrestore(&queue->input_pkt_queue.lock, flags); return NET_RX_SUCCESS; } /* Schedule NAPI for backlog device 能够调度软中断*/ if (napi_schedule_prep(&queue->backlog)) { if (cpu != smp_processor_id()) {/*判断该SKB是否该CPU处理*/ struct rps_remote_softirq_cpus *rcpus = &__get_cpu_var(rps_remote_softirq_cpus); cpu_set(cpu, rcpus->mask[rcpus->select]); __raise_softirq_irqoff(NET_RX_SOFTIRQ); } else /*应该当前cpu处理,则直接schedule 软中断,这里能够看到传递进去的是backlog */ ____napi_schedule(queue, &queue->backlog); } goto enqueue; } spin_unlock(&queue->input_pkt_queue.lock); __get_cpu_var(netdev_rx_stat).dropped++; local_irq_restore(flags); kfree_skb(skb); return NET_RX_DROP; }
static int enqueue_to_backlog(struct sk_buff *skb, int cpu, unsigned int *qtail) { struct softnet_data *queue; unsigned long flags; /*根据传递过来的CPU,获取softnet_data结构体*/ queue = &per_cpu(softnet_data, cpu); local_irq_save(flags); __get_cpu_var(netdev_rx_stat).total++; spin_lock(&queue->input_pkt_queue.lock); if (queue->input_pkt_queue.qlen <= netdev_max_backlog) { if (queue->input_pkt_queue.qlen) { enqueue:/*将数据包添加到input_pkt_queue队列中*/ __skb_queue_tail(&queue->input_pkt_queue, skb); *qtail = queue->input_queue_head + queue->input_pkt_queue.qlen; spin_unlock_irqrestore(&queue->input_pkt_queue.lock, flags); return NET_RX_SUCCESS; } /* Schedule NAPI for backlog device 能够调度软中断*/ if (napi_schedule_prep(&queue->backlog)) { if (cpu != smp_processor_id()) {/*判断该SKB是否该CPU处理*/ struct rps_remote_softirq_cpus *rcpus = &__get_cpu_var(rps_remote_softirq_cpus); cpu_set(cpu, rcpus->mask[rcpus->select]); __raise_softirq_irqoff(NET_RX_SOFTIRQ); } else /*应该当前cpu处理,则直接schedule 软中断,这里能够看到传递进去的是backlog */ ____napi_schedule(queue, &queue->backlog); } goto enqueue; } spin_unlock(&queue->input_pkt_queue.lock); __get_cpu_var(netdev_rx_stat).dropped++; local_irq_restore(flags); kfree_skb(skb); return NET_RX_DROP; }
int inet_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, size_t size, int flags) { struct sock *sk = sock->sk; int addr_len = 0; int err; inet_rps_record_flow(sk);//设置HASH表 err = sk->sk_prot->recvmsg(iocb, sk, msg, size, flags & MSG_DONTWAIT, flags & ~MSG_DONTWAIT, &addr_len); if (err >= 0) msg->msg_namelen = addr_len; return err; }
这个函数主要是获得全局的rps_sock_flow_table,而后调用rps_record_sock_flow来对rps_sock_flow_table进行设置,这里会将socket的sk_rxhash传递进去看成hash的索引,而这个sk_rxhash其实就是skb里面的rxhash,skb的rxhash就是rps中设置的hash值,这个值是根据四元组进行hash的。这里用这个当索引一个是为了相同的socket都能落入一个index。并且下面的软中断上下文也比较容易存取这个hash表socket
点击(此处)折叠或打开tcp
static inline void inet_rps_record_flow(struct sock *sk) { struct rps_sock_flow_table *sock_flow_table; rcu_read_lock(); sock_flow_table = rcu_dereference(rps_sock_flow_table); rps_record_sock_flow(sock_flow_table, inet_sk_rxhash(sk)); rcu_read_unlock(); }
点击(此处)折叠或打开函数
static inline void rps_record_sock_flow(struct rps_sock_flow_table *table, u32 hash) { if (table && hash) { /*获取索引*/ unsigned int cpu, index = hash & table->mask; /* We only give a hint, preemption can change cpu under us 获取CPU */ cpu = raw_smp_processor_id(); /*保存对应的cpu,若是等于当前cpu,则说明已经设置过了*/ if (table->ents[index] != cpu) table->ents[index] = cpu; } }
图:内核代码流程this