Skip to content

Instantly share code, notes, and snippets.

@sijie
Created August 2, 2019 08:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sijie/297e3de42f8184350e651f11db91408b to your computer and use it in GitHub Desktop.
Save sijie/297e3de42f8184350e651f11db91408b to your computer and use it in GitHub Desktop.
Kafka source with a String function example
protected void testKafkaSourcePulsarFunction(TopicName pulsarTopic) throws Exception {
    final String tenantName = pulsarTopic.getTenant();
    final String namespaceName = pulsarTopic.getNamespacePortion();

    // create kafka topic
    final String kafkaTopic = "test-kafka-source-pulsar-function-" + Base58.randomString(8);
    final NewTopic newKafkaTopic = new NewTopic(kafkaTopic, 1, (short) 1);
    kafkaAdmin.createTopics(
        Sets.newHashSet(newKafkaTopic),
        new CreateTopicsOptions()
    );

    // register Schema.STRING to the pulsar topic. because Kafka source is running
    // first, it will trigger creating the pulsar topic with Schema.BYTES. Hence when
    // running ExclamationFunction, the function will fail to run because it attempts
    // to consume the topic (with Schema.BYTES) using Schema.STRING.
    admin.schemas().createSchema(pulsarTopic.toString(), Schema.STRING.getSchemaInfo());

    // create pulsar output topic
    final String outputPulsarTopic = pulsarTopic.toString() + "-output";
    @Cleanup
    final Consumer<String> pulsarConsumer = client.newConsumer(Schema.STRING)
        .subscriptionName("test-sub")
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        .topic(outputPulsarTopic)
        .subscribe();

    // create source
    final String sourceName = "test-kafka-source";
    createKafkaSource(
        tenantName,
        namespaceName,
        sourceName,
        kafkaTopic,
        pulsarTopic.toString()
    );

    // create function
    final String functionName = "test-exclamation-function";
    createExclamationFunction(
        tenantName,
        namespaceName,
        functionName,
        pulsarTopic.toString(),
        outputPulsarTopic
    );

    final int numMessages = 10;
    // once both the kafka source and function is running. produce the data.
    try (Producer<String, String> producer = newKafkaProducer(
        new StringSerializer(), new StringSerializer()
    )) {
        for (int i = 0; i < numMessages; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(
                kafkaTopic,
                "value-" + i
            );
            producer.send(record);
        }
    }

    // receive messages from pulsar output topic (written by Exclamation Function)
    for (int i = 0; i < numMessages; i++) {
        Message<String> message = pulsarConsumer.receive();
        assertEquals("value-" + i + "!", message.getValue());
    }
}

protected boolean assertSourceInstanceRunning(SourceInstanceStatusData status) {
    return status.isRunning();
}

protected boolean assertFunctionInstanceRunning(FunctionInstanceStatusData status) {
    return status.isRunning();
}

protected void createKafkaSource(String tenantName,
                                 String namespaceName,
                                 String sourceName,
                                 String kafkaTopic,
                                 String pulsarTopicName) throws Exception {
    Map<String, Object> kafkaSourceConfig = buildKafkaSourceConfig(
        "PLAINTEXT://" + pulsarService().getClusterName()
            + "-" + ExternalServices.KAFKA + ":9092",
        kafkaTopic,
        "test-group"
    );
    log.info("Source config : {}", kafkaSourceConfig);

    try {
        SourceConfig sourceConfig = SourceConfig.builder()
            .archive("builtin://kafka")
            .tenant(tenantName)
            .namespace(namespaceName)
            .name(sourceName)
            .parallelism(1)
            .configs(kafkaSourceConfig)
            .topicName(pulsarTopicName)
            .build();
        admin.sources().createSource(
            sourceConfig,
            sourceConfig.getArchive()
        );
    } catch (PulsarAdminException pae) {
        log.error("Failed to create a kafka source : {}", pae.getHttpError(), pae);
        throw pae;
    }

    SourceConfig config = admin.sources().getSource(
        tenantName,
        namespaceName,
        sourceName
    );
    log.info("Fetch the source config for {}/{}/{}: {}",
        tenantName, namespaceName, sourceName, config);

    TestUtils.<Void, SourceStatus>retryUntil(
        null,
        ignored -> admin.sources().getSourceStatus(
            tenantName,
            namespaceName,
            sourceName
        ),
        status -> {
            log.info("Get source status : {}", status);
            if (status.getNumRunning() < 1) {
                return false;
            } else {
                for (SourceInstanceStatus is : status.getInstances()) {
                    if (!assertSourceInstanceRunning(is.getStatus())) {
                        return false;
                    }
                }
                return true;
            }
        }
    );
}

protected void createExclamationFunction(String tenantName,
                                         String namespaceName,
                                         String functionName,
                                         String inputPulsarTopic,
                                         String outputPulsarTopic) throws Exception {
    PulsarContainerServiceBase containerService = (PulsarContainerServiceBase) pulsarService();
    String[] cmdArgs = new String[]{
        "/pulsar/bin/pulsar-admin",
        "functions",
        "create",
        "--tenant", tenantName,
        "--namespace", namespaceName,
        "--name", functionName,
        "--inputs", inputPulsarTopic,
        "--output", outputPulsarTopic,
        "--classname",
        "org.apache.pulsar.functions.api.examples.ExclamationFunction",
        "--jar",
        "/pulsar/examples/api-examples.jar"
    };
    ExecResult result = containerService.execCmd(
        PulsarComponent.BROKER,
        cmdArgs
    );
    assertEquals(0, result.getExitCode());

    TestUtils.<Void, FunctionStatus>retryUntil(
        null,
        ignored -> admin.functions().getFunctionStatus(
            tenantName,
            namespaceName,
            functionName
        ),
        status -> {
            log.info("Get function status : {}", status);
            if (status.getNumRunning() < 1) {
                return false;
            } else {
                for (FunctionInstanceStatus is : status.getInstances()) {
                    if (!assertFunctionInstanceRunning(is.getStatus())) {
                        return false;
                    }
                }
                return true;
            }
        }
    );
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment