Created
September 11, 2017 18:42
-
-
Save dxlbnl/11e621ab46b1281c831330f2ace71b1c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Compiling queue v0.1.0 (file:///home/dexter/nerdalize/prototypes/queue) | |
warning: unused import: `diesel::expression` | |
--> src/bin/run_server.rs:23:5 | |
| | |
23 | use diesel::expression; | |
| ^^^^^^^^^^^^^^^^^^ | |
| | |
= note: #[warn(unused_imports)] on by default | |
warning: unused import: `diesel::query_builder::Query` | |
--> src/bin/run_server.rs:24:5 | |
| | |
24 | use diesel::query_builder::Query; | |
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | |
warning: unused import: `sql` | |
--> src/bin/run_server.rs:155:35 | |
| | |
155 | use diesel::expression::dsl::{sql,max,now}; | |
| ^^^ | |
error[E0271]: type mismatch resolving `<diesel::expression::now as diesel::Expression>::SqlType == diesel::types::Nullable<diesel::types::Timestamp>` | |
--> src/bin/run_server.rs:186:39 | |
| | |
186 | .select(max(runs::expire).lt(now)) | |
| ^^ expected struct `diesel::types::Timestamp`, found struct `diesel::types::Nullable` | |
| | |
= note: expected type `diesel::types::Timestamp` | |
found type `diesel::types::Nullable<diesel::types::Timestamp>` | |
error[E0271]: type mismatch resolving `<queue::schema::runs::table as diesel::query_source::AppearsInFromClause<queue::schema::tasks::table>>::Count == diesel::query_source::Succ<diesel::query_source::Never>` | |
--> src/bin/run_server.rs:188:14 | |
| | |
188 | .filter(runs::task.eq(tasks::id)), | |
| ^^^^^^ expected struct `diesel::query_source::Never`, found struct `diesel::query_source::Succ` | |
| | |
= note: expected type `diesel::query_source::Never` | |
found type `diesel::query_source::Succ<diesel::query_source::Never>` | |
= note: required because of the requirements on the impl of `diesel::AppearsOnTable<queue::schema::runs::table>` for `queue::schema::tasks::id` | |
= note: required because of the requirements on the impl of `diesel::AppearsOnTable<queue::schema::runs::table>` for `diesel::expression::operators::Eq<queue::schema::runs::task, queue::schema::tasks::id>` | |
= note: required because of the requirements on the impl of `diesel::FilterDsl<diesel::expression::operators::Eq<queue::schema::runs::task, queue::schema::tasks::id>>` for `diesel::query_builder::SelectStatement<queue::schema::runs::table, diesel::query_builder::select_clause::SelectClause<diesel::expression::Max<queue::schema::runs::expire>>>` | |
error: aborting due to 2 previous errors | |
error: Could not compile `queue`. | |
To learn more, run the command again with --verbose. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![feature(plugin)] | |
#![plugin(rocket_codegen)] | |
extern crate queue; | |
extern crate rocket; | |
extern crate dotenv; | |
extern crate rocket_contrib; | |
#[macro_use] extern crate diesel; | |
#[macro_use] extern crate diesel_codegen; | |
#[macro_use] extern crate serde_derive; | |
extern crate serde_json; | |
use queue::pg_pool; | |
use dotenv::dotenv; | |
use std::env; | |
use std::time::{SystemTime}; | |
use rocket_contrib::Json; | |
use queue::models::*; | |
use diesel::prelude::*; | |
use diesel::expression; | |
use diesel::query_builder::Query; | |
use diesel::associations::HasTable; | |
use diesel::pg::Pg; | |
use diesel::types; | |
use queue::schema::{tasks,runs}; | |
use queue::DbConn; | |
#[get("/workloads", format="application/json")] | |
fn workload_list(connection: DbConn) -> Json<Vec<Workload>> { | |
use queue::schema::workloads::dsl::workloads; | |
let results = workloads | |
.limit(5000) | |
.load::<Workload>(&*connection) | |
.expect("Error loading workloads"); | |
Json(results) | |
// format!("List all workloads") | |
} | |
#[get("/workload/<workload_id>", format="application/json")] | |
fn workload_detail(connection: DbConn, workload_id : i32) -> Json<Workload> { | |
use queue::schema::workloads::dsl::workloads; | |
Json( | |
workloads | |
.find(workload_id) | |
.first(&*connection) | |
.expect("Error loading workloads") | |
) | |
} | |
#[get("/workload/<workload_id>/tasks", format="application/json")] | |
fn task_list(connection: DbConn, workload_id : i32) -> Json<Vec<Task>> { | |
use queue::schema::workloads::dsl::workloads; | |
let workload : Workload = workloads.find(workload_id).first(&*connection).unwrap(); | |
Json( | |
Task::belonging_to(&workload) | |
.load(&*connection) | |
.unwrap() | |
) | |
} | |
#[derive(Serialize,Queryable,Associations,Identifiable)] | |
#[belongs_to(TaskDetail, foreign_key="task")] | |
#[table_name="runs"] | |
struct RunDetail { | |
pub id: i32, | |
pub created_at: SystemTime, | |
pub worker: String, | |
pub task: i32, | |
pub status: String, | |
} | |
#[derive(Serialize,Queryable,Associations,Identifiable)] | |
#[belongs_to(Workload, foreign_key="workload")] | |
#[table_name="tasks"] | |
struct TaskDetail { | |
pub id: i32, | |
pub created_at: SystemTime, | |
pub workload: i32, | |
pub cmd: Option<String>, | |
pub env: Option<serde_json::Value>, // Is a json type | |
} | |
#[get("/workload/<workload_id>/task/<task_id>", format="application/json")] | |
fn task_detail(connection : DbConn, workload_id : i32, task_id : i32) -> Json<TaskDetail> { | |
use queue::schema::workloads; | |
let workload : Workload = workloads::table.find(workload_id).first(&*connection).unwrap(); | |
let task : TaskDetail = TaskDetail::belonging_to(&workload).find(task_id).first(&*connection).unwrap(); | |
// let runs : Vec<Run> = RunDetail::belonging_to(&task).load(&*connection).unwrap(); | |
// task.runs = runs; | |
Json(task) | |
} | |
#[get("/workload/<workload_id>/task/<task_id>/run/<run_id>", format="application/json")] | |
fn run_detail(workload_id : i64, task_id : i64, run_id : i64) -> String { | |
format!("Get run:{} for task:{} from workload:{}", run_id, task_id, workload_id) | |
} | |
#[post("/workload", data="<workload>", format="application/json")] | |
fn workload_create(connection: DbConn, workload: Json<NewWorkload>) -> Json<Workload> { | |
use queue::schema::workloads::dsl::workloads; | |
Json(diesel::insert(&workload.into_inner()) | |
.into(workloads::table()) | |
.get_result(&*connection) | |
.unwrap()) | |
} | |
#[post("/workload/<workload_id>/tasks", data="<task_list>", format="application/json")] | |
fn tasks_create(connection: DbConn, workload_id: i32, task_list: Json<Vec<NewTask>>) -> Json<Vec<Task>> { | |
use queue::schema::tasks::dsl::tasks; | |
// Associate all tasks with the workload | |
let associated_tasks : Vec<NewTask> = task_list.into_inner().iter().map(|task| NewTask { | |
workload: Some(workload_id), | |
cmd: task.clone().cmd | |
}).collect(); | |
Json(diesel::insert(&associated_tasks) | |
.into(tasks::table()) | |
.get_results(&*connection) | |
.unwrap()) | |
} | |
#[derive(Deserialize)] | |
struct TaskClaimRequest { | |
worker_id: String | |
} | |
#[derive(Serialize)] | |
struct TaskClaimResponse { | |
// run: Run, | |
// task: Task | |
} | |
sql_function!(coalesce, coalesce_t, (stmt: types::Array<types::Nullable<types::Timestamp>>, default: types::Bool) -> types::Bool); | |
#[post("/workload/<workload_id>/run", data="<worker_request>", format="application/json")] | |
fn run_claim(connection: DbConn, worker_request: Json<TaskClaimRequest>, workload_id: i32) -> Json<TaskClaimResponse>{ | |
// use queue::schema::workloads; | |
use queue::schema::tasks; | |
// use queue::schema::runs; | |
use diesel::expression::dsl::{sql,max,now}; | |
// Insert statement | |
// let new_run = diesel::insert(/* Select query with LOCKING*/) | |
// .into(runs::table) | |
// .get_result(&*connection) | |
// .unwrap(); | |
// INSERT INTO runs (worker, task) | |
// SELECT 'workerA', t.id | |
// FROM tasks t | |
// WHERE t.workload = 1 | |
// AND COALESCE((SELECT MAX(expire) < NOW() FROM runs r WHERE r.task = t.id), TRUE) | |
// LIMIT 1 | |
// FOR UPDATE SKIP LOCKED | |
// let t = tasks::table | |
// .select(tasks::id) | |
// .filter(tasks::workload.eq(workload_id)) | |
// .filter(sql::<types::Bool>("COALESCE((SELECT MAX(expire) < NOW() FROM runs r WHERE r.task = \"tasks\".\"id\"), TRUE)")) | |
// .limit(1); | |
// // FOR UPDATE SKIP LOCKED | |
let t = tasks::table | |
.select(tasks::id) | |
.filter(tasks::workload.eq(workload_id)) | |
.filter(coalesce( | |
runs::table | |
.select(max(runs::expire).lt(now)) | |
.select(max(runs::expire)) | |
.filter(runs::task.eq(tasks::id)), | |
true | |
)) | |
.limit(1); | |
// FOR UPDATE SKIP LOCKED | |
println!("Querying: \n{}", diesel::debug_query::<Pg, _>(&t)); | |
Json( | |
TaskClaimResponse { | |
// task: task | |
} | |
) | |
} | |
/* | |
! Not querying in a single transaction, reading here | |
let q_open_task = tasks::table.left_join(runs::table) | |
.filter( | |
tasks::workload | |
.eq(workload.id) | |
.and( | |
runs::id | |
.is_null() | |
.or( | |
runs::expire.lt(SystemTime::now()) | |
) | |
) | |
) | |
.select(tasks::all_columns); | |
let task_runs = runs::table.left_join(tasks::table).select(runs::task).filter(tasks::workload.eq(workload_id)); | |
let q_open_task = tasks::table.filter( | |
tasks::workload.eq(workload_id) | |
.and( | |
tasks::id.eq( | |
any(task_runs.filter(runs::expire.lt(now))) | |
).or( | |
tasks::id.ne( | |
any(task_runs.filter(runs::expire.gt(now))) | |
) | |
) | |
) | |
); | |
println!("Querying: \n{}", diesel::debug_query::<Pg, _>(&q_open_task)); | |
let task = q_open_task.first::<Task>(&*connection).unwrap(); | |
// Inserting here | |
let run_claim = diesel::insert(&NewRun { | |
worker: worker_request.into_inner().worker_id, | |
task: task.id, | |
expire: SystemTime::now() + Duration::from_secs(100) | |
}) | |
.into(runs::table) | |
.get_result(&*connection) | |
.unwrap(); | |
*/ | |
fn main() { | |
dotenv().ok(); | |
let database_url = env::var("DATABASE_URL") | |
.expect("DATABASE_URL must be set"); | |
rocket::ignite() | |
.manage(pg_pool::init(&database_url)) | |
.mount("/", routes![ | |
workload_list, | |
workload_detail, | |
task_list, | |
task_detail, | |
run_detail, | |
workload_create, | |
tasks_create, | |
run_claim, | |
]) | |
.launch(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment