欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

编写一个属于自己的DPDK应用程序(DPDK入门向)

程序员文章站 2022-07-14 20:53:27
...

1.前言

最近这几周在研究dpdk,测试性能,因为是从零开始需要找各种各样的资料,然后发现要自己编写一个dpdk应用程序,真正对自己帮助最大的还是官方的文档,一个是api文档,一个是gudides文档(http://doc.dpdk.org/api https://doc.dpdk.org/guides)。毫无疑问的全是英文文档,基本需要借助网页翻译连猜带蒙。
作为程序员这应该是基本技能了,而我写这篇文章的目的,一是对自己这段时间的研究成果做一个积累,二是希望帮助大家更快的上手dpdk,编写一个dpdk应用程序。

2.DPDK 的示例 examples

dpdk的下载,安装和使用直接百度就有很多的教程,跟着教程来应该问题不大,这里主要提醒大家注意dpdk给出的示例

编写一个属于自己的DPDK应用程序(DPDK入门向)
这里面有dpdk大部分的应用场景,想要详细了解可以去guides网页(前文有网址),像二层转发 三层转发(l2fwd l3fwd)网上很多测试性能都是用的这两个例子,而我主要介绍 helloworld basicfwd这两个例子 和 testpmd的结合,编写一个发包的dpdk应用程序,主要还是分享开发流程。

3.helloworld

编写一个属于自己的DPDK应用程序(DPDK入门向)
这个代码并不复杂,我们主要关注四个地方:
rte_eal_init();
这个函数初始化dpdk EAL环境抽象层,这是所有的dpdk应用程序都会使用到的通用模块,想要具体了解的话可以阅读guides文档,在开发过程中一般是首先需要被调用的。

RTE_LCORE_FOREACH_SLAVE();
这个宏应该比较好理解,就是一个for循环,遍历所有的slave核(只有一个master核,其余是slave核,比如一个4核cpu,0核为master核,1、2、3为slave核)。

rte_eal_remote_launch();
这个函数功能也很简单,真要类比的话类似于多线程或者多进程的函数,它相当于另开了一个线程或者进程去执行lcore_hello(自定义的函数),只不过它使用另一个核去完成。第二个参数是传给自定义函数的参数。

除了rte_eal_remote_launch(),还有一个rte_eal_mp_remote_launch(),这个函数不需要遍历,直接会调用所有的核去执行我们自定义的函数。

rte_eal_mp_wait_lcore();
看到wait就应该心里有数了,这个是在等待所有核core结束。这里依然有一个rte_eal_wait_lcore(…,core_id)函数,等待对应核结束,实际上rte_eal_mp_wait_lcore内部应该是让每一个核执行rte_eal_wait_lcore()来实现的。

4.以skeleton(basicfwd)为基础,编写自己的应用程序。

通过对helloworld示例的介绍,最主要的是掌握了类似于多线程或者多进程的rte_eal_remote_launch(),下面是basicfwd的源码,它只使用了一个核,但是我们使用的时候,可以通过rte_eal_remote_launch()增加核来处理自己的业务。

/* SPDX-License-Identifier: BSD-3-Clause
 * Copyright(c) 2010-2015 Intel Corporation
 */

#include <stdint.h>
#include <inttypes.h>
#include <rte_eal.h>
#include <rte_ethdev.h>
#include <rte_cycles.h>
#include <rte_lcore.h>
#include <rte_mbuf.h>

#define RX_RING_SIZE 1024
#define TX_RING_SIZE 1024

#define NUM_MBUFS 8191
#define MBUF_CACHE_SIZE 250
#define BURST_SIZE 32

static const struct rte_eth_conf port_conf_default = {
	.rxmode = {
		.max_rx_pkt_len = RTE_ETHER_MAX_LEN,
	},
};

/* basicfwd.c: Basic DPDK skeleton forwarding example. */

/*
 * Initializes a given port using global settings and with the RX buffers
 * coming from the mbuf_pool passed as a parameter.
 */
static inline int
port_init(uint16_t port, struct rte_mempool *mbuf_pool)
{
	struct rte_eth_conf port_conf = port_conf_default;
	const uint16_t rx_rings = 1, tx_rings = 1;
	uint16_t nb_rxd = RX_RING_SIZE;
	uint16_t nb_txd = TX_RING_SIZE;
	int retval;
	uint16_t q;
	struct rte_eth_dev_info dev_info;
	struct rte_eth_txconf txconf;

	if (!rte_eth_dev_is_valid_port(port))
		return -1;

	retval = rte_eth_dev_info_get(port, &dev_info);
	if (retval != 0) {
		printf("Error during getting device (port %u) info: %s\n",
				port, strerror(-retval));
		return retval;
	}

	if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
		port_conf.txmode.offloads |=
			DEV_TX_OFFLOAD_MBUF_FAST_FREE;

	/* Configure the Ethernet device. */
	retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
	if (retval != 0)
		return retval;

	retval = rte_eth_dev_adjust_nb_rx_tx_desc(port, &nb_rxd, &nb_txd);
	if (retval != 0)
		return retval;

	/* Allocate and set up 1 RX queue per Ethernet port. */
	for (q = 0; q < rx_rings; q++) {
		retval = rte_eth_rx_queue_setup(port, q, nb_rxd,
				rte_eth_dev_socket_id(port), NULL, mbuf_pool);
		if (retval < 0)
			return retval;
	}

	txconf = dev_info.default_txconf;
	txconf.offloads = port_conf.txmode.offloads;
	/* Allocate and set up 1 TX queue per Ethernet port. */
	for (q = 0; q < tx_rings; q++) {
		retval = rte_eth_tx_queue_setup(port, q, nb_txd,
				rte_eth_dev_socket_id(port), &txconf);
		if (retval < 0)
			return retval;
	}

	/* Start the Ethernet port. */
	retval = rte_eth_dev_start(port);
	if (retval < 0)
		return retval;

	/* Display the port MAC address. */
	struct rte_ether_addr addr;
	retval = rte_eth_macaddr_get(port, &addr);
	if (retval != 0)
		return retval;

	printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
			   " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
			port,
			addr.addr_bytes[0], addr.addr_bytes[1],
			addr.addr_bytes[2], addr.addr_bytes[3],
			addr.addr_bytes[4], addr.addr_bytes[5]);

	/* Enable RX in promiscuous mode for the Ethernet device. */
	retval = rte_eth_promiscuous_enable(port);
	if (retval != 0)
		return retval;

	return 0;
}

/*
 * The lcore main. This is the main thread that does the work, reading from
 * an input port and writing to an output port.
 */
static __attribute__((noreturn)) void
lcore_main(void)
{
	uint16_t port;

	/*
	 * Check that the port is on the same NUMA node as the polling thread
	 * for best performance.
	 */
	RTE_ETH_FOREACH_DEV(port)
		if (rte_eth_dev_socket_id(port) > 0 &&
				rte_eth_dev_socket_id(port) !=
						(int)rte_socket_id())
			printf("WARNING, port %u is on remote NUMA node to "
					"polling thread.\n\tPerformance will "
					"not be optimal.\n", port);

	printf("\nCore %u forwarding packets. [Ctrl+C to quit]\n",
			rte_lcore_id());

	/* Run until the application is quit or killed. */
	for (;;) {
		/*
		 * Receive packets on a port and forward them on the paired
		 * port. The mapping is 0 -> 1, 1 -> 0, 2 -> 3, 3 -> 2, etc.
		 */
		RTE_ETH_FOREACH_DEV(port) {

			/* Get burst of RX packets, from first port of pair. */
			struct rte_mbuf *bufs[BURST_SIZE];
			const uint16_t nb_rx = rte_eth_rx_burst(port, 0,
					bufs, BURST_SIZE);

			if (unlikely(nb_rx == 0))
				continue;

			/* Send burst of TX packets, to second port of pair. */
			const uint16_t nb_tx = rte_eth_tx_burst(port ^ 1, 0,
					bufs, nb_rx);

			/* Free any unsent packets. */
			if (unlikely(nb_tx < nb_rx)) {
				uint16_t buf;
				for (buf = nb_tx; buf < nb_rx; buf++)
					rte_pktmbuf_free(bufs[buf]);
			}
		}
	}
}

/*
 * The main function, which does initialization and calls the per-lcore
 * functions.
 */
int
main(int argc, char *argv[])
{
	struct rte_mempool *mbuf_pool;
	unsigned nb_ports;
	uint16_t portid;

	/* Initialize the Environment Abstraction Layer (EAL). */
	int ret = rte_eal_init(argc, argv);
	if (ret < 0)
		rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");

	argc -= ret;
	argv += ret;

	/* Check that there is an even number of ports to send/receive on. */
	nb_ports = rte_eth_dev_count_avail();
	if (nb_ports < 2 || (nb_ports & 1))
		rte_exit(EXIT_FAILURE, "Error: number of ports must be even\n");

	/* Creates a new mempool in memory to hold the mbufs. */
	mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", NUM_MBUFS * nb_ports,
		MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());

	if (mbuf_pool == NULL)
		rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");

	/* Initialize all ports. */
	RTE_ETH_FOREACH_DEV(portid)
		if (port_init(portid, mbuf_pool) != 0)
			rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "\n",
					portid);

	if (rte_lcore_count() > 1)
		printf("\nWARNING: Too many lcores enabled. Only 1 used.\n");

	/* Call lcore_main on the master core only. */
	lcore_main();

	return 0;
}

在这个示例中,可以看到基本的开发流程:
rte_eal_init()
argc -= ret;
argv += ret

开头三连是为了区分eal环境层参数和自己定义的参数,basicfwd没有自定义参数,但我们自己可以在argv += ret;之后封装一个函数解析自定义参数 ,比如parser_my_dpdk(argc, argv),然后使用的时候通过 – 隔开,格式为 (eal 参数)-- (自定义参数)。

rte_eth_dev_count_avail()
这个函数获取设备的port数量,熟悉dpdk的应该都了解,dpdk中的port指的是设备的网口,即eth1、eth2、eth3等,和tcp/ip协议中的8080等端口不是一个东西。

rte_pktmbuf_pool_create()
创建一个内存池,basicfwd中的内存池是用来做接收数据包的缓存,我们可以在自己的程序中沿用,但是它并没有给发送数据包设置缓存,我们其实可以自己加上。

RTE_ETH_FOREACH_DEV()
这个宏应该也好理解,循环遍历port网口(你将几个port拖给dpdk管理,这里就执行几次)

port_init()
这是basicfwd定义的一个函数,它将port也就是网口的初始化进行了一层封装(如果不对网口做一些特殊设置,这个函数可以直接复制到我们的程序中作为自己的port网口初始化函数)。看它都干了些啥:

  1. rte_eth_dev_info_get()
    这个函数直接获取了当前网口的信息,具体有哪些信息,可以去api网址(文章开头)搜索rte_eth_dev_info这个结构体。

  2. if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
    port_conf.txmode.offloads |=
    DEV_TX_OFFLOAD_MBUF_FAST_FREE;
    这个宏是看网口设备是否支持mbufs快速释放的功能,支持的话就给默认加上。offload这个词是卸、减负的意思,一搬如果网口能够支持某些功能,就能够通过设置将cpu的相关工作卸载到port网口上,提高一些效率。

  3. rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf)
    这个函数是配置port网口的一些参数,rx_rings/tx_rings是你要设置的接收/发送的队列数目,这个要看自己的网口支持多少队列(cat /proc/interrupts),port_conf的具体设置可以去api网页查看rte_eth_conf这个结构体,可以重点关注一下 rte_eth_rxmode , rte_eth_txmode这两个成员。

  4. rte_eth_dev_adjust_nb_rx_tx_desc(port, &nb_rxd, &nb_txd)
    这个函数是去判断port网口的队列是否支持 nb_rxd/nb_txd个接收/发送 描述符,如果不支持那么多会自动调整到边界个数(如果没有这一层,很可能出现段错误)。

  5. rte_eth_rx_queue_setup(port, q, nb_rxd, rte_eth_dev_socket_id(port), NULL, mbuf_pool)
    命名中就可以看出,这个函数为port网口设置接收队列,因为我们之前设置了rx_rings=1,因此for循环只执行一次,q(queue_id)=0,设置一个接收队列(这个可以根据自己的实际情况来);
    rte_eth_dev_socket_id返回的是一个NUMA结构套接字,所谓的NUMA结构是将多台服务起连接起来当做一台使用的技术,是多CPU模式的,如果自己的服务器只有一个CPU,这个参数可以就这么写不管它;
    NULL这里其实可以填一个rx_conf,对应之后的tx_conf,这里直接默认为NULL;
    mbuf_pool是basicfwd在main中用rte_pktmbuf_pool_create()创建出来的内存池,作为接收数据包的缓存。

  6. rte_eth_tx_queue_setup(port, q, nb_txd,
    rte_eth_dev_socket_id(port), &txconf)
    这个函数为port网口设置传输发送队列,比上面的rte_eth_rx_queue_setup()少一个内存池参数,这个参数会为nb_rxd个连续的描述符初始化一块内存,因此发送队列其实是没有缓存区的,需要我们自己实现(也可以不实现)。

  7. rte_eth_dev_start()
    这个函数没啥好说的,port网口配置完了,直接start启动起来

  8. rte_eth_promiscuous_enable(port)
    这个是设置port网口的混杂模式,只要是到这个网口的数据包,不管是不是发给它的都会接受。

lcore_main
通过port_init的设置,我们基本上已经准备好了port网口,接下来就是需要处理的业务逻辑,basicfwd核心业务逻辑就只有两个函数:
rte_eth_rx_burst()函数负责接收数据包;
rte_eth_tx_burst()函数负责发送数据包;
如果发送有丢包,没发全,就将没发送出去的包rte_pktmbuf_free()掉。

其实在真实的应用场景中发送失败大多是不会rte_pktmbuf_free()的,而是接着发送。

同时,basicfwd中只使用了一个核去处理数据包的接收和发送,都2020年了,我们的电脑,服务器怎么也有4核以上了吧,土(败)豪(家)一点的可以直接使用rte_eal_remote_launch()函数,一个核接收rte_eth_rx_burst(),一个核处理数据包(自定义函数),一个核发送rte_eth_tx_burst(),甚至还能剩个核做其它(笑),就是这样做的话,数据的同步共享需要花点功夫,推荐使用dpdk的rte_ring无锁队列结构。

5.依葫芦画瓢+抛砖引玉

我自己写了一个dpdk的程序,根据MAC地址来实现专门的发包模式。
起因是这样的:刚开始我使用了testpmd程序的txonly(转发模式),不管怎么调试(修改了队列数,分组包数目,核心数目,数据包大小等),性能一直上不去(距离网口理论性能差了很多),然后想着使用pktgen-dpdk试一下发包,网上看了下pktgen的编译,发现不是很复杂,然后下载了源码自己编译,但事实证明我还是太年轻了!!(是哪个博客上写的能编译dpdk就能编译pktgen!!!那么多坑胃口好全吃了?!)

首先是准备环境,cmake,meson+ninja,要安装meson的话python至少3.5,GCC版本也最好升级,然后还有些莫名其妙的报错就不说了,像准备libdpdk什么的,这些我去pktgen的GitHub上才找到解决,然而好多博客教程上提都不提一句。

总之这都是题外话了,也正是这番遭遇让我开始自己写一个发包程序,参照testpmd的txonly.c源码,结合上面提到的basicfwd的开发流程,不多说,直接上源码:

/* SPDX-License-Identifier: BSD-3-Clause
 * Copyright(c) 2010-2014 Intel Corporation
 */

#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include <inttypes.h>
#include <errno.h>
#include <sys/queue.h>

#include <rte_memory.h>
#include <rte_launch.h>
#include <rte_eal.h>
#include <rte_per_lcore.h>
#include <rte_lcore.h>
#include <rte_debug.h>
#include <rte_ethdev.h>
#include <rte_cycles.h>
#include <rte_mbuf.h>

#define RX_RING_SIZE 1024
#define TX_RING_SIZE 1024

#define NUM_MBUFS 8191
#define MBUF_CACHE_SIZE 250
#define BURST_SIZE 32

#define MAX_PKT_BURST 512
#define RTE_MAX_SEGS_PER_PKT 255
#define TXONLY_DEF_PACKET_LEN 1024
#define BURST_TX_WAIT_US 1
#define BURST_TX_RETRIES 64

uint32_t burst_tx_delay_time = BURST_TX_WAIT_US;
uint32_t burst_tx_retry_num = BURST_TX_RETRIES;

enum tx_pkt_split {
	TX_PKT_SPLIT_OFF,
	TX_PKT_SPLIT_ON,
	TX_PKT_SPLIT_RND,
};

enum tx_pkt_split tx_pkt_split = TX_PKT_SPLIT_OFF;
uint8_t  tx_pkt_nb_segs = 1;
uint16_t nb_pkt_per_burst = 128;

static const struct rte_eth_conf port_conf_default = {
	.rxmode = {
		.max_rx_pkt_len = RTE_ETHER_MAX_LEN,
	},
};

uint16_t tx_pkt_seg_lengths[RTE_MAX_SEGS_PER_PKT] = {
	TXONLY_DEF_PACKET_LEN,
};

static inline int
port_init(uint16_t port, struct rte_mempool *mbuf_pool)
{
	struct rte_eth_conf port_conf = port_conf_default;
	const uint16_t rx_rings = 1, tx_rings = 1;
	uint16_t nb_rxd = RX_RING_SIZE;
	uint16_t nb_txd = TX_RING_SIZE;
	int retval;
	uint16_t q;
	struct rte_eth_dev_info dev_info;
	struct rte_eth_txconf txconf;

	if (!rte_eth_dev_is_valid_port(port))
		return -1;

	retval = rte_eth_dev_info_get(port, &dev_info);
	if (retval != 0) {
		printf("Error during getting device (port %u) info: %s\n",
				port, strerror(-retval));
		return retval;
	}

	if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
		port_conf.txmode.offloads |=
			DEV_TX_OFFLOAD_MBUF_FAST_FREE;

	/* Configure the Ethernet device. */
	retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
	if (retval != 0)
		return retval;

	retval = rte_eth_dev_adjust_nb_rx_tx_desc(port, &nb_rxd, &nb_txd);
	if (retval != 0)
		return retval;

	/* Allocate and set up 1 RX queue per Ethernet port. */
	for (q = 0; q < rx_rings; q++) {
		retval = rte_eth_rx_queue_setup(port, q, nb_rxd,
				rte_eth_dev_socket_id(port), NULL, mbuf_pool);
		if (retval < 0)
			return retval;
	}

	txconf = dev_info.default_txconf;
	txconf.offloads = port_conf.txmode.offloads;
	/* Allocate and set up 1 TX queue per Ethernet port. */
	for (q = 0; q < tx_rings; q++) {
		retval = rte_eth_tx_queue_setup(port, q, nb_txd,
				rte_eth_dev_socket_id(port), &txconf);
		if (retval < 0)
			return retval;
	}

	/* Start the Ethernet port. */
	retval = rte_eth_dev_start(port);
	if (retval < 0)
		return retval;

	/* Display the port MAC address. */
	struct rte_ether_addr addr;
	retval = rte_eth_macaddr_get(port, &addr);
	if (retval != 0)
		return retval;

	printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
			   " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
			port,
			addr.addr_bytes[0], addr.addr_bytes[1],
			addr.addr_bytes[2], addr.addr_bytes[3],
			addr.addr_bytes[4], addr.addr_bytes[5]);

	/* Enable RX in promiscuous mode for the Ethernet device. */
	retval = rte_eth_promiscuous_enable(port);
	if (retval != 0)
		return retval;

	return 0;
}

static inline void
copy_buf_to_pkt(void* buf, unsigned len, struct rte_mbuf *pkt, unsigned offset)
{
	if (offset + len <= pkt->data_len) {
		rte_memcpy(rte_pktmbuf_mtod_offset(pkt, char *, offset),
			buf, (size_t) len);
		return;
	}
}

static inline int
pkt_burst_prepare(struct rte_mbuf *pkt, struct rte_mempool *mbp, struct rte_ether_hdr *eth_hdr){

	struct rte_mbuf *pkt_segs[RTE_MAX_SEGS_PER_PKT];
	struct rte_mbuf *pkt_seg;
	uint32_t nb_segs, pkt_len;
	uint8_t i;

	
	nb_segs = tx_pkt_nb_segs;

	rte_pktmbuf_reset_headroom(pkt);
	pkt->data_len = tx_pkt_seg_lengths[0];
	pkt->l2_len = sizeof(struct rte_ether_hdr);
	pkt->l3_len = sizeof(struct rte_ipv4_hdr);

	pkt_len = pkt->data_len;
	pkt_seg = pkt;

	pkt_seg->next = NULL;

	copy_buf_to_pkt(eth_hdr, sizeof(*eth_hdr), pkt, 0);
//	copy_buf_to_pkt(&pkt_ip_hdr, sizeof(pkt_ip_hdr), pkt, sizeof(struct rte_ether_hdr));

//	copy_buf_to_pkt(&pkt_udp_hdr, sizeof(pkt_udp_hdr), pkt, sizeof(struct rte_ether_hdr));

	pkt->nb_segs = nb_segs;
	pkt->pkt_len = pkt_len;

	return 1;
}

/*
 * The lcore main. This is the main thread that does the work, reading from
 * an input port and writing to an output port.
 */
static __attribute__((noreturn)) void
lcore_main(void)
{
	uint16_t port;
	uint16_t nb_tx;
	uint32_t retry;
	struct rte_ether_addr d_addr;
	struct rte_ether_addr s_addr;
	struct rte_ether_hdr eth_hdr;
	struct rte_mbuf *pkts_burst[MAX_PKT_BURST];

	uint16_t nb_pkt;
	struct rte_mbuf *pkt;

	struct rte_mempool *tx_mbuf_pool = rte_pktmbuf_pool_create("TX_MBUF_POOL", NUM_MBUFS * 2,
		MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());

	RTE_ETH_FOREACH_DEV(port)
		if (rte_eth_dev_socket_id(port) > 0 &&
				rte_eth_dev_socket_id(port) !=
						(int)rte_socket_id())
			printf("WARNING, port %u is on remote NUMA node to "
					"polling thread.\n\tPerformance will "
					"not be optimal.\n", port);

	printf("\nCore %u forwarding packets. [Ctrl+C to quit]\n",
			rte_lcore_id());

	d_addr.addr_bytes[0] = 88;
	d_addr.addr_bytes[1] = 83;
	d_addr.addr_bytes[2] = 192;
	d_addr.addr_bytes[3] = 57;
	d_addr.addr_bytes[4] = 0;
	d_addr.addr_bytes[5] = 58;

	s_addr.addr_bytes[0] = 88;
	s_addr.addr_bytes[1] = 83;
	s_addr.addr_bytes[2] = 192;
	s_addr.addr_bytes[3] = 57;
	s_addr.addr_bytes[4] = 0;
	s_addr.addr_bytes[5] = 220;

	rte_ether_addr_copy(&d_addr, &eth_hdr.d_addr);
	rte_ether_addr_copy(&s_addr, &eth_hdr.s_addr);
	eth_hdr.ether_type = rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4);
	
	for (;;) {
		// RTE_ETH_FOREACH_DEV(port) {


		// 	struct rte_mbuf *bufs[BURST_SIZE];
		// 	const uint16_t nb_rx = rte_eth_rx_burst(port, 0,
		// 	 		bufs, BURST_SIZE);

		// 	 if (unlikely(nb_rx == 0))
		//  	continue;


		// 	const uint16_t nb_tx = rte_eth_tx_burst(port ^ 1, 0,
		// 			bufs, nb_rx);

		// 	if (unlikely(nb_tx < nb_rx)) {
		// 		uint16_t buf;
		// 		for (buf = nb_tx; buf < nb_rx; buf++)
		// 			rte_pktmbuf_free(bufs[buf]);
		// 	}
		// }

	/*
	**pkt_burst_prepare
	*/
		if (rte_mempool_get_bulk(tx_mbuf_pool, (void **)pkts_burst,
					nb_pkt_per_burst) == 0) {
			for (nb_pkt = 0; nb_pkt < nb_pkt_per_burst; nb_pkt++) {
				if (unlikely(!pkt_burst_prepare(pkts_burst[nb_pkt], tx_mbuf_pool, &eth_hdr))) {

					rte_mempool_put_bulk(tx_mbuf_pool, (void **)&pkts_burst[nb_pkt], nb_pkt_per_burst - nb_pkt);
					break;

				}
			}
		} else {
			for (nb_pkt = 0; nb_pkt < nb_pkt_per_burst; nb_pkt++) {
				pkt = rte_mbuf_raw_alloc(tx_mbuf_pool);
				if (pkt == NULL)
					break;
				if (unlikely(!pkt_burst_prepare(pkt, tx_mbuf_pool, &eth_hdr))) {
					rte_pktmbuf_free(pkt);
					break;
				}
				pkts_burst[nb_pkt] = pkt;
			}
		}

		if (nb_pkt == 0)
			return;

		nb_tx = rte_eth_tx_burst(0, 0, pkts_burst, nb_pkt);

		/*
		** retry
		*/
		if (unlikely(nb_tx < nb_pkt)) {
			retry = 0;
			while (nb_tx < nb_pkt && retry++ < burst_tx_retry_num) {
				rte_delay_us(burst_tx_delay_time);
				nb_tx += rte_eth_tx_burst(0, 0,
						&pkts_burst[nb_tx], nb_pkt - nb_tx);
			}
		}

		if (unlikely(nb_tx < nb_pkt)) {
			do {
				rte_pktmbuf_free(pkts_burst[nb_tx]);
			} while (++nb_tx < nb_pkt);
		}
	}
}

/*
 * The main function, which does initialization and calls the per-lcore
 * functions.
 */
int
main(int argc, char *argv[])
{
	struct rte_mempool *mbuf_pool;
	unsigned nb_ports;
	uint16_t portid;

	/* Initialize the Environment Abstraction Layer (EAL). */
	int ret = rte_eal_init(argc, argv);
	if (ret < 0)
		rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");

	argc -= ret;
	argv += ret;

	/* Check that there is an even number of ports to send/receive on. */
	nb_ports = rte_eth_dev_count_avail();
	if (nb_ports < 2 || (nb_ports & 1))
		rte_exit(EXIT_FAILURE, "Error: number of ports must be even\n");

	/* Creates a new mempool in memory to hold the mbufs. */
	mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", NUM_MBUFS * nb_ports,
		MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());

	if (mbuf_pool == NULL)
		rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");

	/* Initialize all ports. */
	RTE_ETH_FOREACH_DEV(portid)
		if (port_init(portid, mbuf_pool) != 0)
			rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "\n",
					portid);

	if (rte_lcore_count() > 1)
		printf("\nWARNING: Too many lcores enabled. Only 1 used.\n");

	/* Call lcore_main on the master core only. */
	lcore_main();

	return 0;
}

可以很明显的看出,我就是直接借鉴的(理直气壮),甚至连注释都没改,毕竟是真的好用,唯一有区别的地方就是业务处理部分lcore_main(这也是大家根据自己的需求来处理的部分,我直接参照txonly.c来写的):
我直接写死了两个MAC地址,一个是src_mac,一个是dst_mac,然后将它copy到pkt中,用一个内存池来做缓存,一组发送512个数据包,如果发送失败会retry重新发送。

6.编译自己的应用程序

程序代码写完之后就是编译,当然可以使用GCC,但是会有一定难度,这边推荐官方最简单粗暴的方法,直接复制黏贴examples里面的Makefile文件到你的项目中,make编译,当然需要修改一下:
编写一个属于自己的DPDK应用程序(DPDK入门向)
一般只要修改这两个地方就行,简单易懂。

7.总结

老实说dpdk的接口在api网址都有很详细的介绍,找到对应的功能模块之后就能根据自己的需要合理的添加到上述的流程框架中,之后无非就是写BUG,解BUG的循环煎熬

作为入门向的文章,可能只能帮助大家到这里了,毕竟我也还在学习中,长路漫漫,少侠一路好走。

相关标签: dpdk linux 其他