-
-
Save phr34k/3a59af154d8ce6636dfc332271052a6a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private Thread notification_connection(FileStorage storage, Npgsql.NpgsqlConnection connection1) | |
{ | |
var resetEvent = connection1Reset; | |
var conn2 = connection1; | |
var isOpened = false; | |
NotificationEventHandler notifyHandler = (o, e) => | |
{ | |
Trace.TraceInformation("Received update notification from server"); | |
Interlocked.CompareExchange(ref update, 1, 0); | |
}; | |
StateChangeEventHandler stateHandler = (o, e) => | |
{ | |
//Connection was changed to open from a closed connection | |
if (e.CurrentState == ConnectionState.Open && (e.OriginalState == ConnectionState.Closed || e.OriginalState == ConnectionState.Broken)) | |
{ | |
isOpened = true; | |
connection1Since = null; | |
Interlocked.CompareExchange(ref update, 1, 0); | |
resetEvent.Set(); | |
Interlocked.Increment(ref activeConnections); | |
if(connectionStateChanged!=null) connectionStateChanged.Invoke(o, e); | |
Trace.TraceInformation("Server connected"); | |
if( activeConnections == 2) Publish("events.connection.changed.good"); | |
} | |
//Connection was changed to closed or broken from any state | |
else if (e.CurrentState == ConnectionState.Closed || e.CurrentState == ConnectionState.Broken) | |
{ | |
connection1Since = DateTime.Now; | |
Trace.TraceInformation("Server disconnected"); | |
resetEvent.Reset(); | |
Interlocked.Decrement(ref activeConnections); | |
if (connectionStateChanged != null) connectionStateChanged.Invoke(o, e); | |
Publish("events.connection.changed.bad"); | |
} | |
}; | |
conn2.Notification += notifyHandler; | |
conn2.StateChange += stateHandler; | |
Thread y = new Thread(() => | |
{ | |
var handles = new WaitHandle[2]{ stopped, resetEvent}; | |
string queryFormat = string.Format("LISTEN update_{0}", siteid); | |
while (running == true) | |
{ | |
WaitHandle.WaitAny(handles); | |
if (running == false) break; | |
if (isOpened == true) | |
{ | |
using (var cmd = new NpgsqlCommand(queryFormat, conn2)) | |
{ | |
cmd.ExecuteNonQuery(); | |
cmd.Dispose(); | |
} | |
} | |
//If the connection is still open & we received an async command | |
if (Interlocked.CompareExchange(ref update, 0, 1) == 1) | |
{ | |
try | |
{ | |
waitingMutex.WaitOne(); | |
storage.Update(connection1, this, siteid); | |
} | |
finally | |
{ | |
waitingMutex.ReleaseMutex(); | |
isOpened = true; | |
} | |
} | |
try | |
{ | |
while (conn2.State == ConnectionState.Open && update == 0 && running == true) | |
{ | |
Task wait = null; | |
Trace.TraceInformation("Waiting for listen"); | |
waitingMutex.WaitOne(); | |
waitingReset.Reset(); | |
try | |
{ | |
wait = conn2.WaitAsync(source.Token); | |
wait.Wait(); | |
} | |
catch(System.InvalidOperationException ex) | |
{ | |
if (ex.Source == "Npgsql") | |
{ | |
waitingReset.Set(); | |
Trace.TraceInformation("Cancelled for listen"); | |
break; | |
} | |
else | |
{ | |
throw; | |
} | |
} | |
catch (System.AggregateException ex) | |
{ | |
if (wait.IsCanceled == true) | |
{ | |
waitingReset.Set(); | |
Trace.TraceInformation("Cancelled for listen"); | |
break; | |
} | |
} | |
finally | |
{ | |
waitingMutex.ReleaseMutex(); | |
} | |
} | |
} | |
catch (Npgsql.NpgsqlException e) | |
{ | |
Trace.TraceInformation("Connection was broken"); | |
} | |
} | |
conn2.Notification -= notifyHandler; | |
conn2.StateChange -= stateHandler; | |
}); | |
return y; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment