This class is a data transfer object that captures metric values at a specific + * timestamp for export to Timeline/Ambari Metrics Collector. It contains three types + * of metrics:
+ *Instances of this class are immutable after creation and are sent to the + * Timeline sink for persistence and visualization.
+ * + * @see TimelineMetricsProvider + * @see TimelineMetricsSink + */ +public class MetricSnapshot { + + private final long timestamp; + private final String hostname; + private final String appId; + + // Separate collections for different metric types + private final MapCounters represent monotonically increasing values such as total requests, + * total bytes received, etc.
+ * + * @param name the metric name + * @param value the counter value + */ + public void addCounter(String name, long value) { + counters.put(name, value); + } + + /** + * Adds a gauge metric to the snapshot. + * + *Gauges represent current values that can increase or decrease, such as + * number of active connections, queue size, etc.
+ * + * @param name the metric name + * @param value the gauge value + */ + public void addGauge(String name, double value) { + gauges.put(name, value); + } + + /** + * Adds a summary metric to the snapshot. + * + *Summaries represent computed statistics such as averages, minimums, maximums, + * and percentiles. The existing {@link org.apache.zookeeper.server.metric.AvgMinMaxCounter} + * and {@link org.apache.zookeeper.server.metric.AvgMinMaxPercentileCounter} classes + * already compute these values and provide them as separate metrics (e.g., "latency_avg", + * "latency_min", "latency_max", "latency_p99").
+ * + * @param name the metric name (e.g., "request_latency_avg") + * @param value the computed statistic value + */ + public void addSummary(String name, double value) { + summaries.put(name, value); + } + + /** + * Returns the total number of metrics in this snapshot. + * + * @return the sum of counters, gauges, and summaries + */ + public int getMetricCount() { + return counters.size() + gauges.size() + summaries.size(); + } + + /** + * Returns the timestamp of this snapshot. + * + * @return timestamp in milliseconds since epoch + */ + public long getTimestamp() { + return timestamp; + } + + /** + * Returns the hostname of the ZooKeeper server. + * + * @return the hostname + */ + public String getHostname() { + return hostname; + } + + /** + * Returns the application ID. + * + * @return the application ID (typically "zookeeper") + */ + public String getAppId() { + return appId; + } + + /** + * Returns all counter metrics in this snapshot. + * + * @return an unmodifiable view of the counters map + */ + public MapThis method is useful for debugging and logging. It prints all counters, + * gauges, and summaries in a human-readable format.
+ * + * @return a formatted string containing all metrics + */ + public String printAllMetrics() { + StringBuilder sb = new StringBuilder(); + sb.append(repeatChar('=', 80)).append("\n"); + sb.append("MetricSnapshot Details\n"); + sb.append(repeatChar('=', 80)).append("\n"); + sb.append(String.format("Timestamp: %d (%s)%n", timestamp, + new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new java.util.Date(timestamp)))); + sb.append(String.format("Hostname: %s%n", hostname)); + sb.append(String.format("AppId: %s%n", appId)); + sb.append(String.format("Total Metrics: %d (Counters: %d, Gauges: %d, Summaries: %d)%n", + getMetricCount(), counters.size(), gauges.size(), summaries.size())); + sb.append(repeatChar('=', 80)).append("\n\n"); + + // Print Counters + if (!counters.isEmpty()) { + sb.append("COUNTERS (").append(counters.size()).append("):\n"); + sb.append(repeatChar('-', 80)).append("\n"); + counters.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .forEach(entry -> sb.append(String.format(" %-50s : %,d%n", + entry.getKey(), entry.getValue()))); + sb.append("\n"); + } + + // Print Gauges + if (!gauges.isEmpty()) { + sb.append("GAUGES (").append(gauges.size()).append("):\n"); + sb.append(repeatChar('-', 80)).append("\n"); + gauges.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .forEach(entry -> sb.append(String.format(" %-50s : %.2f%n", + entry.getKey(), entry.getValue()))); + sb.append("\n"); + } + + // Print Summaries + if (!summaries.isEmpty()) { + sb.append("SUMMARIES (").append(summaries.size()).append("):\n"); + sb.append(repeatChar('-', 80)).append("\n"); + summaries.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .forEach(entry -> sb.append(String.format(" %-50s : %.2f%n", + entry.getKey(), entry.getValue()))); + sb.append("\n"); + } + + sb.append(repeatChar('=', 80)).append("\n"); + return sb.toString(); + } +} diff --git a/zookeeper-metrics-providers/zookeeper-timeline-metrics/src/main/java/org/apache/zookeeper/metrics/timeline/TimelineMetricsProvider.java b/zookeeper-metrics-providers/zookeeper-timeline-metrics/src/main/java/org/apache/zookeeper/metrics/timeline/TimelineMetricsProvider.java new file mode 100644 index 00000000000..1eac1ab6802 --- /dev/null +++ b/zookeeper-metrics-providers/zookeeper-timeline-metrics/src/main/java/org/apache/zookeeper/metrics/timeline/TimelineMetricsProvider.java @@ -0,0 +1,605 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.metrics.timeline; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; + +import org.apache.zookeeper.metrics.Counter; +import org.apache.zookeeper.metrics.CounterSet; +import org.apache.zookeeper.metrics.Gauge; +import org.apache.zookeeper.metrics.GaugeSet; +import org.apache.zookeeper.metrics.MetricsContext; +import org.apache.zookeeper.metrics.MetricsProvider; +import org.apache.zookeeper.metrics.MetricsProviderLifeCycleException; +import org.apache.zookeeper.metrics.Summary; +import org.apache.zookeeper.metrics.SummarySet; +import org.apache.zookeeper.server.metric.AvgMinMaxCounter; +import org.apache.zookeeper.server.metric.AvgMinMaxCounterSet; +import org.apache.zookeeper.server.metric.AvgMinMaxPercentileCounter; +import org.apache.zookeeper.server.metric.AvgMinMaxPercentileCounterSet; +import org.apache.zookeeper.server.metric.SimpleCounter; +import org.apache.zookeeper.server.metric.SimpleCounterSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MetricsProvider implementation that sends ZooKeeper metrics to Timeline collectors. + * + *This provider periodically samples metrics from its internal {@link MetricsContext} + * and sends them to an external Timeline metrics sink (such as Ambari Metrics Collector). + * The sink implementation is loaded dynamically at runtime, allowing ZooKeeper to + * remain independent of specific metrics collection systems.
+ * + *Configuration:
+ *This provider is configured via zoo.cfg with the following properties:
+ *+ * # Enable Timeline metrics provider + * metricsProvider.className=org.apache.zookeeper.metrics.timeline.TimelineMetricsProvider + * + * # Sink class (loaded from external JAR on classpath) + * metricsProvider.timeline.sink.class=org.apache.hadoop.metrics2.sink.timeline.ZooKeeperTimelineMetricsSink + * + * # Collection settings + * metricsProvider.timeline.collection.period=60 + * metricsProvider.timeline.hostname=zk1.example.com + * metricsProvider.timeline.appId=zookeeper + * + * # All other metricsProvider.timeline.* properties are passed to the sink + * metricsProvider.timeline.collector.hosts=collector1.example.com,collector2.example.com + * metricsProvider.timeline.collector.protocol=http + * metricsProvider.timeline.collector.port=6188 + *+ * + *
Lifecycle:
+ *This method loads the sink class dynamically and configures it with + * all properties that start with "metricsProvider.timeline.". The sink class must be + * available on the classpath (typically from an external JAR).
+ * + * @param configuration Properties from zoo.cfg + * @throws MetricsProviderLifeCycleException if configuration fails + */ + @Override + public void configure(Properties configuration) throws MetricsProviderLifeCycleException { + try { + // Load basic configuration + this.collectionPeriodSeconds = Integer.parseInt( + configuration.getProperty(COLLECTION_PERIOD_PROPERTY, + String.valueOf(DEFAULT_COLLECTION_PERIOD_SECONDS))); + + this.appId = configuration.getProperty(APP_ID_PROPERTY, DEFAULT_APP_ID); + + this.hostname = configuration.getProperty(HOSTNAME_PROPERTY); + if (hostname == null || hostname.trim().isEmpty()) { + this.hostname = getLocalHostname(); + } + + LOG.info("Configuring TimelineMetricsProvider: hostname={}, appId={}, collectionPeriod={} seconds", + hostname, appId, collectionPeriodSeconds); + + // Try to load and configure sink - but don't fail if it's not available + String sinkClassName = configuration.getProperty(SINK_CLASS_PROPERTY, DEFAULT_SINK_CLASS); + try { + this.sink = loadSink(sinkClassName); + this.sink.configure(configuration); + LOG.info("Successfully configured TimelineMetricsProvider with sink: {}", sinkClassName); + } catch (ClassNotFoundException e) { + LOG.warn("Timeline sink class not found: {}. Timeline metrics will be disabled. " + + "To enable Timeline metrics, ensure the sink implementation JAR is available on the classpath. " + + "ZooKeeper will continue to operate normally without Timeline metrics.", sinkClassName); + this.sink = null; + } catch (Exception e) { + LOG.warn("Failed to configure Timeline sink: {}. Timeline metrics will be disabled. " + + "ZooKeeper will continue to operate normally without Timeline metrics.", e.getMessage(), e); + this.sink = null; + } + + } catch (Exception e) { + LOG.error("Failed to configure TimelineMetricsProvider", e); + throw new MetricsProviderLifeCycleException("Configuration failed", e); + } + } + + /** + * Start the provider and begin periodic metric collection. + * + *This method creates a scheduled executor that collects metrics + * every N seconds (configured via metricsProvider.timeline.collection.period). The + * collection runs on a daemon thread to avoid blocking ZooKeeper shutdown.
+ * + * @throws MetricsProviderLifeCycleException if startup fails + */ + @Override + public void start() throws MetricsProviderLifeCycleException { + if (started) { + LOG.warn("TimelineMetricsProvider already started"); + return; + } + + // If sink is not available, don't start the scheduler + if (sink == null) { + LOG.warn("Timeline sink not configured. Metric collection will not start. " + + "ZooKeeper will continue to operate normally without Timeline metrics."); + return; + } + + try { + // Create scheduler with daemon thread + this.scheduler = Executors.newScheduledThreadPool(1, r -> { + Thread t = new Thread(r, "TimelineMetricsCollector"); + t.setDaemon(true); + return t; + }); + + // Schedule periodic collection + scheduler.scheduleAtFixedRate( + this::collectAndSend, + 0, // Initial delay + collectionPeriodSeconds, + TimeUnit.SECONDS + ); + + started = true; + LOG.info("Started TimelineMetricsProvider - collecting metrics every {} seconds", + collectionPeriodSeconds); + + } catch (Exception e) { + LOG.error("Failed to start TimelineMetricsProvider", e); + throw new MetricsProviderLifeCycleException("Startup failed", e); + } + } + + /** + * Stop the provider and release all resources. + * + *This method shuts down the scheduler, closes the sink, and releases + * all resources. It can be called multiple times safely.
+ */ + @Override + public void stop() { + if (!started) { + return; + } + + LOG.info("Stopping TimelineMetricsProvider"); + + // Shutdown scheduler + if (scheduler != null) { + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + scheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + // Close sink + if (sink != null) { + try { + sink.close(); + } catch (Exception e) { + LOG.error("Error closing Timeline sink", e); + } + } + + // Clear all metrics from context + rootContext.clear(); + + started = false; + LOG.info("Stopped TimelineMetricsProvider"); + } + + /** + * Returns the root metrics context. + * + *This provider maintains its own {@link TimelineMetricsContext} that stores + * all registered metrics. Components can register counters, gauges, summaries, etc. + * which will be automatically collected and sent to Timeline.
+ * + * @return the root metrics context + */ + @Override + public MetricsContext getRootContext() { + return rootContext; + } + + /** + * Dumps all current metric values. + * + *This method is called by legacy monitoring commands. It iterates through + * all metrics stored in the context and provides their current values.
+ * + * @param sink the receiver of metric name-value pairs + */ + @Override + public void dump(BiConsumerThis resets all counters and summaries to their initial state. + * Gauges are not reset as they represent current values.
+ */ + @Override + public void resetAllValues() { + rootContext.reset(); + } + + /** + * Collects metrics from the context and sends to sink. + * + *This method is called periodically by the scheduler. It creates a snapshot + * of all current metric values and sends it to the configured sink.
+ * + *Exceptions are caught and logged to prevent them from stopping + * the scheduled collection.
+ */ + private void collectAndSend() { + try { + if (sink == null) { + LOG.debug("Timeline sink is null, skipping metric collection"); + return; + } + + // Create snapshot + MetricSnapshot snapshot = new MetricSnapshot( + System.currentTimeMillis(), + hostname, + appId + ); + + // Dump all metrics from context to snapshot + rootContext.dumpToSnapshot(snapshot); + + // Send to Timeline + sink.send(snapshot); + + if (LOG.isDebugEnabled()) { + LOG.debug("Sent {} metrics to Timeline", snapshot.getMetricCount()); + LOG.debug("{}", snapshot.printAllMetrics()); + } + + } catch (Exception e) { + LOG.error("Failed to collect and send metrics", e); + } + } + + /** + * Loads the Timeline sink class dynamically via reflection. + * + *The sink class must be available on the classpath (typically from + * an external JAR). This allows ZooKeeper to remain independent of + * specific metrics collection systems.
+ * + * @param className the fully qualified class name of the sink + * @return an instance of the sink + * @throws ClassNotFoundException if the class cannot be found + * @throws Exception if the class cannot be instantiated + */ + private TimelineMetricsSink loadSink(String className) throws ClassNotFoundException, Exception { + LOG.info("Loading Timeline sink class: {}", className); + + try { + Class> clazz = Class.forName(className); + Object instance = clazz.getDeclaredConstructor().newInstance(); + + if (!(instance instanceof TimelineMetricsSink)) { + throw new IllegalArgumentException( + "Class " + className + " does not implement TimelineMetricsSink"); + } + + return (TimelineMetricsSink) instance; + + } catch (ClassNotFoundException e) { + // Re-throw ClassNotFoundException so it can be caught separately in configure() + throw e; + } catch (Exception e) { + throw new Exception("Failed to instantiate Timeline sink: " + className, e); + } + } + + /** + * Gets the local hostname. + * + * @return the hostname, or "unknown" if it cannot be determined + */ + private String getLocalHostname() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + LOG.warn("Unable to determine hostname, using 'unknown'", e); + return "unknown"; + } + } + + /** + * Internal MetricsContext implementation that stores all metrics. + * + *This context reuses existing metric implementations from zookeeper-server + * (SimpleCounter, AvgMinMaxCounter, etc.) to ensure consistent behavior with + * other metrics providers.
+ */ + private static class TimelineMetricsContext implements MetricsContext { + + private final ConcurrentMapThis interface defines the contract between ZooKeeper's metrics collection + * system and external Timeline metrics collectors (such as Ambari Metrics Collector). + * Implementations of this interface are loaded dynamically at runtime, allowing + * ZooKeeper to remain independent of specific metrics collection systems.
+ * + *The typical lifecycle is:
+ *Example implementation in external JAR:
+ *
+ * public class MyTimelineSink implements TimelineMetricsSink {
+ * public void configure(Properties config) throws Exception {
+ * // Initialize HTTP client, load collector addresses, etc.
+ * }
+ *
+ * public void send(MetricSnapshot snapshot) throws Exception {
+ * // Transform snapshot to target format and send via HTTP
+ * }
+ *
+ * public void close() throws Exception {
+ * // Cleanup resources
+ * }
+ * }
+ *
+ *
+ * @see TimelineMetricsProvider
+ * @see MetricSnapshot
+ */
+public interface TimelineMetricsSink {
+
+ /**
+ * Configure the sink with the provided properties.
+ *
+ * This method is called once during initialization, before any metrics + * are sent. Implementations should use this method to:
+ *This method is called periodically (typically every 60 seconds) with + * a snapshot of all current metric values. Implementations should:
+ *Note: This method may be called from a scheduled executor thread. + * Implementations should be thread-safe and avoid blocking operations + * that could delay subsequent metric collections.
+ * + * @param snapshot A snapshot of all metrics at a specific point in time. + * Contains counters, gauges, and summary statistics. + * @throws Exception if sending fails. Exceptions are logged but do not + * stop metric collection. The next snapshot will be + * attempted on schedule. + */ + void send(MetricSnapshot snapshot) throws Exception; + + /** + * Close the sink and release all resources. + * + *This method is called during ZooKeeper shutdown. Implementations should:
+ *This method should complete quickly (within a few seconds) to avoid + * delaying ZooKeeper shutdown.
+ * + * @throws Exception if cleanup fails. Exceptions are logged but do not + * prevent ZooKeeper shutdown. + */ + void close() throws Exception; +}