Skip to content

Instantly share code, notes, and snippets.

@liammclennan
Created April 28, 2015 00:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save liammclennan/6e277d31942dbfe88b74 to your computer and use it in GitHub Desktop.
Save liammclennan/6e277d31942dbfe88b74 to your computer and use it in GitHub Desktop.
Stream processing via redis
void Main()
{
var sub = Topology.Run("192.168.85.128"/* redis address*/, new Action<StreamSource,StreamSink> [] {
Topology.SplitToWords, Topology.WordCounter
});
var data = new List<string> {
"You can also send events to an Event Hub asynchronously. Sending asynchronously can increase the rate at which a client is able to send events. Both the Send and SendBatch methods are available in asynchronous versions that return a Task object. While this technique can increase throughput, it can also cause the client to continue to send events even while it is being throttled by the Event Hubs service and can result in the client experiencing failures or lost messages if not properly implemented. In addition, you can use the RetryPolicy property on the client to control client retry options.",
"Although it is most common to send events to an Event Hub with a partition key, in some cases you might want to send events directly to a given partition. For example",
"Event Hubs has two primary models for event consumption: direct receivers and higher-level abstractions, such as EventProcessorHost. Direct receivers are responsible for their own coordination of access to partitions within a consumer group.",
"The most direct way to read from a partition within a consumer group is to use the EventHubReceiver class. To create an instance of this class, you must use an instance of the EventHubConsumerGroup class. In the following code example, the partition ID must be specified when creating the receiver for the consumer group.",
"The CreateReceiver method has several overloads that facilitate control over the reader being created. These methods include specifying an offset as either a string or timestamp, and the ability to specify whether to include this specified offset in the returned stream, or start after it. After you create the receiver, you can start receiving events on the returned object. The Receive method has four overloads that control the receive operation parameters, such as batch size and wait time. You can use the asynchronous versions of these methods to increase the throughput of a consumer. For example",
"With respect to a specific partition, the messages are received in the order in which they were sent to the Event Hub. The offset is a string token used to identify a message in a partition. It is important to note that a single partition within a consumer group cannot have more than five concurrent readers connected at any time. As readers connect or become disconnected, their sessions might stay active for several minutes before the service recognizes that they have disconnected. During this time, reconnecting to a partition may fail. For an example of writing a direct receiver for Event Hubs, see the Service Bus Event Hubs Direct Receivers sample."
};
data.ForEach(c => sub.Publish("chunks", JsonConvert.SerializeObject(c)));
}
public static class Topology {
public static void SplitToWords(StreamSource source, StreamSink sink) {
var chunks = source.Get<string>("chunks");
var words = chunks.SelectMany(c => c.Split(new [] {' '}));
sink.Write("words", words);
}
public static void WordCounter(StreamSource source, StreamSink sink) {
var words = source.Get<string>("words");
var counts = new Dictionary<string,int>();
words.Subscribe(w => {
if (!counts.ContainsKey(w)) {
counts[w] = 1;
} else {
counts[w] += 1;
}
});
words.Sample(TimeSpan.FromSeconds(5)).Subscribe(s => counts.Dump());
}
}
5Dictionary<String,Int32> (220 items)4
Key
ValueΞΞ
You 10
can 40
also 10
send 25
events 25
to 100
an 30
Event 35
Hub 10
asynchronously. 5
Sending 5
asynchronously 5
increase 15
the 130
rate 5
at 10
which 10
a 85
client 25
is 30
able 5
events. 5
Both 5
Send 5
and 25
SendBatch 5
methods 15
are 15
available 5
in 35
asynchronous 10
versions 10
that 25
return 5
Task 5
object. 10
While 5
this 20
technique 5
throughput, 5
it 15
cause 5
continue 5
even 5
while 5
being 10
throttled 5
by 5
Hubs 15
service 10
result 5
experiencing 5
failures 5
or 20
lost 5
messages 10
if 5
not 5
properly 5
implemented. 5
In 10
addition, 5
you 25
use 20
RetryPolicy 5
property 5
on 10
control 15
retry 5
options. 5
Although 5
most 10
common 5
with 5
partition 25
key, 5
some 5
cases 5
might 10
want 5
directly 5
given 5
partition. 10
For 15
example 15
has 15
two 5
primary 5
models 5
for 25
event 5
consumption: 5
direct 15
receivers 10
higher-level 5
abstractions, 5
such 10
as 15
EventProcessorHost. 5
Direct 10
responsible 5
their 10
own 5
coordination 5
of 30
access 5
partitions 5
within 15
consumer 20
group. 10
The 20
way 5
read 5
from 5
group 10
EventHubReceiver 5
class. 10
To 5
create 10
instance 10
class, 5
must 10
EventHubConsumerGroup 5
following 5
code 5
example, 5
ID 5
be 5
specified 10
when 5
creating 5
receiver 10
CreateReceiver 5
method 10
several 10
overloads 10
facilitate 5
over 5
reader 5
created. 5
These 5
include 10
specifying 5
offset 15
either 5
string 10
timestamp, 5
ability 5
specify 5
whether 5
returned 10
stream, 5
start 10
after 5
it. 5
After 5
receiver, 5
receiving 5
Receive 5
four 5
receive 5
operation 5
parameters, 5
batch 5
size 5
wait 5
time. 10
these 5
throughput 5
consumer. 5
With 5
respect 5
specific 5
partition, 5
received 5
order 5
they 10
were 5
sent 5
Hub. 5
token 5
used 5
identify 5
message 5
It 5
important 5
note 5
single 5
cannot 5
have 10
more 5
than 5
five 5
concurrent 5
readers 10
connected 5
any 5
As 5
connect 5
become 5
disconnected, 5
sessions 5
stay 5
active 5
minutes 5
before 5
recognizes 5
disconnected. 5
During 5
time, 5
reconnecting 5
may 5
fail. 5
writing 5
Hubs, 5
see 5
Service 5
Bus 5
Receivers 5
sample. 5
2130
5Dictionary<String,Int32> (220 items)4
Key
ValueΞΞ
You 12
can 48
also 12
send 30
events 30
to 120
an 36
Event 42
Hub 12
asynchronously. 6
Sending 6
asynchronously 6
increase 18
the 156
rate 6
at 12
which 12
a 102
client 30
is 36
able 6
events. 6
Both 6
Send 6
and 30
SendBatch 6
methods 18
are 18
available 6
in 42
asynchronous 12
versions 12
that 30
return 6
Task 6
object. 12
While 6
this 24
technique 6
throughput, 6
it 18
cause 6
continue 6
even 6
while 6
being 12
throttled 6
by 6
Hubs 18
service 12
result 6
experiencing 6
failures 6
or 24
lost 6
messages 12
if 6
not 6
properly 6
implemented. 6
In 12
addition, 6
you 30
use 24
RetryPolicy 6
property 6
on 12
control 18
retry 6
options. 6
Although 6
most 12
common 6
with 6
partition 30
key, 6
some 6
cases 6
might 12
want 6
directly 6
given 6
partition. 12
For 18
example 18
has 18
two 6
primary 6
models 6
for 30
event 6
consumption: 6
direct 18
receivers 12
higher-level 6
abstractions, 6
such 12
as 18
EventProcessorHost. 6
Direct 12
responsible 6
their 12
own 6
coordination 6
of 36
access 6
partitions 6
within 18
consumer 24
group. 12
The 24
way 6
read 6
from 6
group 12
EventHubReceiver 6
class. 12
To 6
create 12
instance 12
class, 6
must 12
EventHubConsumerGroup 6
following 6
code 6
example, 6
ID 6
be 6
specified 12
when 6
creating 6
receiver 12
CreateReceiver 6
method 12
several 12
overloads 12
facilitate 6
over 6
reader 6
created. 6
These 6
include 12
specifying 6
offset 18
either 6
string 12
timestamp, 6
ability 6
specify 6
whether 6
returned 12
stream, 6
start 12
after 6
it. 6
After 6
receiver, 6
receiving 6
Receive 6
four 6
receive 6
operation 6
parameters, 6
batch 6
size 6
wait 6
time. 12
these 6
throughput 6
consumer. 6
With 6
respect 6
specific 6
partition, 6
received 6
order 6
they 12
were 6
sent 6
Hub. 6
token 6
used 6
identify 6
message 6
It 6
important 6
note 6
single 6
cannot 6
have 12
more 6
than 6
five 6
concurrent 6
readers 12
connected 6
any 6
As 6
connect 6
become 6
disconnected, 6
sessions 6
stay 6
active 6
minutes 6
before 6
recognizes 6
disconnected. 6
During 6
time, 6
reconnecting 6
may 6
fail. 6
writing 6
Hubs, 6
see 6
Service 6
Bus 6
Receivers 6
sample. 6
2556
5Dictionary<String,Int32> (220 items)4
Key
ValueΞΞ
You 6
can 24
also 6
send 15
events 15
to 60
an 18
Event 21
Hub 6
asynchronously. 3
Sending 3
asynchronously 3
increase 9
the 78
rate 3
at 6
which 6
a 51
client 15
is 18
able 3
events. 3
Both 3
Send 3
and 15
SendBatch 3
methods 9
are 9
available 3
in 21
asynchronous 6
versions 6
that 15
return 3
Task 3
object. 6
While 3
this 12
technique 3
throughput, 3
it 9
cause 3
continue 3
even 3
while 3
being 6
throttled 3
by 3
Hubs 9
service 6
result 3
experiencing 3
failures 3
or 12
lost 3
messages 6
if 3
not 3
properly 3
implemented. 3
In 6
addition, 3
you 15
use 12
RetryPolicy 3
property 3
on 6
control 9
retry 3
options. 3
Although 3
most 6
common 3
with 3
partition 15
key, 3
some 3
cases 3
might 6
want 3
directly 3
given 3
partition. 6
For 9
example 9
has 9
two 3
primary 3
models 3
for 15
event 3
consumption: 3
direct 9
receivers 6
higher-level 3
abstractions, 3
such 6
as 9
EventProcessorHost. 3
Direct 6
responsible 3
their 6
own 3
coordination 3
of 18
access 3
partitions 3
within 9
consumer 12
group. 6
The 12
way 3
read 3
from 3
group 6
EventHubReceiver 3
class. 6
To 3
create 6
instance 6
class, 3
must 6
EventHubConsumerGroup 3
following 3
code 3
example, 3
ID 3
be 3
specified 6
when 3
creating 3
receiver 6
CreateReceiver 3
method 6
several 6
overloads 6
facilitate 3
over 3
reader 3
created. 3
These 3
include 6
specifying 3
offset 9
either 3
string 6
timestamp, 3
ability 3
specify 3
whether 3
returned 6
stream, 3
start 6
after 3
it. 3
After 3
receiver, 3
receiving 3
Receive 3
four 3
receive 3
operation 3
parameters, 3
batch 3
size 3
wait 3
time. 6
these 3
throughput 3
consumer. 3
With 3
respect 3
specific 3
partition, 3
received 3
order 3
they 6
were 3
sent 3
Hub. 3
token 3
used 3
identify 3
message 3
It 3
important 3
note 3
single 3
cannot 3
have 6
more 3
than 3
five 3
concurrent 3
readers 6
connected 3
any 3
As 3
connect 3
become 3
disconnected, 3
sessions 3
stay 3
active 3
minutes 3
before 3
recognizes 3
disconnected. 3
During 3
time, 3
reconnecting 3
may 3
fail. 3
writing 3
Hubs, 3
see 3
Service 3
Bus 3
Receivers 3
sample. 3
1278
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment