Skip to content

Instantly share code, notes, and snippets.

@triptec
Created May 12, 2018 12:28
Show Gist options
  • Save triptec/7b727e0de5a532f68bc0c1645743f2b4 to your computer and use it in GitHub Desktop.
Save triptec/7b727e0de5a532f68bc0c1645743f2b4 to your computer and use it in GitHub Desktop.
#[macro_use]
extern crate actix;
extern crate actix_web;
extern crate bytes;
extern crate futures;
#[macro_use]
extern crate log;
extern crate env_logger;
extern crate serde;
extern crate serde_json;
#[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate validator_derive;
extern crate chrono;
extern crate validator;
extern crate rusoto_core;
extern crate rusoto_firehose;
mod aws;
use std::time::Duration;
use actix::prelude::*;
use actix_web::{
http, http::HeaderMap, middleware, server, App, AsyncResponder, Error, HttpMessage,
HttpRequest, HttpResponse,
};
use bytes::Bytes;
use futures::future::Future;
use chrono::Utc;
use serde_json::Value;
use std::sync::Arc;
use validator::{Validate, ValidationError};
#[derive(Message)]
struct Ping {}
#[derive(Message)]
struct ValueMsg {
value: Value,
}
struct StreamActor {
buffer: Vec<Value>,
addr: Recipient<Syn, Ping>,
}
impl actix::Actor for StreamActor {
type Context = actix::Context<StreamActor>;
}
fn send_buffer(buffer: &Vec<Value>) {
use rusoto_core::Region;
use rusoto_firehose::{KinesisFirehose, KinesisFirehoseClient, PutRecordBatchInput, Record};
let client = KinesisFirehoseClient::simple(Region::EuWest1);
let mut records = vec![];
for item in buffer {
let item_with_endline = String::from(format!("{}{}",item.to_string(), "\n")).into_bytes();
let record = Record {
data: item_with_endline,
};
records.push(record);
}
let record_batch = PutRecordBatchInput {
delivery_stream_name: "MMSLASKEventsV3".to_string(),
records,
};
match client.put_record_batch(&record_batch).sync() {
Ok(_) => info!("PutRecordBatch: OK!"),
Err(error) => error!("Error: {:?}", error),
}
}
impl actix::Handler<ValueMsg> for StreamActor {
type Result = ();
fn handle(&mut self, msg: ValueMsg, _ctx: &mut actix::Context<Self>) {
//info!("Value received");
self.buffer.push(msg.value);
if self.buffer.len() >= 500 {
info!("Flushing");
send_buffer(&self.buffer);
self.buffer.clear();
}
}
}
impl actix::Handler<Ping> for StreamActor {
type Result = ();
fn handle(&mut self, _msg: Ping, ctx: &mut actix::Context<Self>) {
info!("Ping received");
if self.buffer.len() > 0 {
info!("Flushing");
send_buffer(&self.buffer);
self.buffer.clear();
} else {
info!("Buffer empty, not flushing");
}
ctx.notify_later(Ping{}, Duration::new(30, 0));
/*
ctx.run_later(Duration::new(300, 0), move |act, _| {
act.addr.do_send(Ping {}).expect("Could not send ping");
});
*/
}
}
fn validate_context(context: &EventContext) -> Result<(), ValidationError> {
match context.validate() {
Ok(_) => Ok(()),
Err(_e) => Err(ValidationError::new("Invalid context")),
}
}
#[allow(non_snake_case)]
#[derive(Debug, Serialize, Deserialize, Validate)]
struct Event {
#[validate(length(min = "1"))]
eventType: String,
eventTypeVersion: i32,
#[validate(custom = "validate_context")]
context: EventContext,
}
#[allow(non_snake_case)]
#[derive(Debug, Serialize, Deserialize, Validate)]
struct EventContext {
#[validate(length(min = "1"))]
productId: String,
#[validate(length(min = "1"))]
organizationId: String,
}
fn handle_event(
mut v: Value,
_headers: &HeaderMap,
a: &Arc<Recipient<Syn, ValueMsg>>,
) -> Result<(), ()> {
if v.is_object() {
let e: Event = match serde_json::from_value(v.clone()) {
Ok(e) => e,
Err(_) => return Err(()),
};
//debug!("{:?}", e);
match e.validate() {
Ok(_) => (),
Err(_) => return Err(()),
}
v.as_object_mut().unwrap().insert(
String::from("serverTimestamp"),
Value::String(format!("{:?}", Utc::now())),
);
a.clone()
.do_send(ValueMsg { value: v })
.expect("Could not send ValueMsg");
//println!("{}", v.to_string());
Ok(())
} else if v.is_array() {
let es: Vec<Event> = match serde_json::from_value(v.clone()) {
Ok(es) => es,
Err(_) => return Err(()),
};
for e in es {
//debug!("{:?}", e);
match e.validate() {
Ok(_) => continue,
Err(_) => return Err(()),
}
}
for mut value in v.as_array_mut().unwrap() {
value.as_object_mut().unwrap().insert(
String::from("serverTimestamp"),
Value::String(format!("{:?}", Utc::now())),
);
a.clone()
.do_send(ValueMsg {
value: value.clone(),
})
.expect("Could not send ValueMsg");
}
//println!("{}", v.to_string());
Ok(())
} else {
Err(())
}
}
fn index(req: HttpRequest<AppState>) -> Box<Future<Item = HttpResponse, Error = Error>> {
let headers = req.headers().clone();
let a = req.state().actors.clone();
req.body()
.limit(2048)
.from_err()
.and_then(move |bytes: Bytes| {
let v: Value = serde_json::from_slice(&bytes)?;
//info!("{:#}", v);
match handle_event(v, &headers, &a) {
Ok(_) => Ok(HttpResponse::Ok().into()),
Err(_) => Ok(HttpResponse::BadRequest().into()),
}
})
.responder()
}
struct AppState {
actors: Arc<Recipient<Syn, ValueMsg>>,
}
fn main() {
::std::env::set_var("RUST_LOG", "info");
env_logger::init();
let sys = actix::System::new("guide");
let addr: Addr<Syn, _> = StreamActor::create(|ctx| {
let addr: Addr<Syn, _> = ctx.address();
StreamActor {
buffer: vec![],
addr: addr.recipient(),
}
});
addr.clone().do_send(Ping {});
let a = Arc::new(addr.recipient().clone());
server::new(move || {
App::with_state(AppState { actors: a.clone() })
.middleware(middleware::Logger::default())
.route("/api/v1/events", http::Method::POST, index)
}).bind("0.0.0.0:8080")
.unwrap()
.start();
//let addr: actix::Addr<actix::Unsync, _> = StreamActor{buffer: vec![]}.start();
let _ = sys.run();
}
@triptec
Copy link
Author

triptec commented May 12, 2018

thread 'main' panicked at 'Use Self::Context::notify() instead of direct use of address', /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/actix-0.5.6/src/mailbox.rs:134:21
stack backtrace:
   0: std::sys::unix::backtrace::tracing::imp::unwind_backtrace
             at libstd/sys/unix/backtrace/tracing/gcc_s.rs:49
   1: std::sys_common::backtrace::print
             at libstd/sys_common/backtrace.rs:71
             at libstd/sys_common/backtrace.rs:59
   2: std::panicking::default_hook::{{closure}}
             at libstd/panicking.rs:211
   3: std::panicking::default_hook
             at libstd/panicking.rs:227
   4: std::panicking::rust_panic_with_hook
             at libstd/panicking.rs:463
   5: std::panicking::begin_panic
             at /checkout/src/libstd/panicking.rs:397
   6: <actix::mailbox::Mailbox<A>>::poll
             at /home/andreas/dev/ues_rs/<panic macros>:3
   7: <actix::contextimpl::ContextImpl<A>>::poll
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/actix-0.5.6/src/contextimpl.rs:291
   8: <actix::context::Context<A> as futures::future::Future>::poll
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/actix-0.5.6/src/context.rs:144
   9: <futures::future::map::Map<A, F> as futures::future::Future>::poll
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.21/src/future/map.rs:30
  10: <futures::future::map_err::MapErr<A, F> as futures::future::Future>::poll
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.21/src/future/map_err.rs:30
  11: <alloc::boxed::Box<F> as futures::future::Future>::poll
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.21/src/future/mod.rs:113
  12: <futures::task_impl::Spawn<T>>::poll_future_notify::{{closure}}
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.21/src/task_impl/mod.rs:289
  13: <futures::task_impl::Spawn<T>>::enter::{{closure}}
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.21/src/task_impl/mod.rs:363
  14: futures::task_impl::std::set
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.21/src/task_impl/std/mod.rs:78
  15: <futures::task_impl::Spawn<T>>::enter
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.21/src/task_impl/mod.rs:363
  16: <futures::task_impl::Spawn<T>>::poll_future_notify
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.21/src/task_impl/mod.rs:289
  17: <tokio::executor::current_thread::scheduler::Scheduled<'a, U>>::tick
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.6/src/executor/current_thread/scheduler.rs:352
  18: <tokio::executor::current_thread::scheduler::Scheduler<U>>::tick::{{closure}}
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.6/src/executor/current_thread/scheduler.rs:332
  19: <tokio::executor::current_thread::Borrow<'a, U>>::enter::{{closure}}::{{closure}}
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.6/src/executor/current_thread/mod.rs:688
  20: tokio::executor::current_thread::CurrentRunner::set_spawn
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.6/src/executor/current_thread/mod.rs:720
  21: <tokio::executor::current_thread::Borrow<'a, U>>::enter::{{closure}}
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.6/src/executor/current_thread/mod.rs:687
  22: <std::thread::local::LocalKey<T>>::try_with
             at /checkout/src/libstd/thread/local.rs:294
  23: <std::thread::local::LocalKey<T>>::with
             at /checkout/src/libstd/thread/local.rs:248
  24: <tokio::executor::current_thread::Borrow<'a, U>>::enter
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.6/src/executor/current_thread/mod.rs:686
  25: <tokio::executor::current_thread::scheduler::Scheduler<U>>::tick
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.6/src/executor/current_thread/scheduler.rs:332
  26: <tokio::executor::current_thread::Entered<'a, P>>::tick
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.6/src/executor/current_thread/mod.rs:572
  27: <tokio::executor::current_thread::Entered<'a, P>>::turn
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.6/src/executor/current_thread/mod.rs:515
  28: tokio_core::reactor::Core::poll::{{closure}}::{{closure}}::{{closure}}::{{closure}}
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-core-0.1.17/src/reactor/mod.rs:298
  29: <scoped_tls::ScopedKey<T>>::set
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/scoped-tls-0.1.2/src/lib.rs:155
  30: tokio_core::reactor::Core::poll::{{closure}}::{{closure}}::{{closure}}
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-core-0.1.17/src/reactor/mod.rs:297
  31: tokio_timer::timer::handle::with_default::{{closure}}
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-timer-0.2.3/src/timer/handle.rs:64
  32: <std::thread::local::LocalKey<T>>::try_with
             at /checkout/src/libstd/thread/local.rs:294
  33: <std::thread::local::LocalKey<T>>::with
             at /checkout/src/libstd/thread/local.rs:248
  34: tokio_timer::timer::handle::with_default
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-timer-0.2.3/src/timer/handle.rs:56
  35: tokio_core::reactor::Core::poll::{{closure}}::{{closure}}
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-core-0.1.17/src/reactor/mod.rs:275
  36: tokio_executor::global::with_default::{{closure}}
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-executor-0.1.2/src/global.rs:176
  37: <std::thread::local::LocalKey<T>>::try_with
             at /checkout/src/libstd/thread/local.rs:294
  38: <std::thread::local::LocalKey<T>>::with
             at /checkout/src/libstd/thread/local.rs:248
  39: tokio_executor::global::with_default
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-executor-0.1.2/src/global.rs:150
  40: tokio_core::reactor::Core::poll::{{closure}}
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-core-0.1.17/src/reactor/mod.rs:274
  41: tokio_reactor::with_default::{{closure}}
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-reactor-0.1.1/src/lib.rs:207
  42: <std::thread::local::LocalKey<T>>::try_with
             at /checkout/src/libstd/thread/local.rs:294
  43: <std::thread::local::LocalKey<T>>::with
             at /checkout/src/libstd/thread/local.rs:248
  44: tokio_reactor::with_default
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-reactor-0.1.1/src/lib.rs:199
  45: tokio_core::reactor::Core::poll
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-core-0.1.17/src/reactor/mod.rs:273
  46: tokio_core::reactor::Core::run
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-core-0.1.17/src/reactor/mod.rs:248
  47: actix::system::SystemRunner::run
             at /home/andreas/.cargo/registry/src/github.com-1ecc6299db9ec823/actix-0.5.6/src/system.rs:106
  48: ues_rs::main
             at src/main.rs:238
  49: std::rt::lang_start::{{closure}}
             at /checkout/src/libstd/rt.rs:74
  50: std::panicking::try::do_call
             at libstd/rt.rs:59
             at libstd/panicking.rs:310
  51: __rust_maybe_catch_panic
             at libpanic_unwind/lib.rs:105
  52: std::rt::lang_start_internal
             at libstd/panicking.rs:289
             at libstd/panic.rs:374
             at libstd/rt.rs:58
  53: std::rt::lang_start
             at /checkout/src/libstd/rt.rs:74
  54: main
  55: __libc_start_main
  56: _start

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment