Last active
April 13, 2020 17:08
-
-
Save alexforster/a28e2d979e81e8a15fd9948f6cce6c0a to your computer and use it in GitHub Desktop.
Rust xsk ring implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright (c) Cloudflare, Inc. | |
// | |
// Licensed under the Apache License, Version 2.0 (the "License"); | |
// you may not use this file except in compliance with the License. | |
// You may obtain a copy of the License at | |
// | |
// http://www.apache.org/licenses/LICENSE-2.0 | |
// | |
// Unless required by applicable law or agreed to in writing, software | |
// distributed under the License is distributed on an "AS IS" BASIS, | |
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
// See the License for the specific language governing permissions and | |
// limitations under the License. | |
use std::cmp; | |
use std::ops; | |
use std::sync::atomic; | |
// see: https://github.com/torvalds/linux/blob/v5.4/net/xdp/xsk_queue.h#L47-L87 | |
// see: https://www.snellman.net/blog/archive/2016-12-13-ring-buffers/ ("array + two unmasked indicies") | |
#[derive(Copy, Clone)] | |
pub struct XskIndex { | |
cached: u32, | |
shared: *mut u32, | |
} | |
impl XskIndex { | |
pub fn new(shared: *mut u32) -> Self { | |
unsafe { Self { cached: *shared, shared } } | |
} | |
#[inline(always)] | |
pub fn load(&mut self) { | |
unsafe { | |
self.cached = *self.shared; | |
} | |
} | |
#[inline(always)] | |
pub fn store(&mut self) { | |
unsafe { | |
*self.shared = self.cached; | |
} | |
} | |
} | |
impl ops::AddAssign<u32> for XskIndex { | |
#[inline(always)] | |
fn add_assign(&mut self, other: u32) { | |
self.cached = self.cached.wrapping_add(other); | |
} | |
} | |
impl ops::Sub<XskIndex> for XskIndex { | |
type Output = u32; | |
#[inline(always)] | |
fn sub(self, other: Self) -> Self::Output { | |
self.cached.wrapping_sub(other.cached) | |
} | |
} | |
impl ops::BitAnd<u32> for XskIndex { | |
type Output = u32; | |
#[inline(always)] | |
fn bitand(self, other: u32) -> Self::Output { | |
self.cached & other | |
} | |
} | |
pub struct XskRing<T> | |
where | |
T: Copy, | |
{ | |
consumer: XskIndex, | |
producer: XskIndex, | |
ring: *mut T, | |
capacity: u32, | |
mask: u32, | |
} | |
impl<T: Copy> XskRing<T> { | |
pub fn new(slab: *mut u8, capacity: usize, prod_offset: u64, cons_offset: u64, desc_offset: u64) -> Self { | |
unsafe { | |
Self { | |
consumer: XskIndex::new(slab.offset(cons_offset as isize).cast()), | |
producer: XskIndex::new(slab.offset(prod_offset as isize).cast()), | |
ring: slab.offset(desc_offset as isize).cast(), | |
capacity: capacity as u32, | |
mask: (capacity - 1) as u32, | |
} | |
} | |
} | |
pub fn nr_available(&mut self, wanted: usize) -> usize { | |
let mut nr_available = (self.producer - self.consumer) as usize; | |
if nr_available < wanted { | |
self.producer.load(); | |
nr_available = (self.producer - self.consumer) as usize; | |
} | |
cmp::min(wanted, nr_available) | |
} | |
pub fn nr_free(&mut self, wanted: usize) -> usize { | |
let mut nr_free = (self.consumer - self.producer) as usize; | |
if nr_free < wanted { | |
self.consumer.load(); | |
self.consumer += self.capacity as u32; | |
nr_free = (self.consumer - self.producer) as usize; | |
} | |
cmp::min(wanted, nr_free) | |
} | |
pub fn enqueue<'a>(&mut self, items: &'a [T]) -> &'a [T] { | |
let nr_free = self.nr_free(items.len()); | |
if nr_free == 0 { | |
return items; | |
} | |
let (items, rest) = items.split_at(nr_free); | |
for item in items.iter() { | |
unsafe { | |
let entry = self.ring.offset((self.producer & self.mask) as isize); | |
*entry = *item; | |
self.producer += 1; | |
} | |
} | |
atomic::compiler_fence(atomic::Ordering::Release); | |
self.producer.store(); | |
rest | |
} | |
pub fn dequeue<'a>(&mut self, entries: &'a mut [T]) -> &'a [T] { | |
let nr_available = self.nr_available(entries.len()); | |
if nr_available == 0 { | |
return &entries[0..0]; | |
} | |
atomic::compiler_fence(atomic::Ordering::Acquire); | |
let (entries, _) = entries.split_at_mut(nr_available); | |
for entry in entries.iter_mut() { | |
unsafe { | |
let value = self.ring.offset((self.consumer & self.mask) as isize); | |
*entry = *value; | |
self.consumer += 1; | |
} | |
} | |
atomic::compiler_fence(atomic::Ordering::AcqRel); | |
self.consumer.store(); | |
entries | |
} | |
} | |
unsafe impl<T> Send for XskRing<T> where T: Copy {} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment