Commit f6fb8f10 authored by chetan loke's avatar chetan loke Committed by David S. Miller
Browse files

af-packet: TPACKET_V3 flexible buffer implementation.



1) Blocks can be configured with non-static frame-size.
2) Read/poll is at a block-level(as opposed to packet-level).
3) Added poll timeout to avoid indefinite user-space wait on idle links.
4) Added user-configurable knobs:
   4.1) block::timeout.
   4.2) tpkt_hdr::sk_rxhash.

Changes:
C1) tpacket_rcv()
    C1.1) packet_current_frame() is replaced by packet_current_rx_frame()
          The bulk of the processing is then moved in the following chain:
          packet_current_rx_frame()
            __packet_lookup_frame_in_block
              fill_curr_block()
              or
                retire_current_block
                dispatch_next_block
              or
              return NULL(queue is plugged/paused)
Signed-off-by: default avatarChetan Loke <loke.chetan@gmail.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 0d4691ce
......@@ -40,6 +40,10 @@
* byte arrays at the end of sockaddr_ll
* and packet_mreq.
* Johann Baudy : Added TX RING.
* Chetan Loke : Implemented TPACKET_V3 block abstraction
* layer.
* Copyright (C) 2011, <lokec@ccs.neu.edu>
*
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
......@@ -161,9 +165,56 @@ struct packet_mreq_max {
unsigned char mr_address[MAX_ADDR_LEN];
};
static int packet_set_ring(struct sock *sk, struct tpacket_req *req,
static int packet_set_ring(struct sock *sk, union tpacket_req_u *req_u,
int closing, int tx_ring);
#define V3_ALIGNMENT (8)
#define BLK_HDR_LEN (ALIGN(sizeof(struct block_desc), V3_ALIGNMENT))
#define BLK_PLUS_PRIV(sz_of_priv) \
(BLK_HDR_LEN + ALIGN((sz_of_priv), V3_ALIGNMENT))
/* kbdq - kernel block descriptor queue */
struct kbdq_core {
struct pgv *pkbdq;
unsigned int feature_req_word;
unsigned int hdrlen;
unsigned char reset_pending_on_curr_blk;
unsigned char delete_blk_timer;
unsigned short kactive_blk_num;
unsigned short blk_sizeof_priv;
/* last_kactive_blk_num:
* trick to see if user-space has caught up
* in order to avoid refreshing timer when every single pkt arrives.
*/
unsigned short last_kactive_blk_num;
char *pkblk_start;
char *pkblk_end;
int kblk_size;
unsigned int knum_blocks;
uint64_t knxt_seq_num;
char *prev;
char *nxt_offset;
struct sk_buff *skb;
atomic_t blk_fill_in_prog;
/* Default is set to 8ms */
#define DEFAULT_PRB_RETIRE_TOV (8)
unsigned short retire_blk_tov;
unsigned short version;
unsigned long tov_in_jiffies;
/* timer to retire an outstanding block */
struct timer_list retire_blk_timer;
};
#define PGV_FROM_VMALLOC 1
struct pgv {
char *buffer;
};
......@@ -179,12 +230,40 @@ struct packet_ring_buffer {
unsigned int pg_vec_pages;
unsigned int pg_vec_len;
struct kbdq_core prb_bdqc;
atomic_t pending;
};
#define BLOCK_STATUS(x) ((x)->hdr.bh1.block_status)
#define BLOCK_NUM_PKTS(x) ((x)->hdr.bh1.num_pkts)
#define BLOCK_O2FP(x) ((x)->hdr.bh1.offset_to_first_pkt)
#define BLOCK_LEN(x) ((x)->hdr.bh1.blk_len)
#define BLOCK_SNUM(x) ((x)->hdr.bh1.seq_num)
#define BLOCK_O2PRIV(x) ((x)->offset_to_priv)
#define BLOCK_PRIV(x) ((void *)((char *)(x) + BLOCK_O2PRIV(x)))
struct packet_sock;
static int tpacket_snd(struct packet_sock *po, struct msghdr *msg);
static void *packet_previous_frame(struct packet_sock *po,
struct packet_ring_buffer *rb,
int status);
static void packet_increment_head(struct packet_ring_buffer *buff);
static int prb_curr_blk_in_use(struct kbdq_core *,
struct block_desc *);
static void *prb_dispatch_next_block(struct kbdq_core *,
struct packet_sock *);
static void prb_retire_current_block(struct kbdq_core *,
struct packet_sock *, unsigned int status);
static int prb_queue_frozen(struct kbdq_core *);
static void prb_open_block(struct kbdq_core *, struct block_desc *);
static void prb_retire_rx_blk_timer_expired(unsigned long);
static void _prb_refresh_rx_retire_blk_timer(struct kbdq_core *);
static void prb_init_blk_timer(struct packet_sock *, struct kbdq_core *,
void (*func) (unsigned long));
static void prb_fill_rxhash(struct kbdq_core *, struct tpacket3_hdr *);
static void prb_clear_rxhash(struct kbdq_core *, struct tpacket3_hdr *);
static void prb_fill_vlan_info(struct kbdq_core *, struct tpacket3_hdr *);
static void packet_flush_mclist(struct sock *sk);
struct packet_fanout;
......@@ -193,6 +272,7 @@ struct packet_sock {
struct sock sk;
struct packet_fanout *fanout;
struct tpacket_stats stats;
union tpacket_stats_u stats_u;
struct packet_ring_buffer rx_ring;
struct packet_ring_buffer tx_ring;
int copy_thresh;
......@@ -242,6 +322,15 @@ struct packet_skb_cb {
#define PACKET_SKB_CB(__skb) ((struct packet_skb_cb *)((__skb)->cb))
#define GET_PBDQC_FROM_RB(x) ((struct kbdq_core *)(&(x)->prb_bdqc))
#define GET_PBLOCK_DESC(x, bid) \
((struct block_desc *)((x)->pkbdq[(bid)].buffer))
#define GET_CURR_PBLOCK_DESC_FROM_CORE(x) \
((struct block_desc *)((x)->pkbdq[(x)->kactive_blk_num].buffer))
#define GET_NEXT_PRB_BLK_NUM(x) \
(((x)->kactive_blk_num < ((x)->knum_blocks-1)) ? \
((x)->kactive_blk_num+1) : 0)
static inline struct packet_sock *pkt_sk(struct sock *sk)
{
return (struct packet_sock *)sk;
......@@ -325,8 +414,9 @@ static void __packet_set_status(struct packet_sock *po, void *frame, int status)
h.h2->tp_status = status;
flush_dcache_page(pgv_to_page(&h.h2->tp_status));
break;
case TPACKET_V3:
default:
pr_err("TPACKET version not supported\n");
WARN(1, "TPACKET version not supported.\n");
BUG();
}
......@@ -351,8 +441,9 @@ static int __packet_get_status(struct packet_sock *po, void *frame)
case TPACKET_V2:
flush_dcache_page(pgv_to_page(&h.h2->tp_status));
return h.h2->tp_status;
case TPACKET_V3:
default:
pr_err("TPACKET version not supported\n");
WARN(1, "TPACKET version not supported.\n");
BUG();
return 0;
}
......@@ -389,6 +480,665 @@ static inline void *packet_current_frame(struct packet_sock *po,
return packet_lookup_frame(po, rb, rb->head, status);
}
static void prb_del_retire_blk_timer(struct kbdq_core *pkc)
{
del_timer_sync(&pkc->retire_blk_timer);
}
static void prb_shutdown_retire_blk_timer(struct packet_sock *po,
int tx_ring,
struct sk_buff_head *rb_queue)
{
struct kbdq_core *pkc;
pkc = tx_ring ? &po->tx_ring.prb_bdqc : &po->rx_ring.prb_bdqc;
spin_lock(&rb_queue->lock);
pkc->delete_blk_timer = 1;
spin_unlock(&rb_queue->lock);
prb_del_retire_blk_timer(pkc);
}
static void prb_init_blk_timer(struct packet_sock *po,
struct kbdq_core *pkc,
void (*func) (unsigned long))
{
init_timer(&pkc->retire_blk_timer);
pkc->retire_blk_timer.data = (long)po;
pkc->retire_blk_timer.function = func;
pkc->retire_blk_timer.expires = jiffies;
}
static void prb_setup_retire_blk_timer(struct packet_sock *po, int tx_ring)
{
struct kbdq_core *pkc;
if (tx_ring)
BUG();
pkc = tx_ring ? &po->tx_ring.prb_bdqc : &po->rx_ring.prb_bdqc;
prb_init_blk_timer(po, pkc, prb_retire_rx_blk_timer_expired);
}
static int prb_calc_retire_blk_tmo(struct packet_sock *po,
int blk_size_in_bytes)
{
struct net_device *dev;
unsigned int mbits = 0, msec = 0, div = 0, tmo = 0;
dev = dev_get_by_index(sock_net(&po->sk), po->ifindex);
if (unlikely(dev == NULL))
return DEFAULT_PRB_RETIRE_TOV;
if (dev->ethtool_ops && dev->ethtool_ops->get_settings) {
struct ethtool_cmd ecmd = { .cmd = ETHTOOL_GSET, };
if (!dev->ethtool_ops->get_settings(dev, &ecmd)) {
switch (ecmd.speed) {
case SPEED_10000:
msec = 1;
div = 10000/1000;
break;
case SPEED_1000:
msec = 1;
div = 1000/1000;
break;
/*
* If the link speed is so slow you don't really
* need to worry about perf anyways
*/
case SPEED_100:
case SPEED_10:
default:
return DEFAULT_PRB_RETIRE_TOV;
}
}
}
mbits = (blk_size_in_bytes * 8) / (1024 * 1024);
if (div)
mbits /= div;
tmo = mbits * msec;
if (div)
return tmo+1;
return tmo;
}
static void prb_init_ft_ops(struct kbdq_core *p1,
union tpacket_req_u *req_u)
{
p1->feature_req_word = req_u->req3.tp_feature_req_word;
}
static void init_prb_bdqc(struct packet_sock *po,
struct packet_ring_buffer *rb,
struct pgv *pg_vec,
union tpacket_req_u *req_u, int tx_ring)
{
struct kbdq_core *p1 = &rb->prb_bdqc;
struct block_desc *pbd;
memset(p1, 0x0, sizeof(*p1));
p1->knxt_seq_num = 1;
p1->pkbdq = pg_vec;
pbd = (struct block_desc *)pg_vec[0].buffer;
p1->pkblk_start = (char *)pg_vec[0].buffer;
p1->kblk_size = req_u->req3.tp_block_size;
p1->knum_blocks = req_u->req3.tp_block_nr;
p1->hdrlen = po->tp_hdrlen;
p1->version = po->tp_version;
p1->last_kactive_blk_num = 0;
po->stats_u.stats3.tp_freeze_q_cnt = 0;
if (req_u->req3.tp_retire_blk_tov)
p1->retire_blk_tov = req_u->req3.tp_retire_blk_tov;
else
p1->retire_blk_tov = prb_calc_retire_blk_tmo(po,
req_u->req3.tp_block_size);
p1->tov_in_jiffies = msecs_to_jiffies(p1->retire_blk_tov);
p1->blk_sizeof_priv = req_u->req3.tp_sizeof_priv;
prb_init_ft_ops(p1, req_u);
prb_setup_retire_blk_timer(po, tx_ring);
prb_open_block(p1, pbd);
}
/* Do NOT update the last_blk_num first.
* Assumes sk_buff_head lock is held.
*/
static void _prb_refresh_rx_retire_blk_timer(struct kbdq_core *pkc)
{
mod_timer(&pkc->retire_blk_timer,
jiffies + pkc->tov_in_jiffies);
pkc->last_kactive_blk_num = pkc->kactive_blk_num;
}
/*
* Timer logic:
* 1) We refresh the timer only when we open a block.
* By doing this we don't waste cycles refreshing the timer
* on packet-by-packet basis.
*
* With a 1MB block-size, on a 1Gbps line, it will take
* i) ~8 ms to fill a block + ii) memcpy etc.
* In this cut we are not accounting for the memcpy time.
*
* So, if the user sets the 'tmo' to 10ms then the timer
* will never fire while the block is still getting filled
* (which is what we want). However, the user could choose
* to close a block early and that's fine.
*
* But when the timer does fire, we check whether or not to refresh it.
* Since the tmo granularity is in msecs, it is not too expensive
* to refresh the timer, lets say every '8' msecs.
* Either the user can set the 'tmo' or we can derive it based on
* a) line-speed and b) block-size.
* prb_calc_retire_blk_tmo() calculates the tmo.
*
*/
static void prb_retire_rx_blk_timer_expired(unsigned long data)
{
struct packet_sock *po = (struct packet_sock *)data;
struct kbdq_core *pkc = &po->rx_ring.prb_bdqc;
unsigned int frozen;
struct block_desc *pbd;
spin_lock(&po->sk.sk_receive_queue.lock);
frozen = prb_queue_frozen(pkc);
pbd = GET_CURR_PBLOCK_DESC_FROM_CORE(pkc);
if (unlikely(pkc->delete_blk_timer))
goto out;
/* We only need to plug the race when the block is partially filled.
* tpacket_rcv:
* lock(); increment BLOCK_NUM_PKTS; unlock()
* copy_bits() is in progress ...
* timer fires on other cpu:
* we can't retire the current block because copy_bits
* is in progress.
*
*/
if (BLOCK_NUM_PKTS(pbd)) {
while (atomic_read(&pkc->blk_fill_in_prog)) {
/* Waiting for skb_copy_bits to finish... */
cpu_relax();
}
}
if (pkc->last_kactive_blk_num == pkc->kactive_blk_num) {
if (!frozen) {
prb_retire_current_block(pkc, po, TP_STATUS_BLK_TMO);
if (!prb_dispatch_next_block(pkc, po))
goto refresh_timer;
else
goto out;
} else {
/* Case 1. Queue was frozen because user-space was
* lagging behind.
*/
if (prb_curr_blk_in_use(pkc, pbd)) {
/*
* Ok, user-space is still behind.
* So just refresh the timer.
*/
goto refresh_timer;
} else {
/* Case 2. queue was frozen,user-space caught up,
* now the link went idle && the timer fired.
* We don't have a block to close.So we open this
* block and restart the timer.
* opening a block thaws the queue,restarts timer
* Thawing/timer-refresh is a side effect.
*/
prb_open_block(pkc, pbd);
goto out;
}
}
}
refresh_timer:
_prb_refresh_rx_retire_blk_timer(pkc);
out:
spin_unlock(&po->sk.sk_receive_queue.lock);
}
static inline void prb_flush_block(struct kbdq_core *pkc1,
struct block_desc *pbd1, __u32 status)
{
/* Flush everything minus the block header */
#if ARCH_IMPLEMENTS_FLUSH_DCACHE_PAGE == 1
u8 *start, *end;
start = (u8 *)pbd1;
/* Skip the block header(we know header WILL fit in 4K) */
start += PAGE_SIZE;
end = (u8 *)PAGE_ALIGN((unsigned long)pkc1->pkblk_end);
for (; start < end; start += PAGE_SIZE)
flush_dcache_page(pgv_to_page(start));
smp_wmb();
#endif
/* Now update the block status. */
BLOCK_STATUS(pbd1) = status;
/* Flush the block header */
#if ARCH_IMPLEMENTS_FLUSH_DCACHE_PAGE == 1
start = (u8 *)pbd1;
flush_dcache_page(pgv_to_page(start));
smp_wmb();
#endif
}
/*
* Side effect:
*
* 1) flush the block
* 2) Increment active_blk_num
*
* Note:We DONT refresh the timer on purpose.
* Because almost always the next block will be opened.
*/
static void prb_close_block(struct kbdq_core *pkc1, struct block_desc *pbd1,
struct packet_sock *po, unsigned int stat)
{
__u32 status = TP_STATUS_USER | stat;
struct tpacket3_hdr *last_pkt;
struct hdr_v1 *h1 = &pbd1->hdr.bh1;
if (po->stats.tp_drops)
status |= TP_STATUS_LOSING;
last_pkt = (struct tpacket3_hdr *)pkc1->prev;
last_pkt->tp_next_offset = 0;
/* Get the ts of the last pkt */
if (BLOCK_NUM_PKTS(pbd1)) {
h1->ts_last_pkt.ts_sec = last_pkt->tp_sec;
h1->ts_last_pkt.ts_nsec = last_pkt->tp_nsec;
} else {
/* Ok, we tmo'd - so get the current time */
struct timespec ts;
getnstimeofday(&ts);
h1->ts_last_pkt.ts_sec = ts.tv_sec;
h1->ts_last_pkt.ts_nsec = ts.tv_nsec;
}
smp_wmb();
/* Flush the block */
prb_flush_block(pkc1, pbd1, status);
pkc1->kactive_blk_num = GET_NEXT_PRB_BLK_NUM(pkc1);
}
static inline void prb_thaw_queue(struct kbdq_core *pkc)
{
pkc->reset_pending_on_curr_blk = 0;
}
/*
* Side effect of opening a block:
*
* 1) prb_queue is thawed.
* 2) retire_blk_timer is refreshed.
*
*/
static void prb_open_block(struct kbdq_core *pkc1, struct block_desc *pbd1)
{
struct timespec ts;
struct hdr_v1 *h1 = &pbd1->hdr.bh1;
smp_rmb();
if (likely(TP_STATUS_KERNEL == BLOCK_STATUS(pbd1))) {
/* We could have just memset this but we will lose the
* flexibility of making the priv area sticky
*/
BLOCK_SNUM(pbd1) = pkc1->knxt_seq_num++;
BLOCK_NUM_PKTS(pbd1) = 0;
BLOCK_LEN(pbd1) = BLK_PLUS_PRIV(pkc1->blk_sizeof_priv);
getnstimeofday(&ts);
h1->ts_first_pkt.ts_sec = ts.tv_sec;
h1->ts_first_pkt.ts_nsec = ts.tv_nsec;
pkc1->pkblk_start = (char *)pbd1;
pkc1->nxt_offset = (char *)(pkc1->pkblk_start +
BLK_PLUS_PRIV(pkc1->blk_sizeof_priv));
BLOCK_O2FP(pbd1) = (__u32)BLK_PLUS_PRIV(pkc1->blk_sizeof_priv);
BLOCK_O2PRIV(pbd1) = BLK_HDR_LEN;
pbd1->version = pkc1->version;
pkc1->prev = pkc1->nxt_offset;
pkc1->pkblk_end = pkc1->pkblk_start + pkc1->kblk_size;
prb_thaw_queue(pkc1);
_prb_refresh_rx_retire_blk_timer(pkc1);
smp_wmb();
return;
}
WARN(1, "ERROR block:%p is NOT FREE status:%d kactive_blk_num:%d\n",
pbd1, BLOCK_STATUS(pbd1), pkc1->kactive_blk_num);
dump_stack();
BUG();
}
/*
* Queue freeze logic:
* 1) Assume tp_block_nr = 8 blocks.
* 2) At time 't0', user opens Rx ring.
* 3) Some time past 't0', kernel starts filling blocks starting from 0 .. 7
* 4) user-space is either sleeping or processing block '0'.
* 5) tpacket_rcv is currently filling block '7', since there is no space left,
* it will close block-7,loop around and try to fill block '0'.
* call-flow:
* __packet_lookup_frame_in_block
* prb_retire_current_block()
* prb_dispatch_next_block()
* |->(BLOCK_STATUS == USER) evaluates to true
* 5.1) Since block-0 is currently in-use, we just freeze the queue.
* 6) Now there are two cases:
* 6.1) Link goes idle right after the queue is frozen.
* But remember, the last open_block() refreshed the timer.
* When this timer expires,it will refresh itself so that we can
* re-open block-0 in near future.
* 6.2) Link is busy and keeps on receiving packets. This is a simple
* case and __packet_lookup_frame_in_block will check if block-0
* is free and can now be re-used.
*/
static inline void prb_freeze_queue(struct kbdq_core *pkc,
struct packet_sock *po)
{
pkc->reset_pending_on_curr_blk = 1;
po->stats_u.stats3.tp_freeze_q_cnt++;
}
#define TOTAL_PKT_LEN_INCL_ALIGN(length) (ALIGN((length), V3_ALIGNMENT))
/*
* If the next block is free then we will dispatch it
* and return a good offset.
* Else, we will freeze the queue.
* So, caller must check the return value.
*/
static void *prb_dispatch_next_block(struct kbdq_core *pkc,
struct packet_sock *po)
{
struct block_desc *pbd;
smp_rmb();
/* 1. Get current block num */
pbd = GET_CURR_PBLOCK_DESC_FROM_CORE(pkc);
/* 2. If this block is currently in_use then freeze the queue */
if (TP_STATUS_USER & BLOCK_STATUS(pbd)) {
prb_freeze_queue(pkc, po);
return NULL;
}
/*
* 3.
* open this block and return the offset where the first packet
* needs to get stored.
*/
prb_open_block(pkc, pbd);
return (void *)pkc->nxt_offset;
}
static void prb_retire_current_block(struct kbdq_core *pkc,
struct packet_sock *po, unsigned int status)
{
struct block_desc *pbd = GET_CURR_PBLOCK_DESC_FROM_CORE(pkc);
/* retire/close the current block */
if (likely(TP_STATUS_KERNEL == BLOCK_STATUS(pbd))) {
/*
* Plug the case where copy_bits() is in progress on
* cpu-0 and tpacket_rcv() got invoked on cpu-1, didn't
* have space to copy the pkt in the current block and
* called prb_retire_current_block()
*
* We don't need to worry about the TMO case because
* the timer-handler already handled this case.
*/
if (!(status & TP_STATUS_BLK_TMO)) {
while (atomic_read(&pkc->blk_fill_in_prog)) {
/* Waiting for skb_copy_bits to finish... */
cpu_relax();
}
}
prb_close_block(pkc, pbd, po, status);
return;
}
WARN(1, "ERROR-pbd[%d]:%p\n", pkc->kactive_blk_num, pbd);
dump_stack();
BUG();
}
static inline int prb_curr_blk_in_use(struct kbdq_core *pkc,
struct block_desc *pbd)
{
return TP_STATUS_USER & BLOCK_STATUS(pbd);
}
static inline int prb_queue_frozen(struct kbdq_core *pkc)
{
return pkc->reset_pending_on_curr_blk;
}
static inline void prb_clear_blk_fill_status(struct packet_ring_buffer *rb)
{
struct kbdq_core *pkc = GET_PBDQC_FROM_RB(rb);
atomic_dec(&pkc->blk_fill_in_prog);
}
static inline void prb_fill_rxhash(struct kbdq_core *pkc,
struct tpacket3_hdr *ppd)
{
ppd->hv1.tp_rxhash = skb_get_rxhash(pkc->skb);
}
static inline void prb_clear_rxhash(struct kbdq_core *pkc,