Skip to content

Instantly share code, notes, and snippets.

@kinnison
Created March 27, 2018 20:47
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kinnison/2ae3252010a41f62c43574609231ea6c to your computer and use it in GitHub Desktop.
Save kinnison/2ae3252010a41f62c43574609231ea6c to your computer and use it in GitHub Desktop.
Join the socketcan and tokio crates together!
extern crate socketcan;
extern crate tokio;
extern crate futures;
extern crate mio;
use mio::{Ready, Poll, PollOpt, Token};
use mio::event::Evented;
use mio::unix::EventedFd;
use tokio::reactor::PollEvented2;
use tokio::prelude::*;
use std::os::unix::io::AsRawFd;
use std::io;
// Can Stream which is tokioable et al..
pub struct EventableCANSocket {
socket: socketcan::CANSocket,
}
impl EventableCANSocket {
fn new(socket: socketcan::CANSocket) -> EventableCANSocket {
EventableCANSocket { socket: socket }
}
pub fn from_interfacename(ifname: &str) -> Result<EventableCANSocket, socketcan::CANSocketOpenError> {
Ok(EventableCANSocket::new(socketcan::CANSocket::open(ifname)?))
}
pub fn get_ref(&self) -> &socketcan::CANSocket {
&self.socket
}
}
impl Evented for EventableCANSocket {
fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
EventedFd(&self.socket.as_raw_fd()).register(poll, token, interest, opts)
}
fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
EventedFd(&self.socket.as_raw_fd()).reregister(poll, token, interest, opts)
}
fn deregister(&self, poll: &Poll) -> io::Result<()> {
EventedFd(&self.socket.as_raw_fd()).deregister(poll)
}
}
pub struct CANStream {
socket: PollEvented2<EventableCANSocket>,
}
impl CANStream {
pub fn new(socket: EventableCANSocket) -> CANStream {
CANStream { socket: PollEvented2::new(socket) }
}
}
impl Stream for CANStream {
type Item = socketcan::CANFrame;
type Error = io::Error;
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
// We shouldn't be woken unless we're ready, but just in case, let's
// ensure...
let readiness = match self.socket.poll_read_ready(Ready::readable())? {
Async::NotReady => { return Ok(Async::NotReady) },
Async::Ready(r) => r
};
if !readiness.is_readable() {
return Ok(Async::NotReady);
}
// Okay, we're readable, so let's have a go at reading...
let frame = self.socket.get_ref().get_ref().read_frame()?;
Ok(Async::Ready(Some(frame)))
}
}
fn main() {
println!("Starting up candump-rs");
let evented = match EventableCANSocket::from_interfacename("vcan0") {
Ok(v) => v,
Err(e) => panic!("Unable to open socket: {}", e)
};
println!("Opened CAN socket");
let streamed = CANStream::new(evented);
println!("Made it streamed, starting reactor...");
tokio::run(streamed
.map_err(|e| println!("error = {:?}", e))
.for_each(|frame| {
println!("Received a frame: {:?}", frame);
Ok(())
}));
}
@kinnison
Copy link
Author

kinnison commented Mar 27, 2018

I'm sure lines 67 to 74 must be macroable but I couldn't see an obvious one in tokio

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment