Created
July 25, 2016 08:08
-
-
Save paomian/6c3e2cb99834ef5eb112ed1d1602c1e1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#[macro_use] extern crate hyper; | |
extern crate rand; | |
use std::io::{BufReader}; | |
use std::io::prelude::*; | |
use std::fs::OpenOptions; | |
use std::collections::BTreeMap; | |
use std::iter::Iterator; | |
use hyper::client::Client; | |
use hyper::header::Basic; | |
use hyper::header::{Headers,ContentType,Authorization}; | |
use hyper::mime::{Mime, TopLevel, SubLevel,Attr, Value}; | |
use std::io::SeekFrom; | |
use rand::Rng; | |
use std::process::Command; | |
use std::time::SystemTime; | |
use std::fs::File; | |
use std::env; | |
use std::sync::{Once, ONCE_INIT}; | |
use std::mem; | |
use std::rc::Rc; | |
use std::cell::RefCell; | |
type Cache = Rc<RefCell<BTreeMap<String,SystemTime>>>; | |
static mut CACHE: *const Cache = 0 as *const Cache; | |
static ONCE: Once = ONCE_INIT; | |
fn check(app_id:&str) -> bool { | |
let my_cache = unsafe { | |
ONCE.call_once(|| { | |
let cache:Cache = Rc::new(RefCell::new(BTreeMap::new())); | |
CACHE = mem::transmute(Box::new(cache)); | |
}); | |
(*CACHE).clone() | |
}; | |
let mut new = false; | |
let result = match my_cache.borrow_mut().entry(String::from(app_id)).or_insert_with(|| { | |
new = true; | |
SystemTime::now() | |
}).elapsed() { | |
Ok(t) => new || t.as_secs() > 300, | |
Err(_) => false, | |
}; | |
return result; | |
} | |
fn alert(app_id:&str,data:Option<&mut InnerValue>) { | |
if data.is_some() { | |
if check(app_id) { | |
let c = Client::new(); | |
let boundary = rand::thread_rng() | |
.gen_ascii_chars() | |
.take(20) | |
.collect::<String>(); | |
let mut header = Headers::new(); | |
header.set(ContentType(Mime( | |
TopLevel::Multipart,SubLevel::FormData, | |
vec![(Attr::Ext(String::from("boundary")), | |
Value::Ext(boundary.clone()))]))); | |
header.set(Authorization( | |
Basic{ | |
username:String::from(""), | |
password:Some(String::from("")), | |
} | |
)); | |
let text:String = format!("{} 应用短信失败率过高。{:?}",app_id,data); | |
let mut body = BTreeMap::new(); | |
body.insert("from",""); | |
body.insert("to",""); | |
body.insert("subject",""); | |
body.insert("text",&text[..]); | |
let bs = body.iter().fold( | |
String::new(), | |
|acc,x| { | |
format!("{}--{}\r\nContent-Disposition: form-data; name=\"{}\"\r\n\r\n{}\r\n",&acc[..],&boundary[..],x.0,x.1) | |
}); | |
let body_str = format!("{}--{}--",&bs[..],&boundary[..]); | |
let result = c.post("") | |
.headers(header) | |
.body(&body_str[..]) | |
.send(); | |
let _ = result.map(|mut x| { | |
let mut result_body:Vec<u8> = Vec::new(); | |
let _ = x.read_to_end(&mut result_body); | |
println!("{}",String::from_utf8(result_body).unwrap_or(String::new())); | |
}); | |
} else { | |
println!("{}:{:?}",app_id,data); | |
} | |
} | |
} | |
#[derive(Debug)] | |
struct Data<'a> { | |
_app_id:&'a str, | |
_phone:&'a str, | |
_type:&'a str, | |
_channel:&'a str, | |
_subchannel:&'a str, | |
_success:bool, | |
} | |
#[derive(Debug)] | |
struct InnerValue { | |
_success:u64, | |
_fail:u64, | |
_all:u64, | |
_error:u64, | |
} | |
impl InnerValue { | |
fn new() -> Self { | |
InnerValue{ | |
_success:0, | |
_fail:0, | |
_all:0, | |
_error:0, | |
} | |
} | |
} | |
#[derive(Debug)] | |
struct Res { | |
_data:BTreeMap<String,InnerValue>, | |
} | |
impl Res { | |
fn new() -> Self { | |
Res { | |
_data:BTreeMap::new(), | |
} | |
} | |
fn update_recv(&mut self,d:&Data) { | |
let tmp = self._data.entry(String::from(d._app_id)).or_insert(InnerValue::new()); | |
if d._success { | |
tmp._success += 1; | |
} else { | |
tmp._fail += 1 | |
} | |
if ((tmp._success as f64 / tmp._all as f64) < 0.8) && (tmp._all > 200) { | |
alert(d._app_id,Some(tmp)); | |
} | |
} | |
fn update_send(&mut self,d:&Data) { | |
let tmp = self._data.entry(String::from(d._app_id)).or_insert(InnerValue::new()); | |
tmp._all += 1; | |
if !d._success { | |
tmp._error += 1; | |
} | |
} | |
fn clean(&mut self) { | |
{ | |
let mut tmp = self._data.iter().collect::<Vec<(&String,&InnerValue)>>(); | |
tmp.sort_by(|a,b| (a.1)._all.cmp(&(b.1)._all)); | |
for (k,v) in tmp { | |
println!("{}:{:?}",k ,v); | |
} | |
} | |
self._data.clear(); | |
} | |
} | |
impl<'a> Data<'a> { | |
#[allow(dead_code)] | |
fn new() -> Data<'a> { | |
Data { | |
_app_id:"", | |
_phone:"", | |
_type:"", | |
_channel:"", | |
_subchannel:"", | |
_success:false, | |
} | |
} | |
fn new_by_take(mut data:Box<Iterator<Item=&'a str> + 'a>,) -> Option<Data<'a>>{ | |
data.size_hint().1.and_then(|x| { | |
if x == 6 { | |
let data = Data { | |
_app_id:data.next().unwrap(), | |
_phone:data.next().unwrap(), | |
_type:data.next().unwrap(), | |
_channel:data.next().unwrap(), | |
_subchannel:data.next().unwrap(), | |
_success:{ | |
let tmp = data.next().unwrap(); | |
(tmp == "SUCCESS") || (tmp == "true") | |
}, | |
}; | |
Some(data) | |
} else { | |
None | |
} | |
}) | |
} | |
} | |
fn reduce_data(buf:&String,all_data:&mut Res) | |
{ | |
if buf.find("Sms send").map(|x| { | |
match Data::new_by_take(Box::new(buf | |
.split_at(x) | |
.1.split(':') | |
.skip(1) | |
.take(6))) { | |
Some(d) => all_data.update_send(&d), | |
None => println!("log error {}",x), | |
} | |
}).is_none() { | |
buf.find("Sms receipt by").map(|x| { | |
match Data::new_by_take(Box::new(buf | |
.split_at(x) | |
.1.split(':') | |
.skip(1) | |
.take(6))) { | |
Some(d) => all_data.update_recv(&d), | |
None => println!("log error {}",x), | |
} | |
}); | |
} | |
} | |
fn go(fd:&mut File, seek:&mut u64,empty_read:&mut u8, all_data:&mut Res) -> () { | |
let _ = fd.seek(SeekFrom::Start(*seek)); | |
let mut fbuf = BufReader::new(fd); | |
let mut buf = String::new(); | |
let mut empty = true; | |
while fbuf.read_line(&mut buf).unwrap() > 0 { | |
*seek += buf.as_bytes().len() as u64; | |
empty = false; | |
reduce_data(&buf,all_data); | |
buf.clear(); | |
} | |
if empty { | |
*empty_read += 1; | |
} | |
} | |
fn log_file(pp:&String) -> String { | |
let output = Command::new("sh") | |
.arg("-c") | |
.arg("date +\"uluru.log.%Y-%m-%d\"") | |
.output() | |
.expect("failed to execute proces"); | |
format!("{}/{}",pp,String::from_utf8(output.stdout).unwrap_or(String::new())) | |
} | |
fn main() { | |
let prefix_path = env::args().nth(1).unwrap_or(String::from(".")); | |
let mut seek:u64 = 0; | |
let mut all_data = Res::new(); | |
let mut old_log = log_file(&prefix_path); | |
print!("{}",old_log); | |
let mut empty_read:u8 = 0; | |
let mut fd = OpenOptions::new().read(true).open(old_log.trim()).unwrap(); | |
loop { | |
let log = log_file(&prefix_path); | |
if empty_read > 10 { | |
alert(&format!("已经五分钟没有日志写入 {}",old_log)[..],None); | |
if old_log != log { | |
seek = 0; | |
empty_read = 0; | |
all_data.clean(); | |
old_log = log; | |
} | |
fd = OpenOptions::new().read(true).open(old_log.trim()).unwrap(); | |
} | |
go(&mut fd,&mut seek,&mut empty_read, &mut all_data); | |
println!("Sleep 30S old_log:{},seek:{},empty_read:{}", | |
old_log.trim(),seek,empty_read); | |
::std::thread::sleep(::std::time::Duration::new(30,0)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment