Skip to content

Instantly share code, notes, and snippets.

@tnayanam
Created March 27, 2020 11:20
Show Gist options
  • Save tnayanam/f0e29ad5b35b8f0ab8c84a49b745ce02 to your computer and use it in GitHub Desktop.
Save tnayanam/f0e29ad5b35b8f0ab8c84a49b745ce02 to your computer and use it in GitHub Desktop.
using System;
using System.IO;
using System.Text;
using Newtonsoft.Json;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;
using Amazon.DynamoDBv2.Model;
using Microsoft.Azure.ServiceBus;
using System.Threading.Tasks;
using Amazon.DynamoDBv2;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]
namespace corona
{
public class Function
{
private static readonly JsonSerializer _jsonSerializer = new JsonSerializer();
public async Task FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)
{
context.Logger.LogLine($"Beginning to process {dynamoEvent.Records.Count} records...");
try
{
foreach (var record in dynamoEvent.Records)
{
try
{
if (record.EventName == OperationType.REMOVE)
{
string streamRecordJson = SerializeStreamRecord(record.Dynamodb);
await Send(streamRecordJson, context);
}
}
catch (Exception ex)
{
throw ex;
}
}
}
catch (Exception ex)
{
context.Logger.LogLine("Exception Occurred" + ex.Message);
context.Logger.LogLine("Inner Exception Occurred" + ex.InnerException);
}
context.Logger.LogLine("Stream processing complete.");
}
private static async Task Send(string stream, ILambdaContext context)
{
try
{
const string connectionString = "QUEUE_END_POINT";
string queueName = "QUEUE_NAME";
ServiceBusConnectionStringBuilder svc = new ServiceBusConnectionStringBuilder(connectionString);
ServiceBusConnection svc1 = new ServiceBusConnection(svc);
var client = new QueueClient(svc1, queueName, ReceiveMode.PeekLock, RetryPolicy.Default);
var message = new Message(Encoding.UTF8.GetBytes(stream));
await client.SendAsync(message);
}
catch (Exception ex)
{
throw ex;
}
}
private string SerializeStreamRecord(StreamRecord streamRecord)
{
try
{
using (var writer = new StringWriter())
{
_jsonSerializer.Serialize(writer, streamRecord);
return writer.ToString();
}
}
catch (Exception ex)
{
throw ex;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment