private ErrantRecordReporter reporter;
 
@Override
public void start(Map<String, String> props) {
  ...
  try {
    reporter = context.errantRecordReporter(); // may be null if DLQ not enabled
  } catch (NoSuchMethodException | NoClassDefFoundError e) {
    // Will occur in Connect runtimes earlier than 2.6
    reporter = null;
  }
}
 
@Override
public void put(Collection<SinkRecord> records) {
  for (SinkRecord record: records) {
    try {
      // attempt to send record to data sink
      process(record);
    } catch(Exception e) {
      if (reporter != null) {
        // Send errant record to error reporter
        Future<Void> future = reporter.report(record, e);
        // Optionally wait till the failure's been recorded in Kafka
        future.get();
      } else {
        // There's no error reporter, so fail
        throw new ConnectException("Failed on record", e);
      }
    }
  }
}