Skip to content

Instantly share code, notes, and snippets.

@zpqrtbnk
Created August 31, 2021 18:13
Show Gist options
  • Save zpqrtbnk/07ca326fca1a0a77ed5fe56f59915a26 to your computer and use it in GitHub Desktop.
Save zpqrtbnk/07ca326fca1a0a77ed5fe56f59915a26 to your computer and use it in GitHub Desktop.
HazelcastIssue466
// Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using System.Threading;
using System.Threading.Tasks;
using Hazelcast.Core;
using Hazelcast.Serialization;
using Hazelcast.Testing;
using Hazelcast.Testing.Logging;
using Microsoft.Extensions.Logging;
using NUnit.Framework;
namespace Hazelcast.Tests.Sandbox
{
// define extension methods, but don't leak them
using SupportTests_;
namespace SupportTests_
{
public static class Extensions
{
public static SerializationOptions AddSerializer<TSerialized, TSerializer>(this SerializationOptions options)
where TSerializer : ISerializer, new()
{
options.Serializers.Add(new SerializerOptions { SerializedType = typeof(TSerialized), Creator = () => new TSerializer() });
return options;
}
}
}
[TestFixture]
public class SupportTests : SingleMemberRemoteTestBase
{
// items for the queue
private class QueueItem { }
// dummy serializer for the queue items
private class QueueItemSerializer : IStreamSerializer<QueueItem>
{
public void Dispose()
{ }
public int TypeId { get; } = 666;
public QueueItem Read(IObjectDataInput input)
=> new QueueItem();
public void Write(IObjectDataOutput output, QueueItem obj)
{ }
}
// log to HConsole
protected override ILoggerFactory CreateLoggerFactory() =>
Microsoft.Extensions.Logging.LoggerFactory.Create(builder => builder.AddHConsole());
[Test]
[Timeout(20_000)]
public async Task TryToReproduceIssue()
{
var queueName = "queue" + CreateUniqueName();
var queueSubscribed = 0;
var queueEventsCount = 0;
using var _ = HConsole.Capture(o => o
.ClearAll()
.Configure<HConsoleLoggerProvider>().SetPrefix("LOG").SetMaxLevel()
.Configure().SetMinLevel()
.Configure(this).SetPrefix("TEST").SetMaxLevel()
);
var options = new HazelcastOptionsBuilder()
.With(o =>
{
// our test environment provides a cluster, and we need to configure the client accordingly
o.ClusterName = RcCluster.Id;
// our cluster lives on localhost
o.Networking.Addresses.Clear();
o.Networking.Addresses.Add("127.0.0.1:5701");
// fail fast, default timeout is infinite
o.Networking.ConnectionRetry.ClusterConnectionTimeoutMilliseconds = 4000;
// our test environment provides a logger factory
o.LoggerFactory.Creator = () => LoggerFactory;
// we need to be able to (de)serialize queue items
o.Serialization.AddSerializer<QueueItem, QueueItemSerializer>();
// subscribe
o.AddSubscriber(events => events
.StateChanged(async (c, a) =>
{
if (a.State == ClientState.Connected)
{
if (Interlocked.CompareExchange(ref queueSubscribed, 1, 0) == 1) return; // only once!
HConsole.WriteLine(this, "State == Connected");
var q = await c.GetQueueAsync<QueueItem>(queueName).ConfigureAwait(false);
await q.SubscribeAsync(queueEvents => queueEvents
.ItemAdded((xq, xa) =>
{
HConsole.WriteLine(this, "Received item");
Interlocked.Increment(ref queueEventsCount);
})).ConfigureAwait(false);
}
}));
})
.Build();
HConsole.WriteLine(this, "Start new client...");
var client = await HazelcastClientFactory.StartNewClientAsync(options).ConfigureAwait(false);
HConsole.WriteLine(this, "Get queue...");
var queue = await client.GetQueueAsync<QueueItem>(queueName).ConfigureAwait(false);
HConsole.WriteLine(this, "Offer items...");
const int itemsCount = 12;
for (var i = 0; i < itemsCount; i++)
{
Assert.That(await queue.OfferAsync(new QueueItem()).ConfigureAwait(false), Is.True);
}
// eventually, the event count will match
HConsole.WriteLine(this, "Count events...");
await AssertEx.SucceedsEventually(() => Assert.That(queueEventsCount, Is.EqualTo(itemsCount)), 4000, 200).ConfigureAwait(false);
HConsole.WriteLine(this, "Success!");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment