Skip to content

Instantly share code, notes, and snippets.

@MinimaJack
Created November 18, 2019 06:29
Show Gist options
  • Save MinimaJack/8573b375a1f5121a06e5b03c53c6f6aa to your computer and use it in GitHub Desktop.
Save MinimaJack/8573b375a1f5121a06e5b03c53c6f6aa to your computer and use it in GitHub Desktop.
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import java.util.Map;
import java.util.Properties;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
public class KafkaExampleProducer
{
private native void log( String info );
private static long lastUsage = 0l;
public static KafkaProducer<String, String> producer = null;
private static Thread workingThread;
private static final ConcurrentLinkedQueue<ProducerRecord<String, String>> dataToSend = new ConcurrentLinkedQueue<ProducerRecord<String, String>>();
public static int mainInt( String key, String value )
throws InterruptedException, ExecutionException
{
lastUsage = System.currentTimeMillis();
if ( workingThread == null )
{
synchronized ( KafkaExampleProducer.class )
{
if ( workingThread == null )
{
workingThread = new Thread( new Runnable()
{
@Override
public void run()
{
Properties props = new Properties();
props.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" );
// props.put( ProducerConfig.LINGER_MS_CONFIG, 100 );
// props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16_384 * 4);
// props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
props.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer" );
props.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer" );
producer = new KafkaProducer<String, String>( props );
while ( System.currentTimeMillis() - lastUsage < 10000 || !dataToSend.isEmpty() )
{
ProducerRecord<String, String> data = dataToSend.poll();
if ( data != null )
{
producer.send( data );
lastUsage = System.currentTimeMillis();
}
else
{
try
{
Thread.sleep( 20 );
}
catch ( InterruptedException e )
{
}
}
}
producer.send( new ProducerRecord<String, String>( "testTopic", key, "Producer closed" ) );
producer.close();
workingThread = null;
}
} );
workingThread.start();
}
}
}
dataToSend.add( new ProducerRecord<String, String>( "testTopic", key, value ) );
return 0;
}
public static void main( String[] args )
throws InterruptedException, ExecutionException
{
long time = System.currentTimeMillis();
for ( int i = 1; i <= 1; i++ )
{
System.out.println( mainInt( String.valueOf( i ), String.valueOf( i ) ) );
}
System.out.println( System.currentTimeMillis() - time );
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment