流程图#

From 《Understanding Linux Network Internals》

image-20220128172744059

image-20220129171450456

收包流程#

TL; DR

image-20220129171450456

  • 收包NET_RX_SOFTRQ的软中断处理函数是net_rx_action
  • net_rx_action中会调用网卡驱动注册的poll回调函数处理
  • poll回调函数将数据帧从网卡ring buffer中取出,构造skb:
    • 运行xdpdrv上的bpf program,得到action result
    • 如果是XDP_PASS,构造skb,并初始化skb中一些metadata字段
  • 调用内核的GRO和RPS处理流程
  • 进入__netif_receive_skb_core,处理skb:
    • 运行xdpgeneric上的bpf program,得到action result
    • 如果是XDP_PASS,遍历ptype_alldev->ptype_all,进行抓包处理
    • tc ingress 处理sch_handle_ingress
    • 查找ptype_basedev->ptype_specific,交由对应的三层协议栈回调函数处理

NAPI#

**NAPI的思想是从完全的中断收包模型,改用中断和polling混合。**如果内核在处理旧的数据帧时,收到了新的数据帧,网卡设备没有必要再触发一个中断。内核继续处理设备input queue里的数据(该设备的interrupt禁止了),在队列为空时重新使能中断。

从内核的角度,NAPI有如下的优势:

  • 降低CPU的负载(更少的中断)
  • 更多的设备处理公平性

以ixgbe网卡为例,描述下NAPI处理流程。

注册#

ixgbe驱动在初始化中断向量时会调用netif_napi_add初始化NAPI,==ixgbe_poll函数注册到napi结构体,并将napi加入到设备的napi_list内==:

/**
 * ixgbe_alloc_q_vector - Allocate memory for a single interrupt vector
 * @adapter: board private structure to initialize
 * @v_count: q_vectors allocated on adapter, used for ring interleaving
 * @v_idx: index of vector in adapter struct
 * @txr_count: total number of Tx rings to allocate
 * @txr_idx: index of first Tx ring to allocate
 * @xdp_count: total number of XDP rings to allocate
 * @xdp_idx: index of first XDP ring to allocate
 * @rxr_count: total number of Rx rings to allocate
 * @rxr_idx: index of first Rx ring to allocate
 *
 * We allocate one q_vector.  If allocation fails we return -ENOMEM.
 **/
static int ixgbe_alloc_q_vector(struct ixgbe_adapter *adapter,
                int v_count, int v_idx,
                int txr_count, int txr_idx,
                int xdp_count, int xdp_idx,
                int rxr_count, int rxr_idx)
{
	/* ... */

    /* initialize NAPI */
    netif_napi_add(adapter->netdev, &q_vector->napi,
               ixgbe_poll, 64);
}

中断处理函数#

ixgbe驱动收到中断后,会调用ixgbe_msix_clean_rings

static irqreturn_t ixgbe_msix_clean_rings(int irq, void *data)
{
    struct ixgbe_q_vector *q_vector = data;

    /* EIAM disabled interrupts (on this vector) for us */

    if (q_vector->rx.ring || q_vector->tx.ring)
        napi_schedule_irqoff(&q_vector->napi);

    return IRQ_HANDLED;
}

napi_schedule_irqoff -> __napi_schedule_irqoff -> ____napi_schedule

static inline void ____napi_schedule(struct softnet_data *sd,
                     struct napi_struct *napi)
{
	/* ... */

    list_add_tail(&napi->poll_list, &sd->poll_list);
    __raise_softirq_irqoff(NET_RX_SOFTIRQ);
}

将当前napi->poll_list加入到sd->poll_list,并触发软中断。

软中断处理函数#

==NET_RX_SOFTRQ的处理函数是net_rx_action==。

static __latent_entropy void net_rx_action(struct softirq_action *h)
{
    struct softnet_data *sd = this_cpu_ptr(&softnet_data);
    unsigned long time_limit = jiffies +
        usecs_to_jiffies(netdev_budget_usecs);
    int budget = netdev_budget;
    LIST_HEAD(list);
    LIST_HEAD(repoll);

    local_irq_disable();
    list_splice_init(&sd->poll_list, &list);
    local_irq_enable();

    for (;;) {
        struct napi_struct *n;

        if (list_empty(&list)) {
            if (!sd_has_rps_ipi_waiting(sd) && list_empty(&repoll))
                return;
            break;
        }

        n = list_first_entry(&list, struct napi_struct, poll_list);
        budget -= napi_poll(n, &repoll);

        /* If softirq window is exhausted then punt.
         * Allow this to run for 2 jiffies since which will allow
         * an average latency of 1.5/HZ.
         */
        if (unlikely(budget <= 0 ||
                 time_after_eq(jiffies, time_limit))) {
            sd->time_squeeze++;
            break;
        }
    }

    local_irq_disable();

    list_splice_tail_init(&sd->poll_list, &list);
    list_splice_tail(&repoll, &list);
    list_splice(&list, &sd->poll_list);
    if (!list_empty(&sd->poll_list))
        __raise_softirq_irqoff(NET_RX_SOFTIRQ);

    net_rps_action_and_irq_enable(sd);
}

netdev_budgetnetdev_budget_usecs可以通过sysctl进行调整:

net.core.netdev_budget = 300
net.core.netdev_budget_usecs = 2000

每运行一次napi_poll,budget值减1。当budget值为0或者超过了时间限制,跳出循环。

napi_poll会调用网卡注册的poll函数,如前所述,在ixgbe网卡中就是ixgbe_poll。==ixgbe_poll主要功能是将数据帧从ring buffer中取出,并重构成skb,交由协议栈处理。==

而这个功能的主要函数是ixgbe_clean_rx_irq_zcixgbe_clean_rx_irq。前者是XDP socket调用,后者是正常的包调用。

/**
 * ixgbe_poll - NAPI Rx polling callback
 * @napi: structure for representing this polling device
 * @budget: how many packets driver is allowed to clean
 *
 * This function is used for legacy and MSI, NAPI mode
 **/
int ixgbe_poll(struct napi_struct *napi, int budget)
{

  /* ... */

  // 为什么会在RX软中断中清理tx?
  ixgbe_for_each_ring(ring, q_vector->tx) {
    bool wd = ring->xsk_pool ?
        ixgbe_clean_xdp_tx_irq(q_vector, ring, budget) :
        ixgbe_clean_tx_irq(q_vector, ring, budget);

    if (!wd)
      clean_complete = false;
  }

  /* ... */

  // rx ring buffer处理
  ixgbe_for_each_ring(ring, q_vector->rx) {
    int cleaned = ring->xsk_pool ?
            ixgbe_clean_rx_irq_zc(q_vector, ring,
                per_ring_budget) :
            ixgbe_clean_rx_irq(q_vector, ring,
             per_ring_budget);

    work_done += cleaned;
    if (cleaned >= per_ring_budget)
      clean_complete = false;
  }

  /* ... */

  return min(work_done, budget - 1);
}

==xdpdriver的调用点也在这两个函数里面==

// drivers/net/ethernet/intel/ixgbe/ixgbe_xsk.c:ixgbe_clean_rx_irq_zc
xdp_res = ixgbe_run_xdp_zc(adapter, rx_ring, bi->xdp);

// drivers/net/ethernet/intel/ixgbe/ixgbe_main.c:ixgbe_clean_rx_irq
skb = ixgbe_run_xdp(adapter, rx_ring, &xdp);

另外,为了支持AF_XDP的高性能特性,ixgbe_clean_rx_irq_zc如名所述是零拷贝的(==但是看代码zc的函数里面还是有memcpy,那么AF_XDP的zc体现在哪里呢?==)。

我们用ixgbe_clean_rx_irq来分析下里面的流程。

ixgbe_clean_rx_irq#

函数调用ixgbe_get_rx_buffer从ring buffer中取数据,将传入的skb指针指向rx_buffer->skb

这里有个有意思的点是如果skb的值为NULL,驱动才会运行xdp程序:

// drivers/net/ethernet/intel/ixgbe/ixgbe_main.c:ixgbe_clean_rx_irq

	rx_buffer = ixgbe_get_rx_buffer(rx_ring, rx_desc, &skb, size, &rx_buffer_pgcnt);

    /* retrieve a buffer from the ring */
    if (!skb) {
      unsigned char *hard_start;

      hard_start = page_address(rx_buffer->page) +
             rx_buffer->page_offset - offset;
      xdp_prepare_buff(&xdp, hard_start, offset, size, true);
#if (PAGE_SIZE > 4096)
      /* At larger PAGE_SIZE, frame_sz depend on len size */
      xdp.frame_sz = ixgbe_rx_frame_truesize(rx_ring, size);
#endif
      skb = ixgbe_run_xdp(adapter, rx_ring, &xdp);
    }

查看rx_buffer的定义可以发现,结构体里面定义了一个union,rx_buffer->skb是结构体的首个成员;而如果使用union中另一个,首个成员是一个bool变量discard。也就是说,如果使用了xdp,rx_buffer->skb的结果就是一个0值。

// drivers/net/ethernet/intel/ixgbe/ixgbe.h

struct ixgbe_rx_buffer {
  union {
    struct {
      struct sk_buff *skb;
      dma_addr_t dma;
      struct page *page;
      __u32 page_offset;
      __u16 pagecnt_bias;
    };
    struct {
      bool discard;
      struct xdp_buff *xdp;
    };
  };
};

如果运行了xdp,ixgbe_get_rx_buffer会根据ixgbe_run_xdp的返回值来进行相应的操作。redirect和xmit会直接在xdp中将包转走,所以无需在这里构建skb。只有pass需要在函数中通过调用ixgbe_build_skb或者ixgbe_construct_skb构建skb。

在构建好skb后,通过调用ixgbe_put_rx_bufferixgbe_is_non_eop回收ring buffer。eop是end-of-packet的缩写,这是因为帧可能会占据多个ring buffer,所以用循环将一个包数据都取下来,用同一个skb表示。

在skb构建完成后,在ixgbe_rx_skb中调用napi_gro_receive,进入tc层处理数据包。

napi_gro_receive#

napi_gro_receive中的调用链如下所示:

napi_gro_receive
│
└─napi_skb_finish
  │
  └─gro_normal_one
    │
    └─gro_normal_list
      │
      └─netif_receive_skb_list_internal
        │
        └─__netif_receive_skb_list
          │
          └─__netif_receive_skb_list_core
            │
            └─__netif_receive_skb_core

前面几个函数用于GRO和RPS处理。而在__netif_receive_skb_core中,先后会做:

  • xdp generic处理 do_xdp_generic
  • 抓包hook
  • tc ingress处理 sch_handle_ingress
  • 将包交给具体的协议回调函数处理

我们来看看抓包和协议栈回调的处理:

// net/core/dev.c:__netif_receive_skb_core 抓包hook逻辑
  list_for_each_entry_rcu(ptype, &ptype_all, list) {
    if (pt_prev)
      ret = deliver_skb(skb, pt_prev, orig_dev);
    pt_prev = ptype;
  }

  list_for_each_entry_rcu(ptype, &skb->dev->ptype_all, list) {
    if (pt_prev)
      ret = deliver_skb(skb, pt_prev, orig_dev);
    pt_prev = ptype;
  }

// net/core/dev.c:__netif_receive_skb_core 协议栈回调
  type = skb->protocol;

  /* deliver only exact match when indicated */
  if (likely(!deliver_exact)) {
    deliver_ptype_list_skb(skb, &pt_prev, orig_dev, type,
               &ptype_base[ntohs(type) &
               PTYPE_HASH_MASK]);
  }

  deliver_ptype_list_skb(skb, &pt_prev, orig_dev, type,
             &orig_dev->ptype_specific);

  if (unlikely(skb->dev != orig_dev)) {
    deliver_ptype_list_skb(skb, &pt_prev, orig_dev, type,
               &skb->dev->ptype_specific);
  }

static inline int deliver_skb(struct sk_buff *skb,
            struct packet_type *pt_prev,
            struct net_device *orig_dev)
{
  if (unlikely(skb_orphan_frags_rx(skb, GFP_ATOMIC)))
    return -ENOMEM;
  refcount_inc(&skb->users);
  return pt_prev->func(skb, skb->dev, pt_prev, orig_dev);
}

static inline void deliver_ptype_list_skb(struct sk_buff *skb,
            struct packet_type **pt,
            struct net_device *orig_dev,
            __be16 type,
            struct list_head *ptype_list)
{
  struct packet_type *ptype, *pt_prev = *pt;

  list_for_each_entry_rcu(ptype, ptype_list, list) {
    if (ptype->type != type)
      continue;
    if (pt_prev)
      deliver_skb(skb, pt_prev, orig_dev);
    pt_prev = ptype;
  }
  *pt = pt_prev;
}

可以看到,两者在最后都是在deliver_skb中调用了pt_prev中注册的回调函数。pt_prev的类型是struct packet_type

// include/linux/netdevice.h
struct packet_type {
  __be16      type; /* This is really htons(ether_type). */
  bool      ignore_outgoing;
  struct net_device *dev; /* NULL is wildcarded here       */
  int     (*func) (struct sk_buff *,
           struct net_device *,
           struct packet_type *,
           struct net_device *);
  void      (*list_func) (struct list_head *,
                struct packet_type *,
                struct net_device *);
  bool      (*id_match)(struct packet_type *ptype,
              struct sock *sk);
  void      *af_packet_priv;
  struct list_head  list;
};

type是ether_type,对于抓包的回调来说,这个值是ETH_P_ALL。而协议栈通常是具体的ether_type,如ETH_P_IP。根据这一区别,在packet_type注册的时候,就能将两者分别放到不同的数据结构内:

// net/core/dev.c

static inline struct list_head *ptype_head(const struct packet_type *pt)
{
  if (pt->type == htons(ETH_P_ALL))
    return pt->dev ? &pt->dev->ptype_all : &ptype_all;
  else
    return pt->dev ? &pt->dev->ptype_specific :
         &ptype_base[ntohs(pt->type) & PTYPE_HASH_MASK];
}

/**
 *  dev_add_pack - add packet handler
 *  @pt: packet type declaration
 *
 *  Add a protocol handler to the networking stack. The passed &packet_type
 *  is linked into kernel lists and may not be freed until it has been
 *  removed from the kernel lists.
 *
 *  This call does not sleep therefore it can not
 *  guarantee all CPU's that are in middle of receiving packets
 *  will see the new packet type (until the next received packet).
 */

void dev_add_pack(struct packet_type *pt)
{
  struct list_head *head = ptype_head(pt);

  spin_lock(&ptype_lock);
  list_add_rcu(&pt->list, head);
  spin_unlock(&ptype_lock);
}
EXPORT_SYMBOL(dev_add_pack);

==抓包的packet_type被放在了ptype_all全局变量或者指定设备的dev->ptype_all中;协议栈的packet_type被放在了ptype_base或者指定设备的dev->ptype_specific中。==

通过过滤static struct packet_type可以看到各个协议注册的packet_type类型,如IP协议:

// net/ipv4/af_inet.c
static struct packet_type ip_packet_type __read_mostly = {
  .type = cpu_to_be16(ETH_P_IP),
  .func = ip_rcv,
  .list_func = ip_list_rcv,
};

从中就可以找到对应协议的处理函数。

发包流程#

dev_queue_xmit#

内核中网卡驱动发包流程的起点是dev_queue_xmit函数。发包分两种方式:

  • 通过Traffic Control接口发送;
  • 直接调用dev_hard_start_xmit发送出去。

Traffic Control 接口#

    if (q->enqueue) {
        rc = __dev_xmit_skb(skb, q, dev, txq);
        goto out;
    }

通过Qdisc内的enqueue指针,判断net_device是否存在队列==(**TODO:**enqueue在哪初始化的)==。如果存在dev_queue_xmit就调用了__dev_xmit_skb

__dev_xmit_skb主要是TC里的==queuing dispipline==的逻辑(TODO),发包的函数是__qdisc_runqdisc_run__qdisc_run的封装函数。

// net/core/dev.c
int dev_tx_weight __read_mostly = 64;

// net/sched/sch_generic.c
static inline bool qdisc_restart(struct Qdisc *q, int *packets)
{
    spinlock_t *root_lock = NULL;
    struct netdev_queue *txq;
    struct net_device *dev;
    struct sk_buff *skb;
    bool validate;

    /* Dequeue packet */
    skb = dequeue_skb(q, &validate, packets);
    if (unlikely(!skb))
        return false;

    if (!(q->flags & TCQ_F_NOLOCK))
        root_lock = qdisc_lock(q);

    dev = qdisc_dev(q);
    txq = skb_get_tx_queue(dev, skb);

    return sch_direct_xmit(skb, q, dev, txq, root_lock, validate);
}

void __qdisc_run(struct Qdisc *q)
{
    int quota = dev_tx_weight;
    int packets;

    while (qdisc_restart(q, &packets)) {
        quota -= packets;
        if (quota <= 0) {
            if (q->flags & TCQ_F_NOLOCK)
                set_bit(__QDISC_STATE_MISSED, &q->state);
            else
                __netif_schedule(q);

            break;
        }
    }
}

可以看到,每次运行__qdisc_run有一个阈值quota = 64。每次循环调用qdisc_restart,从队列里面dequeue_skb,最后调用sch_direct_xmit发送出去。sch_direct_xmit里面发包和无队列的net_device一样,调用dev_hard_start_xmit

直接发包#

调用流程:dev_hard_start_xmit -> xmit_one -> netdev_start_xmit -> __netdev_start_xmit -> ops->ndo_start_xmit

// net/core/dev.c
static int xmit_one(struct sk_buff *skb, struct net_device *dev,
            struct netdev_queue *txq, bool more)
{
    unsigned int len;
    int rc;

    if (dev_nit_active(dev))
        dev_queue_xmit_nit(skb, dev);

    len = skb->len;
    PRANDOM_ADD_NOISE(skb, dev, txq, len + jiffies);
    trace_net_dev_start_xmit(skb, dev);
    rc = netdev_start_xmit(skb, dev, txq, more);
    trace_net_dev_xmit(skb, rc, dev, len);

    return rc;
}

struct sk_buff *dev_hard_start_xmit(struct sk_buff *first, struct net_device *dev,
                    struct netdev_queue *txq, int *ret)
{
    struct sk_buff *skb = first;
    int rc = NETDEV_TX_OK;

    while (skb) {
        struct sk_buff *next = skb->next;

        skb_mark_not_on_list(skb);
        rc = xmit_one(skb, dev, txq, next != NULL);
        if (unlikely(!dev_xmit_complete(rc))) {
            skb->next = next;
            goto out;
        }

        skb = next;
        if (netif_tx_queue_stopped(txq) && skb) {
            rc = NETDEV_TX_BUSY;
            break;
        }
    }

out:
    *ret = rc;
    return skb;
}

==ndo_start_xmit是每个网卡设备注册的发包函数。==以ixgbe驱动为例,注册的函数是ixgbe_xmit_frame

ixgbe_xmit_frame -> __ixgbe_xmit_frame -> ixgbe_xmit_frame_ring -> ixgbe_tx_map

驱动发包完成后,调用dev_kfree_skb_any,如果当前在中断上下文内,调用__dev_kfree_skb_irq,否则直接释放skb。

// net/core/dev.c
void __dev_kfree_skb_irq(struct sk_buff *skb, enum skb_free_reason reason)
{
    unsigned long flags;

    if (unlikely(!skb))
        return;

    if (likely(refcount_read(&skb->users) == 1)) {
        smp_rmb();
        refcount_set(&skb->users, 0);
    } else if (likely(!refcount_dec_and_test(&skb->users))) {
        return;
    }
    get_kfree_skb_cb(skb)->reason = reason;
    local_irq_save(flags);
    skb->next = __this_cpu_read(softnet_data.completion_queue);
    __this_cpu_write(softnet_data.completion_queue, skb);
    raise_softirq_irqoff(NET_TX_SOFTIRQ);
    local_irq_restore(flags);
}

void __dev_kfree_skb_any(struct sk_buff *skb, enum skb_free_reason reason)
{
    if (in_irq() || irqs_disabled())
        __dev_kfree_skb_irq(skb, reason);
    else
        dev_kfree_skb(skb);
}
EXPORT_SYMBOL(__dev_kfree_skb_any);

__dev_kfree_skb_irq把发送完的skb指针放到当前CPU的softnet_data.completion_queue中,然后唤醒软中断处理函数。

软中断处理函数net_tx_action#

发包软中断处理函数比较简单,由两部分组成:

  1. 将网卡已经发送的skb释放掉;
  2. 当设备允许发包后重新调度,进入发包流程。

之所以把skb释放放在软中断里而不是硬件发完后直接释放,是因为释放buffer可能需要耗费一些时间,而驱动发包代码运行在中断上下文内,耗时越少越好。

// net/core/dev.c:net_tx_action
	if (sd->completion_queue) {
        struct sk_buff *clist;

        local_irq_disable();
        clist = sd->completion_queue;
        sd->completion_queue = NULL;
        local_irq_enable();

        while (clist) {
            struct sk_buff *skb = clist;

            clist = clist->next;

            WARN_ON(refcount_read(&skb->users));
            if (likely(get_kfree_skb_cb(skb)->reason == SKB_REASON_CONSUMED))
                trace_consume_skb(skb);
            else
                trace_kfree_skb(skb, net_tx_action);

            if (skb->fclone != SKB_FCLONE_UNAVAILABLE)
                __kfree_skb(skb);
            else
                __kfree_skb_defer(skb);
        }
    }

第二部分的调度入口函数是__netif_schedule。因为各种原因(如网卡buffer不够,暂时禁用发包或者TC调度限制等)发包受到限制,等到恢复后,调用__netif_schedule进入软中断,使用qdisc_run重新开始发包。

抓包点#

dev_hard_start_xmit里面有egress方向的抓包点dev_queue_xmit_nit

// net/core/dev.c:xmit_one
    if (dev_nit_active(dev))
        dev_queue_xmit_nit(skb, dev);

BPF HOOK点#

tc egress BPF HOOK点在__dev_queue_xmit内:

// net/core/dev.c:__dev_queue_xmit
# ifdef CONFIG_NET_EGRESS
    if (static_branch_unlikely(&egress_needed_key)) {
        skb = sch_handle_egress(skb, &rc, dev);
        if (!skb)
            goto out;
    }
# endif

sch_handle_egress -> tcf_classify -> __tcf_classify -> tp->classify

bpf的classify函数在net/sched/cls_bpf.c内注册:

static struct tcf_proto_ops cls_bpf_ops __read_mostly = {
    .kind       =   "bpf",
    .owner      =   THIS_MODULE,
    .classify   =   cls_bpf_classify,
    .init       =   cls_bpf_init,
    .destroy    =   cls_bpf_destroy,
    .get        =   cls_bpf_get,
    .change     =   cls_bpf_change,
    .delete     =   cls_bpf_delete,
    .walk       =   cls_bpf_walk,
    .reoffload  =   cls_bpf_reoffload,
    .dump       =   cls_bpf_dump,
    .bind_class =   cls_bpf_bind_class,
};