Skip to content

Instantly share code, notes, and snippets.

@psychon
Last active January 28, 2022 17:37
Show Gist options
  • Save psychon/0b8b59b30d3253254b37e6267dbee471 to your computer and use it in GitHub Desktop.
Save psychon/0b8b59b30d3253254b37e6267dbee471 to your computer and use it in GitHub Desktop.
Async Rust X11 connection PoC
use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
use tokio::sync::mpsc::{
channel, unbounded_channel, OwnedPermit, Receiver, Sender, UnboundedReceiver,
};
use tokio::sync::Mutex;
use x11rb::connection::SequenceNumber;
use x11rb::errors::{ConnectionError, ParseError};
use x11rb::extension_manager::ExtensionManager;
use x11rb::protocol::xproto::{
EventMask, GetInputFocusReply, Setup, SetupRequest, CHANGE_WINDOW_ATTRIBUTES_REQUEST, CW,
GET_INPUT_FOCUS_REQUEST, KEYMAP_NOTIFY_EVENT,
};
use x11rb::x11_utils::{Serialize as _, TryParse};
pub struct Cookie<Reply: TryParse>(Receiver<Vec<u8>>, PhantomData<Reply>);
impl<Reply: TryParse> Cookie<Reply> {
fn new(recv: Receiver<Vec<u8>>) -> Self {
Self(recv, PhantomData)
}
pub async fn raw_reply(mut self) -> Vec<u8> {
self.0.recv().await.expect("The reading task exited?!")
}
pub async fn reply(self) -> Result<Reply, ParseError> {
let raw = self.raw_reply().await;
Ok(Reply::try_parse(&raw[..])?.0)
}
}
#[derive(Default)]
struct ConnectionState {
replies: HashMap<SequenceNumber, OwnedPermit<Vec<u8>>>,
next_sequence_send: SequenceNumber,
last_sequence_read: SequenceNumber,
next_reply_expected: SequenceNumber,
extension_manager: ExtensionManager,
}
impl ConnectionState {
fn new() -> Self {
Self {
replies: Default::default(),
next_sequence_send: 1,
last_sequence_read: 0,
next_reply_expected: 0,
extension_manager: Default::default(),
}
}
fn need_sync(&self) -> bool {
self.next_reply_expected + SequenceNumber::from(u16::max_value()) <= self.next_sequence_send
}
fn expect_reply(&mut self, sender: OwnedPermit<Vec<u8>>) {
let sequence = self.next_sequence_send;
self.next_sequence_send += 1;
self.next_reply_expected = sequence;
self.replies.insert(sequence, sender);
}
fn extract_sequence_number(&mut self, packet: &[u8]) -> Option<SequenceNumber> {
// TODO: This code assumes that something inserts sync regularly. So far I did not
// implement that.
if packet[0] == KEYMAP_NOTIFY_EVENT {
return None;
}
let number = u16::from_ne_bytes([packet[2], packet[3]]);
let high_bytes = self.last_sequence_read & !SequenceNumber::from(u16::max_value());
let mut full_number = SequenceNumber::from(number) | high_bytes;
if full_number < self.last_sequence_read {
full_number += SequenceNumber::from(u16::max_value()) + 1;
}
self.last_sequence_read = full_number;
Some(full_number)
}
fn received_packet(&mut self, packet: Vec<u8>) -> Option<Vec<u8>> {
if let Some(seqno) = self.extract_sequence_number(&packet) {
if let Some(sender) = self.replies.remove(&seqno) {
sender.send(packet);
return None;
}
}
// Return the packet to the caller
Some(packet)
}
}
pub struct Connection {
write_channel: Sender<Vec<u8>>,
read_channel: Mutex<UnboundedReceiver<Vec<u8>>>,
state: Arc<Mutex<ConnectionState>>,
setup: Setup,
}
impl Connection {
pub async fn connect(display_number: u16) -> Result<Self, ConnectionError> {
// TODO: This is not using any buffering for the reading/writing
let stream = tokio::net::TcpStream::connect(("127.0.0.1", 6000 + display_number)).await?;
let (mut read, mut write) = stream.into_split();
let setup = SetupRequest {
byte_order: 0x6c,
protocol_major_version: 11,
protocol_minor_version: 0,
authorization_protocol_name: vec![],
authorization_protocol_data: vec![],
}
.serialize();
write.write(&setup[..]).await?;
let mut setup = vec![0; 8];
read.read_exact(&mut setup).await?;
let extra_length = usize::from(u16::from_ne_bytes([setup[6], setup[7]])) * 4;
setup.reserve_exact(extra_length);
setup.resize(8 + extra_length, 0);
read.read_exact(&mut setup[8..]).await?;
// TODO: Handle failures
assert_eq!(1, setup[0]);
let setup = Setup::try_parse(&setup[..])?.0;
let state = Arc::new(Mutex::new(ConnectionState::new()));
// Spawn a task for sending bytes
let (write_channel, mut write_channel_read) = channel::<Vec<u8>>(1);
tokio::spawn(async move {
while let Some(value) = write_channel_read.recv().await {
write.write(&value[..]).await.unwrap();
}
});
// Spawn a task for receiving messages
let (read_channel_write, read_channel) = unbounded_channel();
let read_channel = Mutex::new(read_channel);
let state_clone = Arc::clone(&state);
tokio::spawn(async move {
use x11rb::protocol::xproto::GE_GENERIC_EVENT;
loop {
// Every X11 packet is at least 32 bytes
let mut packet = vec![0; 32];
read.read_exact(&mut packet[..]).await.unwrap();
let response_type = packet[0];
const REPLY: u8 = 1;
let extra_length =
if response_type == REPLY || response_type & 0x7f == GE_GENERIC_EVENT {
let length_field = packet[4..8].try_into().unwrap();
let length_field = u32::from_ne_bytes(length_field) as usize;
4 * length_field
} else {
0
};
packet.resize(packet.len() + extra_length, 0);
read.read_exact(&mut packet[32..]).await.unwrap();
let mut state = state_clone.lock().await;
if let Some(packet) = state.received_packet(packet) {
read_channel_write.send(packet).unwrap();
}
}
});
Ok(Self {
write_channel,
read_channel,
state,
setup,
})
}
pub fn setup(&self) -> &Setup {
&self.setup
}
pub async fn send_void(&self, request: Vec<u8>) -> Result<(), ConnectionError> {
'retry: loop {
let permit = self
.write_channel
.reserve()
.await
.expect("I am not quite sure when this can fail - the write task stopped?");
let state = self.state.lock().await;
if state.need_sync() {
// Unlock everything and send a sync, then try again
drop(permit);
drop(state);
let length = 1u16.to_ne_bytes();
let cookie = self
.send_and_get_reply::<GetInputFocusReply>(vec![
GET_INPUT_FOCUS_REQUEST,
0,
length[0],
length[1],
])
.await;
drop(cookie);
continue 'retry;
}
state.next_sequence_send += 1;
permit.send(request);
return Ok(());
}
}
pub async fn send_and_get_reply<Reply: TryParse>(
&self,
request: Vec<u8>,
) -> Result<Cookie<Reply>, ConnectionError> {
let (sender, receiver) = channel(1);
let sender = sender.reserve_owned().await.unwrap();
{
let permit = self
.write_channel
.reserve()
.await
.expect("I am not quite sure when this can fail - the write task stopped?");
let mut state = self.state.lock().await;
state.expect_reply(sender);
// Keep the mutex until we have actually written our bytes.
// This uses a permit so that nothing can go wrong between expect_reply() and actually
// sending the bytes.
permit.send(request);
}
Ok(Cookie::new(receiver))
}
pub async fn wait_for_event(&self) -> Result<x11rb::protocol::Event, ConnectionError> {
let event = self
.read_channel
.lock()
.await
.recv()
.await
.expect("The write end of the channel was dropped?!");
let state = self.state.lock().await;
Ok(x11rb::protocol::Event::parse(
&event,
&state.extension_manager,
)?)
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let conn = Connection::connect(4).await?;
// If only GetInputFocusRequest::serialize was public...
//conn.send(GetInputFocusRequest {}.serialize()).await;
let length = 1u16.to_ne_bytes();
let reply = conn
.send_and_get_reply::<GetInputFocusReply>(vec![
GET_INPUT_FOCUS_REQUEST,
0,
length[0],
length[1],
])
.await?;
// If only ChangeWindowAttributesRequest::serialize and ChangeWindowAttributesAux::serialize were public...
let length = 4u16.to_ne_bytes();
let window = conn.setup().roots[0].root.to_ne_bytes();
let mask = u32::from(CW::EVENT_MASK).to_ne_bytes();
let events = u32::from(EventMask::FOCUS_CHANGE).to_ne_bytes();
conn.send_void(vec![
CHANGE_WINDOW_ATTRIBUTES_REQUEST,
0,
length[0],
length[1],
window[0],
window[1],
window[2],
window[3],
mask[0],
mask[1],
mask[2],
mask[3],
events[0],
events[1],
events[2],
events[3],
])
.await?;
println!("Input focus reply: {:?}", reply.reply().await);
loop {
println!("Some event: {:?}", conn.wait_for_event().await);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment