Last active
January 28, 2022 17:37
-
-
Save psychon/0b8b59b30d3253254b37e6267dbee471 to your computer and use it in GitHub Desktop.
Async Rust X11 connection PoC
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::collections::HashMap; | |
use std::marker::PhantomData; | |
use std::sync::Arc; | |
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; | |
use tokio::sync::mpsc::{ | |
channel, unbounded_channel, OwnedPermit, Receiver, Sender, UnboundedReceiver, | |
}; | |
use tokio::sync::Mutex; | |
use x11rb::connection::SequenceNumber; | |
use x11rb::errors::{ConnectionError, ParseError}; | |
use x11rb::extension_manager::ExtensionManager; | |
use x11rb::protocol::xproto::{ | |
EventMask, GetInputFocusReply, Setup, SetupRequest, CHANGE_WINDOW_ATTRIBUTES_REQUEST, CW, | |
GET_INPUT_FOCUS_REQUEST, KEYMAP_NOTIFY_EVENT, | |
}; | |
use x11rb::x11_utils::{Serialize as _, TryParse}; | |
pub struct Cookie<Reply: TryParse>(Receiver<Vec<u8>>, PhantomData<Reply>); | |
impl<Reply: TryParse> Cookie<Reply> { | |
fn new(recv: Receiver<Vec<u8>>) -> Self { | |
Self(recv, PhantomData) | |
} | |
pub async fn raw_reply(mut self) -> Vec<u8> { | |
self.0.recv().await.expect("The reading task exited?!") | |
} | |
pub async fn reply(self) -> Result<Reply, ParseError> { | |
let raw = self.raw_reply().await; | |
Ok(Reply::try_parse(&raw[..])?.0) | |
} | |
} | |
#[derive(Default)] | |
struct ConnectionState { | |
replies: HashMap<SequenceNumber, OwnedPermit<Vec<u8>>>, | |
next_sequence_send: SequenceNumber, | |
last_sequence_read: SequenceNumber, | |
next_reply_expected: SequenceNumber, | |
extension_manager: ExtensionManager, | |
} | |
impl ConnectionState { | |
fn new() -> Self { | |
Self { | |
replies: Default::default(), | |
next_sequence_send: 1, | |
last_sequence_read: 0, | |
next_reply_expected: 0, | |
extension_manager: Default::default(), | |
} | |
} | |
fn need_sync(&self) -> bool { | |
self.next_reply_expected + SequenceNumber::from(u16::max_value()) <= self.next_sequence_send | |
} | |
fn expect_reply(&mut self, sender: OwnedPermit<Vec<u8>>) { | |
let sequence = self.next_sequence_send; | |
self.next_sequence_send += 1; | |
self.next_reply_expected = sequence; | |
self.replies.insert(sequence, sender); | |
} | |
fn extract_sequence_number(&mut self, packet: &[u8]) -> Option<SequenceNumber> { | |
// TODO: This code assumes that something inserts sync regularly. So far I did not | |
// implement that. | |
if packet[0] == KEYMAP_NOTIFY_EVENT { | |
return None; | |
} | |
let number = u16::from_ne_bytes([packet[2], packet[3]]); | |
let high_bytes = self.last_sequence_read & !SequenceNumber::from(u16::max_value()); | |
let mut full_number = SequenceNumber::from(number) | high_bytes; | |
if full_number < self.last_sequence_read { | |
full_number += SequenceNumber::from(u16::max_value()) + 1; | |
} | |
self.last_sequence_read = full_number; | |
Some(full_number) | |
} | |
fn received_packet(&mut self, packet: Vec<u8>) -> Option<Vec<u8>> { | |
if let Some(seqno) = self.extract_sequence_number(&packet) { | |
if let Some(sender) = self.replies.remove(&seqno) { | |
sender.send(packet); | |
return None; | |
} | |
} | |
// Return the packet to the caller | |
Some(packet) | |
} | |
} | |
pub struct Connection { | |
write_channel: Sender<Vec<u8>>, | |
read_channel: Mutex<UnboundedReceiver<Vec<u8>>>, | |
state: Arc<Mutex<ConnectionState>>, | |
setup: Setup, | |
} | |
impl Connection { | |
pub async fn connect(display_number: u16) -> Result<Self, ConnectionError> { | |
// TODO: This is not using any buffering for the reading/writing | |
let stream = tokio::net::TcpStream::connect(("127.0.0.1", 6000 + display_number)).await?; | |
let (mut read, mut write) = stream.into_split(); | |
let setup = SetupRequest { | |
byte_order: 0x6c, | |
protocol_major_version: 11, | |
protocol_minor_version: 0, | |
authorization_protocol_name: vec![], | |
authorization_protocol_data: vec![], | |
} | |
.serialize(); | |
write.write(&setup[..]).await?; | |
let mut setup = vec![0; 8]; | |
read.read_exact(&mut setup).await?; | |
let extra_length = usize::from(u16::from_ne_bytes([setup[6], setup[7]])) * 4; | |
setup.reserve_exact(extra_length); | |
setup.resize(8 + extra_length, 0); | |
read.read_exact(&mut setup[8..]).await?; | |
// TODO: Handle failures | |
assert_eq!(1, setup[0]); | |
let setup = Setup::try_parse(&setup[..])?.0; | |
let state = Arc::new(Mutex::new(ConnectionState::new())); | |
// Spawn a task for sending bytes | |
let (write_channel, mut write_channel_read) = channel::<Vec<u8>>(1); | |
tokio::spawn(async move { | |
while let Some(value) = write_channel_read.recv().await { | |
write.write(&value[..]).await.unwrap(); | |
} | |
}); | |
// Spawn a task for receiving messages | |
let (read_channel_write, read_channel) = unbounded_channel(); | |
let read_channel = Mutex::new(read_channel); | |
let state_clone = Arc::clone(&state); | |
tokio::spawn(async move { | |
use x11rb::protocol::xproto::GE_GENERIC_EVENT; | |
loop { | |
// Every X11 packet is at least 32 bytes | |
let mut packet = vec![0; 32]; | |
read.read_exact(&mut packet[..]).await.unwrap(); | |
let response_type = packet[0]; | |
const REPLY: u8 = 1; | |
let extra_length = | |
if response_type == REPLY || response_type & 0x7f == GE_GENERIC_EVENT { | |
let length_field = packet[4..8].try_into().unwrap(); | |
let length_field = u32::from_ne_bytes(length_field) as usize; | |
4 * length_field | |
} else { | |
0 | |
}; | |
packet.resize(packet.len() + extra_length, 0); | |
read.read_exact(&mut packet[32..]).await.unwrap(); | |
let mut state = state_clone.lock().await; | |
if let Some(packet) = state.received_packet(packet) { | |
read_channel_write.send(packet).unwrap(); | |
} | |
} | |
}); | |
Ok(Self { | |
write_channel, | |
read_channel, | |
state, | |
setup, | |
}) | |
} | |
pub fn setup(&self) -> &Setup { | |
&self.setup | |
} | |
pub async fn send_void(&self, request: Vec<u8>) -> Result<(), ConnectionError> { | |
'retry: loop { | |
let permit = self | |
.write_channel | |
.reserve() | |
.await | |
.expect("I am not quite sure when this can fail - the write task stopped?"); | |
let state = self.state.lock().await; | |
if state.need_sync() { | |
// Unlock everything and send a sync, then try again | |
drop(permit); | |
drop(state); | |
let length = 1u16.to_ne_bytes(); | |
let cookie = self | |
.send_and_get_reply::<GetInputFocusReply>(vec![ | |
GET_INPUT_FOCUS_REQUEST, | |
0, | |
length[0], | |
length[1], | |
]) | |
.await; | |
drop(cookie); | |
continue 'retry; | |
} | |
state.next_sequence_send += 1; | |
permit.send(request); | |
return Ok(()); | |
} | |
} | |
pub async fn send_and_get_reply<Reply: TryParse>( | |
&self, | |
request: Vec<u8>, | |
) -> Result<Cookie<Reply>, ConnectionError> { | |
let (sender, receiver) = channel(1); | |
let sender = sender.reserve_owned().await.unwrap(); | |
{ | |
let permit = self | |
.write_channel | |
.reserve() | |
.await | |
.expect("I am not quite sure when this can fail - the write task stopped?"); | |
let mut state = self.state.lock().await; | |
state.expect_reply(sender); | |
// Keep the mutex until we have actually written our bytes. | |
// This uses a permit so that nothing can go wrong between expect_reply() and actually | |
// sending the bytes. | |
permit.send(request); | |
} | |
Ok(Cookie::new(receiver)) | |
} | |
pub async fn wait_for_event(&self) -> Result<x11rb::protocol::Event, ConnectionError> { | |
let event = self | |
.read_channel | |
.lock() | |
.await | |
.recv() | |
.await | |
.expect("The write end of the channel was dropped?!"); | |
let state = self.state.lock().await; | |
Ok(x11rb::protocol::Event::parse( | |
&event, | |
&state.extension_manager, | |
)?) | |
} | |
} | |
#[tokio::main] | |
async fn main() -> Result<(), Box<dyn std::error::Error>> { | |
let conn = Connection::connect(4).await?; | |
// If only GetInputFocusRequest::serialize was public... | |
//conn.send(GetInputFocusRequest {}.serialize()).await; | |
let length = 1u16.to_ne_bytes(); | |
let reply = conn | |
.send_and_get_reply::<GetInputFocusReply>(vec![ | |
GET_INPUT_FOCUS_REQUEST, | |
0, | |
length[0], | |
length[1], | |
]) | |
.await?; | |
// If only ChangeWindowAttributesRequest::serialize and ChangeWindowAttributesAux::serialize were public... | |
let length = 4u16.to_ne_bytes(); | |
let window = conn.setup().roots[0].root.to_ne_bytes(); | |
let mask = u32::from(CW::EVENT_MASK).to_ne_bytes(); | |
let events = u32::from(EventMask::FOCUS_CHANGE).to_ne_bytes(); | |
conn.send_void(vec![ | |
CHANGE_WINDOW_ATTRIBUTES_REQUEST, | |
0, | |
length[0], | |
length[1], | |
window[0], | |
window[1], | |
window[2], | |
window[3], | |
mask[0], | |
mask[1], | |
mask[2], | |
mask[3], | |
events[0], | |
events[1], | |
events[2], | |
events[3], | |
]) | |
.await?; | |
println!("Input focus reply: {:?}", reply.reply().await); | |
loop { | |
println!("Some event: {:?}", conn.wait_for_event().await); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment