Skip to content

Instantly share code, notes, and snippets.

@AlexCuse
Created June 30, 2011 01:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AlexCuse/1055462 to your computer and use it in GitHub Desktop.
Save AlexCuse/1055462 to your computer and use it in GitHub Desktop.
F# subscription
namespace BrewTelligence.Search.IndexingService
#light
open System.Threading
open System.Configuration
open RabbitMQ.Client
open RabbitMQ.Client.MessagePatterns
//TODO: single connection / channel for lifetime?
type Subscriber<'T>() =
let connectionFactory =
let cf = new ConnectionFactory()
cf.Protocol <- Protocols.AMQP_0_9
cf.HostName <- ConfigurationManager.AppSettings.Get("rabbitmq-server-address")
cf.UserName <- ConfigurationManager.AppSettings.Get("rabbitmq-username")
cf.Password <- ConfigurationManager.AppSettings.Get("rabbitmq-password")
cf
let serializer = new Serializer<'T>()
let queueName = typeof<'T>.Name
let exchangeName = ConfigurationManager.AppSettings.Get("rabbitmq-exchange-name")
let durable = true
let exclusive = false
let autoDelete = false
let noAck = false
let setupChannel (connection:IConnection) =
let channel = connection.CreateModel()
channel.ExchangeDeclare (exchangeName, ExchangeType.Direct)
channel.QueueDeclare (queueName, durable, exclusive, autoDelete, null) |> ignore
channel.QueueBind (queueName, exchangeName, queueName)
channel
member x.ReceivedMessages(ct:CancellationToken, finished:ManualResetEvent) =
use connection = connectionFactory.CreateConnection()
use channel = setupChannel connection
use sub = new Subscription (channel, queueName, noAck)
seq {
while not(ct.IsCancellationRequested) do
match sub.Next() with
| null -> failwith "Don't think null items will be returned by subscription"
| _ as result ->
let output = serializer.Deserialize (result.Body)
yield output
sub.Ack (result)
finished.Set() |> ignore
}
member x.Return(message:'T) =
use connection = connectionFactory.CreateConnection()
use channel = setupChannel connection
let body = serializer.Serialize (message)
channel.BasicPublish (exchangeName, queueName, null, body)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment