Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
{-# HLINT ignore "Use newtype instead of data" #-}
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
Expand Down Expand Up @@ -81,16 +80,16 @@ create tr mbCfg = do
runPruner handle = threadLabelMe "timeseries-pruner-thread" >> do
forever $ do
cfg <- readTVarIO handle.config
case cfg.pruningPeriodSec of
case cfg.pruningPeriodMillis of
Nothing ->
-- If the current configuration doesn't specify a pruning period, we block
-- the thread until a reconfiguration happens.
takeMVar handle.reconfigured
Just period -> do
Just periodMs -> do
prune handle
-- Wait for the given period or wake up on a reconfiguration.
race_
(threadDelay (fromIntegral period * 1_000_000))
(threadDelay (fromIntegral periodMs * 1000))
(takeMVar handle.reconfigured)

-- | Reconfigure the store. The new parameters are applied immediately.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ type QueryId = Word64
data TimeseriesConfig = TimeseriesConfig {
-- | How long the store entries are retained for (ms).
retentionMillis :: Word64
-- | How often the pruner thread shall prune the store (sec), if enabled.
, pruningPeriodSec :: Maybe Word64
-- | How often the pruner thread shall prune the store (ms), if enabled.
, pruningPeriodMillis :: Maybe Word64
-- | Parameters of timeseries query interpretation.
, interpCfg :: Interp.Config
} deriving (Show, Generic, ToJSON)
Expand Down
4 changes: 4 additions & 0 deletions cardano-tracer/cardano-tracer.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ library
Cardano.Tracer.Handlers.Metrics.Monitoring
Cardano.Tracer.Handlers.Metrics.Prometheus
Cardano.Tracer.Handlers.Metrics.Servers
Cardano.Tracer.Handlers.Metrics.TimeseriesServer
Cardano.Tracer.Handlers.Metrics.Utils

Cardano.Tracer.Handlers.Notifications.Check
Expand All @@ -147,6 +148,8 @@ library

other-modules: Cardano.Tracer.Handlers.Logs.Journal.NoSystemd
Cardano.Tracer.Handlers.Notifications.Timer
Cardano.Tracer.Time

Paths_cardano_tracer

autogen-modules: Paths_cardano_tracer
Expand Down Expand Up @@ -201,6 +204,7 @@ library
, warp ^>= 3.4
, warp-tls
, yaml
, cardano-timeseries-io

if os(windows)
build-depends: Win32
Expand Down
5 changes: 3 additions & 2 deletions cardano-tracer/src/Cardano/Tracer/Acceptors/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import Data.Word (Word32)
import qualified Network.Mux as Mux
import qualified Network.Socket as Socket
import qualified System.Metrics.Configuration as EKGF
import System.Metrics.Network.Acceptor (acceptEKGMetricsInit)
import System.Metrics.Network.Acceptor (acceptMetricsInit)

import qualified Trace.Forward.Configuration.DataPoint as DPF
import qualified Trace.Forward.Configuration.TraceObject as TF
Expand Down Expand Up @@ -192,9 +192,10 @@ runEKGAcceptorInit
respoinderCtx
LBS.ByteString IO () Void
runEKGAcceptorInit tracerEnv ekgConfig errorHandler =
acceptEKGMetricsInit
acceptMetricsInit
ekgConfig
(prepareMetricsStores tracerEnv . micConnectionId)
(store tracerEnv . connIdToNodeId . micConnectionId)
(errorHandler . micConnectionId)

runTraceObjectsAcceptorInit
Expand Down
15 changes: 7 additions & 8 deletions cardano-tracer/src/Cardano/Tracer/Acceptors/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ module Cardano.Tracer.Acceptors.Server
( runAcceptorsServer
) where

import "contra-tracer" Control.Tracer (nullTracer)

import Cardano.Logging (TraceObject)
import qualified Cardano.Logging.Types as Net
import Cardano.Tracer.Acceptors.Utils
Expand All @@ -25,25 +23,25 @@ import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolLimits (.
miniProtocolNum, miniProtocolRun)
import Ouroboros.Network.Protocol.Handshake (Handshake, HandshakeArguments (..))
import qualified Ouroboros.Network.Protocol.Handshake as Handshake
import qualified Ouroboros.Network.Server.Simple as Server
import Ouroboros.Network.Snocket (LocalAddress, LocalSocket, Snocket,
localAddressFromPath, localSnocket, makeLocalBearer, makeSocketBearer,
socketSnocket)
import Ouroboros.Network.Socket (ConnectionId (..),
SomeResponderApplication (..))
import qualified Ouroboros.Network.Server.Simple as Server
import Ouroboros.Network.Socket (ConnectionId (..), SomeResponderApplication (..))

import Codec.CBOR.Term (Term)
import Control.Concurrent.Async (wait)
import "contra-tracer" Control.Tracer (nullTracer)
import qualified Data.ByteString.Lazy as LBS
import Data.Functor (void)
import Data.List.NonEmpty (NonEmpty ((:|)))
import qualified Data.Text as Text
import Data.Functor (void)
import Data.Void (Void)
import Data.Word (Word32)
import qualified Network.Mux as Mux
import qualified Network.Socket as Socket
import qualified System.Metrics.Configuration as EKGF
import System.Metrics.Network.Acceptor (acceptEKGMetricsResp)
import System.Metrics.Network.Acceptor (acceptMetricsResp)

import qualified Trace.Forward.Configuration.DataPoint as DPF
import qualified Trace.Forward.Configuration.TraceObject as TF
Expand Down Expand Up @@ -184,9 +182,10 @@ runEKGAcceptor
-> (ConnectionId addr -> IO ())
-> RunMiniProtocol 'Mux.ResponderMode initiatorCtx (ResponderContext addr) LBS.ByteString IO Void ()
runEKGAcceptor tracerEnv ekgConfig errorHandler =
acceptEKGMetricsResp
acceptMetricsResp
ekgConfig
(prepareMetricsStores tracerEnv . rcConnectionId)
(store tracerEnv . connIdToNodeId . rcConnectionId)
(errorHandler . rcConnectionId)

runTraceObjectsAcceptor
Expand Down
52 changes: 37 additions & 15 deletions cardano-tracer/src/Cardano/Tracer/Acceptors/Utils.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE ViewPatterns #-}

{-# OPTIONS_GHC -Wno-redundant-constraints #-}

Expand All @@ -9,31 +10,41 @@ module Cardano.Tracer.Acceptors.Utils
, prepareMetricsStores
, removeDisconnectedNode
, notifyAboutNodeDisconnected
, store
) where

#if RTVIEW
import Cardano.Logging (SeverityS (..))
#endif
import qualified Cardano.Timeseries.Component as Timeseries
import Cardano.Timeseries.Domain.Types (MetricIdentifier)
import Cardano.Tracer.Environment
#if RTVIEW
import Cardano.Tracer.Handlers.Notifications.Types
import Cardano.Tracer.Handlers.Notifications.Utils
#endif
import Cardano.Tracer.Environment
import Cardano.Tracer.Time (getTimeMs)
import Cardano.Tracer.Types
import Cardano.Tracer.Utils
import Ouroboros.Network.Socket (ConnectionId (..))

import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TVar (TVar, modifyTVar', newTVarIO)
import qualified Data.Bimap as BM
import Data.Foldable
import qualified Data.Map.Strict as M
import Data.Maybe (mapMaybe)
import qualified Data.Set as S
import Data.Time.Clock.POSIX (getPOSIXTime)
#if RTVIEW
import Data.Time.Clock.System (getSystemTime, systemToUTCTime)
#endif
import qualified System.Metrics as EKG
import System.Metrics.Store.Acceptor (MetricsLocalStore, emptyMetricsLocalStore)
import System.Metrics.ReqResp
import System.Metrics.Store.Acceptor (MetricsLocalStore, emptyMetricsLocalStore,
storeMetrics)

import Trace.Forward.Utils.DataPoint (DataPointRequestor, initDataPointRequestor)
import qualified Data.Text.Read as Text

prepareDataPointRequestor
:: Show addr
Expand All @@ -54,26 +65,17 @@ prepareMetricsStores
-> IO (EKG.Store, TVar MetricsLocalStore)
prepareMetricsStores TracerEnv{teConnectedNodes, teAcceptedMetrics} connId = do
addConnectedNode teConnectedNodes connId
store <- EKG.newStore
st <- EKG.newStore

EKG.registerCounter "ekg.server_timestamp_ms" getTimeMs store
storesForNewNode <- (store ,) <$> newTVarIO emptyMetricsLocalStore
EKG.registerCounter "ekg.server_timestamp_ms" getTimeMs st
storesForNewNode <- (st ,) <$> newTVarIO emptyMetricsLocalStore

atomically do
modifyTVar' teAcceptedMetrics do
M.insert (connIdToNodeId connId) storesForNewNode

return storesForNewNode

where
-- forkServer definition of `getTimeMs'. The ekg frontend relies
-- on the "ekg.server_timestamp_ms" metric being in every
-- store. While forkServer adds that that automatically we must
-- manually add it.
-- url
-- + https://github.com/tvh/ekg-wai/blob/master/System/Remote/Monitoring/Wai.hs#L237-L238
getTimeMs = (round . (* 1000)) `fmap` getPOSIXTime

addConnectedNode
:: Show addr
=> ConnectedNodes
Expand Down Expand Up @@ -115,3 +117,23 @@ notifyAboutNodeDisconnected TracerEnvRTView{teEventsQueues} connId = do
#else
notifyAboutNodeDisconnected _ _ = pure ()
#endif

store :: TracerEnv -> NodeId -> (EKG.Store, TVar MetricsLocalStore) -> Response -> IO ()
store tracerEnv (NodeId nodeId) (ekgStore, localStore) resp@(ResponseMetrics ms) = do
storeMetrics resp ekgStore localStore
for_ (teTimeseriesHandle tracerEnv) $ \h -> do
ts <- getTimeMs
Timeseries.insert h "node_id" nodeId (fromIntegral ts) (mapMaybe parseMetric ms)

where
numeralOnly :: MetricValue -> Maybe Double
numeralOnly (GaugeValue x) = Just (fromIntegral x)
numeralOnly (CounterValue x) = Just (fromIntegral x)
-- If the label is readable as double, accept it
numeralOnly (LabelValue (Text.double -> Right (x, ""))) = Just x
numeralOnly _ = Nothing

parseMetric :: (MetricName, MetricValue) -> Maybe (MetricIdentifier, Double)
parseMetric (k, numeralOnly -> Just v) = Just (k, v)
parseMetric _ = Nothing

1 change: 1 addition & 0 deletions cardano-tracer/src/Cardano/Tracer/Configuration.hs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ data TracerConfig = TracerConfig
, hasEKG :: !(Maybe Endpoint) -- ^ Endpoint for EKG web-page.
, hasPrometheus :: !(Maybe Endpoint) -- ^ Endpoint for Prometheus web-page.
, hasRTView :: !(Maybe Endpoint) -- ^ Endpoint for RTView web-page.
, hasTimeseries :: !(Maybe Endpoint)
, tlsCertificate :: !(Maybe Certificate)
-- | Socket for tracer's to reforward on. Second member of the triplet is the list of prefixes to reforward.
-- Third member of the triplet is the forwarder config.
Expand Down
2 changes: 2 additions & 0 deletions cardano-tracer/src/Cardano/Tracer/Environment.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module Cardano.Tracer.Environment
) where

import Cardano.Logging.Types
import Cardano.Timeseries.Component (TimeseriesHandle)
import Cardano.Tracer.Configuration
#if RTVIEW
import Cardano.Tracer.Handlers.Notifications.Types
Expand Down Expand Up @@ -36,6 +37,7 @@ data TracerEnv = TracerEnv
, teRegistry :: !HandleRegistry
, teStateDir :: !(Maybe FilePath)
, teMetricsHelp :: ![(Text, Builder)]
, teTimeseriesHandle :: !(Maybe TimeseriesHandle)
}

#if RTVIEW
Expand Down
12 changes: 8 additions & 4 deletions cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Servers.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE NumericUnderscores #-}

module Cardano.Tracer.Handlers.Metrics.Servers
( runMetricsServers
Expand All @@ -10,17 +9,19 @@ import Cardano.Tracer.Configuration
import Cardano.Tracer.Environment
import Cardano.Tracer.Handlers.Metrics.Monitoring
import Cardano.Tracer.Handlers.Metrics.Prometheus
import Cardano.Tracer.Handlers.Metrics.TimeseriesServer (runTimeseriesServer)
import qualified Cardano.Tracer.Handlers.Metrics.Utils as Utils
import Cardano.Tracer.Utils (sequenceConcurrently_)

import Control.AutoUpdate
import Data.Maybe (catMaybes)
import Control.Monad (unless)
import Data.Maybe (catMaybes)

-- | Runs metrics servers if needed:
--
-- 1. Prometheus exporter.
-- 2. EKG monitoring web-page.
-- 3. Timeseries query server.
--
runMetricsServers
:: TracerEnv
Expand All @@ -44,8 +45,11 @@ runMetricsServers tracerEnv = do
servers = catMaybes
[ runPrometheusServer tracerEnv <$> hasPrometheus
, runMonitoringServer tracerEnv <$> hasEKG
, const <$> (runTimeseriesServer teTracer cfg <$> hasTimeseries <*> teTimeseriesHandle)
]

TracerEnv
{ teConfig = TracerConfig { hasPrometheus, hasEKG }
{ teConfig = cfg@TracerConfig { hasPrometheus, hasEKG, hasTimeseries },
teTracer,
teTimeseriesHandle
} = tracerEnv
Loading
Loading