Skip to content

Instantly share code, notes, and snippets.

Created August 31, 2021 18:13
Show Gist options
  • Save zpqrtbnk/07ca326fca1a0a77ed5fe56f59915a26 to your computer and use it in GitHub Desktop.
Save zpqrtbnk/07ca326fca1a0a77ed5fe56f59915a26 to your computer and use it in GitHub Desktop.
// Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
using System.Threading;
using System.Threading.Tasks;
using Hazelcast.Core;
using Hazelcast.Serialization;
using Hazelcast.Testing;
using Hazelcast.Testing.Logging;
using Microsoft.Extensions.Logging;
using NUnit.Framework;
namespace Hazelcast.Tests.Sandbox
// define extension methods, but don't leak them
using SupportTests_;
namespace SupportTests_
public static class Extensions
public static SerializationOptions AddSerializer<TSerialized, TSerializer>(this SerializationOptions options)
where TSerializer : ISerializer, new()
options.Serializers.Add(new SerializerOptions { SerializedType = typeof(TSerialized), Creator = () => new TSerializer() });
return options;
public class SupportTests : SingleMemberRemoteTestBase
// items for the queue
private class QueueItem { }
// dummy serializer for the queue items
private class QueueItemSerializer : IStreamSerializer<QueueItem>
public void Dispose()
{ }
public int TypeId { get; } = 666;
public QueueItem Read(IObjectDataInput input)
=> new QueueItem();
public void Write(IObjectDataOutput output, QueueItem obj)
{ }
// log to HConsole
protected override ILoggerFactory CreateLoggerFactory() =>
Microsoft.Extensions.Logging.LoggerFactory.Create(builder => builder.AddHConsole());
public async Task TryToReproduceIssue()
var queueName = "queue" + CreateUniqueName();
var queueSubscribed = 0;
var queueEventsCount = 0;
using var _ = HConsole.Capture(o => o
var options = new HazelcastOptionsBuilder()
.With(o =>
// our test environment provides a cluster, and we need to configure the client accordingly
o.ClusterName = RcCluster.Id;
// our cluster lives on localhost
// fail fast, default timeout is infinite
o.Networking.ConnectionRetry.ClusterConnectionTimeoutMilliseconds = 4000;
// our test environment provides a logger factory
o.LoggerFactory.Creator = () => LoggerFactory;
// we need to be able to (de)serialize queue items
o.Serialization.AddSerializer<QueueItem, QueueItemSerializer>();
// subscribe
o.AddSubscriber(events => events
.StateChanged(async (c, a) =>
if (a.State == ClientState.Connected)
if (Interlocked.CompareExchange(ref queueSubscribed, 1, 0) == 1) return; // only once!
HConsole.WriteLine(this, "State == Connected");
var q = await c.GetQueueAsync<QueueItem>(queueName).ConfigureAwait(false);
await q.SubscribeAsync(queueEvents => queueEvents
.ItemAdded((xq, xa) =>
HConsole.WriteLine(this, "Received item");
Interlocked.Increment(ref queueEventsCount);
HConsole.WriteLine(this, "Start new client...");
var client = await HazelcastClientFactory.StartNewClientAsync(options).ConfigureAwait(false);
HConsole.WriteLine(this, "Get queue...");
var queue = await client.GetQueueAsync<QueueItem>(queueName).ConfigureAwait(false);
HConsole.WriteLine(this, "Offer items...");
const int itemsCount = 12;
for (var i = 0; i < itemsCount; i++)
Assert.That(await queue.OfferAsync(new QueueItem()).ConfigureAwait(false), Is.True);
// eventually, the event count will match
HConsole.WriteLine(this, "Count events...");
await AssertEx.SucceedsEventually(() => Assert.That(queueEventsCount, Is.EqualTo(itemsCount)), 4000, 200).ConfigureAwait(false);
HConsole.WriteLine(this, "Success!");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment