private ErrantRecordReporter reporter; @Override public void start(Map<String, String> props) { ... try { reporter = context.errantRecordReporter(); // may be null if DLQ not enabled } catch (NoSuchMethodException | NoClassDefFoundError e) { // Will occur in Connect runtimes earlier than 2.6 reporter = null; } } @Override public void put(Collection<SinkRecord> records) { for (SinkRecord record: records) { try { // attempt to send record to data sink process(record); } catch(Exception e) { if (reporter != null) { // Send errant record to error reporter Future<Void> future = reporter.report(record, e); // Optionally wait till the failure's been recorded in Kafka future.get(); } else { // There's no error reporter, so fail throw new ConnectException("Failed on record", e); } } } }