Skip to content

Instantly share code, notes, and snippets.

@Geal

Geal/http.rs Secret

Created Nov 3, 2020
Embed
What would you like to do?
chat application example for Clever Cloud's WebAssembly based Function As A Service
// this application will be triggered on any new HTTP request to /chat/*
// it will then upgrade the connection to a WebSocket with the tungstenite library,
// create a temporary topic on which to receive new messages from other participants,
// and then send a message to the Pulsar application to register itself
//
// it will then wait for WebSocket messages, and pass them on to the Pulsar Application,
// and wait for messages on the temporary topic to send them to the browser
extern crate serverless_api as api;
use anyhow::anyhow;
use futures::future::{self, Either};
use futures::io::AsyncReadExt;
use futures::{pin_mut, SinkExt, StreamExt};
use log::*;
use rand::distributions::Alphanumeric;
use rand::Rng;
use serde::{Deserialize, Serialize};
use tungstenite::Message;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ChatMessage {
pub username: String,
pub channel: String,
pub payload: Option<String>,
pub message_type: MessageType,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
enum MessageType {
Register,
Quit,
Message,
}
async fn http() -> anyhow::Result<()> {
// create a random session id string
let mut rng = api::HostRng;
let session_id: String = std::iter::repeat(())
.map(|_| rng.sample(Alphanumeric))
.take(8)
.collect();
// this a TCP like stream that represents the client connection
let stream = api::TcpStream::front();
// IO can be done in blocking or non blocking mode. We need non
// blocking to wait on the WebSocket and the topic at the same time
stream.blocking(false);
// WebSocket handshake, in async/await. The Rust FaaS API comes with
// a custom futures executor
let mut websocket = async_tungstenite::accept_async(stream).await?;
// the frontend connects to the /chat/:channel/:username
// HTTP request parameters are also available as environment variables
let path = api::env::get("http_path")
.ok_or(anyhow!("could not get HTTP request path from environment"))?;
let mut it = std::str::from_utf8(&path)?.split('/');
let username = it
.next_back()
.ok_or(anyhow!("could not get username from HTTP path"))?;
let username = percent_encoding::percent_decode_str(username)
.decode_utf8()?
.to_string();
let channel = it
.next_back()
.ok_or(anyhow!("could not get channel from HTTP path"))?
.to_string();
// send back a message to the client, to indicate tht the VM has started
let start_message = ChatMessage {
username: "-server-".to_string(),
channel: channel.clone(),
payload: Some("Your FaaS VM is ready!".to_string()),
message_type: MessageType::Message,
};
websocket
.send(Message::Text(serde_json::to_string(&start_message)?))
.await?;
// MAIN_TOPIC is the Pulsar topic on which the Pulsar application is subscribed
let main_topic =
api::env::get("MAIN_TOPIC").ok_or(anyhow!("could not get main topic from environment"))?;
let main_topic = String::from_utf8(main_topic)?;
// generate a temporary Pulsar topic on which we'll wait for messages
let topic_prefix = api::env::get("TOPIC_PREFIX")
.ok_or(anyhow!("could not get topic prefix from environment"))?;
let personal_topic = format!(
"{}-{}_{}",
std::str::from_utf8(&topic_prefix).unwrap(),
username,
session_id
);
// register the temporary topic
let mut subscribed = api::topic_subscribe(&personal_topic)?;
subscribed.blocking(false);
// join the chatroom by sending a message to the Pulsar application
let message = ChatMessage {
username: username.to_string(),
channel: channel.to_string(),
payload: Some(personal_topic),
message_type: MessageType::Register,
};
let payload = serde_json::to_vec(&message)?;
api::topic_send(&main_topic, &payload)?;
loop {
let mut buf = [0u8; 16384];
let left: futures::io::Read<_> = AsyncReadExt::read(&mut subscribed, &mut buf[..]);
let right = websocket.next();
pin_mut!(left);
pin_mut!(right);
// wait on the WebSocket and the temporary topic for any new event
match future::select(left, right).await {
// the topic returned something first
Either::Left((topic_res, _)) => match topic_res {
Err(e) => {
error!("IO error communicating with Pulsar: {}", e);
websocket.send(Message::Close(None)).await?;
break;
}
Ok(sz) => {
// the Pulsar application sent us a ChatMessage serialized as JSON
//let topic_message: ChatMessage = serde_json::from_slice(&buf[..sz])?;
let message = std::str::from_utf8(&buf[..sz])?.to_string();
let res = websocket.send(Message::Text(message)).await?;
}
},
// the websocket returned something first
Either::Right((ws_res, _)) => {
match ws_res {
Some(Ok(message)) => match message {
Message::Text(msg) => {
let message = ChatMessage {
username: username.to_string(),
channel: channel.to_string(),
payload: Some(msg),
message_type: MessageType::Message,
};
let payload = serde_json::to_vec(&message)?;
api::topic_send(&main_topic, &payload)?;
}
Message::Close(_) => {
let message = ChatMessage {
username: username.to_string(),
channel: channel.to_string(),
payload: None,
message_type: MessageType::Quit,
};
let payload = serde_json::to_vec(&message).unwrap();
api::topic_send(&main_topic, &payload)?;
break;
}
Message::Binary(_) => {
error!("unexpected binary message");
break;
}
Message::Ping(v) => {
info!("got PING({:?})", v);
}
Message::Pong(v) => {
info!("got PONG({:?})", v);
}
},
Some(Err(tungstenite::error::Error::Io(e))) => {
error!("got websocket IO error: {:?}", e);
let message = ChatMessage {
username: username.to_string(),
channel: channel.to_string(),
payload: None,
message_type: MessageType::Quit,
};
let payload = serde_json::to_vec(&message)?;
api::topic_send(&main_topic, &payload)?;
break;
}
Some(Err(e)) => {
error!("got websocket error: {:?}", e);
let message = ChatMessage {
username: username.to_string(),
channel: channel.to_string(),
payload: None,
message_type: MessageType::Quit,
};
let payload = serde_json::to_vec(&message)?;
api::topic_send(&main_topic, &payload)?;
break;
}
None => {
error!("no more message in websocket stream");
break;
}
}
}
};
}
info!("stopping read loop");
Ok(())
}
// this is where the function starts executing
#[no_mangle]
pub extern "sysv64" fn chat_http() {
api::setup_logger();
match std::panic::catch_unwind(|| {
if let Err(e) = api::futures::run(http()) {
api::log(&format!("error handling the request:\n {:?}", e));
}
}) {
Ok(_) => {}
Err(e) => {
log::error!("panicked:{:?}", e);
}
}
}
// adapted from Socket.io's chat example, replacing socket.i calls with raw WebSocket
$(function() {
var FADE_TIME = 150; // ms
var TYPING_TIMER_LENGTH = 400; // ms
var COLORS = [
'#e21400', '#91580f', '#f8a700', '#f78b00',
'#58dc00', '#287b00', '#a8f07a', '#4ae8c4',
'#3b88eb', '#3824aa', '#a700ff', '#d300e7'
];
// Initialize variables
var $window = $(window);
var $usernameInput = $('.usernameInput'); // Input for username
var $messages = $('.messages'); // Messages area
var $inputMessage = $('.inputMessage'); // Input message input box
var $loginPage = $('.login.page'); // The login page
var $chatPage = $('.chat.page'); // The chatroom page
// Prompt for setting a username
var username;
var connected = false;
var typing = false;
var lastTypingTime;
var $currentInput = $usernameInput.focus();
var socket;
//var socket = new WebSocket("ws://fcb827f6-0119-403c-be89-904cef646bbd.par0-faas-n0.clvrcld.net/chat/room1")
//var socket = io("ws://fcb827f6-0119-403c-be89-904cef646bbd.par0-faas-n0.clvrcld.net/chat/room1")
connected = true;
const addParticipantsMessage = (data) => {
var message = '';
if (data.numUsers === 1) {
message += "there's 1 participant";
} else {
message += "there are " + data.numUsers + " participants";
}
log(message);
}
// Sets the client's username
const setUsername = () => {
//username = cleanInput($usernameInput.val().trim());
username = $usernameInput.val().trim();
// If the username is valid
if (username) {
$loginPage.fadeOut();
$chatPage.show();
$loginPage.off('click');
$currentInput = $inputMessage.focus();
// Tell the server your username
//socket.emit('add user', username);
start();
}
}
const start = () => {
socket = new WebSocket("ws://fcb827f6-0119-403c-be89-904cef646bbd.par0-faas-n0.clvrcld.net/chat/room1/"+username)
socket.onmessage = function(data) {
console.log(data);
let parsed = JSON.parse(data.data);
//if (parsed.message_type == "Message" {
addChatMessage(parsed);
/*} else if (parsed.message_type == "Register" {
log(data.username + ' joined');
addParticipantsMessage(parsed);
}*/
};
}
// Sends a chat message
const sendMessage = () => {
var message = $inputMessage.val();
// Prevent markup from being injected into the message
//message = cleanInput(message);
// if there is a non-empty message and a socket connection
if (message && connected) {
$inputMessage.val('');
/*addChatMessage({
username: username,
payload: message
});*/
// tell server to execute 'new message' and send along one parameter
//socket.emit("message", message);
socket.send(message);
}
}
// Log a message
const log = (message, options) => {
var $el = $('<li>').addClass('log').text(message);
addMessageElement($el, options);
}
// Adds the visual chat message to the message list
const addChatMessage = (data, options) => {
// Don't fade the message in if there is an 'X was typing'
var $typingMessages = getTypingMessages(data);
options = options || {};
if ($typingMessages.length !== 0) {
options.fade = false;
$typingMessages.remove();
}
var $usernameDiv = $('<span class="username"/>')
.text(data.username)
.css('color', getUsernameColor(data.username));
var $messageBodyDiv;
if (data.message_type == "Message") {
$messageBodyDiv = $('<span class="messageBody">')
.text(data.payload);
} else if (data.message_type == "Register") {
$messageBodyDiv = $('<span class="messageBody"><em>joined</em></span>');
} else if (data.message_type == "Quit") {
$messageBodyDiv = $('<span class="messageBody"><em>left</em></span>');
}
var typingClass = data.typing ? 'typing' : '';
var $messageDiv = $('<li class="message"/>')
.data('username', data.username)
.addClass(typingClass)
.append($usernameDiv, $messageBodyDiv);
addMessageElement($messageDiv, options);
}
// Adds the visual chat typing message
const addChatTyping = (data) => {
data.typing = true;
data.message = 'is typing';
addChatMessage(data);
}
// Removes the visual chat typing message
const removeChatTyping = (data) => {
getTypingMessages(data).fadeOut(function () {
$(this).remove();
});
}
// Adds a message element to the messages and scrolls to the bottom
// el - The element to add as a message
// options.fade - If the element should fade-in (default = true)
// options.prepend - If the element should prepend
// all other messages (default = false)
const addMessageElement = (el, options) => {
var $el = $(el);
// Setup default options
if (!options) {
options = {};
}
if (typeof options.fade === 'undefined') {
options.fade = true;
}
if (typeof options.prepend === 'undefined') {
options.prepend = false;
}
// Apply options
if (options.fade) {
$el.hide().fadeIn(FADE_TIME);
}
if (options.prepend) {
$messages.prepend($el);
} else {
$messages.append($el);
}
$messages[0].scrollTop = $messages[0].scrollHeight;
}
// Prevents input from having injected markup
const cleanInput = (input) => {
return $('<div/>').text(input).html();
}
// Updates the typing event
const updateTyping = () => {
if (connected) {
if (!typing) {
typing = true;
//socket.emit('typing');
}
lastTypingTime = (new Date()).getTime();
setTimeout(() => {
var typingTimer = (new Date()).getTime();
var timeDiff = typingTimer - lastTypingTime;
if (timeDiff >= TYPING_TIMER_LENGTH && typing) {
//socket.emit('stop typing');
typing = false;
}
}, TYPING_TIMER_LENGTH);
}
}
// Gets the 'X is typing' messages of a user
const getTypingMessages = (data) => {
return $('.typing.message').filter(function (i) {
return $(this).data('username') === data.username;
});
}
// Gets the color of a username through our hash function
const getUsernameColor = (username) => {
// Compute hash code
var hash = 7;
for (var i = 0; i < username.length; i++) {
hash = username.charCodeAt(i) + (hash << 5) - hash;
}
// Calculate color
var index = Math.abs(hash % COLORS.length);
return COLORS[index];
}
// Keyboard events
$window.keydown(event => {
// Auto-focus the current input when a key is typed
if (!(event.ctrlKey || event.metaKey || event.altKey)) {
$currentInput.focus();
}
// When the client hits ENTER on their keyboard
if (event.which === 13) {
if (username) {
sendMessage();
///socket.emit('stop typing');
typing = false;
} else {
setUsername();
}
}
});
$inputMessage.on('input', () => {
updateTyping();
});
// Click events
// Focus input when clicking anywhere on login page
$loginPage.click(() => {
$currentInput.focus();
});
// Focus input when clicking on the message input's border
$inputMessage.click(() => {
$inputMessage.focus();
});
});
// this application will be started for every message coming on
// an Apache Pulsar topic.
// It receives messages from the HTTP/WebSocket applications,
// stores their information in the key value store, then redispatches
// messages to all the other participants
extern crate serverless_api as api;
use anyhow::anyhow;
use log::*;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ChatMessage {
pub username: String,
pub channel: String,
pub payload: Option<String>,
pub message_type: MessageType,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
enum MessageType {
Register,
Quit,
Message,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Users {
// username, topic
users: HashMap<String, String>,
}
fn pulsar() -> anyhow::Result<()> {
// read the incoming message
let payload = api::read_pulsar().ok_or(anyhow!("could not read trigger message"))?;
let mut message: ChatMessage = serde_json::from_slice(&payload)?;
info!("got message: {:?}", message);
// get the channel's information from the key value store
// or create a new Users struct if we are starting a new channel
let key = format!("chat#{}", message.channel);
let mut users = match api::kv::get(&key) {
None => Users {
users: HashMap::new(),
},
Some(value) => {
info!("kv::get({}) -> {:?}", key, std::str::from_utf8(&value));
if value.is_empty() {
Users {
users: HashMap::new(),
}
} else {
serde_json::from_slice(&value)?
}
},
};
match message.message_type {
// a new user is joining
MessageType::Register => {
// this is the temporary topic to communicate with the
//new user's HTTP/WebSocket application instance
let topic = message.payload.take().unwrap();
// send back the list of currently connected users
for name in users.users.keys() {
let sent_message = ChatMessage {
username: name.clone(),
channel: message.channel.clone(),
payload: None,
message_type: MessageType::Register,
};
api::topic_send(&topic, &serde_json::to_vec(&sent_message)?)?;
}
// this is the same register message, but without the temporary topic
let duplicated_message = serde_json::to_vec(&message)?;
// notify the other users
for user_topic in users.users.values() {
api::topic_send(&user_topic, &duplicated_message)?;
}
// add the new user to the list and save it
users.users.insert(message.username.clone(), topic.clone());
api::kv::set(&key, &serde_json::to_vec(&users)?)?;
// notify the user that they are properly connected
let sent_message = ChatMessage {
username: "-server-".to_string(),
channel: message.channel.clone(),
payload: Some("Connected!".to_string()),
message_type: MessageType::Message,
};
api::topic_send(&topic, &serde_json::to_vec(&sent_message)?)?;
}
// a user is leaving the channel
MessageType::Quit => {
// remove them from the list
users.users.remove(&message.username);
api::kv::set(&key, &serde_json::to_vec(&users)?)?;
// notify the other users
for user_topic in users.users.values() {
api::topic_send(&user_topic, &payload)?;
}
}
// a user sent a message
MessageType::Message => {
// notify the other users
for user_topic in users.users.values() {
api::topic_send(&user_topic, &payload)?;
}
}
}
Ok(())
}
// this is where the function starts eecting
#[no_mangle]
pub extern "sysv64" fn chat_pulsar() {
let res = api::setup_logger();
match std::panic::catch_unwind(|| {
if let Err(e) = pulsar() {
api::log(&format!("error handling the message:\n {:?}", e));
}
}) {
Ok(_) => {}
Err(e) => {
api::log("got an error");
log::error!("panicked:{:?}", e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment