Last active
February 21, 2023 14:51
-
-
Save jpschroeder/f85b593cbe4e18f2ae32bc247456aea0 to your computer and use it in GitHub Desktop.
Streaming with SQL Server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- | |
-- setup | |
-- | |
create database servicebrokertest | |
go | |
alter database servicebrokertest set enable_broker | |
go | |
use servicebrokertest | |
go | |
-- | |
-- install | |
-- | |
create queue dbo.mcs_queue | |
create service mcs_service on queue dbo.mcs_queue ([DEFAULT]) | |
go | |
-- push messages onto the fifo queue | |
create procedure dbo.mcs_queue_push ( | |
@message nvarchar(max) | |
) as | |
begin | |
declare @handle uniqueidentifier | |
begin dialog @handle from service [mcs_service] to service 'mcs_service' on contract [DEFAULT] with encryption=off, lifetime=60; | |
send on conversation @handle message type [DEFAULT] (@message) | |
end conversation @handle | |
end | |
go | |
-- pop messages off the queue | |
-- wait for a message to appear (or a timeout) before returning (allows for long-polling) | |
create procedure dbo.mcs_queue_pop as | |
begin | |
declare @handle uniqueidentifier | |
declare @message varbinary(max) | |
waitfor( | |
receive top(1) | |
@handle = conversation_handle, | |
@message = message_body | |
from dbo.mcs_queue | |
), timeout 600000; -- 10 minutes | |
begin try | |
end conversation @handle; | |
end try | |
begin catch | |
end catch | |
select cast(@message as nvarchar(max)) as message | |
end | |
go | |
-- delete all messages off of the queue | |
create procedure dbo.mcs_queue_clear as | |
begin | |
declare @handles table (handle uniqueidentifier) | |
insert into @handles | |
select ce.conversation_handle | |
from sys.services as s | |
inner join sys.conversation_endpoints as ce on s.service_id = ce.service_id | |
where s.name = 'mcs_service' | |
declare @handle uniqueidentifier | |
while exists(select * from @handles) | |
begin | |
select top 1 @handle = handle from @handles | |
end conversation @handle with cleanup | |
delete from @handles where handle = @handle | |
end | |
end | |
go | |
-- view all messages in the queue | |
create view dbo.mcs_queue_peek as | |
select conversation_handle, message_enqueue_time, cast(message_body as nvarchar(max)) as message | |
from dbo.mcs_queue | |
where message_type_name = 'DEFAULT' | |
--order by message_enqueue_time desc | |
go | |
-- uninstall all resources for queues | |
create procedure dbo.mcs_queue_uninstall as | |
begin | |
exec dbo.mcs_queue_clear | |
drop view dbo.mcs_queue_peek | |
drop procedure dbo.mcs_queue_clear | |
drop procedure dbo.mcs_queue_pop | |
drop procedure dbo.mcs_queue_push | |
drop service mcs_service | |
drop queue mcs_queue | |
drop procedure dbo.mcs_queue_uninstall | |
end | |
go | |
-- | |
-- test | |
-- | |
-- pop a single message off of the queue (run this in a separate window) | |
exec mcs_queue_pop | |
-- push some test messages into the queue | |
exec dbo.mcs_queue_push 'test message 1' | |
exec dbo.mcs_queue_push 'test message 2' | |
exec dbo.mcs_queue_push 'test message 3' | |
exec dbo.mcs_queue_push 'test message 4' | |
-- view all messages in the queue | |
select * from dbo.mcs_queue_peek order by message_enqueue_time desc | |
-- clear all messages from the queue | |
exec mcs_queue_clear | |
-- uninstall the queue | |
exec mcs_queue_uninstall | |
-- | |
-- teardown | |
-- | |
use master | |
go | |
alter database servicebrokertest set single_user with rollback immediate | |
drop database servicebrokertest | |
go |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment