protected void testKafkaSourcePulsarFunction(TopicName pulsarTopic) throws Exception {
final String tenantName = pulsarTopic.getTenant();
final String namespaceName = pulsarTopic.getNamespacePortion();
// create kafka topic
final String kafkaTopic = "test-kafka-source-pulsar-function-" + Base58.randomString(8);
final NewTopic newKafkaTopic = new NewTopic(kafkaTopic, 1, (short) 1);
kafkaAdmin.createTopics(
Sets.newHashSet(newKafkaTopic),
new CreateTopicsOptions()