-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][monitor]Add prometheusRawMetricsProvider support #17531
base: master
Are you sure you want to change the base?
[improve][monitor]Add prometheusRawMetricsProvider support #17531
Conversation
d1838c7
to
70444ec
Compare
labels = Collections.singletonMap(CLUSTER_NAME, conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME)); | ||
|
||
executor.scheduleAtFixedRate(() -> { | ||
rotateLatencyCollection(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed catchingAndLoggingThrowables
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add it back.
private ScheduledExecutorService executor; | ||
|
||
public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = "prometheusStatsLatencyRolloverSeconds"; | ||
public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 60; | ||
public static final String CLUSTER_NAME = "cluster"; | ||
public static final String DEFAULT_CLUSTER_NAME = "pulsar"; | ||
|
||
private String cluster; | ||
private final CachingStatsProvider cachingStatsProvider; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hangc0276 Previously there was a caching layer called cachingStatsProvider which saved in a map every StatsLogger created via getStatsLogger. If it was created previously it would return it.
Now it is removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add the cachingStatsProvider back with a small change.
opStats.forEach((name, opStatLogger) -> PrometheusTextFormatUtil.writeOpStat(writer, name, cluster, | ||
opStatLogger)); | ||
PrometheusTextFormat prometheusTextFormat = new PrometheusTextFormat(); | ||
PrometheusTextFormat.writeMetricsCollectedByPrometheusClient(writer, registry); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get this line. Why would we want to emit all static default Prometheus collector into the writer, where we already have that in PulsarMetricsGenerator
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed it.
counters.forEach((name, counter) -> PrometheusTextFormatUtil.writeCounter(writer, name, cluster, counter)); | ||
opStats.forEach((name, opStatLogger) -> PrometheusTextFormatUtil.writeOpStat(writer, name, cluster, | ||
opStatLogger)); | ||
PrometheusTextFormat prometheusTextFormat = new PrometheusTextFormat(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
writeAllMetrics
is currently called from PrometheusMetricsGenerator
:
generateManagedLedgerBookieClientMetrics(pulsar, stream);
After that call we have support for PrometheusRawMetricsProvider
if (metricsProviders != null) {
for (PrometheusRawMetricsProvider metricsProvider : metricsProviders) {
metricsProvider.generate(stream);
}
}
I haven't seen any registration of PrometheusMetricsProvider
as PrometheusRawMetricsProvider
other than the test. If you do register it, you will have duplicates.
Since writing directly to SimpleTextOutputFormat
is more correct, I advise the following:
writeAllMetrics()
should have to throw an exception and have no implementation since it shouldn't be called at all.- Fix the implementation of
generateManagedLedgerBookieClientMetrics
to call a method that usesSimpleTextOutputFormat
and lose the declaration of implementation ofPrometheusRawMetricsProvider
, it can just be a public method.
The only caveat I see is this: Do you see any chance that we will have two different pairs of (scopeContext
, gauge/metric/...) such that they have the same metric name, maybe the only difference is in the labels?
If you ditch that Writer based method, you can get rid of PrometheusTextFormat.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, updated the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reiterate question: The only caveat I see is this: Do you see any chance that we will have two different pairs of (scopeContext, gauge/metric/...) such that they have the same metric name, maybe the only difference is in the labels?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, there are many cases. Such as a metric name is publishrLatency
, and the label with a different topic name. It represents different metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add CachingStatsLogger
to cache the same scopeLabel instances to avoid creating too many instances with the same scope and label name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, there are many cases. Such as a metric name is publishrLatency, and the label with a different topic name. It represents different metrics.
Well @hangc0276 in that case you must use PrometheusMetricsStreams
- this class guarantees the output will be grouped by metric name as Prometheus Text format dictates
while((str = reader.readLine()) != null){ | ||
sb.append(str); | ||
} | ||
Assert.assertTrue(sb.toString().contains("test_metrics")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't test the full functionality and I think it may be duplicate when calling and parsing.
.append(success.toString()).append("\", quantile=\"") | ||
.append(Double.toString(quantile)).append("\"} ") | ||
.append(Double.toString(opStat.getQuantileValue(success, quantile))).append('\n'); | ||
public static void writeMetricsCollectedByPrometheusClient(SimpleTextOutputStream w, CollectorRegistry registry) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this? Is this called from anywhere once we remove the call talked in the earlier comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it.
rawMetricsProvider.getStatsLogger("test").getOpStatsLogger("test_metrics") | ||
.registerSuccessfulEvent(100, TimeUnit.NANOSECONDS); | ||
|
||
getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you already have integration built it, and you go about querying the real metrics endpoint, why not check that you have a real BK client metric with the labels you expect to have but now you actually have them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR is not mainly developed for the BK client metrics, and it is used for the plugin metrics integrated with Pulsar broker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have enough knowledge on
is used for the plugin metrics integrated with Pulsar broker.
Can you please elaborate and give references?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I read your code below.
I'm still amazed after spending weeks reading all the metrics code, I'm still discovering new bits.
I was wondering what was that weird PrometheusRawMetricsProvider
nobody was actually using inside Pulsar.
So you're saying it's in fact a public API for other plugin developers to rely on that? Can you please give me a reference for such one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is a public API for other plugins which run within the Pulsar broker to expose metrics to the Pulsar broker metric port. A public plugin repo is KOP. https://github.com/streamnative/kop/blob/a9f56e9b0435429dd8d977ba948c6772e6fe5b86/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java#L280.
I'm developing another plugin, which also needs this public API to expose metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hangc0276 Ok, from my understanding, and correct me if I'm wrong here: This PR fixes the BK Metrics API implementation in such a way that it will support labels. But it doesn't fix another issue here: The ability to use PrometheusMetricsProvider
by plugins.
So today, and in your PR, we only print the metrics registered to PrometheusMetricsProvider
instantiated in ManagedLedgerFactory
- which is good as we now get the BK client metrics with labels.
But, I believe the correct solution should be to instantiate PrometheusMetricsProvider
in PulsarService
, and pass it along to ManagedLedgerFactory
and also available to retrieve by any plugin if needed.
The way I see it, plugins should use:
- Prometheus Client library - preferred.
- BK Metrics API implementation - not the best-preferred way.
- Raw metrics provider, if all other means fail.
If we're already doing this, at least let's do this right. In your case, you know you want to use thePrometheusMetricsProvider
maybe we should just expose it and use it properly.
HttpClient httpClient = HttpClientBuilder.create().build(); | ||
final String metricsEndPoint = getPulsar().getWebServiceAddress() "/metrics"; | ||
HttpResponse response = httpClient.execute(new HttpGet(metricsEndPoint)); | ||
InputStream inputStream = response.getEntity().getContent(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use EntityUtils.toString
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
while((str = reader.readLine()) != null){ | ||
sb.append(str); | ||
} | ||
Assert.assertTrue(sb.toString().contains("test_metrics")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe parseMetrics()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will use another PR to add checks for the metrics string format.
Done @hangc0276 |
For future readers of this PR: Why is this PR needed? This PR aims to fix and add support for the labels given to it via the API methods. |
70444ec
to
2c10276
Compare
@asafm Thanks for your explanation. Another purpose of this Pr is to support plugin metrics integrate with Pulsar broker's metrics through the interface. pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java Lines 904 to 914 in 63d4cf2
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java Lines 1673 to 1682 in 63d4cf2
|
@asafm Thanks for your review, I addressed all the comments, please help take a look again, thanks a lot. |
How do you provide such support in your PR - that I haven't figured out yet |
BTW: Force push is not good - I lose all context - I can't see only the changes you've added |
} | ||
|
||
@Override | ||
public StatsLogger getStatsLogger(String scope) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we have labels, should String
-> ScopeContext
? since sope is just the metric prefix, and in theory you can have same metric prefix, different labels
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
*/ | ||
public class PrometheusMetricsProvider implements StatsProvider { | ||
public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMetricsProvider { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a reason to implement PrometheusRawMetricsProvider
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refer to #17531 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on my reply to the comment, I think it's redundant. We should have a single PrometheusMetricsProvider
in PulsarService
, and its metrics should be written explicitly in PrometheusMetricsGenerator
.write(Double.toString(opStat.getSum(success))).write('\n'); | ||
} | ||
|
||
public static void writeMetricsCollectedByPrometheusClient(SimpleTextOutputStream w, CollectorRegistry registry) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need this method?
final String metricsEndPoint = getPulsar().getWebServiceAddress() "/metrics"; | ||
HttpResponse response = httpClient.execute(new HttpGet(metricsEndPoint)); | ||
Multimap<String, Metric> metrics = parseMetrics(EntityUtils.toString(response.getEntity())); | ||
if (((List<Metric>) metrics.get("test_raw_writeLatency_count")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's extract to variable (List<Metric>) metrics.get("test_raw_writeLatency_count")) .get(0)
to make it easier to read the next lines
.getOpStatsLogger("writeLatency") | ||
.registerSuccessfulEvent(100, TimeUnit.NANOSECONDS); | ||
|
||
getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you doing that test?
The whole feature is about adding support of labels to BK client metrics. You changed PrometheusMetricsProvider
to do that.
You changed the integration point of PrometheusMetricsProvider
to PrometheusMetricsGenerator
I don't see any relation to PrometheusRawMetricsProvider
.
I reiterate the last comment - you should test how suddenly BK client metrics have labels unless I'm missing something big here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The BK client metrics test is testManagedLedgerBookieClientStats
and I updated the test to adjust for this change.
pulsar/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
Lines 982 to 1038 in 90c554d
public void testManagedLedgerBookieClientStats() throws Exception { | |
@Cleanup | |
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create(); | |
@Cleanup | |
Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create(); | |
for (int i = 0; i < 10; i ) { | |
String message = "my-message-" i; | |
p1.send(message.getBytes()); | |
p2.send(message.getBytes()); | |
} | |
ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); | |
PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut); | |
String metricsStr = statsOut.toString(); | |
Multimap<String, Metric> metrics = parseMetrics(metricsStr); | |
metrics.entries().forEach(e -> | |
System.out.println(e.getKey() ": " e.getValue()) | |
); | |
List<Metric> cm = (List<Metric>) metrics.get(keyNameBySubstrings(metrics, | |
"pulsar_managedLedger_client", "bookkeeper_ml_scheduler_completed_tasks")); | |
assertEquals(cm.size(), 1); | |
assertEquals(cm.get(0).tags.get("cluster"), "test"); | |
cm = (List<Metric>) metrics.get( | |
keyNameBySubstrings(metrics, | |
"pulsar_managedLedger_client", "bookkeeper_ml_scheduler_queue")); | |
assertEquals(cm.size(), 1); | |
assertEquals(cm.get(0).tags.get("cluster"), "test"); | |
cm = (List<Metric>) metrics.get( | |
keyNameBySubstrings(metrics, | |
"pulsar_managedLedger_client", "bookkeeper_ml_scheduler_total_tasks")); | |
assertEquals(cm.size(), 1); | |
assertEquals(cm.get(0).tags.get("cluster"), "test"); | |
cm = (List<Metric>) metrics.get( | |
keyNameBySubstrings(metrics, | |
"pulsar_managedLedger_client", "bookkeeper_ml_scheduler_threads")); | |
assertEquals(cm.size(), 1); | |
assertEquals(cm.get(0).tags.get("cluster"), "test"); | |
cm = (List<Metric>) metrics.get( | |
keyNameBySubstrings(metrics, | |
"pulsar_managedLedger_client", "bookkeeper_ml_scheduler_task_execution_sum")); | |
assertEquals(cm.size(), 2); | |
assertEquals(cm.get(0).tags.get("cluster"), "test"); | |
cm = (List<Metric>) metrics.get( | |
keyNameBySubstrings(metrics, | |
"pulsar_managedLedger_client", "bookkeeper_ml_scheduler_max_queue_size")); | |
assertEquals(cm.size(), 1); | |
assertEquals(cm.get(0).tags.get("cluster"), "test"); | |
} |
testRawMetricsProvider
is mainly to test the PrometheusRawMetricsProvider
. The relation is used by getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);
to register RawMetricsProvider to Pulsar service.
Sorry, I just rebased the master, and it needs force push, otherwise, the push will fail |
This PR changed the metric name for BK client, and it can only be released in the major version and needs to update the doc. /cc @Anonymitaet |
} catch (IOException e) { | ||
// nop | ||
} | ||
((PrometheusMetricsProvider) statsProvider).generate(stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's weird to have a cast in software you wrote - you usually see this when you use third-party software. Why can't you change the original variable type?
|
||
@Override | ||
public OpStatsLogger getOpStatsLogger(String name) { | ||
return opStatsLoggers.computeIfAbsent(scopeContext(name), x -> underlying.getOpStatsLogger(name)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would you cache it under key = scopeContext(name)
and not just name
?
} | ||
|
||
/** | ||
Thread-scoped stats not currently supported. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it not supported?
*/ | ||
public class PrometheusMetricsProvider implements StatsProvider { | ||
public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMetricsProvider { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on my reply to the comment, I think it's redundant. We should have a single PrometheusMetricsProvider
in PulsarService
, and its metrics should be written explicitly in PrometheusMetricsGenerator
// Example: | ||
// # TYPE bookie_storage_entries_count gauge | ||
// bookie_storage_entries_count 519 | ||
writeType(w, name, "gauge"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that you have grouped the metrics together by metric name, the type needs to be written once per metric name, so called from outside.
The pr had no activity for 30 days, mark with Stale label. |
I just wandering that why don't we use Prometheus lib, I think this implementation is hard to maintain |
The pr had no activity for 30 days, mark with Stale label. |
Motivation
Currently, Pulsar Prometheus metric framework can generate new metrics from
PrometheusRawMetricsProvider
, and the current PrometheusMetricsProvider implementation doesn't support this interface, which leads to new created PrometheusMetricProvider instance can not integrate with the current metric system.Modification
Pulsar prometheusMetricsProvider supports the following features.
This Pr just migrated from the BookKeeper Prometheus metrics provider.
Documentation
Check the box below or label this PR directly.
Need to update docs?
doc-required
(Your PR needs to update docs and you will update later)
doc-not-needed
(Please explain why)
doc
(Your PR contains doc changes)
doc-complete
(Docs have been already added)
Matching PR in the forked repository
PR in forked repository: hangc0276#4