Skip to content

Instantly share code, notes, and snippets.

@tuespetre
Last active January 29, 2023 15:20
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tuespetre/0bf3a4c64cc92724416b8753282b79b2 to your computer and use it in GitHub Desktop.
Save tuespetre/0bf3a4c64cc92724416b8753282b79b2 to your computer and use it in GitHub Desktop.
An attempt at mimicking Postgresql's LISTEN/NOTIFY functionality in SQL Server, using Service Broker
// Worker process:
EXEC [notifications].[Listen] N'something-happened', 5000
// Some other process:
EXEC [notifications].[Notify] N'something-happened'
CREATE SCHEMA [notifications];
GO
CREATE MESSAGE TYPE [notifications/Listen] VALIDATION = NONE;
CREATE MESSAGE TYPE [notifications/Notify] VALIDATION = NONE;
CREATE CONTRACT [notifications/Contract]
(
[notifications/Listen] SENT BY INITIATOR,
[notifications/Notify] SENT BY TARGET
);
CREATE QUEUE [notifications].[ListenerQueue] WITH
STATUS = ON,
RETENTION = OFF,
POISON_MESSAGE_HANDLING ( STATUS = OFF );
CREATE QUEUE [notifications].[NotifierQueue] WITH
STATUS = ON,
RETENTION = OFF,
POISON_MESSAGE_HANDLING ( STATUS = ON );
CREATE SERVICE [notifications/ListenerService] ON QUEUE [notifications].[ListenerQueue];
CREATE SERVICE [notifications/NotifierService] ON QUEUE [notifications].[NotifierQueue] ([notifications/Contract]);
CREATE TABLE [notifications].[Listeners]
(
[ListenerId] int NOT NULL IDENTITY(1,1),
[ConnectionId] uniqueidentifier NOT NULL,
[Channel] sysname NOT NULL,
CONSTRAINT PK_Listeners PRIMARY KEY ([ListenerId]),
CONSTRAINT AK_Listeners UNIQUE ([ConnectionId], [Channel])
);
CREATE TABLE [notifications].[ListenerHandles]
(
[ListenerId] int NOT NULL,
[ListenerHandle] uniqueidentifier NOT NULL,
CONSTRAINT PK_ListenerHandles PRIMARY KEY ([ListenerId]),
CONSTRAINT AK_ListenerHandles UNIQUE ([ListenerHandle])
);
CREATE TABLE [notifications].[NotifierHandles]
(
[ListenerId] int NOT NULL,
[NotifierHandle] uniqueidentifier NOT NULL,
CONSTRAINT PK_NotifierHandles PRIMARY KEY ([ListenerId]),
CONSTRAINT AK_NotifierHandles UNIQUE ([NotifierHandle])
);
GO
CREATE PROCEDURE [notifications].[ActivationProcedure]
WITH EXECUTE AS OWNER
AS
BEGIN
DECLARE @messageType sysname;
DECLARE @messageBody varbinary(max);
DECLARE @notifierHandle uniqueidentifier;
DECLARE @listenerId int;
BEGIN TRY
BEGIN TRANSACTION;
RECEIVE TOP(1)
@messageType = [message_type_name],
@messageBody = [message_body],
@notifierHandle = [conversation_handle]
FROM [notifications].[NotifierQueue];
IF @messageType = N'notifications/Listen'
BEGIN
INSERT INTO [notifications].[NotifierHandles] ([ListenerId], [NotifierHandle])
VALUES (CONVERT(int, @messageBody), @notifierHandle);
BEGIN CONVERSATION TIMER (@notifierHandle) TIMEOUT = 3600;
END
ELSE IF @messageType = N'http://schemas.microsoft.com/SQL/ServiceBroker/DialogTimer'
BEGIN
SET @listenerId =
(
SELECT [ListenerId]
FROM [notifications].[NotifierHandles]
WHERE [NotifierHandle] = @notifierHandle
);
IF NOT EXISTS
(
SELECT 1
FROM [sys].[dm_exec_connections] c
JOIN [notifications].[Listeners] l
ON l.[ConnectionId] = c.[connection_id]
WHERE l.[ListenerId] = @listenerId
)
BEGIN
END CONVERSATION @notifierHandle;
DECLARE @listenerHandle uniqueidentifier =
(
SELECT [ListenerHandle]
FROM [notifications].[ListenerHandles]
WHERE [ListenerId] = @listenerId
);
IF (@listenerHandle IS NOT NULL)
BEGIN
END CONVERSATION @listenerHandle;
END
DELETE [notifications].[Listeners] WHERE [ListenerId] = @listenerId;
DELETE [notifications].[ListenerHandles] WHERE [ListenerId] = @listenerId;
DELETE [notifications].[NotifierHandles] WHERE [ListenerId] = @listenerId;
END
ELSE
BEGIN
BEGIN CONVERSATION TIMER (@notifierHandle) TIMEOUT = 3600;
END
END
ELSE IF @messageType IN
(
N'http://schemas.microsoft.com/SQL/ServiceBroker/Error',
N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
)
BEGIN
END CONVERSATION @notifierHandle;
SET @listenerId =
(
SELECT [ListenerId]
FROM [notifications].[NotifierHandles]
WHERE [NotifierHandle] = @notifierHandle
);
DELETE [notifications].[Listeners] WHERE [ListenerId] = @listenerId;
DELETE [notifications].[ListenerHandles] WHERE [ListenerId] = @listenerId;
DELETE [notifications].[NotifierHandles] WHERE [ListenerId] = @listenerId;
END
COMMIT TRANSACTION
END TRY
BEGIN CATCH
IF (XACT_STATE() <> 0)
BEGIN
ROLLBACK TRANSACTION
END
END CATCH
END
GO
ALTER QUEUE [notifications].[NotifierQueue] WITH
STATUS = ON,
ACTIVATION
(
STATUS = ON,
MAX_QUEUE_READERS = 1,
PROCEDURE_NAME = [notifications].[ActivationProcedure],
EXECUTE AS OWNER
);
GO
CREATE PROCEDURE [notifications].[Notify]
@channel sysname,
@payload nvarchar(max) = ''
WITH EXECUTE AS OWNER
AS
BEGIN
DECLARE @send nvarchar(max) =
(
SELECT DISTINCT N'''' + CAST([NotifierHandle] as nvarchar(36)) + N'''' + ', '
FROM [notifications].[Listeners] l WITH (READUNCOMMITTED)
JOIN [notifications].[NotifierHandles] nh
ON nh.[ListenerId] = l.[ListenerId]
WHERE @channel LIKE l.[Channel]
FOR XML PATH('')
);
IF (LEN(@send) > 0)
BEGIN
SET @send = SUBSTRING(@send, 0, LEN(@send));
SET @send = N'SEND ON CONVERSATION (' + @send + N') MESSAGE TYPE [notifications/Notify] (@payload)';
EXEC sp_executesql @send, N'@payload nvarchar(max)', @payload;
END
END
GO
CREATE PROCEDURE [notifications].[Listen]
@channel sysname,
@timeout int = -1
WITH EXECUTE AS OWNER
AS
BEGIN
DECLARE @connectionId uniqueidentifier =
(
SELECT TOP(1) [connection_id]
FROM [sys].[dm_exec_connections]
WHERE [session_id] = @@SPID
);
DECLARE @listenerHandle uniqueidentifier =
(
SELECT TOP(1) lh.[ListenerHandle]
FROM [notifications].[Listeners] l
JOIN [notifications].[ListenerHandles] lh
ON lh.[ListenerId] = l.[ListenerId]
WHERE l.[ConnectionId] = @connectionId
AND l.[Channel] = @channel
);
IF (@listenerHandle IS NULL)
BEGIN
BEGIN TRY
BEGIN TRANSACTION
BEGIN DIALOG CONVERSATION @listenerHandle
FROM SERVICE [notifications/ListenerService]
TO SERVICE N'notifications/NotifierService'
ON CONTRACT [notifications/Contract]
WITH ENCRYPTION = OFF;
INSERT INTO [notifications].[Listeners] (ConnectionId, Channel)
VALUES (@connectionId, @channel);
DECLARE @listenerId int = SCOPE_IDENTITY();
INSERT INTO [notifications].[ListenerHandles] (ListenerId, ListenerHandle)
VALUES (@listenerId, @listenerHandle);
SEND ON CONVERSATION @listenerHandle MESSAGE TYPE [notifications/Listen] (@listenerId);
COMMIT TRANSACTION
END TRY
BEGIN CATCH
ROLLBACK TRANSACTION
RETURN
END CATCH
END
WAITFOR
(
RECEIVE CONVERT(nvarchar(max), [message_body]) AS Payload
FROM [notifications].[ListenerQueue]
WHERE conversation_handle = @listenerHandle
),
TIMEOUT @timeout;
END
GO
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment