[dpdk-dev,v1,1/3] port: add mp/mc ring ports
Commit Message
ring_multi_reader input port (on top of multi consumer rte_ring)
ring_multi_writer output port (on top of multi producer rte_ring)
Signed-off-by: Piotr Azarewicz <piotrx.t.azarewicz@intel.com>
---
lib/librte_port/rte_port_ring.c | 399 ++++++++++++++++++++++++++++++++++++++-
lib/librte_port/rte_port_ring.h | 34 +++-
2 files changed, 424 insertions(+), 9 deletions(-)
Comments
On Tue, 15 Sep 2015 15:06:33 +0200
Piotr Azarewicz <piotrx.t.azarewicz@intel.com> wrote:
> +static int
> +rte_port_ring_multi_reader_rx(void *port, struct rte_mbuf **pkts, uint32_t n_pkts)
> +{
Please break arguments on line so that line length is not over 80 characters.
On Tue, 15 Sep 2015 15:06:33 +0200
Piotr Azarewicz <piotrx.t.azarewicz@intel.com> wrote:
> +static inline void
> +send_burst_mp(struct rte_port_ring_writer *p)
> +{
compiler will inline static functions anyway. No need to add inline qualifier
On Tue, 15 Sep 2015 15:06:33 +0200
Piotr Azarewicz <piotrx.t.azarewicz@intel.com> wrote:
> + /*
> + * If we didnt manage to send all packets in single burst, move
Checkpatch complains:
WARNING: 'didnt' may be misspelled - perhaps 'didn't'?
#413: FILE: lib/librte_port/rte_port_ring.c:827:
+ * If we didnt manage to send all packets in single burst, move
> -----Original Message-----
> From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Stephen
> Hemminger
> Sent: Tuesday, September 22, 2015 1:36 AM
> To: Azarewicz, PiotrX T
> Cc: dev@dpdk.org
> Subject: Re: [dpdk-dev] [PATCH v1 1/3] port: add mp/mc ring ports
>
> On Tue, 15 Sep 2015 15:06:33 +0200
> Piotr Azarewicz <piotrx.t.azarewicz@intel.com> wrote:
>
> > +static inline void
> > +send_burst_mp(struct rte_port_ring_writer *p)
> > +{
>
> compiler will inline static functions anyway. No need to add inline qualifier
Hi Stephen,
Using 'static inline' seems to be the standard practice in DPDK and a good practice as well.
DPDK> grep 'static inline' `find -name '*.[hc]'` | wc -l
1700
Regards,
Cristian
> -----Original Message-----
> From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Stephen
> Hemminger
> Sent: Tuesday, September 22, 2015 1:37 AM
> To: Azarewicz, PiotrX T
> Cc: dev@dpdk.org
> Subject: Re: [dpdk-dev] [PATCH v1 1/3] port: add mp/mc ring ports
>
> On Tue, 15 Sep 2015 15:06:33 +0200
> Piotr Azarewicz <piotrx.t.azarewicz@intel.com> wrote:
>
> > + /*
> > + * If we didnt manage to send all packets in single burst,
> move
>
> Checkpatch complains:
> WARNING: 'didnt' may be misspelled - perhaps 'didn't'?
> #413: FILE: lib/librte_port/rte_port_ring.c:827:
> + * If we didnt manage to send all packets in single burst,
> move
Thanks for the catch, Stephen.
> -----Original Message-----
> From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Stephen
> Hemminger
> Sent: Tuesday, September 22, 2015 1:35 AM
> To: Azarewicz, PiotrX T
> Cc: dev@dpdk.org
> Subject: Re: [dpdk-dev] [PATCH v1 1/3] port: add mp/mc ring ports
>
> On Tue, 15 Sep 2015 15:06:33 +0200
> Piotr Azarewicz <piotrx.t.azarewicz@intel.com> wrote:
>
> > +static int
> > +rte_port_ring_multi_reader_rx(void *port, struct rte_mbuf **pkts,
> uint32_t n_pkts)
> > +{
>
> Please break arguments on line so that line length is not over 80 characters.
Thanks, Steve.
2015-09-22 11:34, Dumitrescu, Cristian:
>
> > -----Original Message-----
> > From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Stephen
> > Hemminger
> > Sent: Tuesday, September 22, 2015 1:36 AM
> > To: Azarewicz, PiotrX T
> > Cc: dev@dpdk.org
> > Subject: Re: [dpdk-dev] [PATCH v1 1/3] port: add mp/mc ring ports
> >
> > On Tue, 15 Sep 2015 15:06:33 +0200
> > Piotr Azarewicz <piotrx.t.azarewicz@intel.com> wrote:
> >
> > > +static inline void
> > > +send_burst_mp(struct rte_port_ring_writer *p)
> > > +{
> >
> > compiler will inline static functions anyway. No need to add inline qualifier
>
> Hi Stephen,
>
> Using 'static inline' seems to be the standard practice in DPDK and a good practice as well.
Why do you think it is a good practice?
Forced inlining can be a random optimization having negative effects.
> DPDK> grep 'static inline' `find -name '*.[hc]'` | wc -l
> 1700
On Tue, 22 Sep 2015 16:23:51 +0200
Thomas Monjalon <thomas.monjalon@6wind.com> wrote:
> 2015-09-22 11:34, Dumitrescu, Cristian:
> >
> > > -----Original Message-----
> > > From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Stephen
> > > Hemminger
> > > Sent: Tuesday, September 22, 2015 1:36 AM
> > > To: Azarewicz, PiotrX T
> > > Cc: dev@dpdk.org
> > > Subject: Re: [dpdk-dev] [PATCH v1 1/3] port: add mp/mc ring ports
> > >
> > > On Tue, 15 Sep 2015 15:06:33 +0200
> > > Piotr Azarewicz <piotrx.t.azarewicz@intel.com> wrote:
> > >
> > > > +static inline void
> > > > +send_burst_mp(struct rte_port_ring_writer *p)
> > > > +{
> > >
> > > compiler will inline static functions anyway. No need to add inline qualifier
> >
> > Hi Stephen,
> >
> > Using 'static inline' seems to be the standard practice in DPDK and a good practice as well.
>
> Why do you think it is a good practice?
> Forced inlining can be a random optimization having negative effects.
Agreed. Modern compilers make good decisions.
> -----Original Message-----
> From: Thomas Monjalon [mailto:thomas.monjalon@6wind.com]
> Sent: Tuesday, September 22, 2015 5:24 PM
> To: Dumitrescu, Cristian
> Cc: dev@dpdk.org; Stephen Hemminger; Azarewicz, PiotrX T
> Subject: Re: [dpdk-dev] [PATCH v1 1/3] port: add mp/mc ring ports
>
> 2015-09-22 11:34, Dumitrescu, Cristian:
> >
> > > -----Original Message-----
> > > From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Stephen
> > > Hemminger
> > > Sent: Tuesday, September 22, 2015 1:36 AM
> > > To: Azarewicz, PiotrX T
> > > Cc: dev@dpdk.org
> > > Subject: Re: [dpdk-dev] [PATCH v1 1/3] port: add mp/mc ring ports
> > >
> > > On Tue, 15 Sep 2015 15:06:33 +0200
> > > Piotr Azarewicz <piotrx.t.azarewicz@intel.com> wrote:
> > >
> > > > +static inline void
> > > > +send_burst_mp(struct rte_port_ring_writer *p)
> > > > +{
> > >
> > > compiler will inline static functions anyway. No need to add inline qualifier
> >
> > Hi Stephen,
> >
> > Using 'static inline' seems to be the standard practice in DPDK and a good
> practice as well.
>
> Why do you think it is a good practice?
> Forced inlining can be a random optimization having negative effects.
>
What I meant is this: when users want to make sure their code gets inlined by the compiler, it is better to explicitly state this by using the mechanisms provided by the C compiler (C keyword "inline" and compiler pragmas like "always inline") rather than hope that compiler is going to do this anyway. I have been burned in the past by compilers not inlining code even when explicitly stated, so I am a quite sceptical about compilers doing it proactively.
Your point is slightly different: why use code inlining at all? IMHO this discussion is outside the scope of this patch and should be conducted as a separate debate. Please feel free to start it as a separate thread if you deem necessary. As said, there are already 1700 instances of "static inline" in DPDK, as well as lots of "always inline".
In the context of this debate (outside the scope of this patch), my quick input is: compilers are typically good to optimize code at the function level rather than cross-functions, so having more code in the same function allows the compiler to do a better job at code optimization. I am not a compiler expert, so my views could simply be biased by my past experience.
> > DPDK> grep 'static inline' `find -name '*.[hc]'` | wc -l
> > 1700
@@ -70,8 +70,10 @@ rte_port_ring_reader_create(void *params, int socket_id)
struct rte_port_ring_reader *port;
/* Check input parameters */
- if (conf == NULL) {
- RTE_LOG(ERR, PORT, "%s: params is NULL\n", __func__);
+ if ((conf == NULL) ||
+ (conf->ring == NULL) ||
+ (conf->ring->cons.sc_dequeue != 1)) {
+ RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
return NULL;
}
@@ -166,7 +168,8 @@ rte_port_ring_writer_create(void *params, int socket_id)
/* Check input parameters */
if ((conf == NULL) ||
- (conf->ring == NULL) ||
+ (conf->ring == NULL) ||
+ (conf->ring->prod.sp_enqueue != 1) ||
(conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
return NULL;
@@ -343,7 +346,8 @@ rte_port_ring_writer_nodrop_create(void *params, int socket_id)
/* Check input parameters */
if ((conf == NULL) ||
- (conf->ring == NULL) ||
+ (conf->ring == NULL) ||
+ (conf->ring->prod.sp_enqueue != 1) ||
(conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
return NULL;
@@ -448,6 +452,7 @@ rte_port_ring_writer_nodrop_tx_bulk(void *port,
*/
for (; n_pkts_ok < n_pkts; n_pkts_ok++) {
struct rte_mbuf *pkt = pkts[n_pkts_ok];
+
p->tx_buf[p->tx_buf_count++] = pkt;
}
send_burst_nodrop(p);
@@ -513,6 +518,367 @@ rte_port_ring_writer_nodrop_stats_read(void *port,
}
/*
+ * Port RING Multi Reader
+ */
+static void *
+rte_port_ring_multi_reader_create(void *params, int socket_id)
+{
+ struct rte_port_ring_multi_reader_params *conf =
+ (struct rte_port_ring_multi_reader_params *) params;
+ struct rte_port_ring_reader *port;
+
+ /* Check input parameters */
+ if ((conf == NULL) ||
+ (conf->ring == NULL) ||
+ (conf->ring->cons.sc_dequeue != 0)) {
+ RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
+ return NULL;
+ }
+
+ /* Memory allocation */
+ port = rte_zmalloc_socket("PORT", sizeof(*port),
+ RTE_CACHE_LINE_SIZE, socket_id);
+ if (port == NULL) {
+ RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
+ return NULL;
+ }
+
+ /* Initialization */
+ port->ring = conf->ring;
+
+ return port;
+}
+
+static int
+rte_port_ring_multi_reader_rx(void *port, struct rte_mbuf **pkts, uint32_t n_pkts)
+{
+ struct rte_port_ring_reader *p = (struct rte_port_ring_reader *) port;
+ uint32_t nb_rx;
+
+ nb_rx = rte_ring_mc_dequeue_burst(p->ring, (void **) pkts, n_pkts);
+ RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(p, nb_rx);
+
+ return nb_rx;
+}
+
+static int
+rte_port_ring_multi_reader_free(void *port)
+{
+ if (port == NULL) {
+ RTE_LOG(ERR, PORT, "%s: port is NULL\n", __func__);
+ return -EINVAL;
+ }
+
+ rte_free(port);
+
+ return 0;
+}
+
+/*
+ * Port RING Multi Writer
+ */
+static void *
+rte_port_ring_multi_writer_create(void *params, int socket_id)
+{
+ struct rte_port_ring_multi_writer_params *conf =
+ (struct rte_port_ring_multi_writer_params *) params;
+ struct rte_port_ring_writer *port;
+
+ /* Check input parameters */
+ if ((conf == NULL) ||
+ (conf->ring == NULL) ||
+ (conf->ring->prod.sp_enqueue != 0) ||
+ (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
+ RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
+ return NULL;
+ }
+
+ /* Memory allocation */
+ port = rte_zmalloc_socket("PORT", sizeof(*port),
+ RTE_CACHE_LINE_SIZE, socket_id);
+ if (port == NULL) {
+ RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
+ return NULL;
+ }
+
+ /* Initialization */
+ port->ring = conf->ring;
+ port->tx_burst_sz = conf->tx_burst_sz;
+ port->tx_buf_count = 0;
+ port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
+
+ return port;
+}
+
+static inline void
+send_burst_mp(struct rte_port_ring_writer *p)
+{
+ uint32_t nb_tx;
+
+ nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
+ p->tx_buf_count);
+
+ RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
+ for ( ; nb_tx < p->tx_buf_count; nb_tx++)
+ rte_pktmbuf_free(p->tx_buf[nb_tx]);
+
+ p->tx_buf_count = 0;
+}
+
+static int
+rte_port_ring_multi_writer_tx(void *port, struct rte_mbuf *pkt)
+{
+ struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
+
+ p->tx_buf[p->tx_buf_count++] = pkt;
+ RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, 1);
+ if (p->tx_buf_count >= p->tx_burst_sz)
+ send_burst_mp(p);
+
+ return 0;
+}
+
+static int
+rte_port_ring_multi_writer_tx_bulk(void *port,
+ struct rte_mbuf **pkts,
+ uint64_t pkts_mask)
+{
+ struct rte_port_ring_writer *p =
+ (struct rte_port_ring_writer *) port;
+
+ uint32_t bsz_mask = p->bsz_mask;
+ uint32_t tx_buf_count = p->tx_buf_count;
+ uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
+ ((pkts_mask & bsz_mask) ^ bsz_mask);
+
+ if (expr == 0) {
+ uint64_t n_pkts = __builtin_popcountll(pkts_mask);
+ uint32_t n_pkts_ok;
+
+ if (tx_buf_count)
+ send_burst_mp(p);
+
+ RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, n_pkts);
+ n_pkts_ok = rte_ring_mp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
+
+ RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, n_pkts - n_pkts_ok);
+ for ( ; n_pkts_ok < n_pkts; n_pkts_ok++) {
+ struct rte_mbuf *pkt = pkts[n_pkts_ok];
+
+ rte_pktmbuf_free(pkt);
+ }
+ } else {
+ for ( ; pkts_mask; ) {
+ uint32_t pkt_index = __builtin_ctzll(pkts_mask);
+ uint64_t pkt_mask = 1LLU << pkt_index;
+ struct rte_mbuf *pkt = pkts[pkt_index];
+
+ p->tx_buf[tx_buf_count++] = pkt;
+ RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, 1);
+ pkts_mask &= ~pkt_mask;
+ }
+
+ p->tx_buf_count = tx_buf_count;
+ if (tx_buf_count >= p->tx_burst_sz)
+ send_burst_mp(p);
+ }
+
+ return 0;
+}
+
+static int
+rte_port_ring_multi_writer_flush(void *port)
+{
+ struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
+
+ if (p->tx_buf_count > 0)
+ send_burst_mp(p);
+
+ return 0;
+}
+
+static int
+rte_port_ring_multi_writer_free(void *port)
+{
+ if (port == NULL) {
+ RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
+ return -EINVAL;
+ }
+
+ rte_port_ring_multi_writer_flush(port);
+ rte_free(port);
+
+ return 0;
+}
+
+/*
+ * Port RING Multi Writer Nodrop
+ */
+static void *
+rte_port_ring_multi_writer_nodrop_create(void *params, int socket_id)
+{
+ struct rte_port_ring_multi_writer_nodrop_params *conf =
+ (struct rte_port_ring_multi_writer_nodrop_params *) params;
+ struct rte_port_ring_writer_nodrop *port;
+
+ /* Check input parameters */
+ if ((conf == NULL) ||
+ (conf->ring == NULL) ||
+ (conf->ring->prod.sp_enqueue != 0) ||
+ (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
+ RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
+ return NULL;
+ }
+
+ /* Memory allocation */
+ port = rte_zmalloc_socket("PORT", sizeof(*port),
+ RTE_CACHE_LINE_SIZE, socket_id);
+ if (port == NULL) {
+ RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
+ return NULL;
+ }
+
+ /* Initialization */
+ port->ring = conf->ring;
+ port->tx_burst_sz = conf->tx_burst_sz;
+ port->tx_buf_count = 0;
+ port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
+
+ /*
+ * When n_retries is 0 it means that we should wait for every packet to
+ * send no matter how many retries should it take. To limit number of
+ * branches in fast path, we use UINT64_MAX instead of branching.
+ */
+ port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf->n_retries;
+
+ return port;
+}
+
+static inline void
+send_burst_mp_nodrop(struct rte_port_ring_writer_nodrop *p)
+{
+ uint32_t nb_tx = 0, i;
+
+ nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
+ p->tx_buf_count);
+
+ /* We sent all the packets in a first try */
+ if (nb_tx >= p->tx_buf_count)
+ return;
+
+ for (i = 0; i < p->n_retries; i++) {
+ nb_tx += rte_ring_mp_enqueue_burst(p->ring,
+ (void **) (p->tx_buf + nb_tx), p->tx_buf_count - nb_tx);
+
+ /* We sent all the packets in more than one try */
+ if (nb_tx >= p->tx_buf_count)
+ return;
+ }
+
+ /* We didn't send the packets in maximum allowed attempts */
+ RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
+ for ( ; nb_tx < p->tx_buf_count; nb_tx++)
+ rte_pktmbuf_free(p->tx_buf[nb_tx]);
+
+ p->tx_buf_count = 0;
+}
+
+static int
+rte_port_ring_multi_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
+{
+ struct rte_port_ring_writer_nodrop *p =
+ (struct rte_port_ring_writer_nodrop *) port;
+
+ p->tx_buf[p->tx_buf_count++] = pkt;
+ RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
+ if (p->tx_buf_count >= p->tx_burst_sz)
+ send_burst_mp_nodrop(p);
+
+ return 0;
+}
+
+static int
+rte_port_ring_multi_writer_nodrop_tx_bulk(void *port,
+ struct rte_mbuf **pkts,
+ uint64_t pkts_mask)
+{
+ struct rte_port_ring_writer_nodrop *p =
+ (struct rte_port_ring_writer_nodrop *) port;
+
+ uint32_t bsz_mask = p->bsz_mask;
+ uint32_t tx_buf_count = p->tx_buf_count;
+ uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
+ ((pkts_mask & bsz_mask) ^ bsz_mask);
+
+ if (expr == 0) {
+ uint64_t n_pkts = __builtin_popcountll(pkts_mask);
+ uint32_t n_pkts_ok;
+
+ if (tx_buf_count)
+ send_burst_mp_nodrop(p);
+
+ RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, n_pkts);
+ n_pkts_ok = rte_ring_mp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
+
+ if (n_pkts_ok >= n_pkts)
+ return 0;
+
+ /*
+ * If we didnt manage to send all packets in single burst, move
+ * remaining packets to the buffer and call send burst.
+ */
+ for (; n_pkts_ok < n_pkts; n_pkts_ok++) {
+ struct rte_mbuf *pkt = pkts[n_pkts_ok];
+
+ p->tx_buf[p->tx_buf_count++] = pkt;
+ }
+ send_burst_mp_nodrop(p);
+ } else {
+ for ( ; pkts_mask; ) {
+ uint32_t pkt_index = __builtin_ctzll(pkts_mask);
+ uint64_t pkt_mask = 1LLU << pkt_index;
+ struct rte_mbuf *pkt = pkts[pkt_index];
+
+ p->tx_buf[tx_buf_count++] = pkt;
+ RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
+ pkts_mask &= ~pkt_mask;
+ }
+
+ p->tx_buf_count = tx_buf_count;
+ if (tx_buf_count >= p->tx_burst_sz)
+ send_burst_mp_nodrop(p);
+ }
+
+ return 0;
+}
+
+static int
+rte_port_ring_multi_writer_nodrop_flush(void *port)
+{
+ struct rte_port_ring_writer_nodrop *p =
+ (struct rte_port_ring_writer_nodrop *) port;
+
+ if (p->tx_buf_count > 0)
+ send_burst_mp_nodrop(p);
+
+ return 0;
+}
+
+static int
+rte_port_ring_multi_writer_nodrop_free(void *port)
+{
+ if (port == NULL) {
+ RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
+ return -EINVAL;
+ }
+
+ rte_port_ring_multi_writer_nodrop_flush(port);
+ rte_free(port);
+
+ return 0;
+}
+
+/*
* Summary of port operations
*/
struct rte_port_in_ops rte_port_ring_reader_ops = {
@@ -539,3 +905,28 @@ struct rte_port_out_ops rte_port_ring_writer_nodrop_ops = {
.f_flush = rte_port_ring_writer_nodrop_flush,
.f_stats = rte_port_ring_writer_nodrop_stats_read,
};
+
+struct rte_port_in_ops rte_port_ring_multi_reader_ops = {
+ .f_create = rte_port_ring_multi_reader_create,
+ .f_free = rte_port_ring_multi_reader_free,
+ .f_rx = rte_port_ring_multi_reader_rx,
+ .f_stats = rte_port_ring_reader_stats_read,
+};
+
+struct rte_port_out_ops rte_port_ring_multi_writer_ops = {
+ .f_create = rte_port_ring_multi_writer_create,
+ .f_free = rte_port_ring_multi_writer_free,
+ .f_tx = rte_port_ring_multi_writer_tx,
+ .f_tx_bulk = rte_port_ring_multi_writer_tx_bulk,
+ .f_flush = rte_port_ring_multi_writer_flush,
+ .f_stats = rte_port_ring_writer_stats_read,
+};
+
+struct rte_port_out_ops rte_port_ring_multi_writer_nodrop_ops = {
+ .f_create = rte_port_ring_multi_writer_nodrop_create,
+ .f_free = rte_port_ring_multi_writer_nodrop_free,
+ .f_tx = rte_port_ring_multi_writer_nodrop_tx,
+ .f_tx_bulk = rte_port_ring_multi_writer_nodrop_tx_bulk,
+ .f_flush = rte_port_ring_multi_writer_nodrop_flush,
+ .f_stats = rte_port_ring_writer_nodrop_stats_read,
+};
@@ -42,8 +42,14 @@ extern "C" {
* @file
* RTE Port Ring
*
- * ring_reader: input port built on top of pre-initialized single consumer ring
- * ring_writer: output port built on top of pre-initialized single producer ring
+ * ring_reader:
+ * input port built on top of pre-initialized single consumer ring
+ * ring_writer:
+ * output port built on top of pre-initialized single producer ring
+ * ring_multi_reader:
+ * input port built on top of pre-initialized multi consumers ring
+ * ring_multi_writer:
+ * output port built on top of pre-initialized multi producers ring
*
***/
@@ -55,7 +61,7 @@ extern "C" {
/** ring_reader port parameters */
struct rte_port_ring_reader_params {
- /** Underlying single consumer ring that has to be pre-initialized */
+ /** Underlying consumer ring that has to be pre-initialized */
struct rte_ring *ring;
};
@@ -64,7 +70,7 @@ extern struct rte_port_in_ops rte_port_ring_reader_ops;
/** ring_writer port parameters */
struct rte_port_ring_writer_params {
- /** Underlying single producer ring that has to be pre-initialized */
+ /** Underlying producer ring that has to be pre-initialized */
struct rte_ring *ring;
/** Recommended burst size to ring. The actual burst size can be
@@ -77,7 +83,7 @@ extern struct rte_port_out_ops rte_port_ring_writer_ops;
/** ring_writer_nodrop port parameters */
struct rte_port_ring_writer_nodrop_params {
- /** Underlying single producer ring that has to be pre-initialized */
+ /** Underlying producer ring that has to be pre-initialized */
struct rte_ring *ring;
/** Recommended burst size to ring. The actual burst size can be
@@ -91,6 +97,24 @@ struct rte_port_ring_writer_nodrop_params {
/** ring_writer_nodrop port operations */
extern struct rte_port_out_ops rte_port_ring_writer_nodrop_ops;
+/** ring_multi_reader port parameters */
+#define rte_port_ring_multi_reader_params rte_port_ring_reader_params
+
+/** ring_multi_reader port operations */
+extern struct rte_port_in_ops rte_port_ring_multi_reader_ops;
+
+/** ring_multi_writer port parameters */
+#define rte_port_ring_multi_writer_params rte_port_ring_writer_params
+
+/** ring_multi_writer port operations */
+extern struct rte_port_out_ops rte_port_ring_multi_writer_ops;
+
+/** ring_multi_writer_nodrop port parameters */
+#define rte_port_ring_multi_writer_nodrop_params rte_port_ring_writer_nodrop_params
+
+/** ring_multi_writer_nodrop port operations */
+extern struct rte_port_out_ops rte_port_ring_multi_writer_nodrop_ops;
+
#ifdef __cplusplus
}
#endif