Skip to content

Instantly share code, notes, and snippets.

@Dinduks
Created October 13, 2016 09:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Dinduks/d9fa67fc8a036d3cad8e859c508acdba to your computer and use it in GitHub Desktop.
Save Dinduks/d9fa67fc8a036d3cad8e859c508acdba to your computer and use it in GitHub Desktop.
Limit Kafka batches size with Spark Streaming
[info] Running com.sam4m.kafkafsconnector.App -o /tmp/k2d-tests/output --kafka-topic ad_validated_eu --kafka-bootstrap.servers localhost:9092 --kafka-lz4-compression=true --kafka-group.id 9014
[error] SLF4J: Class path contains multiple SLF4J bindings.
[error] SLF4J: Found binding in [jar:file:/home/sdindane/.ivy2/cache/org.slf4j/slf4j-log4j12/jars/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
[error] SLF4J: Found binding in [jar:file:/home/sdindane/.ivy2/cache/ch.qos.logback/logback-classic/jars/logback-classic-1.1.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
[error] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
[error] SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
[error] 16/10/13 11:16:22 INFO App: Starting the application with this configuration:
[error] Config(/tmp/k2d-tests/output,ad_validated_eu,localhost:9092,9014,true)
[error] 16/10/13 11:16:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[error] 16/10/13 11:16:22 WARN Utils: Your hostname, sdindane resolves to a loopback address: 127.0.0.1; using 192.168.63.173 instead (on interface wlp2s0)
[error] 16/10/13 11:16:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
[error] 16/10/13 11:16:23 WARN KafkaUtils: overriding enable.auto.commit to false for executor
[error] 16/10/13 11:16:23 WARN KafkaUtils: overriding auto.offset.reset to none for executor
[error] 16/10/13 11:16:23 WARN KafkaUtils: overriding executor group.id to spark-executor-9014
[error] 16/10/13 11:16:23 WARN KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135
[error] 16/10/13 11:16:23 INFO PIDRateEstimator: Created PIDRateEstimator with proportional = 1.0, integral = 0.2, derivative = 0.0, min rate = 100.0
[error] 16/10/13 11:16:24 INFO AppInfoParser: Kafka version : 0.10.0.0
[error] 16/10/13 11:16:24 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13
[error] 16/10/13 11:16:24 INFO AbstractCoordinator: Discovered coordinator localhost:9092 (id: 2147483645 rack: null) for group 9014.
[error] 16/10/13 11:16:24 INFO ConsumerCoordinator: Revoking previously assigned partitions [] for group 9014
[error] 16/10/13 11:16:24 INFO AbstractCoordinator: (Re-)joining group 9014
[error] 16/10/13 11:16:24 INFO AbstractCoordinator: Successfully joined group 9014 with generation 1
[error] 16/10/13 11:16:24 INFO ConsumerCoordinator: Setting newly assigned partitions [ad_validated_eu-0, ad_validated_eu-2, ad_validated_eu-1] for group 9014
[error] 16/10/13 11:16:25 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
[error]
[error] [Stage 0:> (0 + 0) / 48]
[error] [Stage 0:=========> (8 + 8) / 48]
[error] [Stage 0:==============> (12 + 8) / 48]
[error] [Stage 0:====================> (17 + 8) / 48]
[error] [Stage 0:=========================> (22 + 10) / 48]
[error] [Stage 0:===================================> (30 + 9) / 48]
[error] [Stage 0:===================================================> (43 + 5) / 48]
[error]
[error] 16/10/13 11:16:30 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
[error] 16/10/13 11:16:30 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
[error] 16/10/13 11:16:30 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
[error] 16/10/13 11:16:30 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
[error] 16/10/13 11:16:30 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
[error] 16/10/13 11:16:30 INFO deprecation: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress
[error] 16/10/13 11:16:31 INFO deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
[error] 16/10/13 11:16:31 INFO deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class
[error] 16/10/13 11:16:31 INFO deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class
[error] 16/10/13 11:16:31 INFO AppInfoParser: Kafka version : 0.10.0.0
[error] 16/10/13 11:16:31 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13
[error] 16/10/13 11:16:31 INFO AppInfoParser: Kafka version : 0.10.0.0
[error] 16/10/13 11:16:31 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13
[error] 16/10/13 11:16:31 INFO AppInfoParser: Kafka version : 0.10.0.0
[error] 16/10/13 11:16:31 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13
[error] 16/10/13 11:16:31 INFO AbstractCoordinator: Discovered coordinator localhost:9092 (id: 2147483645 rack: null) for group spark-executor-9014.
[error] 16/10/13 11:16:31 INFO AbstractCoordinator: Discovered coordinator localhost:9092 (id: 2147483645 rack: null) for group spark-executor-9014.
[error] 16/10/13 11:16:31 INFO AbstractCoordinator: Discovered coordinator localhost:9092 (id: 2147483645 rack: null) for group spark-executor-9014.
[error]
[error] [Stage 1:> (0 + 3) / 3]16/10/13 11:16:54 INFO FileOutputCommitter: Saved output of task 'attempt_201610131116_0001_m_000001_0' to file:/tmp/k2d-tests/output/_temporary/0/task_201610131116_0001_m_000001
[error] 16/10/13 11:16:54 INFO FileOutputCommitter: Saved output of task 'attempt_201610131116_0001_m_000002_0' to file:/tmp/k2d-tests/output/_temporary/0/task_201610131116_0001_m_000002
[error]
[error] [Stage 1:=======================================> (2 + 1) / 3]16/10/13 11:16:55 INFO FileOutputCommitter: Saved output of task 'attempt_201610131116_0001_m_000000_0' to file:/tmp/k2d-tests/output/_temporary/0/task_201610131116_0001_m_000000
[error]
[error]
[error]
[error] [Stage 2:====================================> (31 + 8) / 48]
[error] [Stage 2:================================================> (41 + 7) / 48]
[error]
[error] 16/10/13 11:16:57 TRACE PIDRateEstimator:
[error] time = 1476350217586, # records = 300000, processing time = 32530, scheduling delay = 15
[error] 16/10/13 11:16:57 TRACE PIDRateEstimator:
[error] latestRate = -1.0, error = -9223.256378727328
[error] latestError = -1.0, historicalError = 138.33384568090992
[error] delaySinceUpdate = 1.476350217587E9, dError = -6.2466590033091995E-6
[error]
[error] 16/10/13 11:16:57 TRACE PIDRateEstimator: First run, rate estimation skipped
[error]
[error] [Stage 3:===========================> (23 + 9) / 48]
[error] [Stage 3:============================================> (38 + 10) / 48]
[error]
[error] 16/10/13 11:16:59 INFO App: Successfully committed offset 100000 for partition 0.
[error] 16/10/13 11:16:59 INFO App: Successfully committed offset 100000 for partition 2.
[error] 16/10/13 11:16:59 INFO App: Successfully committed offset 100000 for partition 1.
[error]
[error] [Stage 4:> (0 + 3) / 3]16/10/13 11:17:16 INFO FileOutputCommitter: Saved output of task 'attempt_201610131116_0004_m_000002_0' to file:/tmp/k2d-tests/output/_temporary/0/task_201610131116_0004_m_000002
[error] 16/10/13 11:17:16 INFO FileOutputCommitter: Saved output of task 'attempt_201610131116_0004_m_000000_0' to file:/tmp/k2d-tests/output/_temporary/0/task_201610131116_0004_m_000000
[error]
[error] [Stage 4:=======================================> (2 + 1) / 3]16/10/13 11:17:16 INFO FileOutputCommitter: Saved output of task 'attempt_201610131116_0004_m_000001_0' to file:/tmp/k2d-tests/output/_temporary/0/task_201610131116_0004_m_000001
[error]
[error]
[error]
[error] [Stage 5:=====================================================> (45 + 3) / 48]
[error]
[error] 16/10/13 11:17:18 TRACE PIDRateEstimator:
[error] time = 1476350238534, # records = 300000, processing time = 20946, scheduling delay = 32495
[error] 16/10/13 11:17:18 TRACE PIDRateEstimator:
[error] latestRate = 9222.256378727328, error = -5100.287305030906
[error] latestError = 0.0, historicalError = 465411.05700372386
[error] delaySinceUpdate = 20.948, dError = -243.4737113342995
[error]
[error] 16/10/13 11:17:18 TRACE PIDRateEstimator: New rate = 100.0
[error]
[error] [Stage 6:=====================================================> (45 + 3) / 48]
[error]
[error] 16/10/13 11:17:20 INFO App: Successfully committed offset 200000 for partition 0.
[error] 16/10/13 11:17:20 INFO App: Successfully committed offset 200000 for partition 2.
[error] 16/10/13 11:17:20 INFO App: Successfully committed offset 200000 for partition 1.
[error]
[error] [Stage 7:> (0 + 3) / 3]16/10/13 11:17:38 INFO FileOutputCommitter: Saved output of task 'attempt_201610131117_0007_m_000002_0' to file:/tmp/k2d-tests/output/_temporary/0/task_201610131117_0007_m_000002
[error] 16/10/13 11:17:38 INFO FileOutputCommitter: Saved output of task 'attempt_201610131117_0007_m_000000_0' to file:/tmp/k2d-tests/output/_temporary/0/task_201610131117_0007_m_000000
[error] 16/10/13 11:17:38 INFO FileOutputCommitter: Saved output of task 'attempt_201610131117_0007_m_000001_0' to file:/tmp/k2d-tests/output/_temporary/0/task_201610131117_0007_m_000001
[error]
[error]
[error]
[error] [Stage 8:================================================> (41 + 7) / 48]
[error]
[error] 16/10/13 11:17:40 TRACE PIDRateEstimator:
[error] time = 1476350260464, # records = 300000, processing time = 21930, scheduling delay = 52497
[error] 16/10/13 11:17:40 TRACE PIDRateEstimator:
[error] latestRate = 100.0, error = -13579.890560875514
[error] latestError = -5100.287305030906, historicalError = 718153.2147742818
[error] delaySinceUpdate = 21.93, dError = -386.6668151319931
[error]
[error] 16/10/13 11:17:40 TRACE PIDRateEstimator: New rate = 100.0
[error]
[error] [Stage 9:==========================================> (36 + 8) / 48]
[error]
[error] 16/10/13 11:17:42 INFO App: Successfully committed offset 300000 for partition 0.
[error] 16/10/13 11:17:42 INFO App: Successfully committed offset 300000 for partition 2.
[error] 16/10/13 11:17:42 INFO App: Successfully committed offset 300000 for partition 1.
[error]
[error] [Stage 10:> (0 + 3) / 3]16/10/13 11:18:03 INFO FileOutputCommitter: Saved output of task 'attempt_201610131117_0010_m_000002_0' to file:/tmp/k2d-tests/output/_temporary/0/task_201610131117_0010_m_000002
[error]
[error] [Stage 10:===================> (1 + 2) / 3]16/10/13 11:18:04 INFO FileOutputCommitter: Saved output of task 'attempt_201610131117_0010_m_000001_0' to file:/tmp/k2d-tests/output/_temporary/0/task_201610131117_0010_m_000001
[error]
[error] [Stage 10:======================================> (2 + 1) / 3]16/10/13 11:18:04 INFO FileOutputCommitter: Saved output of task 'attempt_201610131117_0010_m_000000_0' to file:/tmp/k2d-tests/output/_temporary/0/task_201610131117_0010_m_000000
[error]
[error]
[error]
[error] [Stage 11:================================> (28 + 8) / 48]
[error] [Stage 11:=====================================================> (46 + 2) / 48]
[error]
[error] 16/10/13 11:18:06 TRACE PIDRateEstimator:
[error] time = 1476350286597, # records = 300000, processing time = 26133, scheduling delay = 73420
[error] 16/10/13 11:18:06 TRACE PIDRateEstimator:
[error] latestRate = 100.0, error = -11379.738261967628
[error] latestError = -13579.890560875514, historicalError = 842842.3831936632
[error] delaySinceUpdate = 26.133, dError = 84.19057509309631
[error]
[error] 16/10/13 11:18:06 TRACE PIDRateEstimator: New rate = 100.0
[error]
[error] [Stage 12:============================> (24 + 9) / 48]
[error] [Stage 12:====================================================> (45 + 3) / 48]
[error]
[error] 16/10/13 11:18:08 INFO App: Successfully committed offset 400000 for partition 0.
[error] 16/10/13 11:18:08 INFO App: Successfully committed offset 400000 for partition 2.
[error] 16/10/13 11:18:08 INFO App: Successfully committed offset 400000 for partition 1.
[error]
[error] [Stage 13:> (0 + 3) / 3]16/10/13 11:18:30 INFO FileOutputCommitter: Saved output of task 'attempt_201610131118_0013_m_000001_0' to file:/tmp/k2d-tests/output/_temporary/0/task_201610131118_0013_m_000001
[error]
[error] [Stage 13:===================> (1 + 2) / 3]16/10/13 11:18:31 INFO FileOutputCommitter: Saved output of task 'attempt_201610131118_0013_m_000000_0' to file:/tmp/k2d-tests/output/_temporary/0/task_201610131118_0013_m_000000
[error]
[error] [Stage 13:======================================> (2 + 1) / 3]16/10/13 11:18:31 INFO FileOutputCommitter: Saved output of task 'attempt_201610131118_0013_m_000002_0' to file:/tmp/k2d-tests/output/_temporary/0/task_201610131118_0013_m_000002
[error]
[error]
[error]
[error] [Stage 14:===============================> (27 + 8) / 48]
[error] [Stage 14:==============================================> (40 + 8) / 48]
[error]
[error] 16/10/13 11:18:33 TRACE PIDRateEstimator:
[error] time = 1476350313538, # records = 300000, processing time = 26941, scheduling delay = 98558
[error] 16/10/13 11:18:33 TRACE PIDRateEstimator:
[error] latestRate = 100.0, error = -11035.444118629599
[error] latestError = -11379.738261967628, historicalError = 1097487.101443896
[error] delaySinceUpdate = 26.941, dError = 12.779560645040245
[error]
[error] 16/10/13 11:18:33 TRACE PIDRateEstimator: New rate = 100.0
[error]
[error] [Stage 15:==================================================> (43 + 5) / 48]
[error]
[error] 16/10/13 11:18:35 INFO App: Successfully committed offset 500000 for partition 0.
[error] 16/10/13 11:18:35 INFO App: Successfully committed offset 500000 for partition 2.
[error] 16/10/13 11:18:35 INFO App: Successfully committed offset 500000 for partition 1.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment