Created
May 12, 2018 12:28
-
-
Save triptec/7b727e0de5a532f68bc0c1645743f2b4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#[macro_use] | |
extern crate actix; | |
extern crate actix_web; | |
extern crate bytes; | |
extern crate futures; | |
#[macro_use] | |
extern crate log; | |
extern crate env_logger; | |
extern crate serde; | |
extern crate serde_json; | |
#[macro_use] | |
extern crate serde_derive; | |
#[macro_use] | |
extern crate validator_derive; | |
extern crate chrono; | |
extern crate validator; | |
extern crate rusoto_core; | |
extern crate rusoto_firehose; | |
mod aws; | |
use std::time::Duration; | |
use actix::prelude::*; | |
use actix_web::{ | |
http, http::HeaderMap, middleware, server, App, AsyncResponder, Error, HttpMessage, | |
HttpRequest, HttpResponse, | |
}; | |
use bytes::Bytes; | |
use futures::future::Future; | |
use chrono::Utc; | |
use serde_json::Value; | |
use std::sync::Arc; | |
use validator::{Validate, ValidationError}; | |
#[derive(Message)] | |
struct Ping {} | |
#[derive(Message)] | |
struct ValueMsg { | |
value: Value, | |
} | |
struct StreamActor { | |
buffer: Vec<Value>, | |
addr: Recipient<Syn, Ping>, | |
} | |
impl actix::Actor for StreamActor { | |
type Context = actix::Context<StreamActor>; | |
} | |
fn send_buffer(buffer: &Vec<Value>) { | |
use rusoto_core::Region; | |
use rusoto_firehose::{KinesisFirehose, KinesisFirehoseClient, PutRecordBatchInput, Record}; | |
let client = KinesisFirehoseClient::simple(Region::EuWest1); | |
let mut records = vec![]; | |
for item in buffer { | |
let item_with_endline = String::from(format!("{}{}",item.to_string(), "\n")).into_bytes(); | |
let record = Record { | |
data: item_with_endline, | |
}; | |
records.push(record); | |
} | |
let record_batch = PutRecordBatchInput { | |
delivery_stream_name: "MMSLASKEventsV3".to_string(), | |
records, | |
}; | |
match client.put_record_batch(&record_batch).sync() { | |
Ok(_) => info!("PutRecordBatch: OK!"), | |
Err(error) => error!("Error: {:?}", error), | |
} | |
} | |
impl actix::Handler<ValueMsg> for StreamActor { | |
type Result = (); | |
fn handle(&mut self, msg: ValueMsg, _ctx: &mut actix::Context<Self>) { | |
//info!("Value received"); | |
self.buffer.push(msg.value); | |
if self.buffer.len() >= 500 { | |
info!("Flushing"); | |
send_buffer(&self.buffer); | |
self.buffer.clear(); | |
} | |
} | |
} | |
impl actix::Handler<Ping> for StreamActor { | |
type Result = (); | |
fn handle(&mut self, _msg: Ping, ctx: &mut actix::Context<Self>) { | |
info!("Ping received"); | |
if self.buffer.len() > 0 { | |
info!("Flushing"); | |
send_buffer(&self.buffer); | |
self.buffer.clear(); | |
} else { | |
info!("Buffer empty, not flushing"); | |
} | |
ctx.notify_later(Ping{}, Duration::new(30, 0)); | |
/* | |
ctx.run_later(Duration::new(300, 0), move |act, _| { | |
act.addr.do_send(Ping {}).expect("Could not send ping"); | |
}); | |
*/ | |
} | |
} | |
fn validate_context(context: &EventContext) -> Result<(), ValidationError> { | |
match context.validate() { | |
Ok(_) => Ok(()), | |
Err(_e) => Err(ValidationError::new("Invalid context")), | |
} | |
} | |
#[allow(non_snake_case)] | |
#[derive(Debug, Serialize, Deserialize, Validate)] | |
struct Event { | |
#[validate(length(min = "1"))] | |
eventType: String, | |
eventTypeVersion: i32, | |
#[validate(custom = "validate_context")] | |
context: EventContext, | |
} | |
#[allow(non_snake_case)] | |
#[derive(Debug, Serialize, Deserialize, Validate)] | |
struct EventContext { | |
#[validate(length(min = "1"))] | |
productId: String, | |
#[validate(length(min = "1"))] | |
organizationId: String, | |
} | |
fn handle_event( | |
mut v: Value, | |
_headers: &HeaderMap, | |
a: &Arc<Recipient<Syn, ValueMsg>>, | |
) -> Result<(), ()> { | |
if v.is_object() { | |
let e: Event = match serde_json::from_value(v.clone()) { | |
Ok(e) => e, | |
Err(_) => return Err(()), | |
}; | |
//debug!("{:?}", e); | |
match e.validate() { | |
Ok(_) => (), | |
Err(_) => return Err(()), | |
} | |
v.as_object_mut().unwrap().insert( | |
String::from("serverTimestamp"), | |
Value::String(format!("{:?}", Utc::now())), | |
); | |
a.clone() | |
.do_send(ValueMsg { value: v }) | |
.expect("Could not send ValueMsg"); | |
//println!("{}", v.to_string()); | |
Ok(()) | |
} else if v.is_array() { | |
let es: Vec<Event> = match serde_json::from_value(v.clone()) { | |
Ok(es) => es, | |
Err(_) => return Err(()), | |
}; | |
for e in es { | |
//debug!("{:?}", e); | |
match e.validate() { | |
Ok(_) => continue, | |
Err(_) => return Err(()), | |
} | |
} | |
for mut value in v.as_array_mut().unwrap() { | |
value.as_object_mut().unwrap().insert( | |
String::from("serverTimestamp"), | |
Value::String(format!("{:?}", Utc::now())), | |
); | |
a.clone() | |
.do_send(ValueMsg { | |
value: value.clone(), | |
}) | |
.expect("Could not send ValueMsg"); | |
} | |
//println!("{}", v.to_string()); | |
Ok(()) | |
} else { | |
Err(()) | |
} | |
} | |
fn index(req: HttpRequest<AppState>) -> Box<Future<Item = HttpResponse, Error = Error>> { | |
let headers = req.headers().clone(); | |
let a = req.state().actors.clone(); | |
req.body() | |
.limit(2048) | |
.from_err() | |
.and_then(move |bytes: Bytes| { | |
let v: Value = serde_json::from_slice(&bytes)?; | |
//info!("{:#}", v); | |
match handle_event(v, &headers, &a) { | |
Ok(_) => Ok(HttpResponse::Ok().into()), | |
Err(_) => Ok(HttpResponse::BadRequest().into()), | |
} | |
}) | |
.responder() | |
} | |
struct AppState { | |
actors: Arc<Recipient<Syn, ValueMsg>>, | |
} | |
fn main() { | |
::std::env::set_var("RUST_LOG", "info"); | |
env_logger::init(); | |
let sys = actix::System::new("guide"); | |
let addr: Addr<Syn, _> = StreamActor::create(|ctx| { | |
let addr: Addr<Syn, _> = ctx.address(); | |
StreamActor { | |
buffer: vec![], | |
addr: addr.recipient(), | |
} | |
}); | |
addr.clone().do_send(Ping {}); | |
let a = Arc::new(addr.recipient().clone()); | |
server::new(move || { | |
App::with_state(AppState { actors: a.clone() }) | |
.middleware(middleware::Logger::default()) | |
.route("/api/v1/events", http::Method::POST, index) | |
}).bind("0.0.0.0:8080") | |
.unwrap() | |
.start(); | |
//let addr: actix::Addr<actix::Unsync, _> = StreamActor{buffer: vec![]}.start(); | |
let _ = sys.run(); | |
} |
Author
triptec
commented
May 12, 2018
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment