Skip to content

Instantly share code, notes, and snippets.

@jpschroeder
Last active February 21, 2023 14:51
Show Gist options
  • Save jpschroeder/f85b593cbe4e18f2ae32bc247456aea0 to your computer and use it in GitHub Desktop.
Save jpschroeder/f85b593cbe4e18f2ae32bc247456aea0 to your computer and use it in GitHub Desktop.
Streaming with SQL Server
--
-- setup
--
create database servicebrokertest
go
alter database servicebrokertest set enable_broker
go
use servicebrokertest
go
--
-- install
--
create queue dbo.mcs_queue
create service mcs_service on queue dbo.mcs_queue ([DEFAULT])
go
-- push messages onto the fifo queue
create procedure dbo.mcs_queue_push (
@message nvarchar(max)
) as
begin
declare @handle uniqueidentifier
begin dialog @handle from service [mcs_service] to service 'mcs_service' on contract [DEFAULT] with encryption=off, lifetime=60;
send on conversation @handle message type [DEFAULT] (@message)
end conversation @handle
end
go
-- pop messages off the queue
-- wait for a message to appear (or a timeout) before returning (allows for long-polling)
create procedure dbo.mcs_queue_pop as
begin
declare @handle uniqueidentifier
declare @message varbinary(max)
waitfor(
receive top(1)
@handle = conversation_handle,
@message = message_body
from dbo.mcs_queue
), timeout 600000; -- 10 minutes
begin try
end conversation @handle;
end try
begin catch
end catch
select cast(@message as nvarchar(max)) as message
end
go
-- delete all messages off of the queue
create procedure dbo.mcs_queue_clear as
begin
declare @handles table (handle uniqueidentifier)
insert into @handles
select ce.conversation_handle
from sys.services as s
inner join sys.conversation_endpoints as ce on s.service_id = ce.service_id
where s.name = 'mcs_service'
declare @handle uniqueidentifier
while exists(select * from @handles)
begin
select top 1 @handle = handle from @handles
end conversation @handle with cleanup
delete from @handles where handle = @handle
end
end
go
-- view all messages in the queue
create view dbo.mcs_queue_peek as
select conversation_handle, message_enqueue_time, cast(message_body as nvarchar(max)) as message
from dbo.mcs_queue
where message_type_name = 'DEFAULT'
--order by message_enqueue_time desc
go
-- uninstall all resources for queues
create procedure dbo.mcs_queue_uninstall as
begin
exec dbo.mcs_queue_clear
drop view dbo.mcs_queue_peek
drop procedure dbo.mcs_queue_clear
drop procedure dbo.mcs_queue_pop
drop procedure dbo.mcs_queue_push
drop service mcs_service
drop queue mcs_queue
drop procedure dbo.mcs_queue_uninstall
end
go
--
-- test
--
-- pop a single message off of the queue (run this in a separate window)
exec mcs_queue_pop
-- push some test messages into the queue
exec dbo.mcs_queue_push 'test message 1'
exec dbo.mcs_queue_push 'test message 2'
exec dbo.mcs_queue_push 'test message 3'
exec dbo.mcs_queue_push 'test message 4'
-- view all messages in the queue
select * from dbo.mcs_queue_peek order by message_enqueue_time desc
-- clear all messages from the queue
exec mcs_queue_clear
-- uninstall the queue
exec mcs_queue_uninstall
--
-- teardown
--
use master
go
alter database servicebrokertest set single_user with rollback immediate
drop database servicebrokertest
go
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment