Skip to content

Instantly share code, notes, and snippets.

@antiduh
Created March 8, 2017 21:43
Show Gist options
  • Save antiduh/27bf516c745195c5ab6af71fb7e50f6f to your computer and use it in GitHub Desktop.
Save antiduh/27bf516c745195c5ab6af71fb7e50f6f to your computer and use it in GitHub Desktop.
Work-in-progress cross-process events
using System;
using System.Collections.Generic;
using System.IO.MemoryMappedFiles;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
namespace EventTest
{
public class EventListener : IDisposable
{
private Semaphore waitHandle;
private RegisteredWaitHandle handle;
private SharedEvent parent;
internal EventListener( Semaphore waitHandle, int memberId, SharedEvent parent )
{
this.waitHandle = waitHandle;
this.MemberId = memberId;
this.parent = parent;
RegisterThreadPool();
}
internal int MemberId { get; private set; }
public event Action Occurred;
public void Dispose()
{
if( this.handle != null )
{
parent.Destroy( this );
UnregisterThreadPool();
this.handle = null;
this.waitHandle.Dispose();
}
}
private void RegisterThreadPool()
{
this.handle = ThreadPool.RegisterWaitForSingleObject( waitHandle, Callback, null, -1, false );
}
private void UnregisterThreadPool()
{
this.handle.Unregister( this.waitHandle );
}
private void Callback( object state, bool timedOut )
{
Occurred?.Invoke();
}
}
public class SharedEvent : IDisposable
{
// A pool consists of N number of listeners.
// Each listener has a personal wait handle that they expose.
// When the pool is triggered, we have to find out who all of the listeners are,
// look up their wait handle, and individually trigger each handle.
// In order to access the pool's data structure, we all must share one global lock
// so as to not corrupt the pool, and to make sure we get a consistent read of the pool's state.
// 4 bytes NumMembers
// 4 bytes Member1Id
// 4 bytes Member2Id
// ...
// So a pool is then:
// - A single shared lock that protects all reads and writes.
// - A single shared memory region that we use to communicate the state of the pool and
// who's registered.
// - N listeners, each with a registration and their own personal wait handle.
private Semaphore poolLock;
private MemoryMappedFile pool;
public SharedEvent( string name, int maxListeners = 1024 )
{
this.Name = name;
this.poolLock = new Semaphore( 1, 1, LockName() );
this.pool = MemoryMappedFile.CreateOrOpen(
ShmemName(),
4 + maxListeners * 4,
MemoryMappedFileAccess.ReadWrite,
MemoryMappedFileOptions.None,
null, System.IO.HandleInheritability.None
);
}
public string Name { get; private set; }
public string GlobalName
{
get
{
return "Antiduh.EventPool_" + this.Name;
}
}
public EventListener GetListener()
{
this.poolLock.WaitOne();
try
{
List<int> memberIds = ReadMemberIds();
int nextId = FindNextMemberId( memberIds );
memberIds.Add( nextId );
memberIds.Sort();
EventListener listener = new EventListener(
CreateMemberWaitHandle( nextId ),
nextId,
this
);
WriteMemberIds( memberIds );
return listener;
}
finally
{
this.poolLock.Release();
}
}
public void Trigger()
{
this.poolLock.WaitOne();
try
{
List<int> memberIds = ReadMemberIds();
foreach( int member in memberIds )
{
var handle = CreateMemberWaitHandle( member );
handle.Release();
handle.Dispose();
}
}
finally
{
this.poolLock.Release();
}
}
public void Dispose()
{
if( this.pool != null )
{
this.pool.Dispose();
this.pool = null;
this.poolLock.Dispose();
this.poolLock = null;
}
}
internal void Destroy( EventListener child )
{
this.poolLock.WaitOne();
try
{
List<int> memberIds = ReadMemberIds();
memberIds.Remove( child.MemberId );
WriteMemberIds( memberIds );
}
finally
{
this.poolLock.Release();
}
}
private Semaphore CreateMemberWaitHandle( int id )
{
return new Semaphore( 0, int.MaxValue - 1, MemberName( id ) );
}
private List<int> ReadMemberIds()
{
MemoryMappedViewAccessor view;
int numMembers;
int[] memberIds;
int position = 0;
view = this.pool.CreateViewAccessor();
numMembers = view.ReadInt32( position );
position += 4;
memberIds = new int[numMembers];
view.ReadArray( position, memberIds, 0, numMembers );
position += sizeof( int ) * numMembers;
return new List<int>( memberIds );
}
private void WriteMemberIds( List<int> memberIds )
{
MemoryMappedViewAccessor view;
int position = 0;
view = this.pool.CreateViewAccessor();
view.Write( position, (int)memberIds.Count );
position += 4;
foreach( int id in memberIds )
{
view.Write( position, (int)id );
position += 4;
}
}
private int FindNextMemberId( List<int> ids )
{
// The list is always sorted.
// If we find {1, 2, 4}, then pick 3.
// If we find {1, 2, 3}, then pick 4.
if( ids.Count == 0 )
{
return 0;
}
else
{
int candidate = ids[0];
bool found = false;
for( int i = 1; i < ids.Count && !found; i++ )
{
if( ids[i] > candidate + 1 )
{
// found a hole.
candidate = candidate + 1;
found = true;
}
else
{
candidate = ids[i];
}
}
if( found == false )
{
candidate = ids.Last() + 1;
}
return candidate;
}
}
private string LockName()
{
return this.GlobalName + ".PoolLock";
}
private string ShmemName()
{
return this.GlobalName + ".Memory";
}
private string MemberName( int memberId )
{
return string.Format( "{0}.Member{1}", this.GlobalName, memberId );
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment