Skip to content

Instantly share code, notes, and snippets.

@phr34k

phr34k/code.cs Secret

Created February 23, 2020 06:20
Show Gist options
  • Save phr34k/3a59af154d8ce6636dfc332271052a6a to your computer and use it in GitHub Desktop.
Save phr34k/3a59af154d8ce6636dfc332271052a6a to your computer and use it in GitHub Desktop.
private Thread notification_connection(FileStorage storage, Npgsql.NpgsqlConnection connection1)
{
var resetEvent = connection1Reset;
var conn2 = connection1;
var isOpened = false;
NotificationEventHandler notifyHandler = (o, e) =>
{
Trace.TraceInformation("Received update notification from server");
Interlocked.CompareExchange(ref update, 1, 0);
};
StateChangeEventHandler stateHandler = (o, e) =>
{
//Connection was changed to open from a closed connection
if (e.CurrentState == ConnectionState.Open && (e.OriginalState == ConnectionState.Closed || e.OriginalState == ConnectionState.Broken))
{
isOpened = true;
connection1Since = null;
Interlocked.CompareExchange(ref update, 1, 0);
resetEvent.Set();
Interlocked.Increment(ref activeConnections);
if(connectionStateChanged!=null) connectionStateChanged.Invoke(o, e);
Trace.TraceInformation("Server connected");
if( activeConnections == 2) Publish("events.connection.changed.good");
}
//Connection was changed to closed or broken from any state
else if (e.CurrentState == ConnectionState.Closed || e.CurrentState == ConnectionState.Broken)
{
connection1Since = DateTime.Now;
Trace.TraceInformation("Server disconnected");
resetEvent.Reset();
Interlocked.Decrement(ref activeConnections);
if (connectionStateChanged != null) connectionStateChanged.Invoke(o, e);
Publish("events.connection.changed.bad");
}
};
conn2.Notification += notifyHandler;
conn2.StateChange += stateHandler;
Thread y = new Thread(() =>
{
var handles = new WaitHandle[2]{ stopped, resetEvent};
string queryFormat = string.Format("LISTEN update_{0}", siteid);
while (running == true)
{
WaitHandle.WaitAny(handles);
if (running == false) break;
if (isOpened == true)
{
using (var cmd = new NpgsqlCommand(queryFormat, conn2))
{
cmd.ExecuteNonQuery();
cmd.Dispose();
}
}
//If the connection is still open & we received an async command
if (Interlocked.CompareExchange(ref update, 0, 1) == 1)
{
try
{
waitingMutex.WaitOne();
storage.Update(connection1, this, siteid);
}
finally
{
waitingMutex.ReleaseMutex();
isOpened = true;
}
}
try
{
while (conn2.State == ConnectionState.Open && update == 0 && running == true)
{
Task wait = null;
Trace.TraceInformation("Waiting for listen");
waitingMutex.WaitOne();
waitingReset.Reset();
try
{
wait = conn2.WaitAsync(source.Token);
wait.Wait();
}
catch(System.InvalidOperationException ex)
{
if (ex.Source == "Npgsql")
{
waitingReset.Set();
Trace.TraceInformation("Cancelled for listen");
break;
}
else
{
throw;
}
}
catch (System.AggregateException ex)
{
if (wait.IsCanceled == true)
{
waitingReset.Set();
Trace.TraceInformation("Cancelled for listen");
break;
}
}
finally
{
waitingMutex.ReleaseMutex();
}
}
}
catch (Npgsql.NpgsqlException e)
{
Trace.TraceInformation("Connection was broken");
}
}
conn2.Notification -= notifyHandler;
conn2.StateChange -= stateHandler;
});
return y;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment