Skip to content

Instantly share code, notes, and snippets.

@tobi-tobsen
Created August 30, 2013 12:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tobi-tobsen/6389448 to your computer and use it in GitHub Desktop.
Save tobi-tobsen/6389448 to your computer and use it in GitHub Desktop.
A patch for netmq to compile under 3.5 client profile
diff --git a/src/NetMQ.Tests/NetMQ.Tests.csproj b/src/NetMQ.Tests/NetMQ.Tests.csproj
index 4e9d395..ab4b3dc 100644
--- a/src/NetMQ.Tests/NetMQ.Tests.csproj
+++ b/src/NetMQ.Tests/NetMQ.Tests.csproj
@@ -12,6 +12,8 @@
<AssemblyName>NetMQ.Tests</AssemblyName>
<TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
+ <TargetFrameworkProfile>
+ </TargetFrameworkProfile>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
@@ -64,7 +66,6 @@
<Compile Include="ExpectedZmqException.cs" />
<Compile Include="MessageTests.cs" />
<Compile Include="MonitorPollTests.cs" />
- <Compile Include="NetMQSchedulerTest.cs" />
<Compile Include="NetMQTestRunner.cs" />
<Compile Include="PgmTests.cs" />
<Compile Include="PollerTests.cs" />
@@ -73,7 +74,6 @@
<Compile Include="PubSubTests.cs" />
<Compile Include="PushPullTests.cs" />
<Compile Include="ReqRepTests.cs" />
- <Compile Include="Security\SecureChannelTests.cs" />
<Compile Include="SocketTests.cs" />
<Compile Include="zmq\YQueueTests.cs" />
<Compile Include="zmq\ZMQPollTests.cs" />
@@ -86,6 +86,7 @@
</ItemGroup>
<ItemGroup />
<ItemGroup>
+ <None Include="app.config" />
<None Include="NetMQ.Testing.crt">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
diff --git a/src/NetMQ/Devices/ThreadedDeviceRunner.cs b/src/NetMQ/Devices/ThreadedDeviceRunner.cs
index cdea96d..abe22eb 100644
--- a/src/NetMQ/Devices/ThreadedDeviceRunner.cs
+++ b/src/NetMQ/Devices/ThreadedDeviceRunner.cs
@@ -1,4 +1,4 @@
-using System.Threading.Tasks;
+using System.Threading;
namespace NetMQ.Devices
{
@@ -8,11 +8,13 @@ namespace NetMQ.Devices
internal class ThreadedDeviceRunner : DeviceRunner
{
public ThreadedDeviceRunner(IDevice device)
- : base(device) {
+ : base(device)
+ {
}
- public override void Start() {
- Task.Factory.StartNew(Device.Run, TaskCreationOptions.LongRunning);
+ public override void Start()
+ {
+ ThreadPool.QueueUserWorkItem(delegate { this.Device.Run(); });
}
}
}
\ No newline at end of file
diff --git a/src/NetMQ/Monitoring/NetMQMonitor.cs b/src/NetMQ/Monitoring/NetMQMonitor.cs
index d31337b..31f51b9 100644
--- a/src/NetMQ/Monitoring/NetMQMonitor.cs
+++ b/src/NetMQ/Monitoring/NetMQMonitor.cs
@@ -13,7 +13,7 @@ public class NetMQMonitor : IDisposable
private bool m_isOwner;
private Poller m_attachedPoller = null;
- readonly CancellationTokenSource m_cancellationTokenSource = new CancellationTokenSource();
+ bool m_cancellationTokenSource = false;
private readonly ManualResetEvent m_isStoppedEvent = new ManualResetEvent(true);
@@ -214,7 +214,7 @@ public void Start()
try
{
- while (!m_cancellationTokenSource.IsCancellationRequested)
+ while (!m_cancellationTokenSource)
{
MonitoringSocket.Poll(Timeout);
}
@@ -233,7 +233,7 @@ public void Stop()
throw new InvalidOperationException("Monitor attached to a poller, please detach from poller and don't use the stop method");
}
- m_cancellationTokenSource.Cancel();
+ m_cancellationTokenSource = true;
m_isStoppedEvent.WaitOne();
}
diff --git a/src/NetMQ/NetMQ.csproj b/src/NetMQ/NetMQ.csproj
index 74de613..9ea6830 100644
--- a/src/NetMQ/NetMQ.csproj
+++ b/src/NetMQ/NetMQ.csproj
@@ -10,8 +10,9 @@
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>NetMQ</RootNamespace>
<AssemblyName>NetMQ</AssemblyName>
- <TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
+ <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
+ <TargetFrameworkProfile>Client</TargetFrameworkProfile>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
@@ -38,7 +39,6 @@
</ItemGroup>
<ItemGroup>
<Compile Include="IReceivingSocket.cs" />
- <Compile Include="NetMQScheduler.cs" />
<Compile Include="OutgoingSocketExtensions.cs" />
<Compile Include="ErrorPollingException.cs" />
<Compile Include="NetMQFrame.cs" />
@@ -50,25 +50,6 @@
<Compile Include="NetMQSocketEventArgs.cs" />
<Compile Include="Poller.cs" />
<Compile Include="ReceivingSocketExtensions.cs" />
- <Compile Include="Security\NetMQSecurityException.cs">
- <SubType>Code</SubType>
- </Compile>
- <Compile Include="Security\CipherSuite.cs" />
- <Compile Include="Security\V0_1\ContentType.cs" />
- <Compile Include="Security\V0_1\HandshakeLayer.cs" />
- <Compile Include="Security\V0_1\HandshakeMessages\ClientHelloMessage.cs" />
- <Compile Include="Security\V0_1\HandshakeMessages\FinishedMessage.cs" />
- <Compile Include="Security\V0_1\HandshakeMessages\HandshakeMessage.cs" />
- <Compile Include="Security\V0_1\HandshakeMessages\CertificateMessage.cs" />
- <Compile Include="Security\V0_1\HandshakeMessages\ServerHelloDoneMessage.cs" />
- <Compile Include="Security\V0_1\HandshakeMessages\ServerHelloMessage.cs" />
- <Compile Include="Security\V0_1\HandshakeMessages\ClientKeyExchangeMessage.cs" />
- <Compile Include="Security\ISecureChannel.cs" />
- <Compile Include="Security\V0_1\OutgoingMessageBag.cs" />
- <Compile Include="Security\V0_1\RecordLayer.cs" />
- <Compile Include="Security\V0_1\SecureChannel.cs" />
- <Compile Include="Security\V0_1\SecurityParameters.cs" />
- <Compile Include="Security\V0_1\SHA256PRF.cs" />
<Compile Include="Sockets\DealerSocket.cs" />
<Compile Include="Devices\DeviceBase.cs" />
<Compile Include="Devices\DeviceMode.cs" />
diff --git a/src/NetMQ/NetMQScheduler.cs b/src/NetMQ/NetMQScheduler.cs
deleted file mode 100644
index 03ab10b..0000000
--- a/src/NetMQ/NetMQScheduler.cs
+++ /dev/null
@@ -1,188 +0,0 @@
-´╗┐using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Linq;
-using System.Runtime.InteropServices;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace NetMQ
-{
- public class NetMQScheduler : TaskScheduler, IDisposable
- {
- private readonly bool m_ownPoller;
- private readonly Poller m_poller;
-
- private static int s_schedulerCounter = 0;
-
- private readonly int m_schedulerId;
- private readonly string m_address;
-
- private readonly NetMQContext m_context;
- private readonly NetMQSocket m_serverSocket;
-
- private readonly ThreadLocal<NetMQSocket> m_clientSocket;
- private readonly ThreadLocal<bool> m_schedulerThread;
-
- private readonly ConcurrentBag<NetMQSocket> m_clientSockets;
-
- private EventHandler<NetMQSocketEventArgs> m_currentMessageHandler;
-
- public NetMQScheduler(NetMQContext context, Poller poller = null)
- {
- m_context = context;
- if (poller == null)
- {
- m_ownPoller = true;
-
- m_poller = new Poller();
- }
- else
- {
- m_ownPoller = false;
-
- m_poller = poller;
- }
-
- m_clientSockets = new ConcurrentBag<NetMQSocket>();
-
- m_schedulerId = Interlocked.Increment(ref s_schedulerCounter);
-
- m_address = string.Format("inproc://scheduler-{0}", m_schedulerId);
-
- m_serverSocket = context.CreatePullSocket();
- m_serverSocket.Options.Linger = TimeSpan.Zero;
- m_serverSocket.Bind(m_address);
-
- m_currentMessageHandler = OnMessageFirstTime;
-
- m_serverSocket.ReceiveReady += m_currentMessageHandler;
-
- m_poller.AddSocket(m_serverSocket);
-
- m_clientSocket = new ThreadLocal<NetMQSocket>(() =>
- {
- var socket = m_context.CreatePushSocket();
- socket.Connect(m_address);
-
- m_clientSockets.Add(socket);
-
- return socket;
- });
-
- m_schedulerThread = new ThreadLocal<bool>(() => false);
-
- if (m_ownPoller)
- {
- Task.Factory.StartNew(m_poller.Start, TaskCreationOptions.LongRunning);
- }
- }
-
- private void OnMessageFirstTime(object sender, NetMQSocketEventArgs e)
- {
- // set the current thread as the scheduler thread, this only happen the first time message arrived and important for the TryExecuteTaskInline
- m_schedulerThread.Value = true;
-
- // stop calling the OnMessageFirstTime and start calling OnMessage
- m_serverSocket.ReceiveReady -= m_currentMessageHandler;
- m_currentMessageHandler = OnMessage;
- m_serverSocket.ReceiveReady += m_currentMessageHandler;
-
- OnMessage(sender, e);
- }
-
- private void OnMessage(object sender, NetMQSocketEventArgs e)
- {
- byte[] data = m_serverSocket.Receive();
-
- IntPtr address;
-
- // checking if 64bit or 32 bit
- if (data.Length == 8)
- {
- address = new IntPtr(BitConverter.ToInt64(data, 0));
- }
- else
- {
- address = new IntPtr(BitConverter.ToInt32(data, 0));
- }
-
- GCHandle handle = GCHandle.FromIntPtr(address);
-
- Task task = (Task)handle.Target;
-
- TryExecuteTask(task);
-
- handle.Free();
- }
-
- protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
- {
- return m_schedulerThread.Value && TryExecuteTask(task);
- }
-
- public override int MaximumConcurrencyLevel
- {
- get { return 1; }
- }
-
- public void Dispose()
- {
- // disposing on the scheduler thread
- Task task = new Task(DisposeSynced);
- task.Start(this);
- task.Wait();
-
- // poller cannot be stopped from poller thread
- if (m_ownPoller)
- {
- m_poller.Stop();
- }
- }
-
- private void DisposeSynced()
- {
- Thread.MemoryBarrier();
-
- m_poller.RemoveSocket(m_serverSocket);
-
- m_serverSocket.ReceiveReady -= m_currentMessageHandler;
-
- foreach (var clientSocket in m_clientSockets)
- {
- clientSocket.Dispose();
- }
-
- m_serverSocket.Dispose();
- m_clientSocket.Dispose();
- }
-
- protected override IEnumerable<Task> GetScheduledTasks()
- {
- // this is not supported, also it's only important for debug propose and doesn't get called in real time
- throw new NotSupportedException();
- }
-
- protected override void QueueTask(Task task)
- {
- GCHandle handle = GCHandle.Alloc(task, GCHandleType.Normal);
-
- IntPtr address = GCHandle.ToIntPtr(handle);
-
- byte[] data;
-
- // checking if 64bit or 32 bit
- if (IntPtr.Size == 8)
- {
- data = BitConverter.GetBytes(address.ToInt64());
- }
- else
- {
- data = BitConverter.GetBytes(address.ToInt32());
- }
-
- m_clientSocket.Value.Send(data);
- }
- }
-}
diff --git a/src/NetMQ/NetMQSocket.cs b/src/NetMQ/NetMQSocket.cs
index 3ed6b49..34373b2 100644
--- a/src/NetMQ/NetMQSocket.cs
+++ b/src/NetMQ/NetMQSocket.cs
@@ -159,7 +159,7 @@ public bool Poll(TimeSpan timeout)
ZMQ.Poll(items, (int)timeout.TotalMilliseconds);
- if (item.ResultEvent.HasFlag(PollEvents.PollError) && !IgnoreErrors)
+ if (EnumFlagsHelper.HasFlag(item.ResultEvent, PollEvents.PollError) && !IgnoreErrors)
{
Errors++;
@@ -201,7 +201,7 @@ internal void InvokeEvents(object sender, PollEvents events)
{
m_socketEventArgs.Init(events);
- if (events.HasFlag(PollEvents.PollIn))
+ if (EnumFlagsHelper.HasFlag(events, PollEvents.PollIn))
{
var temp = m_receiveReady;
if (temp != null)
@@ -210,7 +210,7 @@ internal void InvokeEvents(object sender, PollEvents events)
}
}
- if (events.HasFlag(PollEvents.PollOut))
+ if (EnumFlagsHelper.HasFlag(events, PollEvents.PollOut))
{
var temp = m_sendReady;
if (temp != null)
diff --git a/src/NetMQ/NetMQSocketEventArgs.cs b/src/NetMQ/NetMQSocketEventArgs.cs
index 6eed16c..927cb6b 100644
--- a/src/NetMQ/NetMQSocketEventArgs.cs
+++ b/src/NetMQ/NetMQSocketEventArgs.cs
@@ -13,10 +13,10 @@ public NetMQSocketEventArgs(NetMQSocket socket)
Socket = socket;
}
- internal void Init(PollEvents events)
+ internal void Init(PollEvents events)
{
- this.ReceiveReady = events.HasFlag(PollEvents.PollIn);
- this.SendReady = events.HasFlag(PollEvents.PollOut);
+ this.ReceiveReady = EnumFlagsHelper.HasFlag(events, PollEvents.PollIn);
+ this.SendReady = EnumFlagsHelper.HasFlag(events, PollEvents.PollOut);
}
public NetMQSocket Socket { get; private set; }
diff --git a/src/NetMQ/Poller.cs b/src/NetMQ/Poller.cs
index 41ddedf..4b64a51 100644
--- a/src/NetMQ/Poller.cs
+++ b/src/NetMQ/Poller.cs
@@ -35,7 +35,7 @@ public NetMQSocket NetMQSocket
readonly List<NetMQTimer> m_timers = new List<NetMQTimer>();
readonly List<NetMQTimer> m_zombies = new List<NetMQTimer>();
- readonly CancellationTokenSource m_cancellationTokenSource;
+ bool m_cancellationTokenSource = false;
readonly ManualResetEvent m_isStoppedEvent = new ManualResetEvent(false);
private bool m_isStarted;
@@ -44,8 +44,6 @@ public NetMQSocket NetMQSocket
public Poller()
{
PollTimeout = 1000;
-
- m_cancellationTokenSource = new CancellationTokenSource();
}
/// <summary>
@@ -163,7 +161,7 @@ public void Start()
}
}
- while (!m_cancellationTokenSource.IsCancellationRequested)
+ while (!m_cancellationTokenSource)
{
if (m_isDirty)
{
@@ -195,7 +193,7 @@ public void Start()
NetMQSocket socket = m_pollact[itemNbr];
PollItem item = m_pollset[itemNbr];
- if (item.ResultEvent.HasFlag(PollEvents.PollError) && !socket.IgnoreErrors)
+ if (EnumFlagsHelper.HasFlag(item.ResultEvent, PollEvents.PollError) && !socket.IgnoreErrors)
{
socket.Errors++;
@@ -241,7 +239,7 @@ public void Start()
/// <param name="waitForCloseToComplete">if true the method will block until the poller is fully stopped</param>
public void Stop(bool waitForCloseToComplete)
{
- m_cancellationTokenSource.Cancel();
+ m_cancellationTokenSource = true;
if (waitForCloseToComplete)
{
diff --git a/src/NetMQ/ReceivingSocketExtensions.cs b/src/NetMQ/ReceivingSocketExtensions.cs
index 30839ff..32a7070 100644
--- a/src/NetMQ/ReceivingSocketExtensions.cs
+++ b/src/NetMQ/ReceivingSocketExtensions.cs
@@ -10,7 +10,7 @@ public static class ReceivingSocketExtensions
{
public static byte[] Receive(this IReceivingSocket socket, SendReceiveOptions options, out bool hasMore)
{
- return socket.Receive(options.HasFlag(SendReceiveOptions.DontWait), out hasMore);
+ return socket.Receive(EnumFlagsHelper.HasFlag(options, SendReceiveOptions.DontWait), out hasMore);
}
public static byte[] Receive(this IReceivingSocket socket, out bool hasMore)
@@ -38,7 +38,7 @@ public static string ReceiveString(this IReceivingSocket socket, bool dontWait,
public static string ReceiveString(this IReceivingSocket socket, SendReceiveOptions options, out bool hasMore)
{
- return socket.ReceiveString(options.HasFlag(SendReceiveOptions.DontWait), out hasMore);
+ return socket.ReceiveString(EnumFlagsHelper.HasFlag(options, SendReceiveOptions.DontWait), out hasMore);
}
public static string ReceiveString(this IReceivingSocket socket, SendReceiveOptions options)
@@ -71,10 +71,10 @@ public static NetMQMessage ReceiveMessage(this NetMQSocket socket, TimeSpan time
var items = new[] { item };
ZMQ.Poll(items, (int)timeout.TotalMilliseconds);
- if (item.ResultEvent.HasFlag(PollEvents.PollError) && !socket.IgnoreErrors)
+ if (EnumFlagsHelper.HasFlag(item.ResultEvent, PollEvents.PollError) && !socket.IgnoreErrors)
throw new ErrorPollingException("Error while polling", socket);
- if (!item.ResultEvent.HasFlag(PollEvents.PollIn))
+ if (!EnumFlagsHelper.HasFlag(item.ResultEvent, PollEvents.PollIn))
return null;
var msg = socket.ReceiveMessage();
diff --git a/src/NetMQ/zmq/Address.cs b/src/NetMQ/zmq/Address.cs
index 7f2dafa..682e212 100644
--- a/src/NetMQ/zmq/Address.cs
+++ b/src/NetMQ/zmq/Address.cs
@@ -44,12 +44,13 @@ public Address(EndPoint endpoint)
{
Protocol = "tcp";
- if (endpoint is DnsEndPoint)
- {
- DnsEndPoint dnsEndpoint = endpoint as DnsEndPoint;
- AddressString = dnsEndpoint.Host + ":" + dnsEndpoint.Port;
- }
- else if (endpoint is IPEndPoint)
+ //if (endpoint is DnsEndPoint)
+ //{
+ // DnsEndPoint dnsEndpoint = endpoint as DnsEndPoint;
+ // AddressString = dnsEndpoint.Host + ":" + dnsEndpoint.Port;
+ //}
+ //else
+ if (endpoint is IPEndPoint)
{
IPEndPoint ipEndpoint = endpoint as IPEndPoint;
AddressString = ipEndpoint.Address + ":" + ipEndpoint.Port;
diff --git a/src/NetMQ/zmq/Enums.cs b/src/NetMQ/zmq/Enums.cs
index 6ccc99f..35864fc 100644
--- a/src/NetMQ/zmq/Enums.cs
+++ b/src/NetMQ/zmq/Enums.cs
@@ -120,4 +120,25 @@ public enum PollEvents
PollOut = 0x2,
PollError = 0x4
}
+
+ public class EnumFlagsHelper
+ {
+
+ //public static bool HasFlag(this Enum flags, Enum flag)
+ //{
+ // return (flags & flag) != 0;
+ //}
+
+ public static bool HasFlag(PollEvents flags, PollEvents flag)
+ {
+ return (flags & flag) != 0;
+ }
+
+ // TODO: strange, why cant this be generic?
+
+ public static bool HasFlag(SendReceiveOptions flags, SendReceiveOptions flag)
+ {
+ return (flags & flag) != 0;
+ }
+ }
}
\ No newline at end of file
diff --git a/src/NetMQ/zmq/Signaler.cs b/src/NetMQ/zmq/Signaler.cs
index 85e3426..61d2975 100644
--- a/src/NetMQ/zmq/Signaler.cs
+++ b/src/NetMQ/zmq/Signaler.cs
@@ -109,7 +109,7 @@ private void MakeFDpair()
sync.ReleaseMutex();
// Release the kernel object
- sync.Dispose();
+ sync.Close();
}
public Socket FD
diff --git a/src/Samples/Load Balancing Pattern/ROUTERbrokerDEALERworkers/ROUTERbrokerDEALERworkers.csproj b/src/Samples/Load Balancing Pattern/ROUTERbrokerDEALERworkers/ROUTERbrokerDEALERworkers.csproj
index 899734b..eb6f07a 100644
--- a/src/Samples/Load Balancing Pattern/ROUTERbrokerDEALERworkers/ROUTERbrokerDEALERworkers.csproj
+++ b/src/Samples/Load Balancing Pattern/ROUTERbrokerDEALERworkers/ROUTERbrokerDEALERworkers.csproj
@@ -48,7 +48,7 @@
<None Include="App.config" />
</ItemGroup>
<ItemGroup>
- <ProjectReference Include="..\..\..\..\..\..\GitHub\netmq\src\NetMQ\NetMQ.csproj">
+ <ProjectReference Include="..\..\..\NetMQ\NetMQ.csproj">
<Project>{82934bac-07fb-41ac-ae59-46fee6026a40}</Project>
<Name>NetMQ</Name>
</ProjectReference>
diff --git a/src/Samples/Load Balancing Pattern/ROUTERbrokerREQworkers/ROUTERbrokerREQworkers.csproj b/src/Samples/Load Balancing Pattern/ROUTERbrokerREQworkers/ROUTERbrokerREQworkers.csproj
index 9e0529a..eba605d 100644
--- a/src/Samples/Load Balancing Pattern/ROUTERbrokerREQworkers/ROUTERbrokerREQworkers.csproj
+++ b/src/Samples/Load Balancing Pattern/ROUTERbrokerREQworkers/ROUTERbrokerREQworkers.csproj
@@ -48,7 +48,7 @@
<None Include="App.config" />
</ItemGroup>
<ItemGroup>
- <ProjectReference Include="..\..\..\..\..\..\GitHub\netmq\src\NetMQ\NetMQ.csproj">
+ <ProjectReference Include="..\..\..\NetMQ\NetMQ.csproj">
<Project>{82934bac-07fb-41ac-ae59-46fee6026a40}</Project>
<Name>NetMQ</Name>
</ProjectReference>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment