Skip to content

Instantly share code, notes, and snippets.

@kellabyte
Created September 9, 2011 03:15
Show Gist options
  • Save kellabyte/1205406 to your computer and use it in GitHub Desktop.
Save kellabyte/1205406 to your computer and use it in GitHub Desktop.
using System;
using System.Linq;
using System.Text;
using Disruptor;
using Elasticity.Domain;
using Elasticity.Events;
namespace Elasticity.Domain
{
/// <summary>
/// Represents an entry in the RingBuffer.
/// </summary>
public sealed class JobEntry
{
public SchedulerJob Job { get; set; }
}
/// <summary>
/// Domain service which handles job and task scheduling that gets sent to the Agents.
/// SchedulerService behaves passively by receiving domain events it subscribes to and
/// will generate commands which will go back to the command handlers when needed.
/// </summary>
public class SchedulerService : IBatchHandler<JobEntry>,
IHandle<JobCreated>,
IHandle<JobActivated>,
IHandle<JobDisabled>
{
private IEventAggregator eventAggregator;
private IRepository<SchedulerJob> jobRepository;
private RingBuffer<JobEntry> jobs;
private IProducerBarrier<JobEntry> producer;
public SchedulerService(IEventAggregator eventAggregator, IRepository<SchedulerJob> jobRepository)
{
this.eventAggregator = eventAggregator;
this.jobRepository = jobRepository;
this.eventAggregator.Subscribe(this);
// Initialize the RingBuffer
this.jobs = new RingBuffer<JobEntry>(
() => new JobEntry(), // JobEntry factory
1000, // Size of RingBuffer
ClaimStrategyFactory.ClaimStrategyOption.SingleThreaded, // Single producer
WaitStrategyFactory.WaitStrategyOption.Yielding
);
// Set this object as the consumer.
jobs.ConsumeWith(this);
// Initialize the producer.
producer = jobs.CreateProducerBarrier();
// Run the consumer in a new Thread
jobs.StartConsumers();
}
/// <summary>
/// Processes a new entry as it becomes available from the RingBuffer.
/// </summary>
/// <param name="sequence">Sequence index in the RingBuffer this istem was slotted in.</param>
/// <param name="job">JobEntry that was slotted in the RingBuffer</param>
public void OnAvailable(long sequence, JobEntry job)
{
// TODO: Still deciding what role the RingBuffer plays in here.
// Perhaps it just becomes a queue for what gets processed for
// sending out to the agents.
}
/// <summary>
/// Handles the end of batch from the RingBuffer.
/// </summary>
public void OnEndOfBatch()
{
}
public void Handle(JobCreated message)
{
SchedulerJob job = jobRepository.GetById(message.JobId);
JobEntry data;
long sequence = producer.NextEntry(out data);
data.Job = job;
producer.Commit(sequence);
}
public void Handle(JobActivated message)
{
// TODO: Not sure if I need to handle this or not yet.
}
public void Handle(JobDisabled message)
{
// TODO: Haven't figured out what disabling a job means in the
// context of a RingBuffer.
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment