Skip to content

Instantly share code, notes, and snippets.

@dxlbnl
Created September 11, 2017 18:42
Show Gist options
  • Save dxlbnl/11e621ab46b1281c831330f2ace71b1c to your computer and use it in GitHub Desktop.
Save dxlbnl/11e621ab46b1281c831330f2ace71b1c to your computer and use it in GitHub Desktop.
Compiling queue v0.1.0 (file:///home/dexter/nerdalize/prototypes/queue)
warning: unused import: `diesel::expression`
--> src/bin/run_server.rs:23:5
|
23 | use diesel::expression;
| ^^^^^^^^^^^^^^^^^^
|
= note: #[warn(unused_imports)] on by default
warning: unused import: `diesel::query_builder::Query`
--> src/bin/run_server.rs:24:5
|
24 | use diesel::query_builder::Query;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
warning: unused import: `sql`
--> src/bin/run_server.rs:155:35
|
155 | use diesel::expression::dsl::{sql,max,now};
| ^^^
error[E0271]: type mismatch resolving `<diesel::expression::now as diesel::Expression>::SqlType == diesel::types::Nullable<diesel::types::Timestamp>`
--> src/bin/run_server.rs:186:39
|
186 | .select(max(runs::expire).lt(now))
| ^^ expected struct `diesel::types::Timestamp`, found struct `diesel::types::Nullable`
|
= note: expected type `diesel::types::Timestamp`
found type `diesel::types::Nullable<diesel::types::Timestamp>`
error[E0271]: type mismatch resolving `<queue::schema::runs::table as diesel::query_source::AppearsInFromClause<queue::schema::tasks::table>>::Count == diesel::query_source::Succ<diesel::query_source::Never>`
--> src/bin/run_server.rs:188:14
|
188 | .filter(runs::task.eq(tasks::id)),
| ^^^^^^ expected struct `diesel::query_source::Never`, found struct `diesel::query_source::Succ`
|
= note: expected type `diesel::query_source::Never`
found type `diesel::query_source::Succ<diesel::query_source::Never>`
= note: required because of the requirements on the impl of `diesel::AppearsOnTable<queue::schema::runs::table>` for `queue::schema::tasks::id`
= note: required because of the requirements on the impl of `diesel::AppearsOnTable<queue::schema::runs::table>` for `diesel::expression::operators::Eq<queue::schema::runs::task, queue::schema::tasks::id>`
= note: required because of the requirements on the impl of `diesel::FilterDsl<diesel::expression::operators::Eq<queue::schema::runs::task, queue::schema::tasks::id>>` for `diesel::query_builder::SelectStatement<queue::schema::runs::table, diesel::query_builder::select_clause::SelectClause<diesel::expression::Max<queue::schema::runs::expire>>>`
error: aborting due to 2 previous errors
error: Could not compile `queue`.
To learn more, run the command again with --verbose.
#![feature(plugin)]
#![plugin(rocket_codegen)]
extern crate queue;
extern crate rocket;
extern crate dotenv;
extern crate rocket_contrib;
#[macro_use] extern crate diesel;
#[macro_use] extern crate diesel_codegen;
#[macro_use] extern crate serde_derive;
extern crate serde_json;
use queue::pg_pool;
use dotenv::dotenv;
use std::env;
use std::time::{SystemTime};
use rocket_contrib::Json;
use queue::models::*;
use diesel::prelude::*;
use diesel::expression;
use diesel::query_builder::Query;
use diesel::associations::HasTable;
use diesel::pg::Pg;
use diesel::types;
use queue::schema::{tasks,runs};
use queue::DbConn;
#[get("/workloads", format="application/json")]
fn workload_list(connection: DbConn) -> Json<Vec<Workload>> {
use queue::schema::workloads::dsl::workloads;
let results = workloads
.limit(5000)
.load::<Workload>(&*connection)
.expect("Error loading workloads");
Json(results)
// format!("List all workloads")
}
#[get("/workload/<workload_id>", format="application/json")]
fn workload_detail(connection: DbConn, workload_id : i32) -> Json<Workload> {
use queue::schema::workloads::dsl::workloads;
Json(
workloads
.find(workload_id)
.first(&*connection)
.expect("Error loading workloads")
)
}
#[get("/workload/<workload_id>/tasks", format="application/json")]
fn task_list(connection: DbConn, workload_id : i32) -> Json<Vec<Task>> {
use queue::schema::workloads::dsl::workloads;
let workload : Workload = workloads.find(workload_id).first(&*connection).unwrap();
Json(
Task::belonging_to(&workload)
.load(&*connection)
.unwrap()
)
}
#[derive(Serialize,Queryable,Associations,Identifiable)]
#[belongs_to(TaskDetail, foreign_key="task")]
#[table_name="runs"]
struct RunDetail {
pub id: i32,
pub created_at: SystemTime,
pub worker: String,
pub task: i32,
pub status: String,
}
#[derive(Serialize,Queryable,Associations,Identifiable)]
#[belongs_to(Workload, foreign_key="workload")]
#[table_name="tasks"]
struct TaskDetail {
pub id: i32,
pub created_at: SystemTime,
pub workload: i32,
pub cmd: Option<String>,
pub env: Option<serde_json::Value>, // Is a json type
}
#[get("/workload/<workload_id>/task/<task_id>", format="application/json")]
fn task_detail(connection : DbConn, workload_id : i32, task_id : i32) -> Json<TaskDetail> {
use queue::schema::workloads;
let workload : Workload = workloads::table.find(workload_id).first(&*connection).unwrap();
let task : TaskDetail = TaskDetail::belonging_to(&workload).find(task_id).first(&*connection).unwrap();
// let runs : Vec<Run> = RunDetail::belonging_to(&task).load(&*connection).unwrap();
// task.runs = runs;
Json(task)
}
#[get("/workload/<workload_id>/task/<task_id>/run/<run_id>", format="application/json")]
fn run_detail(workload_id : i64, task_id : i64, run_id : i64) -> String {
format!("Get run:{} for task:{} from workload:{}", run_id, task_id, workload_id)
}
#[post("/workload", data="<workload>", format="application/json")]
fn workload_create(connection: DbConn, workload: Json<NewWorkload>) -> Json<Workload> {
use queue::schema::workloads::dsl::workloads;
Json(diesel::insert(&workload.into_inner())
.into(workloads::table())
.get_result(&*connection)
.unwrap())
}
#[post("/workload/<workload_id>/tasks", data="<task_list>", format="application/json")]
fn tasks_create(connection: DbConn, workload_id: i32, task_list: Json<Vec<NewTask>>) -> Json<Vec<Task>> {
use queue::schema::tasks::dsl::tasks;
// Associate all tasks with the workload
let associated_tasks : Vec<NewTask> = task_list.into_inner().iter().map(|task| NewTask {
workload: Some(workload_id),
cmd: task.clone().cmd
}).collect();
Json(diesel::insert(&associated_tasks)
.into(tasks::table())
.get_results(&*connection)
.unwrap())
}
#[derive(Deserialize)]
struct TaskClaimRequest {
worker_id: String
}
#[derive(Serialize)]
struct TaskClaimResponse {
// run: Run,
// task: Task
}
sql_function!(coalesce, coalesce_t, (stmt: types::Array<types::Nullable<types::Timestamp>>, default: types::Bool) -> types::Bool);
#[post("/workload/<workload_id>/run", data="<worker_request>", format="application/json")]
fn run_claim(connection: DbConn, worker_request: Json<TaskClaimRequest>, workload_id: i32) -> Json<TaskClaimResponse>{
// use queue::schema::workloads;
use queue::schema::tasks;
// use queue::schema::runs;
use diesel::expression::dsl::{sql,max,now};
// Insert statement
// let new_run = diesel::insert(/* Select query with LOCKING*/)
// .into(runs::table)
// .get_result(&*connection)
// .unwrap();
// INSERT INTO runs (worker, task)
// SELECT 'workerA', t.id
// FROM tasks t
// WHERE t.workload = 1
// AND COALESCE((SELECT MAX(expire) < NOW() FROM runs r WHERE r.task = t.id), TRUE)
// LIMIT 1
// FOR UPDATE SKIP LOCKED
// let t = tasks::table
// .select(tasks::id)
// .filter(tasks::workload.eq(workload_id))
// .filter(sql::<types::Bool>("COALESCE((SELECT MAX(expire) < NOW() FROM runs r WHERE r.task = \"tasks\".\"id\"), TRUE)"))
// .limit(1);
// // FOR UPDATE SKIP LOCKED
let t = tasks::table
.select(tasks::id)
.filter(tasks::workload.eq(workload_id))
.filter(coalesce(
runs::table
.select(max(runs::expire).lt(now))
.select(max(runs::expire))
.filter(runs::task.eq(tasks::id)),
true
))
.limit(1);
// FOR UPDATE SKIP LOCKED
println!("Querying: \n{}", diesel::debug_query::<Pg, _>(&t));
Json(
TaskClaimResponse {
// task: task
}
)
}
/*
! Not querying in a single transaction, reading here
let q_open_task = tasks::table.left_join(runs::table)
.filter(
tasks::workload
.eq(workload.id)
.and(
runs::id
.is_null()
.or(
runs::expire.lt(SystemTime::now())
)
)
)
.select(tasks::all_columns);
let task_runs = runs::table.left_join(tasks::table).select(runs::task).filter(tasks::workload.eq(workload_id));
let q_open_task = tasks::table.filter(
tasks::workload.eq(workload_id)
.and(
tasks::id.eq(
any(task_runs.filter(runs::expire.lt(now)))
).or(
tasks::id.ne(
any(task_runs.filter(runs::expire.gt(now)))
)
)
)
);
println!("Querying: \n{}", diesel::debug_query::<Pg, _>(&q_open_task));
let task = q_open_task.first::<Task>(&*connection).unwrap();
// Inserting here
let run_claim = diesel::insert(&NewRun {
worker: worker_request.into_inner().worker_id,
task: task.id,
expire: SystemTime::now() + Duration::from_secs(100)
})
.into(runs::table)
.get_result(&*connection)
.unwrap();
*/
fn main() {
dotenv().ok();
let database_url = env::var("DATABASE_URL")
.expect("DATABASE_URL must be set");
rocket::ignite()
.manage(pg_pool::init(&database_url))
.mount("/", routes![
workload_list,
workload_detail,
task_list,
task_detail,
run_detail,
workload_create,
tasks_create,
run_claim,
])
.launch();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment