Last active
March 1, 2024 11:00
-
-
Save kkadir/a67eb3d2443ce1564a63cb5cc04c69bc to your computer and use it in GitHub Desktop.
Sample Receive and Push
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using Gevasys.DataCore.Client; | |
using Gevasys.DataCore.Core; | |
using Gevasys.DataCore.Core.Protocol; | |
using Gevasys.DataCore.Core.Protocol.Binary; | |
using Gevasys.DataCore.Core.Protocol.Binary.Decoding; | |
using Gevasys.DataCore.Tools.Protocol; | |
using Gevasys.DataCore.RequestData.ProtoBuf.V2; | |
namespace DataManagerConnect.SampleApplication | |
{ | |
class SamplePushClient : PushClient | |
{ | |
public SamplePushClient(string name) | |
: base(name) | |
{ | |
} | |
public override void ProcessUpdates(List<Gevasys.DataCore.Core.Protocol.UpdateMessage> updates) | |
{ | |
foreach (UpdateMessage update in updates) | |
{ | |
switch (update.MessageType) | |
{ | |
case BaseBinaryMessage.MessageType.Proto: | |
//Get the Message ID | |
StringField sf = (StringField)update.GetField(1); | |
//News | |
if (sf.Value.Equals(MessageIdentifier.NewsMessage)) | |
{ | |
NewsMessage nm = ProtobufMessageHelper.DeserializeNewsMessage(update); | |
} | |
break; | |
case BaseBinaryMessage.MessageType.PartialRecap: | |
case BaseBinaryMessage.MessageType.Recap: | |
case BaseBinaryMessage.MessageType.Update: | |
ProcessUpdateOrRecap(update); | |
break; | |
case BaseBinaryMessage.MessageType.RequestStatus: | |
if (((RequestStatusMessage)update).Status == RequestStatusType.NotDefined) | |
{ | |
Console.WriteLine("Instrument not definied: " + update.GetInstrument()); | |
} | |
else if (((RequestStatusMessage)update).Status == RequestStatusType.NoPermission) | |
{ | |
Console.WriteLine("No permission for instrument: " + update.GetInstrument()); | |
} | |
else if (((RequestStatusMessage)update).Status == RequestStatusType.NotFound) | |
{ | |
Console.WriteLine("Instrument not found: " + update.GetInstrument()); | |
} | |
else if (((RequestStatusMessage)update).Status == RequestStatusType.ServiceNotAvailable) | |
{ | |
Console.WriteLine("Underlying data service is not available"); | |
} | |
break; | |
default: | |
break; | |
} | |
//Attention! Possible heavy CPU usage due console output. | |
Console.WriteLine(update.toMachineReadableOutput()); | |
} | |
} | |
private void ProcessUpdateOrRecap(UpdateMessage update) | |
{ | |
//Iterate through all field of update | |
foreach (BaseField field in update.FieldDict.Values) | |
{ | |
ProcessFieldUpdate(field); | |
} | |
//And/or | |
//Direct access to fields (see vwd Fieldmap.xml) | |
BaseField outField = null; | |
if (update.TryGetField(80, out outField)) | |
{ | |
ProcessFieldUpdate(outField); | |
} | |
} | |
private void ProcessFieldUpdate(BaseField field) | |
{ | |
short fieldId = field.FieldId; | |
switch (field.DataTypeValue) | |
{ | |
case BaseBinaryMessage.DataType.String: | |
string stringValue = ((StringField)field).Value; | |
break; | |
case BaseBinaryMessage.DataType.DateTime: | |
DateTime timeValue = ((DateTimeField)field).Value; | |
break; | |
case BaseBinaryMessage.DataType.Double: | |
double doubleValue = ((DoubleField)field).Value; | |
break; | |
case BaseBinaryMessage.DataType.Float: | |
float floatValue = ((FloatField)field).Value; | |
break; | |
case BaseBinaryMessage.DataType.Int32: | |
int intValue = ((IntField)field).Value; | |
break; | |
case BaseBinaryMessage.DataType.Long: | |
long longValue = ((LongField)field).Value; | |
break; | |
case BaseBinaryMessage.DataType.Short: | |
short shortField = ((ShortField)field).Value; | |
break; | |
} | |
} | |
public override void Update(IConsumer source, ConsumerStateChangedEventArgs arg) | |
{ | |
if (arg.IsOnline) | |
{ | |
//Consumer is online. Feed data will be received. | |
} | |
else | |
{ | |
//Consumer is offline. No feed data will be received. | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Security.Cryptography; | |
using kx; | |
using NLog; | |
namespace FeedDemo | |
{ | |
static class Program | |
{ | |
private static readonly ILogger Logger = LogManager.GetCurrentClassLogger(); | |
private const string QFunc = ".u.upd"; | |
private const string TableName = "mytable"; | |
static void Main() | |
{ | |
string host = "localhost"; | |
int port = 5001; | |
string usernamePassword = $"{Environment.UserName}:mypassword"; | |
c connection = null; | |
try | |
{ | |
connection = new c(host, port, usernamePassword); | |
//Example of 10 single row inserts to a table | |
InsertRows(connection); | |
//Example of bulk inserts to a table to improve throughput | |
BulkInsertRows(connection); | |
} | |
catch (Exception ex) | |
{ | |
Logger.Error($"Error occurred running Feed-Demo. \r\n{ex}"); | |
} | |
finally | |
{ | |
if (connection != null) | |
{ | |
connection.Close(); | |
} | |
} | |
} | |
private static void InsertRows(c connection) | |
{ | |
// Single row insert - not as efficient as bulk insert | |
Logger.Info("Populating '{0}' table on kdb server with 10 rows...", TableName); | |
for(int i = 0; i < 10; i++) | |
{ | |
// Assumes a remote schema of mytable:([]time:`timespan$();sym:`symbol$();price:`float$();size:`long$()) | |
object[] row = new object[] | |
{ | |
new c.KTimespan(100), | |
"SYMBOL", | |
93.5, | |
300L | |
}; | |
connection.ks(QFunc, TableName, row); | |
} | |
Logger.Info("Successfully inserted 10 rows to {0}", TableName); | |
} | |
private static void BulkInsertRows(c connection) | |
{ | |
// Bulk row insert - more efficient | |
string[] syms = new[] { "ABC", "DEF", "GHI", "JKL" }; | |
c.KTimespan[] times = CreateTestArray(i => new c.KTimespan(i), 10); | |
string[] symbols = CreateTestArray(i => syms[RandomNumberGenerator.GetInt32(syms.Length)], 10); | |
double[] prices = CreateTestArray(i => i * 1.1, 10); | |
long[] sizes = CreateTestArray(i => (long)(i * 100), 10); | |
Logger.Info("Bulk populating '{0}' table on kdb server without using column names", TableName); | |
connection.ks(QFunc, TableName, new object[] { times, symbols, prices, sizes }); | |
Logger.Info("Bulk populating '{0}' table on kdb server using column names", TableName); | |
connection.ks(QFunc, TableName, new c.Flip(new c.Dict(new string[] { "time", "sym", "price", "size" }, new object[] { times, symbols, prices, sizes }))); | |
//block until all messages are processed | |
connection.k(string.Empty); | |
} | |
private static T[] CreateTestArray<T>(Func<int, T> elementBuilder, int arraySize) | |
{ | |
T[] array = new T[arraySize]; | |
for (int i = 0; i < arraySize; i++) | |
{ | |
array[i] = elementBuilder(i); | |
} | |
return array; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.IO; | |
using System.Threading; | |
using Gevasys.DataCore.Configuration; | |
using Gevasys.DataCore.Core.Logging; | |
using Gevasys.DataCore.Common; | |
using Gevasys.DataCore.Consumer.Manager; | |
namespace DataManagerConnect.SampleApplication | |
{ | |
class Sample | |
{ | |
/// <summary> | |
/// Main entry point | |
/// </summary> | |
/// <param name="args"></param> | |
public static void Main(string[] args) | |
{ | |
Sample dmcClientSample = new Sample(); | |
if (!dmcClientSample.Initialize()) | |
return; | |
dmcClientSample.Start(); | |
Console.WriteLine("Press any key to exit ..."); | |
Console.ReadKey(); | |
} | |
/// <summary> | |
/// Initialize the Data Manager Connect (DMC) library | |
/// Note that DMC configuration is taken from App.config via .NET configuration mechanism | |
/// </summary> | |
/// <returns></returns> | |
public bool Initialize() | |
{ | |
//Initialize the logging system (hint the log4net configuration file 'log_config.xml') | |
string logConfigFilePath = Path.Combine(Environment.CurrentDirectory, "Configuration"); | |
Logger.Initialise(logConfigFilePath, "BaseLogger", "log_config.xml"); | |
//Read the configuration from App.config | |
if (!ConfigurationHelper.Initialize()) | |
{ | |
Logger.Log.Error("Failed to initialize DMC library! Please check logfiles and your configuration!"); | |
Console.WriteLine("Failed to initialize DMC library! Either wrong credentials or perhaps network connectivity problems."); | |
return false; | |
} | |
// Everything successfully initialized, we're ready to work with DMC | |
return true; | |
} | |
public void Start() | |
{ | |
Console.WriteLine("======= Testing Push Data samples (Streaming - ConsumerStore) =========="); | |
new SamplePushData().GetStreamingData(); | |
Console.WriteLine("Finished!"); | |
} | |
} | |
public class SamplePushData | |
{ | |
// Please NOTE: create fields as private for Push Data | |
private ConsumerStore consumerStore = null; | |
private SamplePushClient pushClient = null; | |
public void GetStreamingData() | |
{ | |
// Initialization | |
pushClient = new SamplePushClient("Sample"); | |
pushClient.Start(); | |
consumerStore = ConfigurationHelper.CreateConsumer(pushClient); | |
// GMS -> BND, CER, FND, IND, WNT | |
consumerStore.AddFilter(new FilterElement { ExchangeCode = "GMS", SecurityType = SecurityType.Bond }); | |
// Wait 60 seconds and stop | |
Thread.Sleep(60000); | |
StopStreaming(); | |
} | |
// Helper method to stop streaming data store and push client | |
public void StopStreaming() | |
{ | |
if (consumerStore != null) | |
{ | |
Console.WriteLine("Stopping consumerStore..."); | |
consumerStore.Stop(); | |
// Waiting to give consumerStore time to stop | |
Thread.Sleep(3000); | |
Console.WriteLine("Stopping pushClient sample..."); | |
pushClient.Stop(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment