Skip to content

Instantly share code, notes, and snippets.

@alexforster
Last active April 13, 2020 17:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alexforster/a28e2d979e81e8a15fd9948f6cce6c0a to your computer and use it in GitHub Desktop.
Save alexforster/a28e2d979e81e8a15fd9948f6cce6c0a to your computer and use it in GitHub Desktop.
Rust xsk ring implementation
// Copyright (c) Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp;
use std::ops;
use std::sync::atomic;
// see: https://github.com/torvalds/linux/blob/v5.4/net/xdp/xsk_queue.h#L47-L87
// see: https://www.snellman.net/blog/archive/2016-12-13-ring-buffers/ ("array + two unmasked indicies")
#[derive(Copy, Clone)]
pub struct XskIndex {
cached: u32,
shared: *mut u32,
}
impl XskIndex {
pub fn new(shared: *mut u32) -> Self {
unsafe { Self { cached: *shared, shared } }
}
#[inline(always)]
pub fn load(&mut self) {
unsafe {
self.cached = *self.shared;
}
}
#[inline(always)]
pub fn store(&mut self) {
unsafe {
*self.shared = self.cached;
}
}
}
impl ops::AddAssign<u32> for XskIndex {
#[inline(always)]
fn add_assign(&mut self, other: u32) {
self.cached = self.cached.wrapping_add(other);
}
}
impl ops::Sub<XskIndex> for XskIndex {
type Output = u32;
#[inline(always)]
fn sub(self, other: Self) -> Self::Output {
self.cached.wrapping_sub(other.cached)
}
}
impl ops::BitAnd<u32> for XskIndex {
type Output = u32;
#[inline(always)]
fn bitand(self, other: u32) -> Self::Output {
self.cached & other
}
}
pub struct XskRing<T>
where
T: Copy,
{
consumer: XskIndex,
producer: XskIndex,
ring: *mut T,
capacity: u32,
mask: u32,
}
impl<T: Copy> XskRing<T> {
pub fn new(slab: *mut u8, capacity: usize, prod_offset: u64, cons_offset: u64, desc_offset: u64) -> Self {
unsafe {
Self {
consumer: XskIndex::new(slab.offset(cons_offset as isize).cast()),
producer: XskIndex::new(slab.offset(prod_offset as isize).cast()),
ring: slab.offset(desc_offset as isize).cast(),
capacity: capacity as u32,
mask: (capacity - 1) as u32,
}
}
}
pub fn nr_available(&mut self, wanted: usize) -> usize {
let mut nr_available = (self.producer - self.consumer) as usize;
if nr_available < wanted {
self.producer.load();
nr_available = (self.producer - self.consumer) as usize;
}
cmp::min(wanted, nr_available)
}
pub fn nr_free(&mut self, wanted: usize) -> usize {
let mut nr_free = (self.consumer - self.producer) as usize;
if nr_free < wanted {
self.consumer.load();
self.consumer += self.capacity as u32;
nr_free = (self.consumer - self.producer) as usize;
}
cmp::min(wanted, nr_free)
}
pub fn enqueue<'a>(&mut self, items: &'a [T]) -> &'a [T] {
let nr_free = self.nr_free(items.len());
if nr_free == 0 {
return items;
}
let (items, rest) = items.split_at(nr_free);
for item in items.iter() {
unsafe {
let entry = self.ring.offset((self.producer & self.mask) as isize);
*entry = *item;
self.producer += 1;
}
}
atomic::compiler_fence(atomic::Ordering::Release);
self.producer.store();
rest
}
pub fn dequeue<'a>(&mut self, entries: &'a mut [T]) -> &'a [T] {
let nr_available = self.nr_available(entries.len());
if nr_available == 0 {
return &entries[0..0];
}
atomic::compiler_fence(atomic::Ordering::Acquire);
let (entries, _) = entries.split_at_mut(nr_available);
for entry in entries.iter_mut() {
unsafe {
let value = self.ring.offset((self.consumer & self.mask) as isize);
*entry = *value;
self.consumer += 1;
}
}
atomic::compiler_fence(atomic::Ordering::AcqRel);
self.consumer.store();
entries
}
}
unsafe impl<T> Send for XskRing<T> where T: Copy {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment