Skip to content

Instantly share code, notes, and snippets.

@sixeyed
Created July 23, 2014 15:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sixeyed/adb156ae3b91091e98a2 to your computer and use it in GitHub Desktop.
Save sixeyed/adb156ae3b91091e98a2 to your computer and use it in GitHub Desktop.
SoakController - a simple WebAPI controller for remote-controlling a Kafka soak test
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="kafka-net" version="0.8.0.31-beta" targetFramework="net45" />
</packages>
using KafkaNet;
using KafkaNet.Model;
using KafkaNet.Protocol;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using System.Web.Http;
namespace KafkaScratchpad.Api.Controllers
{
public class SoakController : ApiController
{
private static List<Task> _SoakTasks = new List<Task>();
private static CancellationTokenSource _CancellationTokenSource = new CancellationTokenSource();
private static int _MessageCount;
private static int _ProducerCount;
[Route("soak/start")]
[HttpPost]
public IHttpActionResult Start()
{
Debug.WriteLine("SoakController.Start called");
_SoakTasks.Add(Task.Factory.StartNew(() => SendMessages(_CancellationTokenSource.Token), _CancellationTokenSource.Token));
return Ok();
}
[Route("soak/stats")]
[HttpGet]
public IHttpActionResult GetStats()
{
return Ok(new
{
MessagesSent = _MessageCount,
ProducerCount = _ProducerCount
});
}
[Route("soak/stopall")]
[HttpPost]
public IHttpActionResult StopAll()
{
Debug.WriteLine("SoakController.StopAll called. Task Count: {0}, MessageCount: {1}", _SoakTasks.Count, _MessageCount);
_CancellationTokenSource.Cancel();
_CancellationTokenSource = new CancellationTokenSource();
_SoakTasks = new List<Task>();
_MessageCount = 0;
_ProducerCount = 0;
return Ok();
}
private static void SendMessages(CancellationToken token)
{
try
{
Interlocked.Increment(ref _ProducerCount);
var producerId = _ProducerCount;
Debug.WriteLine("SoakController.SendMessages - starting");
var options = new KafkaOptions(new Uri(ConfigurationManager.AppSettings["KafkaUrl"]))
{
Log = new DebugLog()
};
var router = new BrokerRouter(options);
var client = new Producer(router);
var topic = ConfigurationManager.AppSettings["KafkaTopic"];
Debug.WriteLine("SoakController.SendMessages - client initialised");
while (true)
{
Interlocked.Increment(ref _MessageCount);
var message = string.Format ("producer-{0}:msg-{1}:{2}", producerId, _MessageCount, Guid.NewGuid());
Debug.WriteLine("SoakController.SendMessages - sending: " + message);
client.SendMessageAsync(topic, new[] { new Message { Value = message } })
.ContinueWith(x => Debug.WriteLine("SoakController.SendMessages - got result, response count: {0}, first -partitionId: {1}, offset: {2}", x.Result.Count, x.Result[0].PartitionId, x.Result[0].Offset));
Thread.Sleep(100);
if (token.IsCancellationRequested)
{
break;
}
}
client.Dispose();
router.Dispose();
}
catch(Exception ex)
{
Debug.WriteLine("SoakController.SendMessages errored. MessageCount: {0}, ex: {1}", _MessageCount, ex);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment