Last active
July 7, 2023 09:48
-
-
Save EitanBlumin/2476483e28fa788356c5648441d8cfaa to your computer and use it in GitHub Desktop.
Multi-Threading Implementation Using Service Broker (more info: https://eitanblumin.com/2018/10/31/advanced-service-broker-sample-multi-threading/ )
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
=================================================== | |
Service Broker Sample 1: Parallel Querying | |
=================================================== | |
Copyright: Eitan Blumin (C) 2012 | |
Email: eitan@madeira.co.il | |
Source: www.madeira.co.il | |
Disclaimer: | |
The author is not responsible for any damage this | |
script or any of its variations may cause. | |
Do not execute it or any variations of it on production | |
environments without first verifying its validity | |
on controlled testing and/or QA environments. | |
You may use this script at your own risk and may change it | |
to your liking, as long as you leave this disclaimer header | |
fully intact and unchanged. | |
*/ | |
USE | |
[SB_PQ_Test] | |
GO | |
SET NOCOUNT ON; | |
-- init variables | |
DECLARE | |
@SQL NVARCHAR(MAX), -- holding the SQL command | |
@OutputParam VARCHAR(128), -- the name of the output parameter | |
@Segment INT, -- index number of the current segment (thread) | |
@ConvGroup UNIQUEIDENTIFIER, -- conversation group ID | |
@Delay VARCHAR(8), -- delay representation HH:MM:SS | |
@MinDelay INT, -- minimum delay in seconds | |
@MaxDelay INT, -- maximum delay in seconds | |
@StartTime DATETIME, -- start time of this script | |
@NumOfSegments INT -- number of segments (threads) to use | |
SET @StartTime = GETDATE(); | |
SET @OutputParam = 'SB_PQ_Result'; | |
-- random workload simulation settings | |
SET @MinDelay = 10 | |
SET @MaxDelay = 30 | |
SET @ConvGroup = NEWID(); -- the messages will be grouped in a specific conversation group | |
SET @NumOfSegments = 3; -- number of "threads" to use | |
-- create several segments | |
SET @Segment = 1; | |
WHILE @Segment <= @NumOfSegments | |
BEGIN | |
-- random delay between @MinDelay and @MaxDelay seconds to simulate long execution time | |
SET @Delay = '00:00:' + CONVERT(varchar(8), ROUND(RAND() * (@MaxDelay - @MinDelay),0) + @MinDelay) | |
-- build our dynamic SQL command. note the use of XML as the result. | |
SET @SQL = N' | |
WAITFOR DELAY ''' + @Delay + '''; | |
SET @SB_PQ_Result = | |
( | |
SELECT | |
Segment = ' + CONVERT(nvarchar(max), @Segment) + N', | |
Delay = ''' + @Delay + N''', | |
StartDate = GETDATE(), | |
Name = QUOTENAME(name), | |
object_id, type, modify_date | |
FROM | |
SB_PQ_Test.sys.tables AS Tab | |
FOR XML AUTO, ELEMENTS | |
); | |
'; | |
-- Send request to queue | |
EXEC SB_PQ_Start_Query @SQL, @OutputParam, @ConvGroup; | |
RAISERROR(N'Sent segment %d (intended delay %s)',0,0,@Segment,@Delay) WITH NOWAIT; | |
-- increment segment index | |
SET @Segment = @Segment + 1; | |
END | |
-- init final result | |
DECLARE @TotalResult XML; | |
SET @TotalResult = '<Tables> </Tables>' | |
-- Get results | |
RAISERROR(N'Getting results...',0,0) WITH NOWAIT; | |
DECLARE @CurrentResult XML, @CurrSegment VARCHAR(100), @CurrDelay VARCHAR(8), @CurrStart DATETIME; | |
-- count based on number of segments that we created earlier | |
SET @Segment = 1; | |
WHILE @Segment <= @NumOfSegments | |
BEGIN | |
-- Get segment from response queue | |
EXEC SB_PQ_Get_Response_One @ConvGroup, @CurrentResult OUTPUT | |
-- extract result values | |
SET @CurrSegment = @CurrentResult.value('(Tab/Segment)[1]','varchar(100)'); | |
SET @CurrDelay = @CurrentResult.value('(Tab/Delay)[1]','varchar(8)'); | |
SET @CurrStart = @CurrentResult.value('(Tab/StartDate)[1]','datetime'); | |
'Received segment ' | |
+ @CurrSegment + ' ' | |
+ CONVERT(nvarchar(max),DATEDIFF(ms,@StartTime,@CurrStart)) + ' ms delay end since start' | |
+ ' (intended delay ' + ISNULL(@CurrDelay,'<none>') + ')' | |
-- insert into TotalResults using XML DML (syntax for SQL2008 and newer) | |
SET @TotalResult.modify(' | |
insert sql:variable("@CurrentResult") | |
into (/Tables)[1] '); | |
-- increment segment index | |
SET @Segment = @Segment + 1; | |
END | |
-- return final result (as XML) | |
SELECT @TotalResult.query('.') AS FinalResult | |
-- return final result (as relational table) | |
SELECT | |
Segment = T.XRecord.query('.').value('(/Tab/Segment)[1]','varchar(100)'), | |
Delay = T.XRecord.query('.').value('(/Tab/Delay)[1]','varchar(8)'), | |
StartDate = T.XRecord.query('.').value('(/Tab/StartDate)[1]','datetime') | |
FROM | |
@TotalResult.nodes('/Tables/Tab') AS T(XRecord) | |
-- check the SB logs to see how many unique sessions executed our requests | |
DECLARE @NumOfSessions INT; | |
SELECT @NumOfSessions = COUNT(DISTINCT SPID) | |
FROM SB_PQ_ServiceBrokerLogs | |
WHERE LogDate >= @StartTime | |
PRINT CONVERT(nvarchar(100),@NumOfSessions) + ' unique sessions participated in execution' | |
SELECT * | |
FROM SB_PQ_ServiceBrokerLogs | |
WHERE LogDate >= @StartTime | |
GO | |
SELECT * | |
FROM sys.conversation_endpoints | |
/* -- cleanup closed conversations (SQL Server eventually does this automatically) | |
declare @q uniqueidentifier; | |
select top 1 @q = conversation_handle from sys.conversation_endpoints where state='CD'; | |
while @@rowcount > 0 | |
begin | |
end conversation @q with cleanup | |
select top 1 @q = conversation_handle from sys.conversation_endpoints where state='CD'; | |
end | |
*/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
=================================================== | |
Service Broker Sample 1: Parallel Querying | |
=================================================== | |
Copyright: Eitan Blumin (C) 2012 | |
Email: eitan@madeira.co.il | |
Source: www.madeira.co.il | |
Disclaimer: | |
The author is not responsible for any damage this | |
script or any of its variations may cause. | |
Do not execute it or any variations of it on production | |
environments without first verifying its validity | |
on controlled testing and/or QA environments. | |
You may use this script at your own risk and may change it | |
to your liking, as long as you leave this disclaimer header | |
fully intact and unchanged. | |
*/ | |
-- Creation of the test database | |
IF DB_ID('SB_PQ_Test') IS NULL | |
CREATE DATABASE [SB_PQ_Test] | |
GO | |
USE | |
[SB_PQ_Test] | |
GO | |
-- Creation of the table to hold SB logs | |
IF OBJECT_ID('SB_PQ_ServiceBrokerLogs') IS NULL | |
BEGIN | |
CREATE TABLE SB_PQ_ServiceBrokerLogs | |
( | |
LogID BIGINT IDENTITY(1,1) NOT NULL, | |
LogDate DATETIME NOT NULL DEFAULT (GETDATE()), | |
SPID INT NOT NULL DEFAULT (@@SPID), | |
ProgramName NVARCHAR(255) NOT NULL DEFAULT (APP_NAME()), | |
HostName NVARCHAR(255) NOT NULL DEFAULT (HOST_NAME()), | |
ErrorSeverity INT NOT NULL DEFAULT (0), | |
ErrorMessage NVARCHAR(MAX) NULL, | |
ErrorLine INT NULL, | |
ErrorProc SYSNAME NOT NULL DEFAULT (COALESCE(ERROR_PROCEDURE(),OBJECT_NAME(@@PROCID),'<unknown>')), | |
QueueMessage XML NULL, | |
PRIMARY KEY NONCLUSTERED (LogID) | |
); | |
CREATE CLUSTERED INDEX IX_SB_PQ_ServiceBrokerLogs ON SB_PQ_ServiceBrokerLogs (LogDate ASC) WITH FILLFACTOR=100; | |
PRINT 'Table SB_PQ_ServiceBrokerLogs Created'; | |
END | |
ELSE | |
TRUNCATE TABLE SB_PQ_ServiceBrokerLogs | |
GO | |
IF OBJECT_ID('SB_PQ_ExecuteDynamicQuery') IS NOT NULL DROP PROCEDURE SB_PQ_ExecuteDynamicQuery; | |
RAISERROR(N'Creating SB_PQ_ExecuteDynamicQuery...',0,0) WITH NOWAIT; | |
GO | |
-- This procedure executes a single dynamic SQL command | |
CREATE PROCEDURE SB_PQ_ExecuteDynamicQuery | |
@SQLCommand NVARCHAR(MAX), | |
@OutputXMLVarName VARCHAR(128) = 'SB_PQ_Result', | |
@Result XML OUTPUT | |
AS | |
BEGIN | |
SET NOCOUNT ON; | |
DECLARE @SQLParams NVARCHAR(MAX); | |
SET @SQLParams = '@' + @OutputXMLVarName + ' XML OUTPUT'; | |
EXEC sp_executesql @SQLCommand, @SQLParams, @Result OUTPUT | |
RETURN; | |
END | |
GO | |
IF OBJECT_ID('SB_PQ_HandleQueue') IS NOT NULL DROP PROCEDURE SB_PQ_HandleQueue | |
RAISERROR(N'Creating SB_PQ_HandleQueue...',0,0) WITH NOWAIT; | |
GO | |
-- This procedure is activated to handle each item in the Request queue | |
CREATE PROCEDURE SB_PQ_HandleQueue | |
AS | |
SET NOCOUNT ON; | |
SET ARITHABORT ON | |
DECLARE @msg XML | |
DECLARE @MsgType SYSNAME | |
DECLARE @DlgId UNIQUEIDENTIFIER | |
DECLARE @Info nvarchar(max) | |
DECLARE @ErrorsCount int | |
SET @ErrorsCount = 0 | |
-- Set whether to log verbose status messages before and after each operation | |
DECLARE @Verbose BIT = 1 | |
-- Allow 10 retries in case of errors | |
WHILE @ErrorsCount < 10 | |
BEGIN | |
BEGIN TRANSACTION | |
BEGIN TRY | |
-- Make sure queue is active | |
IF EXISTS (SELECT NULL FROM sys.service_queues | |
WHERE NAME = 'SB_PQ_Request_Queue' | |
AND is_receive_enabled = 0) | |
ALTER QUEUE SB_PQ_Request_Queue WITH STATUS = ON; | |
-- handle one message at a time | |
WAITFOR | |
( | |
RECEIVE TOP(1) | |
@msg = convert(xml,message_body), | |
@MsgType = message_type_name, | |
@DlgId = conversation_handle | |
FROM dbo.SB_PQ_Request_Queue | |
); | |
-- exit when waiting has been timed out | |
IF @@ROWCOUNT = 0 | |
BEGIN | |
IF @@TRANCOUNT > 0 | |
ROLLBACK TRANSACTION; | |
BREAK; | |
END | |
-- If message type is end dialog or error, end the conversation | |
IF (@MsgType = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' OR | |
@MsgType = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error') | |
BEGIN | |
END CONVERSATION @DlgId; | |
IF @@TRANCOUNT > 0 | |
COMMIT TRANSACTION; | |
IF @Verbose = 1 | |
INSERT INTO SB_PQ_ServiceBrokerLogs(ErrorSeverity,ErrorMessage,ErrorProc,QueueMessage) | |
VALUES(0,'Ended Conversation ' + CONVERT(nvarchar(max),@DlgId),OBJECT_NAME(@@PROCID),@msg); | |
END | |
ELSE | |
BEGIN | |
-- Retreive data from xml message | |
DECLARE @SQL NVARCHAR(MAX), @OutputVar VARCHAR(128) | |
DECLARE @Result XML; | |
SELECT | |
@SQL = x.value('(/Request/SQL)[1]','VARCHAR(MAX)'), | |
@OutputVar = x.value('(/Request/OutputVar)[1]','VARCHAR(128)') | |
FROM @msg.nodes('/Request') AS T(x); | |
-- Log operation start | |
IF @Verbose = 1 | |
INSERT INTO SB_PQ_ServiceBrokerLogs(ErrorSeverity,ErrorMessage,ErrorProc,QueueMessage) | |
VALUES(0,'Starting Process',OBJECT_NAME(@@PROCID),@msg); | |
-- Encapsulate execution in TRY..CATCH | |
-- to handle problems in the specific request | |
BEGIN TRY | |
-- Execute Request | |
EXEC SB_PQ_ExecuteDynamicQuery @SQL, @OutputVar, @Result OUTPUT; | |
END TRY | |
BEGIN CATCH | |
-- log operation fail | |
INSERT INTO SB_PQ_ServiceBrokerLogs(ErrorSeverity,ErrorMessage,ErrorLine,ErrorProc,QueueMessage) | |
VALUES(ERROR_SEVERITY(),ERROR_MESSAGE(),ERROR_LINE(),ERROR_PROCEDURE(),@msg); | |
-- return empty response | |
SET @Result = NULL; | |
END CATCH | |
; | |
-- Send response to initiator | |
SEND ON CONVERSATION @DlgId | |
MESSAGE TYPE [//SB_PQ/Message] | |
( @Result ); | |
-- commit | |
IF @@TRANCOUNT > 0 | |
COMMIT TRANSACTION; | |
-- Log operation end | |
IF @Verbose = 1 | |
INSERT INTO SB_PQ_ServiceBrokerLogs(ErrorSeverity,ErrorMessage,ErrorProc,QueueMessage) | |
VALUES(0,'Finished Process',OBJECT_NAME(@@PROCID),@msg); | |
END | |
-- reset xml message | |
SET @msg = NULL; | |
END TRY | |
BEGIN CATCH | |
-- rollback transaction | |
-- this will also rollback the extraction of the message from the queue | |
IF @@TRANCOUNT > 0 | |
ROLLBACK TRANSACTION; | |
-- log operation fail | |
INSERT INTO SB_PQ_ServiceBrokerLogs(ErrorSeverity,ErrorMessage,ErrorLine,ErrorProc,QueueMessage) | |
VALUES(ERROR_SEVERITY(),ERROR_MESSAGE(),ERROR_LINE(),ERROR_PROCEDURE(),@msg); | |
-- increase error counter | |
SET @ErrorsCount = @ErrorsCount + 1; | |
-- wait 5 seconds before retrying | |
WAITFOR DELAY '00:00:05' | |
END CATCH | |
END | |
GO | |
DECLARE @SQL nvarchar(max) | |
-- Enable service broker | |
IF EXISTS (SELECT * FROM sys.databases WHERE database_id = DB_ID() AND is_broker_enabled = 0) | |
BEGIN | |
SET @SQL = 'ALTER DATABASE [' + DB_NAME() + '] SET NEW_BROKER WITH ROLLBACK IMMEDIATE'; | |
EXEC(@SQL); | |
PRINT 'Enabled Service Broker for DB ' + DB_NAME(); | |
END | |
GO | |
-- Drop existing objects | |
IF EXISTS (SELECT NULL FROM sys.services WHERE NAME = '//SB_PQ/ProcessReceivingService') | |
DROP SERVICE [//SB_PQ/ProcessReceivingService]; | |
IF EXISTS (SELECT NULL FROM sys.services WHERE NAME = '//SB_PQ/ProcessStartingService') | |
DROP SERVICE [//SB_PQ/ProcessStartingService]; | |
IF EXISTS (SELECT NULL FROM sys.service_queues WHERE NAME = 'SB_PQ_Request_Queue') | |
DROP QUEUE dbo.SB_PQ_Request_Queue; | |
IF EXISTS (SELECT NULL FROM sys.service_queues WHERE NAME = 'SB_PQ_Response_Queue') | |
DROP QUEUE dbo.SB_PQ_Response_Queue; | |
IF EXISTS (SELECT NULL FROM sys.service_contracts WHERE NAME = '//SB_PQ/Contract') | |
DROP CONTRACT [//SB_PQ/Contract]; | |
IF EXISTS (SELECT NULL FROM sys.service_message_types WHERE name='//SB_PQ/Message') | |
DROP MESSAGE TYPE [//SB_PQ/Message]; | |
GO | |
-- Create service broker objects | |
RAISERROR(N'Creating Message Type...',0,0) WITH NOWAIT; | |
CREATE MESSAGE TYPE [//SB_PQ/Message] | |
VALIDATION = WELL_FORMED_XML; | |
RAISERROR(N'Creating Contract...',0,0) WITH NOWAIT; | |
CREATE CONTRACT [//SB_PQ/Contract] | |
([//SB_PQ/Message] SENT BY ANY); | |
RAISERROR(N'Creating Request Queue...',0,0) WITH NOWAIT; | |
CREATE QUEUE dbo.SB_PQ_Request_Queue | |
WITH STATUS=ON, | |
ACTIVATION ( | |
PROCEDURE_NAME = SB_PQ_HandleQueue, -- sproc to run when queue receives message | |
MAX_QUEUE_READERS = 10, -- max concurrent instances | |
EXECUTE AS SELF | |
); | |
RAISERROR(N'Creating Response Queue...',0,0) WITH NOWAIT; | |
CREATE QUEUE dbo.SB_PQ_Response_Queue | |
WITH STATUS=ON; -- This queue is without activation because we need to handle it manually | |
RAISERROR(N'Creating Recieving Service...',0,0) WITH NOWAIT; | |
CREATE SERVICE [//SB_PQ/ProcessReceivingService] | |
AUTHORIZATION dbo | |
ON QUEUE dbo.SB_PQ_Request_Queue ([//SB_PQ/Contract]); | |
RAISERROR(N'Creating Sending Service...',0,0) WITH NOWAIT; | |
CREATE SERVICE [//SB_PQ/ProcessStartingService] | |
AUTHORIZATION dbo | |
ON QUEUE dbo.SB_PQ_Response_Queue ([//SB_PQ/Contract]); | |
GO | |
IF OBJECT_ID('SB_PQ_Start_Query') IS NOT NULL DROP PROCEDURE SB_PQ_Start_Query; | |
RAISERROR(N'Creating SB_PQ_Start_Query...',0,0) WITH NOWAIT; | |
GO | |
-- This procedure sends items to the queue | |
CREATE PROCEDURE SB_PQ_Start_Query | |
@SQLCommand NVARCHAR(MAX), | |
@OutputXMLVarName VARCHAR(128) = 'SB_PQ_Result', | |
@Conversation_Group_ID UNIQUEIDENTIFIER = NULL | |
AS | |
SET NOCOUNT ON; | |
DECLARE @msg XML | |
-- build the XML message | |
SET @msg = N' | |
<Request> | |
<SQL>' + @SQLCommand + N'</SQL> | |
<OutputVar>' + @OutputXMLVarName + N'</OutputVar> | |
</Request>' | |
DECLARE @DlgId UNIQUEIDENTIFIER | |
BEGIN TRY | |
-- if conversation group was not specified | |
IF @Conversation_Group_ID IS NULL | |
BEGIN | |
BEGIN DIALOG @DlgId | |
FROM SERVICE [//SB_PQ/ProcessStartingService] | |
TO SERVICE '//SB_PQ/ProcessReceivingService', | |
'CURRENT DATABASE' | |
ON CONTRACT [//SB_PQ/Contract] | |
WITH ENCRYPTION = OFF; | |
END | |
-- else, send on specified conversation group | |
ELSE | |
BEGIN | |
BEGIN DIALOG @DlgId | |
FROM SERVICE [//SB_PQ/ProcessStartingService] | |
TO SERVICE '//SB_PQ/ProcessReceivingService', | |
'CURRENT DATABASE' | |
ON CONTRACT [//SB_PQ/Contract] | |
WITH | |
RELATED_CONVERSATION_GROUP = @Conversation_Group_ID, | |
ENCRYPTION = OFF; | |
END | |
; | |
-- send the message | |
SEND ON CONVERSATION @DlgId | |
MESSAGE TYPE [//SB_PQ/Message] (@msg); | |
PRINT N'Started SB_PQ process on dialogId ' + ISNULL(convert(varchar(100),@DlgId),'(null)'); | |
END TRY | |
BEGIN CATCH | |
DECLARE @Err nvarchar(max) | |
SET @Err = ERROR_MESSAGE() | |
RAISERROR('Error starting SB_PQ process: %s', 16, 1, @Err); | |
END CATCH | |
GO | |
IF OBJECT_ID('SB_PQ_Get_Response_One') IS NOT NULL DROP PROCEDURE SB_PQ_Get_Response_One; | |
RAISERROR(N'Creating SB_PQ_Get_Response_One...',0,0) WITH NOWAIT; | |
GO | |
-- This procedure receives one result item from the queue | |
CREATE PROCEDURE SB_PQ_Get_Response_One | |
@Conversation_Group_ID UNIQUEIDENTIFIER = NULL, | |
@Result XML OUTPUT | |
AS | |
BEGIN | |
SET NOCOUNT ON; | |
DECLARE @DlgId UNIQUEIDENTIFIER; | |
DECLARE @MsgType SYSNAME; | |
DECLARE @Verbose BIT = 1; | |
-- if conversation group was specified | |
IF @Conversation_Group_ID IS NOT NULL | |
BEGIN | |
WAITFOR | |
( | |
RECEIVE TOP (1) | |
@Result = convert(xml,message_body), | |
@MsgType = message_type_name, | |
@DlgId = conversation_handle | |
FROM SB_PQ_Response_Queue | |
WHERE | |
conversation_group_id = @Conversation_Group_ID | |
) | |
END | |
-- else, retrieve from any converation group | |
ELSE | |
BEGIN | |
WAITFOR | |
( | |
RECEIVE TOP (1) | |
@Result = convert(xml,message_body), | |
@MsgType = message_type_name, | |
@DlgId = conversation_handle | |
FROM SB_PQ_Response_Queue | |
) | |
END | |
-- If message type is end dialog or error, end the conversation | |
IF (@MsgType = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' OR | |
@MsgType = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error') | |
BEGIN | |
END CONVERSATION @DlgId; | |
IF @Verbose = 1 | |
INSERT INTO SB_PQ_ServiceBrokerLogs(ErrorSeverity,ErrorMessage,ErrorProc,QueueMessage) | |
VALUES(0,'Ended Conversation ' + CONVERT(nvarchar(max),@DlgId),OBJECT_NAME(@@PROCID),@Result); | |
END | |
-- Close the dialog if it's unused | |
ELSE IF NOT EXISTS (SELECT * FROM SB_PQ_Response_Queue WHERE conversation_handle = @DlgId) | |
BEGIN | |
END CONVERSATION @DlgId; | |
IF @Verbose = 1 | |
INSERT INTO SB_PQ_ServiceBrokerLogs(ErrorSeverity,ErrorMessage,ErrorProc,QueueMessage) | |
VALUES(0,'Ending Conversation ' + CONVERT(nvarchar(max),@DlgId),OBJECT_NAME(@@PROCID),@Result); | |
END | |
RETURN; | |
END | |
GO | |
PRINT 'Done'; | |
GO |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment