Instantly share code, notes, and snippets.

🎯
Focusing on code - always. 24/7

Matthias Wessendorf matzew

View GitHub Profile
View ReactiveMessaging_Reply.md

Reactive Messaging as Knative Reply Services

Knative Services are HTTP servers, that can receive input values, and their availablity and scaling is controlled by the autoscaler of Knative Serving.

Receiving CloudEvents

In Eventing such a HTTP application, can be subscribed to a channel, to receive message from that channel, as CloudEvents, over http. The yaml for that would look like:

apiVersion: eventing.knative.dev/v1alpha1
View gist:8e4588bb7fb8a6d17a140db6f94a1280

| Field
| Type | Description | Constraints | | ------------------------ | ---------------------------------- | -------------------------------------------------------------------------- | -------------------------------------- | | provisioner* | ObjectReference | The name of the provisioner to create the resources that back the Channel. | Immutable. | | arguments | runtime.RawExtension (JSON object) | Arguments to be passed to the provisioner. | | | subscribable.subscribers | ChannelSubscriberSpec[] | Information about subscriptions used to implement message forwarding. | Filled out by Subscription Controller. |

View dumpy.log
2019/01/19 12:16:09 Openshift Http Request Dumper received a message: POST / HTTP/1.1
Host: dumpy.myproject.svc.cluster.local
Accept-Encoding: gzip
Content-Length: 997
Content-Type: text/plain
User-Agent: Go-http-client/1.1
X-B3-Parentspanid: 02b1068d49abc225
X-B3-Sampled: 1
X-B3-Spanid: a238e3f2c246dac4
X-B3-Traceid: 02b1068d49abc225
View SimpleProcessor.java
package io.my.streams;
import java.util.function.Function;
@In("from-kafka")
@Out("to-http")
public class SimpleProcessor implements Function<String, String> {
@Override
public String apply(final String payload) {
View HttpProcessor.java
@ApplicationScoped
public class MyCloudEventProcessor {
@Incoming("from-http")
@Outgoing("to-http")
public HttpMessage<CloudEventMessage> process(CloudEventMessage<String> message) {
View kafka-sample.yaml
apiVersion: kafka.strimzi.io/v1alpha1
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
replicas: 1
listeners:
plain: {}
tls: {}
View ReactiveMessaging.java
package io.smallrye.reactive.messaging.example.eventclouds;
import io.smallrye.reactive.messaging.cloudevents.CloudEventMessage;
import io.smallrye.reactive.messaging.http.HttpMessage;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
View source.java
@Incoming("source")
@Outgoing("to-http")
public HttpMessage<CloudEventMessage> process(CloudEventMessage<String> message) {
return HttpMessage.HttpMessageBuilder.<CloudEventMessage>create()
.withMethod("PUT")
.withPayload(message)
.withHeader("Content-Type", "application/cloudevents+json")
.build();
View CloudEvent-java.md

Create the obj:

final CloudEvent<MyCustomEvent> cloudEvent = new CloudEventBuilder<MyCustomEvent>()
    .type("My.Cloud.Event.Type")
    .id("1234-1234-1234")
    .source("/trigger)
    .data(new MyCustomEvent(...))
    .build();
View gist:367291fcc6cb609c5f1e27b72532f3dc
User-Agent: Go-http-client/1.1
X-B3-Parentspanid: cfce8ebfb2c048ef
X-B3-Sampled: 1
X-B3-Spanid: 865946da443d23f8
X-B3-Traceid: cfce8ebfb2c048ef
X-Forwarded-For: 127.0.0.1
X-Forwarded-Proto: http
X-Request-Id: b24e72cb-a63f-9e82-aa33-3cbf5ac12dad