Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@mze3e
Created April 14, 2021 13:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mze3e/de9d1b43759d26cbfea142f5257cc30c to your computer and use it in GitHub Desktop.
Save mze3e/de9d1b43759d26cbfea142f5257cc30c to your computer and use it in GitHub Desktop.
Python Kafka Consumer Examples
#https://www.programcreek.com/python/example/98440/kafka.KafkaConsumer
##Project: scrapy-cluster Author: istresearch File: kafka_monitor.py License: MIT License
def _create_consumer(self):
"""Tries to establing the Kafka consumer connection"""
try:
brokers = self.settings['KAFKA_HOSTS']
self.logger.debug("Creating new kafka consumer using brokers: " +
str(brokers) + ' and topic ' +
self.settings['KAFKA_INCOMING_TOPIC'])
return KafkaConsumer(
self.settings['KAFKA_INCOMING_TOPIC'],
group_id=self.settings['KAFKA_GROUP'],
bootstrap_servers=brokers,
consumer_timeout_ms=self.settings['KAFKA_CONSUMER_TIMEOUT'],
auto_offset_reset=self.settings['KAFKA_CONSUMER_AUTO_OFFSET_RESET'],
auto_commit_interval_ms=self.settings['KAFKA_CONSUMER_COMMIT_INTERVAL_MS'],
enable_auto_commit=self.settings['KAFKA_CONSUMER_AUTO_COMMIT_ENABLE'],
max_partition_fetch_bytes=self.settings['KAFKA_CONSUMER_FETCH_MESSAGE_MAX_BYTES'])
except KeyError as e:
self.logger.error('Missing setting named ' + str(e),
{'ex': traceback.format_exc()})
except:
self.logger.error("Couldn't initialize kafka consumer for topic",
{'ex': traceback.format_exc(),
'topic': self.settings['KAFKA_INCOMING_TOPIC']})
raise
##Example 2
##Project: scrapy-cluster Author: istresearch File: rest_service.py License: MIT License
def _create_consumer(self):
"""Tries to establing the Kafka consumer connection"""
if not self.closed:
try:
self.logger.debug("Creating new kafka consumer using brokers: " +
str(self.settings['KAFKA_HOSTS']) + ' and topic ' +
self.settings['KAFKA_TOPIC_PREFIX'] +
".outbound_firehose")
return KafkaConsumer(
self.settings['KAFKA_TOPIC_PREFIX'] + ".outbound_firehose",
group_id=None,
bootstrap_servers=self.settings['KAFKA_HOSTS'],
consumer_timeout_ms=self.settings['KAFKA_CONSUMER_TIMEOUT'],
auto_offset_reset=self.settings['KAFKA_CONSUMER_AUTO_OFFSET_RESET'],
auto_commit_interval_ms=self.settings['KAFKA_CONSUMER_COMMIT_INTERVAL_MS'],
enable_auto_commit=self.settings['KAFKA_CONSUMER_AUTO_COMMIT_ENABLE'],
max_partition_fetch_bytes=self.settings['KAFKA_CONSUMER_FETCH_MESSAGE_MAX_BYTES'])
except KeyError as e:
self.logger.error('Missing setting named ' + str(e),
{'ex': traceback.format_exc()})
except:
self.logger.error("Couldn't initialize kafka consumer for topic",
{'ex': traceback.format_exc()})
raise
##Example 3
##Project: open-nti Author: Juniper File: open_nti_input_syslog_lib.py License: Apache License 2.0
def check_kafka_msg(topic='events', nbr_msg=100):
## Collect Messages from Bus
consumer = KafkaConsumer(
bootstrap_servers=get_external_ip()+':'+str(KAFKA_BROKER_PORT),
auto_offset_reset='earliest')
consumer.subscribe([topic])
counter = 0
for message in consumer:
counter = counter + 1
if counter == nbr_msg:
break
return counter
##Example 4
##Project: operations-software-druid_exporter Author: wikimedia File: collector.py License: Apache License 2.0
def pull_datapoints_from_kafka(self, kafka_config, stop_threads):
log.debug('Kafka datapoints puller thread starting..')
consumer = KafkaConsumer(
kafka_config['topic'],
group_id=kafka_config['group_id'],
bootstrap_servers=kafka_config['bootstrap_servers'])
while True and not stop_threads.isSet():
consumer.poll()
for message in consumer:
try:
json_message = json.loads(message.value.decode())
log.debug('Datapoint from kafka: %s', json_message)
if type(json_message) == list:
for datapoint in json_message:
self.register_datapoint(datapoint)
else:
self.register_datapoint(json_message)
except json.JSONDecodeError:
log.exception("Failed to decode message from Kafka, skipping..")
except Exception as e:
log.exception("Generic exception while pulling datapoints from Kafka")
log.debug('Kafka datapoints puller thread shutting down..')
#Example 5
#Project: fooltrader Author: foolcage File: base_bot.py License: MIT License
def run(self):
self.logger.info("start bot:{}".format(self))
funcs = set(dir(self)) & self.func_map_topic.keys()
consumer = KafkaConsumer(bootstrap_servers=[KAFKA_HOST])
current_topics = consumer.topics()
for func in funcs:
topic = self.func_map_topic.get(func)
if topic not in current_topics:
self.logger.exception("you implement func:{},but the topic:{} for it not exist".format(func, topic))
continue
self.threads.append(
threading.Thread(target=self.consume_topic_with_func, args=(self.func_map_topic.get(func), func)))
for the_thread in self.threads:
the_thread.start()
self.consume_topic_with_func(self.quote_topic, 'on_event')
self.logger.info("finish bot:{}".format(self))
#Example 6
#Project: fooltrader Author: foolcage File: bot.py License: MIT License
def run(self):
self.logger.info("start bot:{}".format(self))
funcs = set(dir(self)) & self.func_map_topic.keys()
consumer = KafkaConsumer(bootstrap_servers=[KAFKA_HOST])
current_topics = consumer.topics()
for func in funcs:
topic = self.func_map_topic.get(func)
if topic not in current_topics:
self.logger.exception("you implement func:{},but the topic:{} for it not exist".format(func, topic))
continue
self._threads.append(
threading.Thread(target=self.consume_topic_with_func, args=(self.func_map_topic.get(func), func)))
for the_thread in self._threads:
the_thread.start()
self.consume_topic_with_func(self.quote_topic, 'on_event')
self.logger.info("finish bot:{}".format(self))
#Example 7
#Project: platypush Author: BlackLight File: __init__.py License: MIT License
def run(self):
from kafka import KafkaConsumer
super().run()
self.consumer = KafkaConsumer(self.topic, bootstrap_servers=self.server)
self.logger.info('Initialized kafka backend - server: {}, topic: {}'
.format(self.server, self.topic))
try:
for msg in self.consumer:
self._on_record(msg)
if self.should_stop(): break
except Exception as e:
self.logger.warning('Kafka connection error, reconnecting in {} seconds'.
format(self._conn_retry_secs))
self.logger.exception(e)
time.sleep(self._conn_retry_secs)
# vim:sw=4:ts=4:et:
#Example 8
#Project: py-timeexecution Author: kpn-digital File: test_kafka.py License: Apache License 2.0
def _query_backend(self):
consumer = KafkaConsumer(
bootstrap_servers=KAFKA_HOST, value_deserializer=lambda v: JSONSerializer().loads(v.decode('utf-8'))
)
tp = TopicPartition(self.topic, 0)
consumer.assign([tp])
count = consumer.position(tp)
consumer.seek(tp, 0)
metrics = []
for i in range(count):
metrics.append(next(consumer))
return metrics
#Example 9
#Project: scrapy-kafka-export Author: TeamHG-Memex File: test_extension.py License: MIT License
def setup_class(cls):
cls.broker = os.getenv('KAFKA_BROKER')
if not cls.topic:
topic = "%s-%s" % ('topic_test_', random_string(10))
cls.topic = topic
create_topic(cls.topic)
cls._deserializer = ScrapyJSONDecoder()
cls.consumer = KafkaConsumer(
bootstrap_servers=[cls.broker],
auto_offset_reset='earliest',
group_id=None,
value_deserializer=lambda x:
cls._deserializer.decode(x.decode('utf8'))
)
cls.consumer.subscribe([cls.topic])
#Example 10
#Project: kzmonitor Author: tqlihuiqi File: client.py License: MIT License
def getOffsets(self, topic, partitions, group):
""" 指定topic、partition和group, 返回offsets数据 """
try:
# 尝试使用zookeeper-storage api获取offsets数据
# 未获得指定group的offsets数据将抛出UnknownTopicOrPartitionError异常
tp = self.client.send_offset_fetch_request(group, [OffsetRequestPayload(topic, p, -1, 1) for p in partitions])
offsets = {p.partition: p.offset for p in tp}
except UnknownTopicOrPartitionError:
# 收到异常后使用kafka-storage api获取offsets数据
consumer = KafkaConsumer(group_id=group, bootstrap_servers=self.broker, enable_auto_commit=False)
tp = [TopicPartition(topic, p) for p in partitions]
consumer.assign(tp)
offsets = {p.partition: consumer.position(p) for p in tp}
return offsets
#Example 11
#Project: kq Author: joowani File: test_worker.py License: MIT License
def test_worker_properties(worker, hosts, topic, group):
assert hosts in repr(worker)
assert topic in repr(worker)
assert group in repr(worker)
assert worker.consumer.config['bootstrap_servers'] == hosts
assert worker.consumer.config['group_id'] == group
assert isinstance(worker.hosts, str) and worker.hosts == hosts
assert isinstance(worker.topic, str) and worker.topic == topic
assert isinstance(worker.group, str) and worker.group == group
assert isinstance(worker.consumer, KafkaConsumer)
assert callable(worker.deserializer)
assert callable(worker.callback) or worker.callback is None
# noinspection PyTypeChecker
#Example 12
#Project: kq Author: joowani File: test_worker.py License: MIT License
def test_worker_initialization_with_bad_args(hosts, consumer):
with pytest.raises(AssertionError) as e:
Worker(topic=True, consumer=consumer)
assert str(e.value) == 'topic must be a str'
with pytest.raises(AssertionError) as e:
Worker(topic='topic', consumer='bar')
assert str(e.value) == 'bad consumer instance'
with pytest.raises(AssertionError) as e:
bad_consumer = KafkaConsumer(bootstrap_servers=hosts)
Worker(topic='topic', consumer=bad_consumer)
assert str(e.value) == 'consumer must have group_id'
with pytest.raises(AssertionError) as e:
Worker(topic='topic', consumer=consumer, callback=1)
assert str(e.value) == 'callback must be a callable'
with pytest.raises(AssertionError) as e:
Worker(topic='topic', consumer=consumer, deserializer=1)
assert str(e.value) == 'deserializer must be a callable'
with pytest.raises(AssertionError) as e:
Worker(topic='topic', consumer=consumer, logger=1)
assert str(e.value) == 'bad logger instance'
#Example 13
#Project: Ad-Insertion-Sample Author: OpenVisualCloud File: messaging.py License: BSD 3-Clause "New" or "Revised" License
def debug(self, topic):
c=KafkaConsumer(bootstrap_servers=kafka_hosts, client_id=self._client_id , group_id=None, api_version=(0,10))
# assign/subscribe topic
partitions=c.partitions_for_topic(topic)
if not partitions: raise Exception("Topic "+topic+" not exist")
c.assign([TopicPartition(topic,p) for p in partitions])
# seek to beginning if needed
c.seek_to_beginning()
# fetch messages
while True:
partitions=c.poll(100)
if partitions:
for p in partitions:
for msg in partitions[p]:
yield msg.value.decode('utf-8')
yield ""
c.close()
#Example 14
#Project: karapace Author: aiven File: consumer_manager.py License: Apache License 2.0
def create_kafka_consumer(self, fetch_min_bytes, group_name, internal_name, request_data):
while True:
try:
c = KafkaConsumer(
bootstrap_servers=self.config["bootstrap_uri"],
client_id=internal_name,
security_protocol=self.config["security_protocol"],
ssl_cafile=self.config["ssl_cafile"],
ssl_certfile=self.config["ssl_certfile"],
ssl_keyfile=self.config["ssl_keyfile"],
group_id=group_name,
fetch_min_bytes=fetch_min_bytes,
fetch_max_bytes=self.config["consumer_request_max_bytes"],
request_timeout_ms=request_data["consumer.request.timeout.ms"],
enable_auto_commit=request_data["auto.commit.enable"],
auto_offset_reset=request_data["auto.offset.reset"]
)
return c
except: # pylint: disable=bare-except
self.log.exception("Unable to create consumer, retrying")
await asyncio.sleep(1)
#Example 15
#Project: InsightAgent Author: insightfinder File: getmessages_kafka2.py License: Apache License 2.0
def start_data_processing(thread_number):
# open consumer
consumer = KafkaConsumer(**agent_config_vars['kafka_kwargs'])
logger.info('Started consumer number ' + str(thread_number))
# subscribe to given topics
consumer.subscribe(agent_config_vars['topics'])
logger.info('Successfully subscribed to topics' + str(agent_config_vars['topics']))
# start consuming messages
parse_messages_kafka(consumer)
consumer.close()
logger.info('Closed consumer number ' + str(thread_number))
#Example 16
#Project: dino Author: thenetcircle File: kafka_to_rabbitmq.py License: Apache License 2.0
def run(self) -> None:
self.create_loggers()
logger.info('sleeping for 3 second before consuming')
time.sleep(3)
kafka_conf = self.conf.get(ConfigKeys.EXTERNAL_QUEUE)
bootstrap_servers = kafka_conf.get(ConfigKeys.HOST)
logger.info('bootstrapping from servers: %s' % (str(bootstrap_servers)))
topic_name = kafka_conf.get(ConfigKeys.QUEUE)
logger.info('consuming from topic {}'.format(topic_name))
group_id = 'dino-kafka-to-rabbitmq'
logger.info('using Group ID {}'.format(group_id))
self.consumer = KafkaConsumer(
topic_name,
group_id=group_id,
bootstrap_servers=bootstrap_servers,
enable_auto_commit=True,
connections_max_idle_ms=180 * ONE_MINUTE, # default: 9min
max_poll_interval_ms=10 * ONE_MINUTE, # default: 5min
session_timeout_ms=ONE_MINUTE, # default: 10s
max_poll_records=10 # default: 500
)
while True:
try:
self.try_to_read()
except InterruptedError:
logger.info('got interrupted, shutting down...')
break
except Exception as e:
logger.error('could not read from kafka: {}'.format(str(e)))
logger.exception(e)
time.sleep(1)
#Example 17
#Project: scrapy-cluster Author: istresearch File: online.py License: MIT License
def setUp(self):
self.settings = get_project_settings()
self.settings.set('KAFKA_TOPIC_PREFIX', "demo_test")
# set up redis
self.redis_conn = redis.Redis(host=self.settings['REDIS_HOST'],
port=self.settings['REDIS_PORT'],
db=self.settings['REDIS_DB'])
try:
self.redis_conn.info()
except ConnectionError:
print("Could not connect to Redis")
# plugin is essential to functionality
sys.exit(1)
# clear out older test keys if any
keys = self.redis_conn.keys("test-spider:*")
for key in keys:
self.redis_conn.delete(key)
# set up kafka to consumer potential result
self.consumer = KafkaConsumer(
"demo_test.crawled_firehose",
bootstrap_servers=self.settings['KAFKA_HOSTS'],
group_id="demo-id",
auto_commit_interval_ms=10,
consumer_timeout_ms=5000,
auto_offset_reset='earliest'
)
time.sleep(1)
#Example 18
#Project: scrapy-cluster Author: istresearch File: online.py License: MIT License
def setUp(self):
self.redis_monitor = RedisMonitor("localsettings.py")
self.redis_monitor.settings = self.redis_monitor.wrapper.load("localsettings.py")
self.redis_monitor.logger = MagicMock()
self.redis_monitor.settings['KAFKA_TOPIC_PREFIX'] = "demo_test"
self.redis_monitor.settings['STATS_TOTAL'] = False
self.redis_monitor.settings['STATS_PLUGINS'] = False
self.redis_monitor.settings['PLUGINS'] = {
'plugins.info_monitor.InfoMonitor': None,
'plugins.stop_monitor.StopMonitor': None,
'plugins.expire_monitor.ExpireMonitor': None,
'tests.online.CustomMonitor': 100,
}
self.redis_monitor.redis_conn = redis.Redis(
host=self.redis_monitor.settings['REDIS_HOST'],
port=self.redis_monitor.settings['REDIS_PORT'],
db=self.redis_monitor.settings['REDIS_DB'])
self.redis_monitor._load_plugins()
self.redis_monitor.stats_dict = {}
self.consumer = KafkaConsumer(
"demo_test.outbound_firehose",
bootstrap_servers=self.redis_monitor.settings['KAFKA_HOSTS'],
group_id="demo-id",
auto_commit_interval_ms=10,
consumer_timeout_ms=5000,
auto_offset_reset='earliest'
)
sleep(1)
#Example 19
#Project: sniffer Author: threathunterX File: kafkadriver.py License: Apache License 2.0
def start(self):
self.consumer = KafkaConsumer(self.topics,**self.config)
self.bg_task = run_in_thread(self.bg_processing)
#Example 20
#Project: sniffer Author: threathunterX File: sniffer_nebula_test.py License: Apache License 2.0
def __init__(self, bootstrap_servers, kafkatopic):
self.kafkatopic = kafkatopic
self.consumer = KafkaConsumer(self.kafkatopic, bootstrap_servers=bootstrap_servers)
#Example 21
#Project: search-MjoLniR Author: wikimedia File: client.py License: MIT License
def offsets_for_times(consumer, partitions, timestamp):
"""Augment KafkaConsumer.offsets_for_times to not return None
Parameters
----------
consumer : kafka.KafkaConsumer
This consumer must only be used for collecting metadata, and not
consuming. API's will be used that invalidate consuming.
partitions : list of kafka.TopicPartition
timestamp : number
Timestamp, in seconds since unix epoch, to return offsets for.
Returns
-------
dict from kafka.TopicPartition to integer offset
"""
# Kafka uses millisecond timestamps
timestamp_ms = int(timestamp * 1000)
response = consumer.offsets_for_times({p: timestamp_ms for p in partitions})
offsets = {}
for tp, offset_and_timestamp in response.items():
if offset_and_timestamp is None:
# No messages exist after timestamp. Fetch latest offset.
consumer.assign([tp])
consumer.seek_to_end(tp)
offsets[tp] = consumer.position(tp)
else:
offsets[tp] = offset_and_timestamp.offset
return offsets
#Example 22
#Project: search-MjoLniR Author: wikimedia File: client.py License: MIT License
def offset_range_for_timestamp_range(brokers, start, end, topic):
"""Determine OffsetRange for a given timestamp range
Parameters
----------
client_config : ClientConfig
start : number
Unix timestamp in seconds
end : number
Unix timestamp in seconds
topic : str
Topic to fetch offsets for
Returns
-------
list of OffsetRange or None
Per-partition ranges of offsets to read
"""
consumer = kafka.KafkaConsumer(bootstrap_servers=brokers)
partitions = consumer.partitions_for_topic(topic)
if partitions is None:
# Topic does not exist.
return None
partitions = [kafka.TopicPartition(topic, p) for p in partitions]
o_start = offsets_for_times(consumer, partitions, start)
o_end = offsets_for_times(consumer, partitions, end)
return [OffsetRange(tp, o_start[tp], o_end[tp]) for tp in partitions]
#Example 23
#Project: distributed_framework Author: ydf0509 File: kafka_consumer.py License: Apache License 2.0
def _shedual_task(self):
self._producer = KafkaProducer(bootstrap_servers=frame_config.KAFKA_BOOTSTRAP_SERVERS)
consumer = OfficialKafkaConsumer(self._queue_name, bootstrap_servers=frame_config.KAFKA_BOOTSTRAP_SERVERS,
group_id='frame_group', enable_auto_commit=True)
# REMIND 由于是很高数量的并发消费,线程很多,分区很少,这里设置成自动确认消费了,否则多线程提交同一个分区的偏移量导致超前错乱,就没有意义了。
# REMIND 要保证很高的可靠性和一致性,请用rabbitmq。
# REMIND 好处是并发高。topic像翻书一样,随时可以设置偏移量重新消费。多个分组消费同一个主题,每个分组对相同主题的偏移量互不干扰。
for message in consumer:
# 注意: message ,value都是原始的字节数据,需要decode
self.logger.debug(
f'从kafka的 [{message.topic}] 主题,分区 {message.partition} 中 取出的消息是: {message.value.decode()}')
kw = {'consumer': consumer, 'message': message, 'body': json.loads(message.value)}
self._submit_task(kw)
#Example 24
#Project: open-nti Author: Juniper File: open_nti_input_syslog_lib.py License: Apache License 2.0
def check_kafka_is_running():
# Verify we can connect to Kafka
time.sleep(2)
consumer = KafkaConsumer(bootstrap_servers=get_external_ip()+':'+str(KAFKA_BROKER_PORT),
auto_offset_reset='earliest')
mytopic = consumer.topics()
return 1
#Example 25
#Project: rafiki Author: nginyc File: inference_cache.py License: Apache License 2.0
def take_prediction_for_worker(self, worker_id: str, query_id: str) -> Union[Prediction, None]:
name = f'workers_{worker_id}_{query_id}_prediction'
prediction_consumer = KafkaConsumer(name, bootstrap_servers=self.connection_url, auto_offset_reset='earliest', group_id=PREDICTIONS_QUEUE)
prediction = None
try:
prediction = next(prediction_consumer).value
prediction_consumer.commit()
prediction = pickle.loads(prediction)
except KafkaError:
pass
prediction_consumer.close()
logger.info(f'Took prediction for query "{query_id}" from worker "{worker_id}"')
return prediction
#Example 26
#Project: rafiki Author: nginyc File: inference_cache.py License: Apache License 2.0
def pop_queries_for_worker(self, worker_id: str, batch_size: int) -> List[Query]:
name = f'workers_{worker_id}_queries'
query_consumer = KafkaConsumer(name, bootstrap_servers=self.connection_url, auto_offset_reset='earliest', group_id=QUERIES_QUEUE)
partition = TopicPartition(name, 0)
partitiondic = query_consumer.end_offsets([partition])
offsetend = partitiondic.get(partition, None)
if offsetend == 0:
query_consumer.close()
return []
try:
queries = []
while True:
record = next(query_consumer)
queries.append(record.value)
query_consumer.commit()
if record.offset >= offsetend-1 or len(queries) == batch_size:
break
queries = [pickle.loads(x) for x in queries]
query_consumer.close()
return queries
except KafkaError:
query_consumer.close()
return []
#Example 27
#Project: fooltrader Author: foolcage File: kafka_connector.py License: MIT License
def list_topics():
try:
consumer = KafkaConsumer(bootstrap_servers=[KAFKA_HOST])
return consumer.topics()
finally:
consumer.close()
#Example 28
#Project: fooltrader Author: foolcage File: kafka_utils.py License: MIT License
def get_latest_timestamp_order_from_topic(topic):
consumer = KafkaConsumer(topic,
# client_id='fooltrader',
# group_id='fooltrader',
value_deserializer=lambda m: json.loads(m.decode('utf8')),
bootstrap_servers=[KAFKA_HOST])
topic_partition = TopicPartition(topic=topic, partition=0)
end_offset = consumer.end_offsets([topic_partition])[topic_partition]
if end_offset > 0:
# partition assigned after poll, and we could seek
consumer.poll(5, 1)
consumer.seek(topic_partition, end_offset - 1)
message = consumer.poll(10000, 500)
msgs = message[topic_partition]
if len(msgs) > 0:
record = msgs[-1]
timestamp = to_timestamp(record.value['timestamp'])
order = None
if 'order' in record.value:
order = record.value['order']
return timestamp, order
return None, None
#Example 29
#Project: scrapy-kafka-export Author: TeamHG-Memex File: writer.py License: MIT License
def _get_consumer(self, **kwargs):
_kwargs = self.ssl_config.copy()
_kwargs.update(bootstrap_servers=self.bootstrap_servers)
_kwargs.update(kwargs)
return KafkaConsumer(**_kwargs)
#Example 30
#Project: ozymandias Author: pambot File: ozy_app.py License: MIT License
def video_generator(topic):
"""Video streaming generator function."""
consumer = KafkaConsumer('flask',
bootstrap_servers='localhost:9092',
auto_offset_reset='latest',
fetch_max_bytes=15728640,
max_partition_fetch_bytes=15728640,
group_id=topic)
for msg in consumer:
if msg.key == topic:
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + msg.value + b'\r\n')
time.sleep(0.1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment