Skip to content

Instantly share code, notes, and snippets.

@yangl
Last active February 8, 2021 09:45
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save yangl/a47f95ee6f05340fc789eb0c095a525b to your computer and use it in GitHub Desktop.
Save yangl/a47f95ee6f05340fc789eb0c095a525b to your computer and use it in GitHub Desktop.
Pulsar 收发消息过程

Broker启动

Main class: org.apache.pulsar.PulsarStandaloneStarter
VM Options: -Dlog4j2.configurationFile=D:\opensource\pulsar\conf\log4j2.xml
Projram arguments: --config conf/standalone.conf --no-functions-worker --no-stream-storage --wipe-data

查看进程是否正常启动?

jps -l

查看端口6650、8080、2181、3181、4181是否正常启动?(如果打开了--no-stream-storage则就不会监听占用4181端口了)

netstat -ano|findstr "6650"
netstat -ano|findstr "8080"
netstat -ano|findstr "2181"
netstat -ano|findstr "3181"
netstat -ano|findstr "4181"

发送消息

Main class: org.apache.pulsar.client.cli.PulsarClientTool
Projram arguments: conf/client.conf --url pulsar://localhost:6650 produce --messages "hello pulsar" --num-produce 10 t1

消费消息

Main class: org.apache.pulsar.client.cli.PulsarClientTool
Projram arguments: conf/client.conf --url pulsar://localhost:6650 consume --subscription-name s1 --subscription-type Shared --num-messages 0 t1

管理

Main class: org.apache.pulsar.admin.cli.PulsarAdminTool
Projram arguments: conf/client.conf --admin-url http://localhost:8080/ tenants list
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="info">
<Appenders>
<Console name="console" target="SYSTEM_OUT">
<PatternLayout pattern="%d %-5p [%C{1}#%L] [%t] %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="console"/>
</Root>
</Loggers>
</Configuration>

查找ServerBootstrap即可找到入口

在 ** BrokerService.start() ** 内启动Netty ServerBootstrap, 关注ChannelInitializer子类PulsarChannelInitializer即可。

BrokerService

public void start() throws Exception {
        this.producerNameGenerator = new DistributedIdGenerator(pulsar.getZkClient(), producerNameGeneratorPath,
                pulsar.getConfiguration().getClusterName());

        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        bootstrap.group(acceptorGroup, workerGroup);
        bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
        bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
                new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024));

        bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup));
        EventLoopUtil.enableTriggeredMode(bootstrap);

        ServiceConfiguration serviceConfig = pulsar.getConfiguration();

        bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false));

        Optional<Integer> port = serviceConfig.getBrokerServicePort();
        if (port.isPresent()) {
            // Bind and start to accept incoming connections.
            InetSocketAddress addr = new InetSocketAddress(pulsar.getBindAddress(), port.get());
            try {
                bootstrap.bind(addr).sync();
            } catch (Exception e) {
                throw new IOException("Failed to bind Pulsar broker on " + addr, e);
            }
            log.info("Started Pulsar Broker service on port {}", port.get());
        }

        Optional<Integer> tlsPort = serviceConfig.getBrokerServicePortTls();
        if (tlsPort.isPresent()) {
            ServerBootstrap tlsBootstrap = bootstrap.clone();
            tlsBootstrap.childHandler(new PulsarChannelInitializer(pulsar, true));
            tlsBootstrap.bind(new InetSocketAddress(pulsar.getBindAddress(), tlsPort.get())).sync();
            log.info("Started Pulsar Broker TLS service on port {} - TLS provider: {}", tlsPort.get(),
                    SslContext.defaultServerProvider());
        }

        // start other housekeeping functions
        this.startStatsUpdater(
                serviceConfig.getStatsUpdateInitialDelayInSecs(),
                serviceConfig.getStatsUpdateFrequencyInSecs());
        this.startInactivityMonitor();
        this.startMessageExpiryMonitor();
        this.startCompactionMonitor();
        this.startBacklogQuotaChecker();
        // register listener to capture zk-latency
        ClientCnxnAspect.addListener(zkStatsListener);
        ClientCnxnAspect.registerExecutor(pulsar.getExecutor());
    }

PulsarChannelInitializer

    protected void initChannel(SocketChannel ch) throws Exception {
      if (this.enableTls) {
          ch.pipeline().addLast(TLS_HANDLER, sslCtxRefresher.get().newHandler(ch.alloc()));
          ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
      } else {
          ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
      }

      ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
          brokerConf.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
      ch.pipeline().addLast("handler", new ServerCnx(pulsar));
  }

ServerCnx 继承 PulsarHandler 继承 PulsarDecoder 继承 ChannelInboundHandlerAdapter,从而查看channelRead方法cmd.getType()的SEND分支

protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
      checkArgument(state == State.Connected);

      CompletableFuture<Producer> producerFuture = producers.get(send.getProducerId());

      if (producerFuture == null || !producerFuture.isDone() || producerFuture.isCompletedExceptionally()) {
          log.warn("[{}] Producer had already been closed: {}", remoteAddress, send.getProducerId());
          return;
      }

      Producer producer = producerFuture.getNow(null);
      if (log.isDebugEnabled()) {
          printSendCommandDebug(send, headersAndPayload);
      }

      if (producer.isNonPersistentTopic()) {
          // avoid processing non-persist message if reached max concurrent-message limit
          if (nonPersistentPendingMessages > MaxNonPersistentPendingMessages) {
              final long producerId = send.getProducerId();
              final long sequenceId = send.getSequenceId();
              service.getTopicOrderedExecutor().executeOrdered(producer.getTopic().getName(), SafeRun.safeRun(() -> {
                  ctx.writeAndFlush(Commands.newSendReceipt(producerId, sequenceId, -1, -1), ctx.voidPromise());
              }));
              producer.recordMessageDrop(send.getNumMessages());
              return;
          } else {
              nonPersistentPendingMessages++;
          }
      }

      startSendOperation();

      // Persist the message
      producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload, send.getNumMessages());
  }

Producer

public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize) {
      if (isClosed) {
          cnx.ctx().channel().eventLoop().execute(() -> {
              cnx.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, ServerError.PersistenceError,
                      "Producer is closed"));
              cnx.completedSendOperation(isNonPersistentTopic);
          });

          return;
      }

      if (!verifyChecksum(headersAndPayload)) {
          cnx.ctx().channel().eventLoop().execute(() -> {
              cnx.ctx().writeAndFlush(
                      Commands.newSendError(producerId, sequenceId, ServerError.ChecksumError, "Checksum failed on the broker"));
              cnx.completedSendOperation(isNonPersistentTopic);
          });
          return;
      }

      if (topic.isEncryptionRequired()) {

          headersAndPayload.markReaderIndex();
          MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
          headersAndPayload.resetReaderIndex();

          // Check whether the message is encrypted or not
          if (msgMetadata.getEncryptionKeysCount() < 1) {
              log.warn("[{}] Messages must be encrypted", getTopic().getName());
              cnx.ctx().channel().eventLoop().execute(() -> {
                  cnx.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, ServerError.MetadataError,
                          "Messages must be encrypted"));
                  cnx.completedSendOperation(isNonPersistentTopic);
              });
              return;
          }
      }

      startPublishOperation();
      topic.publishMessage(headersAndPayload,
              MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
                      System.nanoTime()));
  }

PersistentTopic

public void publishMessage(ByteBuf headersAndPayload, PublishContext publishContext) {
        if (messageDeduplication.shouldPublishNextMessage(publishContext, headersAndPayload)) {
            ledger.asyncAddEntry(headersAndPayload, this, publishContext);
        } else {
            // Immediately acknowledge duplicated message
            publishContext.completed(null, -1, -1);
        }
    }

ManagedLedgerImpl 调用bookkeeper的mledger API写入磁盘

public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state);
        }

        OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, ctx);

        // Jump to specific thread to avoid contention from writers writing from different threads
        executor.executeOrdered(name, safeRun(() -> {
            pendingAddEntries.add(addOperation);

            internalAsyncAddEntry(addOperation);
        }));
    }

查找Bootstrap即可找到入口

ConnectionPool

public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
            Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException {
        this.eventLoopGroup = eventLoopGroup;
        this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();

        pool = new ConcurrentHashMap<>();
        bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup);
        bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));

        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.getConnectionTimeoutMs());
        bootstrap.option(ChannelOption.TCP_NODELAY, conf.isUseTcpNoDelay());
        bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

        try {
            bootstrap.handler(new PulsarChannelInitializer(conf, clientCnxSupplier));
        } catch (Exception e) {
            log.error("Failed to create channel initializer");
            throw new PulsarClientException(e);
        }

        this.dnsResolver = new DnsNameResolverBuilder(eventLoopGroup.next()).traceEnabled(true)
                .channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup)).build();
    }

PulsarChannelInitializer

public void initChannel(SocketChannel ch) throws Exception {
        if (sslCtx != null) {
            ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
            ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
        } else {
            ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
        }

        ch.pipeline()
          .addLast("frameDecoder",
                   new LengthFieldBasedFrameDecoder(
                       Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING,
                       0, 4, 0, 4));
        ch.pipeline().addLast("handler", clientCnxSupplier.get());
    }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment