Skip to content

Instantly share code, notes, and snippets.

@kkadir
Last active March 1, 2024 11:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kkadir/a67eb3d2443ce1564a63cb5cc04c69bc to your computer and use it in GitHub Desktop.
Save kkadir/a67eb3d2443ce1564a63cb5cc04c69bc to your computer and use it in GitHub Desktop.
Sample Receive and Push
using System;
using System.Collections.Generic;
using Gevasys.DataCore.Client;
using Gevasys.DataCore.Core;
using Gevasys.DataCore.Core.Protocol;
using Gevasys.DataCore.Core.Protocol.Binary;
using Gevasys.DataCore.Core.Protocol.Binary.Decoding;
using Gevasys.DataCore.Tools.Protocol;
using Gevasys.DataCore.RequestData.ProtoBuf.V2;
namespace DataManagerConnect.SampleApplication
{
class SamplePushClient : PushClient
{
public SamplePushClient(string name)
: base(name)
{
}
public override void ProcessUpdates(List<Gevasys.DataCore.Core.Protocol.UpdateMessage> updates)
{
foreach (UpdateMessage update in updates)
{
switch (update.MessageType)
{
case BaseBinaryMessage.MessageType.Proto:
//Get the Message ID
StringField sf = (StringField)update.GetField(1);
//News
if (sf.Value.Equals(MessageIdentifier.NewsMessage))
{
NewsMessage nm = ProtobufMessageHelper.DeserializeNewsMessage(update);
}
break;
case BaseBinaryMessage.MessageType.PartialRecap:
case BaseBinaryMessage.MessageType.Recap:
case BaseBinaryMessage.MessageType.Update:
ProcessUpdateOrRecap(update);
break;
case BaseBinaryMessage.MessageType.RequestStatus:
if (((RequestStatusMessage)update).Status == RequestStatusType.NotDefined)
{
Console.WriteLine("Instrument not definied: " + update.GetInstrument());
}
else if (((RequestStatusMessage)update).Status == RequestStatusType.NoPermission)
{
Console.WriteLine("No permission for instrument: " + update.GetInstrument());
}
else if (((RequestStatusMessage)update).Status == RequestStatusType.NotFound)
{
Console.WriteLine("Instrument not found: " + update.GetInstrument());
}
else if (((RequestStatusMessage)update).Status == RequestStatusType.ServiceNotAvailable)
{
Console.WriteLine("Underlying data service is not available");
}
break;
default:
break;
}
//Attention! Possible heavy CPU usage due console output.
Console.WriteLine(update.toMachineReadableOutput());
}
}
private void ProcessUpdateOrRecap(UpdateMessage update)
{
//Iterate through all field of update
foreach (BaseField field in update.FieldDict.Values)
{
ProcessFieldUpdate(field);
}
//And/or
//Direct access to fields (see vwd Fieldmap.xml)
BaseField outField = null;
if (update.TryGetField(80, out outField))
{
ProcessFieldUpdate(outField);
}
}
private void ProcessFieldUpdate(BaseField field)
{
short fieldId = field.FieldId;
switch (field.DataTypeValue)
{
case BaseBinaryMessage.DataType.String:
string stringValue = ((StringField)field).Value;
break;
case BaseBinaryMessage.DataType.DateTime:
DateTime timeValue = ((DateTimeField)field).Value;
break;
case BaseBinaryMessage.DataType.Double:
double doubleValue = ((DoubleField)field).Value;
break;
case BaseBinaryMessage.DataType.Float:
float floatValue = ((FloatField)field).Value;
break;
case BaseBinaryMessage.DataType.Int32:
int intValue = ((IntField)field).Value;
break;
case BaseBinaryMessage.DataType.Long:
long longValue = ((LongField)field).Value;
break;
case BaseBinaryMessage.DataType.Short:
short shortField = ((ShortField)field).Value;
break;
}
}
public override void Update(IConsumer source, ConsumerStateChangedEventArgs arg)
{
if (arg.IsOnline)
{
//Consumer is online. Feed data will be received.
}
else
{
//Consumer is offline. No feed data will be received.
}
}
}
}
using System;
using System.Security.Cryptography;
using kx;
using NLog;
namespace FeedDemo
{
static class Program
{
private static readonly ILogger Logger = LogManager.GetCurrentClassLogger();
private const string QFunc = ".u.upd";
private const string TableName = "mytable";
static void Main()
{
string host = "localhost";
int port = 5001;
string usernamePassword = $"{Environment.UserName}:mypassword";
c connection = null;
try
{
connection = new c(host, port, usernamePassword);
//Example of 10 single row inserts to a table
InsertRows(connection);
//Example of bulk inserts to a table to improve throughput
BulkInsertRows(connection);
}
catch (Exception ex)
{
Logger.Error($"Error occurred running Feed-Demo. \r\n{ex}");
}
finally
{
if (connection != null)
{
connection.Close();
}
}
}
private static void InsertRows(c connection)
{
// Single row insert - not as efficient as bulk insert
Logger.Info("Populating '{0}' table on kdb server with 10 rows...", TableName);
for(int i = 0; i < 10; i++)
{
// Assumes a remote schema of mytable:([]time:`timespan$();sym:`symbol$();price:`float$();size:`long$())
object[] row = new object[]
{
new c.KTimespan(100),
"SYMBOL",
93.5,
300L
};
connection.ks(QFunc, TableName, row);
}
Logger.Info("Successfully inserted 10 rows to {0}", TableName);
}
private static void BulkInsertRows(c connection)
{
// Bulk row insert - more efficient
string[] syms = new[] { "ABC", "DEF", "GHI", "JKL" };
c.KTimespan[] times = CreateTestArray(i => new c.KTimespan(i), 10);
string[] symbols = CreateTestArray(i => syms[RandomNumberGenerator.GetInt32(syms.Length)], 10);
double[] prices = CreateTestArray(i => i * 1.1, 10);
long[] sizes = CreateTestArray(i => (long)(i * 100), 10);
Logger.Info("Bulk populating '{0}' table on kdb server without using column names", TableName);
connection.ks(QFunc, TableName, new object[] { times, symbols, prices, sizes });
Logger.Info("Bulk populating '{0}' table on kdb server using column names", TableName);
connection.ks(QFunc, TableName, new c.Flip(new c.Dict(new string[] { "time", "sym", "price", "size" }, new object[] { times, symbols, prices, sizes })));
//block until all messages are processed
connection.k(string.Empty);
}
private static T[] CreateTestArray<T>(Func<int, T> elementBuilder, int arraySize)
{
T[] array = new T[arraySize];
for (int i = 0; i < arraySize; i++)
{
array[i] = elementBuilder(i);
}
return array;
}
}
}
using System;
using System.IO;
using System.Threading;
using Gevasys.DataCore.Configuration;
using Gevasys.DataCore.Core.Logging;
using Gevasys.DataCore.Common;
using Gevasys.DataCore.Consumer.Manager;
namespace DataManagerConnect.SampleApplication
{
class Sample
{
/// <summary>
/// Main entry point
/// </summary>
/// <param name="args"></param>
public static void Main(string[] args)
{
Sample dmcClientSample = new Sample();
if (!dmcClientSample.Initialize())
return;
dmcClientSample.Start();
Console.WriteLine("Press any key to exit ...");
Console.ReadKey();
}
/// <summary>
/// Initialize the Data Manager Connect (DMC) library
/// Note that DMC configuration is taken from App.config via .NET configuration mechanism
/// </summary>
/// <returns></returns>
public bool Initialize()
{
//Initialize the logging system (hint the log4net configuration file 'log_config.xml')
string logConfigFilePath = Path.Combine(Environment.CurrentDirectory, "Configuration");
Logger.Initialise(logConfigFilePath, "BaseLogger", "log_config.xml");
//Read the configuration from App.config
if (!ConfigurationHelper.Initialize())
{
Logger.Log.Error("Failed to initialize DMC library! Please check logfiles and your configuration!");
Console.WriteLine("Failed to initialize DMC library! Either wrong credentials or perhaps network connectivity problems.");
return false;
}
// Everything successfully initialized, we're ready to work with DMC
return true;
}
public void Start()
{
Console.WriteLine("======= Testing Push Data samples (Streaming - ConsumerStore) ==========");
new SamplePushData().GetStreamingData();
Console.WriteLine("Finished!");
}
}
public class SamplePushData
{
// Please NOTE: create fields as private for Push Data
private ConsumerStore consumerStore = null;
private SamplePushClient pushClient = null;
public void GetStreamingData()
{
// Initialization
pushClient = new SamplePushClient("Sample");
pushClient.Start();
consumerStore = ConfigurationHelper.CreateConsumer(pushClient);
// GMS -> BND, CER, FND, IND, WNT
consumerStore.AddFilter(new FilterElement { ExchangeCode = "GMS", SecurityType = SecurityType.Bond });
// Wait 60 seconds and stop
Thread.Sleep(60000);
StopStreaming();
}
// Helper method to stop streaming data store and push client
public void StopStreaming()
{
if (consumerStore != null)
{
Console.WriteLine("Stopping consumerStore...");
consumerStore.Stop();
// Waiting to give consumerStore time to stop
Thread.Sleep(3000);
Console.WriteLine("Stopping pushClient sample...");
pushClient.Stop();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment