在 ** BrokerService.start() ** 内启动Netty ServerBootstrap, 关注ChannelInitializer子类PulsarChannelInitializer即可。
public void start () throws Exception {
this .producerNameGenerator = new DistributedIdGenerator (pulsar .getZkClient (), producerNameGeneratorPath ,
pulsar .getConfiguration ().getClusterName ());
ServerBootstrap bootstrap = new ServerBootstrap ();
bootstrap .childOption (ChannelOption .ALLOCATOR , PooledByteBufAllocator .DEFAULT );
bootstrap .group (acceptorGroup , workerGroup );
bootstrap .childOption (ChannelOption .TCP_NODELAY , true );
bootstrap .childOption (ChannelOption .RCVBUF_ALLOCATOR ,
new AdaptiveRecvByteBufAllocator (1024 , 16 * 1024 , 1 * 1024 * 1024 ));
bootstrap .channel (EventLoopUtil .getServerSocketChannelClass (workerGroup ));
EventLoopUtil .enableTriggeredMode (bootstrap );
ServiceConfiguration serviceConfig = pulsar .getConfiguration ();
bootstrap .childHandler (new PulsarChannelInitializer (pulsar , false ));
Optional <Integer > port = serviceConfig .getBrokerServicePort ();
if (port .isPresent ()) {
// Bind and start to accept incoming connections.
InetSocketAddress addr = new InetSocketAddress (pulsar .getBindAddress (), port .get ());
try {
bootstrap .bind (addr ).sync ();
} catch (Exception e ) {
throw new IOException ("Failed to bind Pulsar broker on " + addr , e );
}
log .info ("Started Pulsar Broker service on port {}" , port .get ());
}
Optional <Integer > tlsPort = serviceConfig .getBrokerServicePortTls ();
if (tlsPort .isPresent ()) {
ServerBootstrap tlsBootstrap = bootstrap .clone ();
tlsBootstrap .childHandler (new PulsarChannelInitializer (pulsar , true ));
tlsBootstrap .bind (new InetSocketAddress (pulsar .getBindAddress (), tlsPort .get ())).sync ();
log .info ("Started Pulsar Broker TLS service on port {} - TLS provider: {}" , tlsPort .get (),
SslContext .defaultServerProvider ());
}
// start other housekeeping functions
this .startStatsUpdater (
serviceConfig .getStatsUpdateInitialDelayInSecs (),
serviceConfig .getStatsUpdateFrequencyInSecs ());
this .startInactivityMonitor ();
this .startMessageExpiryMonitor ();
this .startCompactionMonitor ();
this .startBacklogQuotaChecker ();
// register listener to capture zk-latency
ClientCnxnAspect .addListener (zkStatsListener );
ClientCnxnAspect .registerExecutor (pulsar .getExecutor ());
}
protected void initChannel (SocketChannel ch ) throws Exception {
if (this .enableTls ) {
ch .pipeline ().addLast (TLS_HANDLER , sslCtxRefresher .get ().newHandler (ch .alloc ()));
ch .pipeline ().addLast ("ByteBufPairEncoder" , ByteBufPair .COPYING_ENCODER );
} else {
ch .pipeline ().addLast ("ByteBufPairEncoder" , ByteBufPair .ENCODER );
}
ch .pipeline ().addLast ("frameDecoder" , new LengthFieldBasedFrameDecoder (
brokerConf .getMaxMessageSize () + Commands .MESSAGE_SIZE_FRAME_PADDING , 0 , 4 , 0 , 4 ));
ch .pipeline ().addLast ("handler" , new ServerCnx (pulsar ));
}
ServerCnx 继承 PulsarHandler 继承 PulsarDecoder 继承 ChannelInboundHandlerAdapter,从而查看channelRead方法cmd.getType()的SEND分支
protected void handleSend (CommandSend send , ByteBuf headersAndPayload ) {
checkArgument (state == State .Connected );
CompletableFuture <Producer > producerFuture = producers .get (send .getProducerId ());
if (producerFuture == null || !producerFuture .isDone () || producerFuture .isCompletedExceptionally ()) {
log .warn ("[{}] Producer had already been closed: {}" , remoteAddress , send .getProducerId ());
return ;
}
Producer producer = producerFuture .getNow (null );
if (log .isDebugEnabled ()) {
printSendCommandDebug (send , headersAndPayload );
}
if (producer .isNonPersistentTopic ()) {
// avoid processing non-persist message if reached max concurrent-message limit
if (nonPersistentPendingMessages > MaxNonPersistentPendingMessages ) {
final long producerId = send .getProducerId ();
final long sequenceId = send .getSequenceId ();
service .getTopicOrderedExecutor ().executeOrdered (producer .getTopic ().getName (), SafeRun .safeRun (() -> {
ctx .writeAndFlush (Commands .newSendReceipt (producerId , sequenceId , -1 , -1 ), ctx .voidPromise ());
}));
producer .recordMessageDrop (send .getNumMessages ());
return ;
} else {
nonPersistentPendingMessages ++;
}
}
startSendOperation ();
// Persist the message
producer .publishMessage (send .getProducerId (), send .getSequenceId (), headersAndPayload , send .getNumMessages ());
}
public void publishMessage (long producerId , long sequenceId , ByteBuf headersAndPayload , long batchSize ) {
if (isClosed ) {
cnx .ctx ().channel ().eventLoop ().execute (() -> {
cnx .ctx ().writeAndFlush (Commands .newSendError (producerId , sequenceId , ServerError .PersistenceError ,
"Producer is closed" ));
cnx .completedSendOperation (isNonPersistentTopic );
});
return ;
}
if (!verifyChecksum (headersAndPayload )) {
cnx .ctx ().channel ().eventLoop ().execute (() -> {
cnx .ctx ().writeAndFlush (
Commands .newSendError (producerId , sequenceId , ServerError .ChecksumError , "Checksum failed on the broker" ));
cnx .completedSendOperation (isNonPersistentTopic );
});
return ;
}
if (topic .isEncryptionRequired ()) {
headersAndPayload .markReaderIndex ();
MessageMetadata msgMetadata = Commands .parseMessageMetadata (headersAndPayload );
headersAndPayload .resetReaderIndex ();
// Check whether the message is encrypted or not
if (msgMetadata .getEncryptionKeysCount () < 1 ) {
log .warn ("[{}] Messages must be encrypted" , getTopic ().getName ());
cnx .ctx ().channel ().eventLoop ().execute (() -> {
cnx .ctx ().writeAndFlush (Commands .newSendError (producerId , sequenceId , ServerError .MetadataError ,
"Messages must be encrypted" ));
cnx .completedSendOperation (isNonPersistentTopic );
});
return ;
}
}
startPublishOperation ();
topic .publishMessage (headersAndPayload ,
MessagePublishContext .get (this , sequenceId , msgIn , headersAndPayload .readableBytes (), batchSize ,
System .nanoTime ()));
}
public void publishMessage (ByteBuf headersAndPayload , PublishContext publishContext ) {
if (messageDeduplication .shouldPublishNextMessage (publishContext , headersAndPayload )) {
ledger .asyncAddEntry (headersAndPayload , this , publishContext );
} else {
// Immediately acknowledge duplicated message
publishContext .completed (null , -1 , -1 );
}
}
ManagedLedgerImpl 调用bookkeeper的mledger API写入磁盘
public void asyncAddEntry (ByteBuf buffer , AddEntryCallback callback , Object ctx ) {
if (log .isDebugEnabled ()) {
log .debug ("[{}] asyncAddEntry size={} state={}" , name , buffer .readableBytes (), state );
}
OpAddEntry addOperation = OpAddEntry .create (this , buffer , callback , ctx );
// Jump to specific thread to avoid contention from writers writing from different threads
executor .executeOrdered (name , safeRun (() -> {
pendingAddEntries .add (addOperation );
internalAsyncAddEntry (addOperation );
}));
}