Skip to content

Instantly share code, notes, and snippets.

@gom
Last active April 12, 2018 03:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gom/8c84ac7583c137a60d0aefb1554bb65f to your computer and use it in GitHub Desktop.
Save gom/8c84ac7583c137a60d0aefb1554bb65f to your computer and use it in GitHub Desktop.
Flink PrometheusReporter ingenerates "Invalid metric name" errors.
2018-04-04 15:30:58,215 WARN org.apache.flink.runtime.metrics.MetricRegistryImpl - Error while registering metric.
java.lang.IllegalArgumentException: Invalid metric name: flink_jobmanager.Status.JVM.CPU_Time
at org.apache.flink.shaded.io.prometheus.client.Collector.checkMetricName(Collector.java:171)
at org.apache.flink.shaded.io.prometheus.client.SimpleCollector.<init>(SimpleCollector.java:163)
at org.apache.flink.shaded.io.prometheus.client.Gauge.<init>(Gauge.java:68)
at org.apache.flink.shaded.io.prometheus.client.Gauge$Builder.create(Gauge.java:74)
at org.apache.flink.metrics.prometheus.PrometheusReporter.createCollector(PrometheusReporter.java:170)
at org.apache.flink.metrics.prometheus.PrometheusReporter.notifyOfAddedMetric(PrometheusReporter.java:146)
at org.apache.flink.runtime.metrics.MetricRegistryImpl.register(MetricRegistryImpl.java:319)
at org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:370)
at org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.gauge(AbstractMetricGroup.java:314)
at org.apache.flink.runtime.metrics.util.MetricUtils.instantiateCPUMetrics(MetricUtils.java:260)
at org.apache.flink.runtime.metrics.util.MetricUtils.instantiateStatusMetrics(MetricUtils.java:102)
at org.apache.flink.runtime.metrics.util.MetricUtils.instantiateJobManagerMetricGroup(MetricUtils.java:68)
at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2515)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2612)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2557)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2210)
at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3$1(JobManager.scala:2021)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2020)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply$mcV$sp(JobManager.scala:2115)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2093)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2093)
at scala.util.Try$.apply(Try.scala:192)
diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java b/flink-metrics/flink-metrics-prometheus/s
index 48fd8a42b5..6f0ef3792b 100644
--- a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
+++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
@@ -128,7 +128,7 @@ public class PrometheusReporter implements MetricReporter {
for (final Map.Entry<String, String> dimension : group.getAllVariables().entrySet()) {
final String key = dimension.getKey();
dimensionKeys.add(CHARACTER_FILTER.filterCharacters(key.substring(1, key.length() - 1)));
- dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
+ dimensionValues.add(dimension.getValue());
}
final String scopedMetricName = getScopedName(metricName, group);
@@ -156,7 +156,7 @@ public class PrometheusReporter implements MetricReporter {
}
private static String getScopedName(String metricName, MetricGroup group) {
- return SCOPE_PREFIX + getLogicalScope(group) + SCOPE_SEPARATOR + CHARACTER_FILTER.filterCharacters(metricName);
+ return SCOPE_PREFIX + CHARACTER_FILTER.filterCharacters(getLogicalScope(group)) + SCOPE_SEPARATOR + CHARACTER_FILTER.filterCharacters(metricName);
}
private static Collector createCollector(Metric metric, List<String> dimensionKeys, List<String> dimensionValues, String scopedMetricName, String helpString) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/g
index 6d9c7d95a3..bd25f98890 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -90,9 +90,9 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
* For example: "host-7.taskmanager-2.window_word_count.my-mapper" */
private final String[] scopeStrings;
- /** The logical metrics scope represented by this group, as a concatenated string, lazily computed.
+ /** The map containing the logical metrics scope represented by this group, as a concatenated string, lazily computed.
* For example: "taskmanager.job.task" */
- private String logicalScopeString;
+ private final Map<Character, String> logicalScopeStrings = new HashMap<>();
/** The metrics query service scope represented by this group, lazily computed. */
protected QueryScopeInfo queryServiceScopeInfo;
@@ -151,14 +151,14 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
* @return logical scope
*/
public String getLogicalScope(CharacterFilter filter, char delimiter) {
- if (logicalScopeString == null) {
+ if (logicalScopeStrings.size() == 0 || !logicalScopeStrings.containsKey(delimiter)) {
if (parent == null) {
- logicalScopeString = getGroupName(filter);
+ logicalScopeStrings.put(delimiter, getGroupName(filter));
} else {
- logicalScopeString = parent.getLogicalScope(filter, delimiter) + delimiter + getGroupName(filter);
+ logicalScopeStrings.put(delimiter, parent.getLogicalScope(filter, delimiter) + delimiter + getGroupName(filter));
}
}
- return logicalScopeString;
+ return logicalScopeStrings.get(delimiter);
}
/**
@gom
Copy link
Author

gom commented Apr 4, 2018

Works fine

flink_jobmanager_Status_JVM_Memory_NonHeap_Used{host="HOSTNAME",} 6.4014688E7

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment