Created
April 28, 2015 00:05
-
-
Save liammclennan/6e277d31942dbfe88b74 to your computer and use it in GitHub Desktop.
Stream processing via redis
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
void Main() | |
{ | |
var sub = Topology.Run("192.168.85.128"/* redis address*/, new Action<StreamSource,StreamSink> [] { | |
Topology.SplitToWords, Topology.WordCounter | |
}); | |
var data = new List<string> { | |
"You can also send events to an Event Hub asynchronously. Sending asynchronously can increase the rate at which a client is able to send events. Both the Send and SendBatch methods are available in asynchronous versions that return a Task object. While this technique can increase throughput, it can also cause the client to continue to send events even while it is being throttled by the Event Hubs service and can result in the client experiencing failures or lost messages if not properly implemented. In addition, you can use the RetryPolicy property on the client to control client retry options.", | |
"Although it is most common to send events to an Event Hub with a partition key, in some cases you might want to send events directly to a given partition. For example", | |
"Event Hubs has two primary models for event consumption: direct receivers and higher-level abstractions, such as EventProcessorHost. Direct receivers are responsible for their own coordination of access to partitions within a consumer group.", | |
"The most direct way to read from a partition within a consumer group is to use the EventHubReceiver class. To create an instance of this class, you must use an instance of the EventHubConsumerGroup class. In the following code example, the partition ID must be specified when creating the receiver for the consumer group.", | |
"The CreateReceiver method has several overloads that facilitate control over the reader being created. These methods include specifying an offset as either a string or timestamp, and the ability to specify whether to include this specified offset in the returned stream, or start after it. After you create the receiver, you can start receiving events on the returned object. The Receive method has four overloads that control the receive operation parameters, such as batch size and wait time. You can use the asynchronous versions of these methods to increase the throughput of a consumer. For example", | |
"With respect to a specific partition, the messages are received in the order in which they were sent to the Event Hub. The offset is a string token used to identify a message in a partition. It is important to note that a single partition within a consumer group cannot have more than five concurrent readers connected at any time. As readers connect or become disconnected, their sessions might stay active for several minutes before the service recognizes that they have disconnected. During this time, reconnecting to a partition may fail. For an example of writing a direct receiver for Event Hubs, see the Service Bus Event Hubs Direct Receivers sample." | |
}; | |
data.ForEach(c => sub.Publish("chunks", JsonConvert.SerializeObject(c))); | |
} | |
public static class Topology { | |
public static void SplitToWords(StreamSource source, StreamSink sink) { | |
var chunks = source.Get<string>("chunks"); | |
var words = chunks.SelectMany(c => c.Split(new [] {' '})); | |
sink.Write("words", words); | |
} | |
public static void WordCounter(StreamSource source, StreamSink sink) { | |
var words = source.Get<string>("words"); | |
var counts = new Dictionary<string,int>(); | |
words.Subscribe(w => { | |
if (!counts.ContainsKey(w)) { | |
counts[w] = 1; | |
} else { | |
counts[w] += 1; | |
} | |
}); | |
words.Sample(TimeSpan.FromSeconds(5)).Subscribe(s => counts.Dump()); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
5Dictionary<String,Int32> (220 items)4 | |
Key | |
ValueΞΞ | |
You 10 | |
can 40 | |
also 10 | |
send 25 | |
events 25 | |
to 100 | |
an 30 | |
Event 35 | |
Hub 10 | |
asynchronously. 5 | |
Sending 5 | |
asynchronously 5 | |
increase 15 | |
the 130 | |
rate 5 | |
at 10 | |
which 10 | |
a 85 | |
client 25 | |
is 30 | |
able 5 | |
events. 5 | |
Both 5 | |
Send 5 | |
and 25 | |
SendBatch 5 | |
methods 15 | |
are 15 | |
available 5 | |
in 35 | |
asynchronous 10 | |
versions 10 | |
that 25 | |
return 5 | |
Task 5 | |
object. 10 | |
While 5 | |
this 20 | |
technique 5 | |
throughput, 5 | |
it 15 | |
cause 5 | |
continue 5 | |
even 5 | |
while 5 | |
being 10 | |
throttled 5 | |
by 5 | |
Hubs 15 | |
service 10 | |
result 5 | |
experiencing 5 | |
failures 5 | |
or 20 | |
lost 5 | |
messages 10 | |
if 5 | |
not 5 | |
properly 5 | |
implemented. 5 | |
In 10 | |
addition, 5 | |
you 25 | |
use 20 | |
RetryPolicy 5 | |
property 5 | |
on 10 | |
control 15 | |
retry 5 | |
options. 5 | |
Although 5 | |
most 10 | |
common 5 | |
with 5 | |
partition 25 | |
key, 5 | |
some 5 | |
cases 5 | |
might 10 | |
want 5 | |
directly 5 | |
given 5 | |
partition. 10 | |
For 15 | |
example 15 | |
has 15 | |
two 5 | |
primary 5 | |
models 5 | |
for 25 | |
event 5 | |
consumption: 5 | |
direct 15 | |
receivers 10 | |
higher-level 5 | |
abstractions, 5 | |
such 10 | |
as 15 | |
EventProcessorHost. 5 | |
Direct 10 | |
responsible 5 | |
their 10 | |
own 5 | |
coordination 5 | |
of 30 | |
access 5 | |
partitions 5 | |
within 15 | |
consumer 20 | |
group. 10 | |
The 20 | |
way 5 | |
read 5 | |
from 5 | |
group 10 | |
EventHubReceiver 5 | |
class. 10 | |
To 5 | |
create 10 | |
instance 10 | |
class, 5 | |
must 10 | |
EventHubConsumerGroup 5 | |
following 5 | |
code 5 | |
example, 5 | |
ID 5 | |
be 5 | |
specified 10 | |
when 5 | |
creating 5 | |
receiver 10 | |
CreateReceiver 5 | |
method 10 | |
several 10 | |
overloads 10 | |
facilitate 5 | |
over 5 | |
reader 5 | |
created. 5 | |
These 5 | |
include 10 | |
specifying 5 | |
offset 15 | |
either 5 | |
string 10 | |
timestamp, 5 | |
ability 5 | |
specify 5 | |
whether 5 | |
returned 10 | |
stream, 5 | |
start 10 | |
after 5 | |
it. 5 | |
After 5 | |
receiver, 5 | |
receiving 5 | |
Receive 5 | |
four 5 | |
receive 5 | |
operation 5 | |
parameters, 5 | |
batch 5 | |
size 5 | |
wait 5 | |
time. 10 | |
these 5 | |
throughput 5 | |
consumer. 5 | |
With 5 | |
respect 5 | |
specific 5 | |
partition, 5 | |
received 5 | |
order 5 | |
they 10 | |
were 5 | |
sent 5 | |
Hub. 5 | |
token 5 | |
used 5 | |
identify 5 | |
message 5 | |
It 5 | |
important 5 | |
note 5 | |
single 5 | |
cannot 5 | |
have 10 | |
more 5 | |
than 5 | |
five 5 | |
concurrent 5 | |
readers 10 | |
connected 5 | |
any 5 | |
As 5 | |
connect 5 | |
become 5 | |
disconnected, 5 | |
sessions 5 | |
stay 5 | |
active 5 | |
minutes 5 | |
before 5 | |
recognizes 5 | |
disconnected. 5 | |
During 5 | |
time, 5 | |
reconnecting 5 | |
may 5 | |
fail. 5 | |
writing 5 | |
Hubs, 5 | |
see 5 | |
Service 5 | |
Bus 5 | |
Receivers 5 | |
sample. 5 | |
2130 | |
5Dictionary<String,Int32> (220 items)4 | |
Key | |
ValueΞΞ | |
You 12 | |
can 48 | |
also 12 | |
send 30 | |
events 30 | |
to 120 | |
an 36 | |
Event 42 | |
Hub 12 | |
asynchronously. 6 | |
Sending 6 | |
asynchronously 6 | |
increase 18 | |
the 156 | |
rate 6 | |
at 12 | |
which 12 | |
a 102 | |
client 30 | |
is 36 | |
able 6 | |
events. 6 | |
Both 6 | |
Send 6 | |
and 30 | |
SendBatch 6 | |
methods 18 | |
are 18 | |
available 6 | |
in 42 | |
asynchronous 12 | |
versions 12 | |
that 30 | |
return 6 | |
Task 6 | |
object. 12 | |
While 6 | |
this 24 | |
technique 6 | |
throughput, 6 | |
it 18 | |
cause 6 | |
continue 6 | |
even 6 | |
while 6 | |
being 12 | |
throttled 6 | |
by 6 | |
Hubs 18 | |
service 12 | |
result 6 | |
experiencing 6 | |
failures 6 | |
or 24 | |
lost 6 | |
messages 12 | |
if 6 | |
not 6 | |
properly 6 | |
implemented. 6 | |
In 12 | |
addition, 6 | |
you 30 | |
use 24 | |
RetryPolicy 6 | |
property 6 | |
on 12 | |
control 18 | |
retry 6 | |
options. 6 | |
Although 6 | |
most 12 | |
common 6 | |
with 6 | |
partition 30 | |
key, 6 | |
some 6 | |
cases 6 | |
might 12 | |
want 6 | |
directly 6 | |
given 6 | |
partition. 12 | |
For 18 | |
example 18 | |
has 18 | |
two 6 | |
primary 6 | |
models 6 | |
for 30 | |
event 6 | |
consumption: 6 | |
direct 18 | |
receivers 12 | |
higher-level 6 | |
abstractions, 6 | |
such 12 | |
as 18 | |
EventProcessorHost. 6 | |
Direct 12 | |
responsible 6 | |
their 12 | |
own 6 | |
coordination 6 | |
of 36 | |
access 6 | |
partitions 6 | |
within 18 | |
consumer 24 | |
group. 12 | |
The 24 | |
way 6 | |
read 6 | |
from 6 | |
group 12 | |
EventHubReceiver 6 | |
class. 12 | |
To 6 | |
create 12 | |
instance 12 | |
class, 6 | |
must 12 | |
EventHubConsumerGroup 6 | |
following 6 | |
code 6 | |
example, 6 | |
ID 6 | |
be 6 | |
specified 12 | |
when 6 | |
creating 6 | |
receiver 12 | |
CreateReceiver 6 | |
method 12 | |
several 12 | |
overloads 12 | |
facilitate 6 | |
over 6 | |
reader 6 | |
created. 6 | |
These 6 | |
include 12 | |
specifying 6 | |
offset 18 | |
either 6 | |
string 12 | |
timestamp, 6 | |
ability 6 | |
specify 6 | |
whether 6 | |
returned 12 | |
stream, 6 | |
start 12 | |
after 6 | |
it. 6 | |
After 6 | |
receiver, 6 | |
receiving 6 | |
Receive 6 | |
four 6 | |
receive 6 | |
operation 6 | |
parameters, 6 | |
batch 6 | |
size 6 | |
wait 6 | |
time. 12 | |
these 6 | |
throughput 6 | |
consumer. 6 | |
With 6 | |
respect 6 | |
specific 6 | |
partition, 6 | |
received 6 | |
order 6 | |
they 12 | |
were 6 | |
sent 6 | |
Hub. 6 | |
token 6 | |
used 6 | |
identify 6 | |
message 6 | |
It 6 | |
important 6 | |
note 6 | |
single 6 | |
cannot 6 | |
have 12 | |
more 6 | |
than 6 | |
five 6 | |
concurrent 6 | |
readers 12 | |
connected 6 | |
any 6 | |
As 6 | |
connect 6 | |
become 6 | |
disconnected, 6 | |
sessions 6 | |
stay 6 | |
active 6 | |
minutes 6 | |
before 6 | |
recognizes 6 | |
disconnected. 6 | |
During 6 | |
time, 6 | |
reconnecting 6 | |
may 6 | |
fail. 6 | |
writing 6 | |
Hubs, 6 | |
see 6 | |
Service 6 | |
Bus 6 | |
Receivers 6 | |
sample. 6 | |
2556 | |
5Dictionary<String,Int32> (220 items)4 | |
Key | |
ValueΞΞ | |
You 6 | |
can 24 | |
also 6 | |
send 15 | |
events 15 | |
to 60 | |
an 18 | |
Event 21 | |
Hub 6 | |
asynchronously. 3 | |
Sending 3 | |
asynchronously 3 | |
increase 9 | |
the 78 | |
rate 3 | |
at 6 | |
which 6 | |
a 51 | |
client 15 | |
is 18 | |
able 3 | |
events. 3 | |
Both 3 | |
Send 3 | |
and 15 | |
SendBatch 3 | |
methods 9 | |
are 9 | |
available 3 | |
in 21 | |
asynchronous 6 | |
versions 6 | |
that 15 | |
return 3 | |
Task 3 | |
object. 6 | |
While 3 | |
this 12 | |
technique 3 | |
throughput, 3 | |
it 9 | |
cause 3 | |
continue 3 | |
even 3 | |
while 3 | |
being 6 | |
throttled 3 | |
by 3 | |
Hubs 9 | |
service 6 | |
result 3 | |
experiencing 3 | |
failures 3 | |
or 12 | |
lost 3 | |
messages 6 | |
if 3 | |
not 3 | |
properly 3 | |
implemented. 3 | |
In 6 | |
addition, 3 | |
you 15 | |
use 12 | |
RetryPolicy 3 | |
property 3 | |
on 6 | |
control 9 | |
retry 3 | |
options. 3 | |
Although 3 | |
most 6 | |
common 3 | |
with 3 | |
partition 15 | |
key, 3 | |
some 3 | |
cases 3 | |
might 6 | |
want 3 | |
directly 3 | |
given 3 | |
partition. 6 | |
For 9 | |
example 9 | |
has 9 | |
two 3 | |
primary 3 | |
models 3 | |
for 15 | |
event 3 | |
consumption: 3 | |
direct 9 | |
receivers 6 | |
higher-level 3 | |
abstractions, 3 | |
such 6 | |
as 9 | |
EventProcessorHost. 3 | |
Direct 6 | |
responsible 3 | |
their 6 | |
own 3 | |
coordination 3 | |
of 18 | |
access 3 | |
partitions 3 | |
within 9 | |
consumer 12 | |
group. 6 | |
The 12 | |
way 3 | |
read 3 | |
from 3 | |
group 6 | |
EventHubReceiver 3 | |
class. 6 | |
To 3 | |
create 6 | |
instance 6 | |
class, 3 | |
must 6 | |
EventHubConsumerGroup 3 | |
following 3 | |
code 3 | |
example, 3 | |
ID 3 | |
be 3 | |
specified 6 | |
when 3 | |
creating 3 | |
receiver 6 | |
CreateReceiver 3 | |
method 6 | |
several 6 | |
overloads 6 | |
facilitate 3 | |
over 3 | |
reader 3 | |
created. 3 | |
These 3 | |
include 6 | |
specifying 3 | |
offset 9 | |
either 3 | |
string 6 | |
timestamp, 3 | |
ability 3 | |
specify 3 | |
whether 3 | |
returned 6 | |
stream, 3 | |
start 6 | |
after 3 | |
it. 3 | |
After 3 | |
receiver, 3 | |
receiving 3 | |
Receive 3 | |
four 3 | |
receive 3 | |
operation 3 | |
parameters, 3 | |
batch 3 | |
size 3 | |
wait 3 | |
time. 6 | |
these 3 | |
throughput 3 | |
consumer. 3 | |
With 3 | |
respect 3 | |
specific 3 | |
partition, 3 | |
received 3 | |
order 3 | |
they 6 | |
were 3 | |
sent 3 | |
Hub. 3 | |
token 3 | |
used 3 | |
identify 3 | |
message 3 | |
It 3 | |
important 3 | |
note 3 | |
single 3 | |
cannot 3 | |
have 6 | |
more 3 | |
than 3 | |
five 3 | |
concurrent 3 | |
readers 6 | |
connected 3 | |
any 3 | |
As 3 | |
connect 3 | |
become 3 | |
disconnected, 3 | |
sessions 3 | |
stay 3 | |
active 3 | |
minutes 3 | |
before 3 | |
recognizes 3 | |
disconnected. 3 | |
During 3 | |
time, 3 | |
reconnecting 3 | |
may 3 | |
fail. 3 | |
writing 3 | |
Hubs, 3 | |
see 3 | |
Service 3 | |
Bus 3 | |
Receivers 3 | |
sample. 3 | |
1278 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment