chat application example for Clever Cloud's WebAssembly based Function As A Service
// this application will be triggered on any new HTTP request to /chat/* | |
// it will then upgrade the connection to a WebSocket with the tungstenite library, | |
// create a temporary topic on which to receive new messages from other participants, | |
// and then send a message to the Pulsar application to register itself | |
// | |
// it will then wait for WebSocket messages, and pass them on to the Pulsar Application, | |
// and wait for messages on the temporary topic to send them to the browser | |
extern crate serverless_api as api; | |
use anyhow::anyhow; | |
use futures::future::{self, Either}; | |
use futures::io::AsyncReadExt; | |
use futures::{pin_mut, SinkExt, StreamExt}; | |
use log::*; | |
use rand::distributions::Alphanumeric; | |
use rand::Rng; | |
use serde::{Deserialize, Serialize}; | |
use tungstenite::Message; | |
#[derive(Debug, Clone, Serialize, Deserialize)] | |
struct ChatMessage { | |
pub username: String, | |
pub channel: String, | |
pub payload: Option<String>, | |
pub message_type: MessageType, | |
} | |
#[derive(Debug, Clone, Serialize, Deserialize)] | |
enum MessageType { | |
Register, | |
Quit, | |
Message, | |
} | |
async fn http() -> anyhow::Result<()> { | |
// create a random session id string | |
let mut rng = api::HostRng; | |
let session_id: String = std::iter::repeat(()) | |
.map(|_| rng.sample(Alphanumeric)) | |
.take(8) | |
.collect(); | |
// this a TCP like stream that represents the client connection | |
let stream = api::TcpStream::front(); | |
// IO can be done in blocking or non blocking mode. We need non | |
// blocking to wait on the WebSocket and the topic at the same time | |
stream.blocking(false); | |
// WebSocket handshake, in async/await. The Rust FaaS API comes with | |
// a custom futures executor | |
let mut websocket = async_tungstenite::accept_async(stream).await?; | |
// the frontend connects to the /chat/:channel/:username | |
// HTTP request parameters are also available as environment variables | |
let path = api::env::get("http_path") | |
.ok_or(anyhow!("could not get HTTP request path from environment"))?; | |
let mut it = std::str::from_utf8(&path)?.split('/'); | |
let username = it | |
.next_back() | |
.ok_or(anyhow!("could not get username from HTTP path"))?; | |
let username = percent_encoding::percent_decode_str(username) | |
.decode_utf8()? | |
.to_string(); | |
let channel = it | |
.next_back() | |
.ok_or(anyhow!("could not get channel from HTTP path"))? | |
.to_string(); | |
// send back a message to the client, to indicate tht the VM has started | |
let start_message = ChatMessage { | |
username: "-server-".to_string(), | |
channel: channel.clone(), | |
payload: Some("Your FaaS VM is ready!".to_string()), | |
message_type: MessageType::Message, | |
}; | |
websocket | |
.send(Message::Text(serde_json::to_string(&start_message)?)) | |
.await?; | |
// MAIN_TOPIC is the Pulsar topic on which the Pulsar application is subscribed | |
let main_topic = | |
api::env::get("MAIN_TOPIC").ok_or(anyhow!("could not get main topic from environment"))?; | |
let main_topic = String::from_utf8(main_topic)?; | |
// generate a temporary Pulsar topic on which we'll wait for messages | |
let topic_prefix = api::env::get("TOPIC_PREFIX") | |
.ok_or(anyhow!("could not get topic prefix from environment"))?; | |
let personal_topic = format!( | |
"{}-{}_{}", | |
std::str::from_utf8(&topic_prefix).unwrap(), | |
username, | |
session_id | |
); | |
// register the temporary topic | |
let mut subscribed = api::topic_subscribe(&personal_topic)?; | |
subscribed.blocking(false); | |
// join the chatroom by sending a message to the Pulsar application | |
let message = ChatMessage { | |
username: username.to_string(), | |
channel: channel.to_string(), | |
payload: Some(personal_topic), | |
message_type: MessageType::Register, | |
}; | |
let payload = serde_json::to_vec(&message)?; | |
api::topic_send(&main_topic, &payload)?; | |
loop { | |
let mut buf = [0u8; 16384]; | |
let left: futures::io::Read<_> = AsyncReadExt::read(&mut subscribed, &mut buf[..]); | |
let right = websocket.next(); | |
pin_mut!(left); | |
pin_mut!(right); | |
// wait on the WebSocket and the temporary topic for any new event | |
match future::select(left, right).await { | |
// the topic returned something first | |
Either::Left((topic_res, _)) => match topic_res { | |
Err(e) => { | |
error!("IO error communicating with Pulsar: {}", e); | |
websocket.send(Message::Close(None)).await?; | |
break; | |
} | |
Ok(sz) => { | |
// the Pulsar application sent us a ChatMessage serialized as JSON | |
//let topic_message: ChatMessage = serde_json::from_slice(&buf[..sz])?; | |
let message = std::str::from_utf8(&buf[..sz])?.to_string(); | |
let res = websocket.send(Message::Text(message)).await?; | |
} | |
}, | |
// the websocket returned something first | |
Either::Right((ws_res, _)) => { | |
match ws_res { | |
Some(Ok(message)) => match message { | |
Message::Text(msg) => { | |
let message = ChatMessage { | |
username: username.to_string(), | |
channel: channel.to_string(), | |
payload: Some(msg), | |
message_type: MessageType::Message, | |
}; | |
let payload = serde_json::to_vec(&message)?; | |
api::topic_send(&main_topic, &payload)?; | |
} | |
Message::Close(_) => { | |
let message = ChatMessage { | |
username: username.to_string(), | |
channel: channel.to_string(), | |
payload: None, | |
message_type: MessageType::Quit, | |
}; | |
let payload = serde_json::to_vec(&message).unwrap(); | |
api::topic_send(&main_topic, &payload)?; | |
break; | |
} | |
Message::Binary(_) => { | |
error!("unexpected binary message"); | |
break; | |
} | |
Message::Ping(v) => { | |
info!("got PING({:?})", v); | |
} | |
Message::Pong(v) => { | |
info!("got PONG({:?})", v); | |
} | |
}, | |
Some(Err(tungstenite::error::Error::Io(e))) => { | |
error!("got websocket IO error: {:?}", e); | |
let message = ChatMessage { | |
username: username.to_string(), | |
channel: channel.to_string(), | |
payload: None, | |
message_type: MessageType::Quit, | |
}; | |
let payload = serde_json::to_vec(&message)?; | |
api::topic_send(&main_topic, &payload)?; | |
break; | |
} | |
Some(Err(e)) => { | |
error!("got websocket error: {:?}", e); | |
let message = ChatMessage { | |
username: username.to_string(), | |
channel: channel.to_string(), | |
payload: None, | |
message_type: MessageType::Quit, | |
}; | |
let payload = serde_json::to_vec(&message)?; | |
api::topic_send(&main_topic, &payload)?; | |
break; | |
} | |
None => { | |
error!("no more message in websocket stream"); | |
break; | |
} | |
} | |
} | |
}; | |
} | |
info!("stopping read loop"); | |
Ok(()) | |
} | |
// this is where the function starts executing | |
#[no_mangle] | |
pub extern "sysv64" fn chat_http() { | |
api::setup_logger(); | |
match std::panic::catch_unwind(|| { | |
if let Err(e) = api::futures::run(http()) { | |
api::log(&format!("error handling the request:\n {:?}", e)); | |
} | |
}) { | |
Ok(_) => {} | |
Err(e) => { | |
log::error!("panicked:{:?}", e); | |
} | |
} | |
} |
// adapted from Socket.io's chat example, replacing socket.i calls with raw WebSocket | |
$(function() { | |
var FADE_TIME = 150; // ms | |
var TYPING_TIMER_LENGTH = 400; // ms | |
var COLORS = [ | |
'#e21400', '#91580f', '#f8a700', '#f78b00', | |
'#58dc00', '#287b00', '#a8f07a', '#4ae8c4', | |
'#3b88eb', '#3824aa', '#a700ff', '#d300e7' | |
]; | |
// Initialize variables | |
var $window = $(window); | |
var $usernameInput = $('.usernameInput'); // Input for username | |
var $messages = $('.messages'); // Messages area | |
var $inputMessage = $('.inputMessage'); // Input message input box | |
var $loginPage = $('.login.page'); // The login page | |
var $chatPage = $('.chat.page'); // The chatroom page | |
// Prompt for setting a username | |
var username; | |
var connected = false; | |
var typing = false; | |
var lastTypingTime; | |
var $currentInput = $usernameInput.focus(); | |
var socket; | |
//var socket = new WebSocket("ws://fcb827f6-0119-403c-be89-904cef646bbd.par0-faas-n0.clvrcld.net/chat/room1") | |
//var socket = io("ws://fcb827f6-0119-403c-be89-904cef646bbd.par0-faas-n0.clvrcld.net/chat/room1") | |
connected = true; | |
const addParticipantsMessage = (data) => { | |
var message = ''; | |
if (data.numUsers === 1) { | |
message += "there's 1 participant"; | |
} else { | |
message += "there are " + data.numUsers + " participants"; | |
} | |
log(message); | |
} | |
// Sets the client's username | |
const setUsername = () => { | |
//username = cleanInput($usernameInput.val().trim()); | |
username = $usernameInput.val().trim(); | |
// If the username is valid | |
if (username) { | |
$loginPage.fadeOut(); | |
$chatPage.show(); | |
$loginPage.off('click'); | |
$currentInput = $inputMessage.focus(); | |
// Tell the server your username | |
//socket.emit('add user', username); | |
start(); | |
} | |
} | |
const start = () => { | |
socket = new WebSocket("ws://fcb827f6-0119-403c-be89-904cef646bbd.par0-faas-n0.clvrcld.net/chat/room1/"+username) | |
socket.onmessage = function(data) { | |
console.log(data); | |
let parsed = JSON.parse(data.data); | |
//if (parsed.message_type == "Message" { | |
addChatMessage(parsed); | |
/*} else if (parsed.message_type == "Register" { | |
log(data.username + ' joined'); | |
addParticipantsMessage(parsed); | |
}*/ | |
}; | |
} | |
// Sends a chat message | |
const sendMessage = () => { | |
var message = $inputMessage.val(); | |
// Prevent markup from being injected into the message | |
//message = cleanInput(message); | |
// if there is a non-empty message and a socket connection | |
if (message && connected) { | |
$inputMessage.val(''); | |
/*addChatMessage({ | |
username: username, | |
payload: message | |
});*/ | |
// tell server to execute 'new message' and send along one parameter | |
//socket.emit("message", message); | |
socket.send(message); | |
} | |
} | |
// Log a message | |
const log = (message, options) => { | |
var $el = $('<li>').addClass('log').text(message); | |
addMessageElement($el, options); | |
} | |
// Adds the visual chat message to the message list | |
const addChatMessage = (data, options) => { | |
// Don't fade the message in if there is an 'X was typing' | |
var $typingMessages = getTypingMessages(data); | |
options = options || {}; | |
if ($typingMessages.length !== 0) { | |
options.fade = false; | |
$typingMessages.remove(); | |
} | |
var $usernameDiv = $('<span class="username"/>') | |
.text(data.username) | |
.css('color', getUsernameColor(data.username)); | |
var $messageBodyDiv; | |
if (data.message_type == "Message") { | |
$messageBodyDiv = $('<span class="messageBody">') | |
.text(data.payload); | |
} else if (data.message_type == "Register") { | |
$messageBodyDiv = $('<span class="messageBody"><em>joined</em></span>'); | |
} else if (data.message_type == "Quit") { | |
$messageBodyDiv = $('<span class="messageBody"><em>left</em></span>'); | |
} | |
var typingClass = data.typing ? 'typing' : ''; | |
var $messageDiv = $('<li class="message"/>') | |
.data('username', data.username) | |
.addClass(typingClass) | |
.append($usernameDiv, $messageBodyDiv); | |
addMessageElement($messageDiv, options); | |
} | |
// Adds the visual chat typing message | |
const addChatTyping = (data) => { | |
data.typing = true; | |
data.message = 'is typing'; | |
addChatMessage(data); | |
} | |
// Removes the visual chat typing message | |
const removeChatTyping = (data) => { | |
getTypingMessages(data).fadeOut(function () { | |
$(this).remove(); | |
}); | |
} | |
// Adds a message element to the messages and scrolls to the bottom | |
// el - The element to add as a message | |
// options.fade - If the element should fade-in (default = true) | |
// options.prepend - If the element should prepend | |
// all other messages (default = false) | |
const addMessageElement = (el, options) => { | |
var $el = $(el); | |
// Setup default options | |
if (!options) { | |
options = {}; | |
} | |
if (typeof options.fade === 'undefined') { | |
options.fade = true; | |
} | |
if (typeof options.prepend === 'undefined') { | |
options.prepend = false; | |
} | |
// Apply options | |
if (options.fade) { | |
$el.hide().fadeIn(FADE_TIME); | |
} | |
if (options.prepend) { | |
$messages.prepend($el); | |
} else { | |
$messages.append($el); | |
} | |
$messages[0].scrollTop = $messages[0].scrollHeight; | |
} | |
// Prevents input from having injected markup | |
const cleanInput = (input) => { | |
return $('<div/>').text(input).html(); | |
} | |
// Updates the typing event | |
const updateTyping = () => { | |
if (connected) { | |
if (!typing) { | |
typing = true; | |
//socket.emit('typing'); | |
} | |
lastTypingTime = (new Date()).getTime(); | |
setTimeout(() => { | |
var typingTimer = (new Date()).getTime(); | |
var timeDiff = typingTimer - lastTypingTime; | |
if (timeDiff >= TYPING_TIMER_LENGTH && typing) { | |
//socket.emit('stop typing'); | |
typing = false; | |
} | |
}, TYPING_TIMER_LENGTH); | |
} | |
} | |
// Gets the 'X is typing' messages of a user | |
const getTypingMessages = (data) => { | |
return $('.typing.message').filter(function (i) { | |
return $(this).data('username') === data.username; | |
}); | |
} | |
// Gets the color of a username through our hash function | |
const getUsernameColor = (username) => { | |
// Compute hash code | |
var hash = 7; | |
for (var i = 0; i < username.length; i++) { | |
hash = username.charCodeAt(i) + (hash << 5) - hash; | |
} | |
// Calculate color | |
var index = Math.abs(hash % COLORS.length); | |
return COLORS[index]; | |
} | |
// Keyboard events | |
$window.keydown(event => { | |
// Auto-focus the current input when a key is typed | |
if (!(event.ctrlKey || event.metaKey || event.altKey)) { | |
$currentInput.focus(); | |
} | |
// When the client hits ENTER on their keyboard | |
if (event.which === 13) { | |
if (username) { | |
sendMessage(); | |
///socket.emit('stop typing'); | |
typing = false; | |
} else { | |
setUsername(); | |
} | |
} | |
}); | |
$inputMessage.on('input', () => { | |
updateTyping(); | |
}); | |
// Click events | |
// Focus input when clicking anywhere on login page | |
$loginPage.click(() => { | |
$currentInput.focus(); | |
}); | |
// Focus input when clicking on the message input's border | |
$inputMessage.click(() => { | |
$inputMessage.focus(); | |
}); | |
}); |
// this application will be started for every message coming on | |
// an Apache Pulsar topic. | |
// It receives messages from the HTTP/WebSocket applications, | |
// stores their information in the key value store, then redispatches | |
// messages to all the other participants | |
extern crate serverless_api as api; | |
use anyhow::anyhow; | |
use log::*; | |
use serde::{Deserialize, Serialize}; | |
use std::collections::HashMap; | |
#[derive(Debug, Clone, Serialize, Deserialize)] | |
struct ChatMessage { | |
pub username: String, | |
pub channel: String, | |
pub payload: Option<String>, | |
pub message_type: MessageType, | |
} | |
#[derive(Debug, Clone, Serialize, Deserialize)] | |
enum MessageType { | |
Register, | |
Quit, | |
Message, | |
} | |
#[derive(Debug, Clone, Serialize, Deserialize)] | |
struct Users { | |
// username, topic | |
users: HashMap<String, String>, | |
} | |
fn pulsar() -> anyhow::Result<()> { | |
// read the incoming message | |
let payload = api::read_pulsar().ok_or(anyhow!("could not read trigger message"))?; | |
let mut message: ChatMessage = serde_json::from_slice(&payload)?; | |
info!("got message: {:?}", message); | |
// get the channel's information from the key value store | |
// or create a new Users struct if we are starting a new channel | |
let key = format!("chat#{}", message.channel); | |
let mut users = match api::kv::get(&key) { | |
None => Users { | |
users: HashMap::new(), | |
}, | |
Some(value) => { | |
info!("kv::get({}) -> {:?}", key, std::str::from_utf8(&value)); | |
if value.is_empty() { | |
Users { | |
users: HashMap::new(), | |
} | |
} else { | |
serde_json::from_slice(&value)? | |
} | |
}, | |
}; | |
match message.message_type { | |
// a new user is joining | |
MessageType::Register => { | |
// this is the temporary topic to communicate with the | |
//new user's HTTP/WebSocket application instance | |
let topic = message.payload.take().unwrap(); | |
// send back the list of currently connected users | |
for name in users.users.keys() { | |
let sent_message = ChatMessage { | |
username: name.clone(), | |
channel: message.channel.clone(), | |
payload: None, | |
message_type: MessageType::Register, | |
}; | |
api::topic_send(&topic, &serde_json::to_vec(&sent_message)?)?; | |
} | |
// this is the same register message, but without the temporary topic | |
let duplicated_message = serde_json::to_vec(&message)?; | |
// notify the other users | |
for user_topic in users.users.values() { | |
api::topic_send(&user_topic, &duplicated_message)?; | |
} | |
// add the new user to the list and save it | |
users.users.insert(message.username.clone(), topic.clone()); | |
api::kv::set(&key, &serde_json::to_vec(&users)?)?; | |
// notify the user that they are properly connected | |
let sent_message = ChatMessage { | |
username: "-server-".to_string(), | |
channel: message.channel.clone(), | |
payload: Some("Connected!".to_string()), | |
message_type: MessageType::Message, | |
}; | |
api::topic_send(&topic, &serde_json::to_vec(&sent_message)?)?; | |
} | |
// a user is leaving the channel | |
MessageType::Quit => { | |
// remove them from the list | |
users.users.remove(&message.username); | |
api::kv::set(&key, &serde_json::to_vec(&users)?)?; | |
// notify the other users | |
for user_topic in users.users.values() { | |
api::topic_send(&user_topic, &payload)?; | |
} | |
} | |
// a user sent a message | |
MessageType::Message => { | |
// notify the other users | |
for user_topic in users.users.values() { | |
api::topic_send(&user_topic, &payload)?; | |
} | |
} | |
} | |
Ok(()) | |
} | |
// this is where the function starts eecting | |
#[no_mangle] | |
pub extern "sysv64" fn chat_pulsar() { | |
let res = api::setup_logger(); | |
match std::panic::catch_unwind(|| { | |
if let Err(e) = pulsar() { | |
api::log(&format!("error handling the message:\n {:?}", e)); | |
} | |
}) { | |
Ok(_) => {} | |
Err(e) => { | |
api::log("got an error"); | |
log::error!("panicked:{:?}", e); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment