Skip to content

Instantly share code, notes, and snippets.

@mitch-seymour
Last active April 8, 2019 01:58
Show Gist options
  • Save mitch-seymour/6f6a0c66e7e33b34eb85776502da7047 to your computer and use it in GitHub Desktop.
Save mitch-seymour/6f6a0c66e7e33b34eb85776502da7047 to your computer and use it in GitHub Desktop.
Proposal for a guest post to Confluent's blog

Guest blog post proposal

I'd love to contribute a guest blog post to highlight a simpler method for getting started with KSQL UDFs / UDAFs. We would need to merge KSQL #2272 first, potentially add a closing section to this article, and get the content reviewed by Confluent of course :)

Author Mitch Seymour
Organization Mailchimp

KSQL UDF / UDAF Bootstrap

One of the most powerful features of KSQL is the ability for users to build their own KSQL functions for processing real-time streams of data. These functions can be invoked on individual messages (UDFs) or used to perform aggregations on groups of messages (UDAFs). In a previous blog post, How to Build a UDF and/or UDAF in KSQL 5.0, some key steps for building and deploying a custom KSQL UDF / UDAF were discussed. With Confluent Platform 5.2.0, building custom KSQL functions is now even easier. A new Maven archetype has been added that will allow you to quickly bootstrap your own UDF / UDAF without having to copy and paste example code, add the boilerplate for building an uber jar, or perform other tedious tasks that would otherwise be required for setting up a new project.

In addition to discussing how this new Maven archetype can be used, we will also demonstrate how to convert the generated Maven project to a Gradle project with a simple command. So, without further ado, let's get started.

Using the archetype

First, in order to use the new Maven archetype for bootstrapping our custom KSQL UDF / UDAF, we need to have Maven installed. You can check to see if Maven is installed by running the following command:

$ mvn --version

If Maven is not installed, follow the official installation instructions here.

Once Maven is installed, generating a new UDF / UDAF project is simple. First, run the following command. Be sure to replace the last 3 values (groupId, artifactId, and version) with the appropriate values for your project.

$ mvn archetype:generate -X \
    -DarchetypeGroupId=io.confluent.ksql \
    -DarchetypeArtifactId=ksql-udf-quickstart \
    -DarchetypeVersion=5.2.0-SNAPSHOT \
    -DgroupId=com.example.ksql.functions \
    -DartifactId=my-udf \
    -Dversion=0.1.0

You may be asked to confirm the configuration you provided above. e.g.

Confirm properties configuration:
groupId: com.example.ksql.functions
artifactId: my-udf
version: 0.1.0
package: com.example.ksql.functions
 Y: :

Once you've confirmed the configuration (e.g. by simply hitting <ENTER>), the above command will create a new project with the following directory structure (Note: the actual directory structure may vary, depending on the groupId and artifactId params you specified earlier).

my-udf/
├── pom.xml
└── src
    └── main
        ├── java
        │   └── com
        │       └── example
        │           └── ksql
        │               └── functions
        │                   ├── ReverseUdf.java
        │                   └── SummaryStatsUdaf.java
        └── resources

In the next section, we will explore the example KSQL functions that were generated by the archetype, and learn how to deploy these functions to our KSQL server with just a couple of steps.

Example UDF / UDAF

The archetype includes one example UDF (REVERSE) and one example UDAF (SUMMARY_STATS), which are defined in the following files, respectively: ReverseUdf.java and SummaryStatsUdaf.java. First, let's take a look at ReverseUdf.java:

ReverseUdf.java

/*
 * Copyright 2018 Confluent Inc.
 *
 * Licensed under the Confluent Community License (the "License"); you may not use
 * this file except in compliance with the License.  You may obtain a copy of the
 * License at
 *
 * http://www.confluent.io/confluent-community-license
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OF ANY KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations under the License.
 */

package com.example.ksql.functions;

import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;

@UdfDescription(
    name = "reverse",
    description = "Example UDF that reverses an object",
    version = "0.1.0",
    author = "mitch"
)
public class ReverseUdf {

  @Udf(description = "Reverse a string")
  public String reverseString(
      @UdfParameter(value = "source", description = "the value to reverse")
      final String source
  ) {
    return new StringBuilder(source).reverse().toString();
  }

  @Udf(description = "Reverse an integer")
  public String reverseInt(
      @UdfParameter(value = "source", description = "the value to reverse")
      final Integer source
  ) {
    return new StringBuilder(source.toString()).reverse().toString();
  }

  @Udf(description = "Reverse a long")
  public String reverseLong(
      @UdfParameter(value = "source", description = "the value to reverse")
      final Long source
  ) {
    return new StringBuilder(source.toString()).reverse().toString();
  }

  @Udf(description = "Reverse a double")
  public String reverseDouble(
      @UdfParameter(value = "source", description = "the value to reverse")
      final Double source
  ) {
    return new StringBuilder(source.toString()).reverse().toString();
  }
}

This example UDF can be used for reversing strings and numerics, and is already fully functional and ready to deploy. One key item this particular UDF showcases is the ability for a KSQL function to support multiple method signatures. Our REVERSE function (defined above) can reverse a String, Long, Integer or Double since we provided methods for each of these operations. This example UDF is somewhat trivial, but the point of this archetype is to allow you to easily replace the code here with your own code, and then just follow the build and deployment steps described later in this article to start using your own UDF.

As mentioned earlier, the archetype also includes an example UDAF. Unlike UDFs, which operate on a single row at a time, UDAFs can be used for computing aggregates against multiple rows of data. Let's take a look at the example UDAF (called SUMMARY_STATS) and see how it works.

SummaryStatsUdaf.java

/*
 * Copyright 2018 Confluent Inc.
 *
 * Licensed under the Confluent Community License (the "License"); you may not use
 * this file except in compliance with the License.  You may obtain a copy of the
 * License at
 *
 * http://www.confluent.io/confluent-community-license
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OF ANY KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations under the License.
 */

package com.example.ksql.functions;

import io.confluent.ksql.function.udaf.Udaf;
import io.confluent.ksql.function.udaf.UdafDescription;
import io.confluent.ksql.function.udaf.UdafFactory;
import java.util.HashMap;
import java.util.Map;

/**
 * In this example, we implement a UDAF for computing some summary statistics for a stream
 * of doubles.
 *
 * <p>Example query usage:
 *
 * <pre>{@code
 * CREATE STREAM api_responses (username VARCHAR, response_code INT, response_time DOUBLE) \
 * WITH (kafka_topic='api_logs', value_format='JSON');
 *
 * SELECT username, SUMMARY_STATS(response_time) \
 * FROM api_responses \
 * GROUP BY username ;
 * }</pre>
 */
@UdafDescription(
    name = "summary_stats",
    description = "Example UDAF that computes some summary stats for a stream of doubles",
    version = "0.1.0",
    author = "mitch"
)
public final class SummaryStatsUdaf {

  private SummaryStatsUdaf() {
  }

  @UdafFactory(description = "compute summary stats for doubles")
  // Can be used with stream aggregations. The input of our aggregation will be doubles,
  // and the output will be a map
  public static Udaf<Double, Map<String, Double>> createUdaf() {

    return new Udaf<Double, Map<String, Double>>() {

      /**
       * Specify an initial value for our aggregation
       *
       * @return the initial state of the aggregate.
       */
      @Override
      public Map<String, Double> initialize() {
        final Map<String, Double> stats = new HashMap<>();
        stats.put("mean", 0.0);
        stats.put("sample_size", 0.0);
        stats.put("sum", 0.0);
        return stats;
      }

      /**
       * Perform the aggregation whenever a new record appears in our stream.
       *
       * @param newValue the new value to add to the {@code aggregateValue}.
       * @param aggregateValue the current aggregate.
       * @return the new aggregate value.
       */
      @Override
      public Map<String, Double> aggregate(
          final Double newValue,
          final Map<String, Double> aggregateValue
      ) {
        final Double sampleSize = 1.0 + aggregateValue
            .getOrDefault("sample_size", 0.0);

        final Double sum = newValue + aggregateValue
            .getOrDefault("sum", 0.0);
  
        // calculate the new aggregate
        aggregateValue.put("mean", sum / sampleSize);
        aggregateValue.put("sample_size", sampleSize);
        aggregateValue.put("sum", sum);
        return aggregateValue;
      }

      /**
       * Called to merge two aggregates together.
       *
       * @param aggOne the first aggregate
       * @param aggTwo the second aggregate
       * @return the merged result
       */
      @Override
      public Map<String, Double> merge(
          final Map<String, Double> aggOne,
          final Map<String, Double> aggTwo
      ) {
        final Double sampleSize =
            aggOne.getOrDefault("sample_size", 0.0) + aggTwo.getOrDefault("sample_size", 0.0);
        final Double sum =
            aggOne.getOrDefault("sum", 0.0) + aggTwo.getOrDefault("sum", 0.0);

        // calculate the new aggregate
        final Map<String, Double> newAggregate = new HashMap<>();
        newAggregate.put("mean", sum / sampleSize);
        newAggregate.put("sample_size", sampleSize);
        newAggregate.put("sum", sum);
        return newAggregate;
      }
    };
  }
}

This UDAF may seem complicated at first, but it's really just performing some basic math and adding the computations to a Map object. Returning a Map is one method for returning multiple values from a KSQL function. Using the example above for your own UDAF, take note of the following methods:

  • initialize: used to specify the initial value of your aggregation;
  • aggregate: performs the actual aggregation by looking at the current row's value (i.e. the currentValue argument), as well as the current aggregation value (i.e. aggregateValue argument), and generates a new aggregate
  • merge: describes how to merge two aggregations into one (e.g. when using session windows).

Building and deploying

Once you've added your own UDF / UDAF logic (or, just use the example functions for the rest of this tutorial), then it's time to deploy your KSQL functions to a KSQL server. First, build the project by running the following command in the project root directory.

$ mvn clean package

The above command will drop a JAR in the target/ directory. For example, if your artifactId is my-udf, then the command will have created a file named target/my-udf-0.1.0.jar.

Now, simply copy this JAR file to the KSQL extension directory (see the ksql.extension.dir property in the ksql-server.properties file) and restart your KSQL server so that it can pick up the new JAR containing your custom KSQL function. Once KSQL has finished restarting, you can verify that the new functions exist by running the DESCRIBE FUNCTION command from the CLI. e.g.

ksql> DESCRIBE FUNCTION REVERSE ;

Name        : REVERSE
Overview    : Example UDF that reverses an object
Type        : scalar
Jar         : /tmp/ext/my-udf-0.1.0-SNAPSHOT.jar
Variations  :

	Variation   : REVERSE(VARCHAR)
	Returns     : VARCHAR
	Description : Reverse a string

	Variation   : REVERSE(INT)
	Returns     : VARCHAR
	Description : Reverse an integer

	Variation   : REVERSE(BIGINT)
	Returns     : VARCHAR
	Description : Reverse a long

	Variation   : REVERSE(DOUBLE)
	Returns     : VARCHAR
	Description : Reverse a double
	

ksql> DESCRIBE FUNCTION SUMMARY_STATS ;

Name        : SUMMARY_STATS
Overview    : Example UDAF that computes some summary stats for a stream of doubles
Type        : aggregate
Jar         : /tmp/ext/my-udf-0.1.0-SNAPSHOT.jar
Variations  :

	Variation   : SUMMARY_STATS(DOUBLE)
	Returns     : MAP<VARCHAR,DOUBLE>
	Description : compute summary stats for doubles

Finally, let's see how to invoke our new UDF / UDAF. For this example, we'll assume there's a topic named api_logs in our Kafka cluster. You can create this dummy topic by using the kafka-topics console script:

# assumes the `kafka-topics` script is on your $PATH
$ kafka-topics --create \
    --zookeeper localhost:2181 \
    --topic api_logs \
    --replication-factor 1 \
    --partitions 4

Created topic "api_logs".

With the api_logs topic created, we can now create a KSQL STREAM using the following command:

ksql> CREATE STREAM api_logs (username VARCHAR, endpoint VARCHAR, response_code INT, response_time DOUBLE) \
      WITH (kafka_topic='api_logs', value_format='JSON');

 Message
----------------
 Stream created
----------------

Now, invoking our UDF / UDAF is simply a matter of adding it to our KSQL query. e.g.

ksql> SELECT username, REVERSE(username), endpoint, SUMMARY_STATS(response_time) \
FROM api_requests \
GROUP BY username, REVERSE(username), endpoint ;

The above command will execute a continuous query in the KSQL CLI. In another tab, we can produce some dummy records to the api_logs topics using the kafkacat utility.

$ echo '{"username": "mseymour", "endpoint": "index.html", "response_code": 200, "response_time": 400}' | kafkacat -q -b localhost:9092 -t api_logs -P

$ echo '{"username": "mseymour", "endpoint": "index.html", "response_code": 200, "response_time": 900}' | kafkacat -q -b localhost:9092 -t api_logs -P

Back inside the KSQL CLI, you should see the following output:

mseymour | ruomyesm | index.html | {sample_size=1.0, stddev_sample=0.0, mean=400.0, sum=400.0, sum_squares=160000.0, stddev_population=0.0}
mseymour | ruomyesm | index.html | {sample_size=2.0, stddev_sample=353.5533905932738, mean=650.0, sum=1300.0, sum_squares=970000.0, stddev_population=250.0}

Automated Testing of KSQL UDFs

In addition to including example implementations of a UDF and UDAF, the KSQL UDF / UDAF Quickstart includes unit tests that demonstrate how to test your custom KSQL functions. These tests live in the src/test/java/ directory and rely on the JUnit 5 testing platform, which is automatically included when you create a project from the quickstart archetype. Whenever you update the example KSQL functions with your own code, it is necessary to also update the included unit tests.

Before we learn how to execute the tests, let’s first see what they look like. The unit test in the ReverseUdfTests.java file below ensures the Reverse UDF returns the expected results when reversing strings.

package com.example.ksql.functions;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

/**
 * Example class that demonstrates how to unit test UDFs.
 */
public class ReverseUdfTests {

  @ParameterizedTest(name = "reverse({0})= {1}")
  @CsvSource({
    "hello, olleh",
    "world, dlrow",
  })
  void reverseString(final String source, final String expectedResult) {
    final ReverseUdf reverse = new ReverseUdf();
    final String actualResult = reverse.reverseString(source);
    assertEquals(expectedResult, actualResult, source + " reversed should equal " + expectedResult);
  }
}

Our testing methodology is relatively straightforward in the test above. First, we use a parameter provider called @CsvSource, which is included in the JUnit 5 library, to specify multiple test cases with their corresponding parameters and expected result values. The first value in each CSV string (hello and world) represents the parameter that we want to pass to our UDF (ReverseUdf). The second value in each CSV represents the expected result of the test (since this UDF is responsible for reversing objects, the expected result is this test case is a reversed string).

Now that we've defined our parameters, we simply instantiate a ReverseUdf instance, invoke the appropriate method (reverseString) for each parameter, and check the result with assertEquals. This method of instantiating a KSQL function and invoking the appropriate methods directly in a test is a good way to prevent accidental regression as you iterate of your code in the future.

A keen eye may have noticed that our ReverseUdf is capable or reversing many types of objects, yet the included unit tests only cover the reveral of strings. We will leave the additional test implementations as an exercise to the reader.

Let’s move on to the unit tests for SummaryStats.

package com.example.ksql.functions;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.params.provider.Arguments.arguments;

import io.confluent.ksql.function.udaf.Udaf;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

/**
 * Example class that demonstrates how to unit test UDAFs.
 */
public class SummaryStatsUdafTests {

  @Test
  void mergeAggregates() {
    final Udaf<Double, Map<String, Double>> udaf = SummaryStatsUdaf.createUdaf();
    final Map<String, Double> mergedAggregate = udaf.merge(
      // (sample_size, sum, mean)
      aggregate(3.0, 3300.0, 1100.0),
      aggregate(7.0, 6700.0, 957.143)
    );

    final Map<String, Double> expectedResult = aggregate(10.0, 10000.0, 1000.0);
    assertEquals(expectedResult, mergedAggregate);
  }

  @ParameterizedTest
  @MethodSource("aggSources")
  void calculateSummaryStats(
      final Double newValue,
      final Map<String, Double> currentAggregate,
      final Map<String, Double> expectedResult
    ) {
    final Udaf<Double, Map<String, Double>> udaf = SummaryStatsUdaf.createUdaf();
    assertEquals(expectedResult, udaf.aggregate(newValue, currentAggregate));
  }

  static Stream<Arguments> aggSources() {
    return Stream.of(
      // sample: 400
      arguments(
        // new value
        400.0,
        // current aggregate
        aggregate(0.0, 0.0, 0.0),
        // expected new aggregate
        aggregate(1.0, 400.0, 400.0)
      ),
      // sample: 400, 900
      arguments(
        // new value
        900.0,
        // current aggregate
        aggregate(1.0, 400.0, 400.0),
        // expected new aggregate
        aggregate(2.0, 1300.0, 650.0)
      )
    );
  }

  /**
   * Helper method for building an aggregate that mimics what KSQL would pass
   * to our UDAF instance.
   */
  static Map<String, Double> aggregate(
      final Double sampleSize,
      final Double sum,
      final Double mean
  ) {

    final Map<String, Double> result = new HashMap<>();
    result.put("mean", mean);
    result.put("sample_size", sampleSize);
    result.put("sum", sum);
    return result;
  }
}

The testing methodology is similar for testing UDAFs. We instantiate a UDAF instance and invoke the methods we want to test. In this case, we want to test both the aggregate and merge methods for the SummaryStatsUdaf. There are some additional helper methods included in this test (e.g. aggSources for defining test cases) and we also use a different parameter provider in this test as well (i.e. @MethodSource instead of CsvSource). However, these are simply implementation details. The important take away here is that your custom KSQL functions should include unit tests and JUnit 5 gives us a lot of flexibility in creating these tests.

Running tests

Finally, we're ready to execute our tests. Simply run the following command to execute these tests:

mvn test

Converting to a Gradle project

Gradle is the preferred build tool for many people / organizations, and bootstrapping your custom KSQL functions from Confluent's Maven archetype doesn't mean you also have to use Maven as your build tool. In fact, converting the generated Maven project to a Gradle project is easy. Simply run the following command in the root project directory to generate a build.gradle file for your project.

$ gradle init --type pom

Now, feel free to delete the pom.xml and make all future build modifications to build.gradle instead.

What's next?

Now that you know how to quickly bootstrap your next KSQL UDF / UDAF project, you can start building your own custom KSQL functions with minimum effort. A couple of next steps you may want to pursue include adding unit tests for your new code, and, if your function might be useful to others, consider sharing it with the community.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment