Skip to content

Instantly share code, notes, and snippets.

@sijie
sijie / geo-replication-demo-without-a-global-zookeeper.md
Last active September 5, 2023 14:01
Geo Replication Demo without a global zookeeper

This doc demonstrates how to do geo-replication across multiple pulsar clusters without a global configuration store (zookeeper).

This demo is using docker-compose to start 3 pulsar clusters. Each pulsar cluster has 1 zk, 1 bk, and 1 broker. The docker compose file can be found at https://gist.github.com/sijie/63737459112471a82957ae20bd78adb5.

The information of all the three clusters is listed in the following table:

zk configuration store broker
beijing zk-beijing zk-beijing broker-beijing
@sijie
sijie / standalone-docker-tls.md
Created May 9, 2019 13:46
Run Pulsar standalone in docker with TLS enabled
  1. Create a directory certs.
  2. Generated the TLS keys under certs directory.
  3. Run the docker command.
docker run \
	-p 6650:6650 -p 8080:8080 \
	-p 8081:8081 -p 6651:6651 \
	-p 8443:8443 \
	-v $PWD/certs:/certs \
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
@sijie
sijie / kafka-source-exclamation-function.md
Created August 2, 2019 08:26
Kafka source with a String function example
protected void testKafkaSourcePulsarFunction(TopicName pulsarTopic) throws Exception {
    final String tenantName = pulsarTopic.getTenant();
    final String namespaceName = pulsarTopic.getNamespacePortion();

    // create kafka topic
    final String kafkaTopic = "test-kafka-source-pulsar-function-" + Base58.randomString(8);
    final NewTopic newKafkaTopic = new NewTopic(kafkaTopic, 1, (short) 1);
    kafkaAdmin.createTopics(
        Sets.newHashSet(newKafkaTopic),

new CreateTopicsOptions()

@sijie
sijie / delayed-message-code-example.md
Last active July 9, 2019 08:27
How to deliver messages after 3 minutes
producer.newMessage()
    .deliverAfter(3L, TimeUnit.Minute)
    .value("Hello Pulsar after 3 minutes!")
    .send();
@sijie
sijie / scheduled-message-example.md
Last active July 9, 2019 08:23
How to deliver messages at a given time
producer.newMessage()
    .deliverAt(new Date(2019, 06, 27, 23, 00, 00)
        .getTime())
    .value("Hello Pulsar at 11pm on 06/27/2019!")
    .send();
@sijie
sijie / key-value-schema-example.md
Created July 9, 2019 07:39
KeyValue Schema Example
Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
    Schema.INT32,
    Schema.STRING,
    KeyValueEncodingType.SEPARATED
);
@sijie
sijie / auto-consume-example.md
Created July 9, 2019 07:38
AutoConsume Example
Consumer<GenericRecord> pulsarConsumer = client.newConsumer(Schema.AUTO_CONSUME())
    …
    .subscribe();
Message<GenericRecord> msg = consumer.receive() ;
GenericRecord record = msg.getValue();
@sijie
sijie / generic-record-example.md
Created July 9, 2019 07:37
Generic Record Example
Producer<GenericRecord> producer = client.newProducer(Schema.generic(schemaInfo)).create();
producer.newMessage()
    .value(schema.newRecordBuilder()
        .set("intField", 32)
        .build())
    .send();
@sijie
sijie / generic-schema-example.md
Created July 9, 2019 07:36
Generic Schema Example
RecordSchemaBuilder recordSchemaBuilder = SchemaBuilder.record("schemaName");
recordSchemaBuilder
    .field("intField")
    .type(SchemaType.INT32);
SchemaInfo schemaInfo = recordSchemaBuilder.build(SchemaType.AVRO);
Schema<GenericRecord> schema = Schema.generic(schemaInfo);