Skip to content

Instantly share code, notes, and snippets.

@rponte
Last active June 4, 2020 13:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save rponte/0c5b0e3c1b84c0c2e49c863215c2c0f4 to your computer and use it in GitHub Desktop.
Save rponte/0c5b0e3c1b84c0c2e49c863215c2c0f4 to your computer and use it in GitHub Desktop.
Oracle: Playing a little bit with SELECT...FOR UPDATE SKIP LOCKED
--
-- tabela JOB_QUEUE
--
CREATE SEQUENCE SEQ_JOB_QUEUE;
CREATE TABLE JOB_QUEUE (
ID NUMBER NOT NULL ENABLE,
CONTENT VARCHAR2(4000 CHAR) NOT NULL ENABLE,
STATUS VARCHAR2(20 CHAR) NOT NULL ENABLE,
THREAD_NAME VARCHAR2(60 CHAR),
CONSTRAINT JOB_QUEUE_PK PRIMARY KEY (ID))
;
--
-- insere dados na JOB_QUEUE
--
delete from job_queue;
insert all
into job_queue values (1, 'c1', 'NEW', null)
into job_queue values (2, 'c2', 'NEW', null)
into job_queue values (3, 'c3', 'NEW', null)
into job_queue values (4, 'c4', 'NEW', null)
into job_queue values (5, 'c5', 'NEW', null)
select * from dual
;
--
-- package
--
create or replace package job_queue_processor as
type Job_t is record (
id job_queue.id%type
,content job_queue.content%type
,status job_queue.status%type
,thread_name job_queue.thread_name%type
);
type Job_List_t is table of Job_t;
/**
* Processa jobs pendentes
*/
procedure process_new_jobs(p_thread_name varchar2
,p_row_limit integer := 1);
end job_queue_processor;
/
create or replace package body job_queue_processor as
procedure process_job(p_job Job_t
, p_thread_name varchar2) as
l_seconds number;
begin
dbms_output.put_line('Row: ' || p_job.content);
update job_queue j
set j.status = 'FINISHED'
,j.thread_name = p_thread_name
where j.id = p_job.id
;
l_seconds := floor(dbms_random.value(1, 3));
dbms_lock.sleep(seconds => l_seconds);
end;
procedure process_new_jobs(p_thread_name varchar2
,p_row_limit integer := 1) as
cursor new_jobs_cursor is
select jq.*
from job_queue jq
where jq.status = 'NEW'
for update skip locked
;
l_job_list Job_List_t;
l_job Job_t;
begin
open new_jobs_cursor;
loop
fetch new_jobs_cursor bulk collect into l_job_list limit p_row_limit;
dbms_output.put_line('Fetched ' || l_job_list.COUNT || ' rows.');
for i in 1..l_job_list.count loop
l_job := l_job_list(i);
-- processa job
process_job(p_job => l_job
,p_thread_name => p_thread_name);
end loop;
exit when new_jobs_cursor%NOTFOUND;
end loop;
commit;
close new_jobs_cursor;
end process_new_jobs;
end job_queue_processor;
/
-- configuring number of processes
--
-- select value from v$parameter where name='job_queue_processes';
-- alter system set job_queue_processes = 20;
--
set timing on
set serveroutput on
declare
number_of_jobs_in_the_queue constant integer := 100000;
number_of_threads constant integer := 10;
batch_size constant integer := 100;
job_name varchar2(200);
plsql_block varchar2(1000);
begin
-- clean and populate database
delete from job_queue;
insert into job_queue(id, content, status, thread_name)
select level as id
,('c' || level) as content
,'NEW' as status
,null as thread_name
from dual
connect by level <= number_of_jobs_in_the_queue
;
commit;
-- //
-- execute workers (threads) in background
for i in 1..number_of_threads loop
job_name := 'thread_' || i;
plsql_block := 'BEGIN
job_queue_processor.process_new_jobs(p_thread_name => ''%s''
,p_row_limit => %s);
END;';
plsql_block := utl_lms.format_message(plsql_block, job_name, to_char(batch_size));
DBMS_SCHEDULER.create_job (
job_name => job_name,
job_type => 'PLSQL_BLOCK',
job_action => plsql_block,
enabled => TRUE
);
end loop;
end;
--
-- REUSING CURSOR + BULK COLLECT + LIMIT
--
create or replace package body job_queue_processor as
procedure process_job(p_job Job_t
, p_thread_name varchar2) as
l_seconds number;
begin
dbms_output.put_line('Row: ' || p_job.content);
update job_queue j
set j.status = 'FINISHED'
,j.thread_name = p_thread_name
where j.id = p_job.id
;
l_seconds := round(dbms_random.value(), 2);
dbms_lock.sleep(seconds => l_seconds);
end;
procedure process_new_jobs(p_thread_name varchar2
,p_row_limit integer := 1) as
cursor new_jobs_cursor is
select jq.*
from job_queue jq
where jq.status = 'NEW'
for update skip locked
;
l_job_list Job_List_t;
l_job Job_t;
begin
loop
-- reusando cursor
open new_jobs_cursor;
fetch new_jobs_cursor bulk collect into l_job_list limit p_row_limit;
dbms_output.put_line('Fetched ' || l_job_list.COUNT || ' rows.');
for i in 1..l_job_list.count loop
l_job := l_job_list(i);
-- processa job
process_job(p_job => l_job
,p_thread_name => p_thread_name);
end loop;
commit;
close new_jobs_cursor;
exit when l_job_list.count = 0;
end loop;
end process_new_jobs;
end job_queue_processor;
--
-- REUSING CURSOR + SIMPLE FETCH ROW-BY-ROW
--
create or replace package body job_queue_processor as
procedure process_job(p_job Job_t
, p_thread_name varchar2) as
l_seconds number;
begin
dbms_output.put_line('Row: ' || p_job.content);
update job_queue j
set j.status = 'FINISHED'
,j.thread_name = p_thread_name
where j.id = p_job.id
;
l_seconds := round(dbms_random.value(), 2);
dbms_lock.sleep(seconds => l_seconds);
end;
procedure process_new_jobs(p_thread_name varchar2
,p_row_limit integer := 1) as
cursor new_jobs_cursor is
select jq.*
from job_queue jq
where jq.status = 'NEW'
for update skip locked
;
l_job_list Job_List_t;
l_job Job_t;
begin
loop
open new_jobs_cursor;
fetch new_jobs_cursor into l_job;
exit when new_jobs_cursor%NOTFOUND;
-- processa job
process_job(p_job => l_job
,p_thread_name => p_thread_name);
commit;
close new_jobs_cursor;
end loop;
end process_new_jobs;
end job_queue_processor;
--
-- Using FOR-Loop
--
create or replace package body job_queue_processor as
procedure process_job(p_job Job_t
, p_thread_name varchar2) as
l_seconds number;
begin
dbms_output.put_line('Row: ' || p_job.content);
update job_queue j
set j.status = 'FINISHED'
,j.thread_name = p_thread_name
where j.id = p_job.id
;
l_seconds := round(dbms_random.value(), 2);
dbms_lock.sleep(seconds => l_seconds);
end;
procedure process_new_jobs(p_thread_name varchar2
,p_row_limit integer := 1) as
cursor new_jobs_cursor is
select jq.*
from job_queue jq
where jq.status = 'NEW'
for update skip locked
;
l_job_list Job_List_t;
l_job Job_t;
begin
for l_job in new_jobs_cursor
loop
-- processa job
process_job(p_job => l_job
,p_thread_name => p_thread_name);
end loop;
commit; -- commit somente no final quando NAO sabemos a qtd de linhas eh PERIGOSO!
end process_new_jobs;
end job_queue_processor;
set timing on
set serveroutput on
begin
job_queue_processor.process_new_jobs(p_thread_name => 'thread_1'
,p_row_limit => 1);
end;
select jq.thread_name as thread_name
,count(1) as total
,sum(count(1)) over() as sum_up
from job_queue jq
where jq.status = 'FINISHED'
group by jq.thread_name
order by total desc
;
@rponte
Copy link
Author

rponte commented Jul 1, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment