Skip to content

Instantly share code, notes, and snippets.

@shaunsales
Created March 30, 2019 14:26
Show Gist options
  • Save shaunsales/e9daba03d4d72c62c3552f9d23d135f1 to your computer and use it in GitHub Desktop.
Save shaunsales/e9daba03d4d72c62c3552f9d23d135f1 to your computer and use it in GitHub Desktop.
using System;
using Confluent.Kafka;
using Fraction.Services.Messaging.Common.Consumers;
using Fraction.Services.Messaging.Common.Helpers;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System.Threading.Tasks;
using Fraction.Services.Common.Enums;
using Fraction.Services.Messaging.Common.Messages;
using Fraction.Services.Messaging.Common.Producers;
using System.Threading;
using Microsoft.Extensions.Options;
namespace Fraction.Services.Messaging.Test
{
public class Program
{
private const string Topic = "orders-local";
private const string GroupId = "orders-consumer";
private const string Server = "35.241.96.140:9092";
private static async Task Main(string[] args)
{
var hostBuilder = new HostBuilder().ConfigureServices((hostContext, services) =>
{
services.Configure<ConsumerOptions>(options =>
{
options.Topic = Topic;
options.TimeoutMs = 100;
options.FrequencyMs = 1000;
options.Config = new ConsumerConfig
{
GroupId = GroupId,
BootstrapServers = Server,
AutoOffsetReset = AutoOffsetReset.Earliest,
};
});
services.AddSingleton<IOrderConsumerController, OrderConsumerController>();
services.AddHostedService<OrderConsumerBackgroundService>();
});
var task = Task.Run(() =>
{
Thread.Sleep(3000);
var options = new ProducerOptions
{
Topic = Topic,
Config = new ProducerConfig
{
BootstrapServers = Server,
SocketTimeoutMs = 5000
}
};
var orderProducer = new OrderProducer(new OptionsWrapper<ProducerOptions>(options));
var message = new OrderPlaced
{
OrderId = Guid.NewGuid().ToString(),
Symbol = "BABA-1234",
Price = 23.34,
Amount = 100,
OrderType = OrderType.Buy,
};
var result = orderProducer.ProduceAsync(message).Result;
Console.WriteLine($"Produced Message: OrderId:{message.OrderId}, Key:{result.Key}, Offset:{result.Offset}");
});
await hostBuilder.RunConsoleAsync();
Console.WriteLine("Press any key to exit...");
Console.ReadLine();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment