Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions lsm-tree/bench/macro/lsm-tree-bench-lookups.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE OverloadedRecordDot #-}

module Main ( main ) where

Expand Down Expand Up @@ -384,9 +385,9 @@ lookupsEnv runSizes keyRng0 hfs hbio refCtx caching = do

-- return runs
runs <- V.fromList <$> mapM (Run.fromBuilder refCtx) rbs
let blooms = V.map (\(DeRef r) -> Run.runFilter r) runs
indexes = V.map (\(DeRef r) -> Run.runIndex r) runs
handles = V.map (\(DeRef r) -> Run.runKOpsFile r) runs
let blooms = V.map (\(DeRef r) -> r.bloomFilter ) runs
indexes = V.map (\(DeRef r) -> r.index ) runs
handles = V.map (\(DeRef r) -> r.kOpsFile ) runs
Comment on lines +388 to +390
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let blooms = V.map (\(DeRef r) -> r.bloomFilter ) runs
indexes = V.map (\(DeRef r) -> r.index ) runs
handles = V.map (\(DeRef r) -> r.kOpsFile ) runs
let blooms = V.map (\(DeRef r) -> r.bloomFilter) runs
indexes = V.map (\(DeRef r) -> r.index ) runs
handles = V.map (\(DeRef r) -> r.kOpsFile ) runs

pure $!! (runs, blooms, indexes, handles)

genLookupBatch :: StdGen -> Int -> (V.Vector SerialisedKey, StdGen)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# OPTIONS_GHC -Wno-orphans #-}
{-# LANGUAGE OverloadedRecordDot #-}

module Bench.Database.LSMTree.Internal.Lookup (benchmarks) where

Expand Down Expand Up @@ -91,9 +92,9 @@ benchSalt = 4
benchLookups :: Config -> Benchmark
benchLookups conf@Config{name} =
withEnv $ \ ~(_dir, arenaManager, _hasFS, hasBlockIO, _refCtx, wbblobs, rs, ks) ->
env ( pure ( V.map (\(DeRef r) -> Run.runFilter r) rs
, V.map (\(DeRef r) -> Run.runIndex r) rs
, V.map (\(DeRef r) -> Run.runKOpsFile r) rs
env ( pure ( V.map (\(DeRef r) -> r.bloomFilter ) rs
, V.map (\(DeRef r) -> r.index ) rs
, V.map (\(DeRef r) -> r.kOpsFile ) rs
Comment on lines +95 to +97
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
env ( pure ( V.map (\(DeRef r) -> r.bloomFilter ) rs
, V.map (\(DeRef r) -> r.index ) rs
, V.map (\(DeRef r) -> r.kOpsFile ) rs
env ( pure ( V.map (\(DeRef r) -> r.bloomFilter) rs
, V.map (\(DeRef r) -> r.index ) rs
, V.map (\(DeRef r) -> r.kOpsFile ) rs

)
) $ \ ~(blooms, indexes, kopsFiles) ->
bgroup name [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedRecordDot #-}

{-# OPTIONS_HADDOCK not-home #-}

-- Definitions for override table config options.
Expand Down Expand Up @@ -103,7 +105,7 @@ instance Override TableConfigOverride SnapshotMetaData where

instance Override MergeBatchSize SnapshotMetaData where
override mbs smd =
smd { snapMetaConfig = override mbs (snapMetaConfig smd) }
smd { snapMetaConfig = override mbs smd.snapMetaConfig }

instance Override MergeBatchSize TableConfig where
override confMergeBatchSize' tc =
Expand Down
41 changes: 21 additions & 20 deletions lsm-tree/src-core/Database/LSMTree/Internal/MergeSchedule.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE UnboxedTuples #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE UnboxedTuples #-}
{-# OPTIONS_HADDOCK not-home #-}

-- TODO: establish that this implementation matches up with the ScheduledMerges
Expand Down Expand Up @@ -216,9 +217,9 @@ mkLevelsCache reg lvls = do
lvls
pure $! LevelsCache_ {
cachedRuns = rs
, cachedFilters = mapStrict (\(DeRef r) -> Run.runFilter r) rs
, cachedIndexes = mapStrict (\(DeRef r) -> Run.runIndex r) rs
, cachedKOpsFiles = mapStrict (\(DeRef r) -> Run.runKOpsFile r) rs
, cachedFilters = mapStrict (\(DeRef r) -> r.bloomFilter ) rs
, cachedIndexes = mapStrict (\(DeRef r) -> r.index) rs
, cachedKOpsFiles = mapStrict (\(DeRef r) -> r.kOpsFile) rs
}
where
dupRun r = withRollback reg (dupRef r) releaseRef
Expand Down Expand Up @@ -283,7 +284,7 @@ duplicateLevelsCache ::
-> LevelsCache m h
-> m (LevelsCache m h)
duplicateLevelsCache reg cache = do
rs' <- forMStrict (cachedRuns cache) $ \r ->
rs' <- forMStrict cache.cachedRuns $ \r ->
withRollback reg (dupRef r) releaseRef
pure cache { cachedRuns = rs' }

Expand All @@ -297,7 +298,7 @@ releaseLevelsCache ::
-> LevelsCache m h
-> m ()
releaseLevelsCache reg cache =
V.forM_ (cachedRuns cache) $ \r ->
V.forM_ cache.cachedRuns $ \r ->
delayedCommit reg (releaseRef r)

{-------------------------------------------------------------------------------
Expand Down Expand Up @@ -494,14 +495,14 @@ updatesWithInterleavedFlushes ::
-> TableContent m h
-> m (TableContent m h)
updatesWithInterleavedFlushes tr conf resolve hfs hbio refCtx root salt uc es reg tc = do
let wb = tableWriteBuffer tc
wbblobs = tableWriteBufferBlobs tc
let wb = tc.tableWriteBuffer
wbblobs = tc.tableWriteBufferBlobs
(wb', es') <- addWriteBufferEntries hfs resolve wbblobs maxn wb es
-- Supply credits before flushing, so that we complete merges in time. The
-- number of supplied credits is based on the size increase of the write
-- buffer, not the number of processed entries @length es' - length es@.
let numAdded = unNumEntries (WB.numEntries wb') - unNumEntries (WB.numEntries wb)
supplyCredits refCtx conf (NominalCredits numAdded) (tableLevels tc)
supplyCredits refCtx conf (NominalCredits numAdded) tc.tableLevels
let tc' = tc { tableWriteBuffer = wb' }
if WB.numEntries wb' < maxn then do
pure $! tc'
Expand Down Expand Up @@ -593,10 +594,10 @@ flushWriteBuffer ::
-> TableContent m h
-> m (TableContent m h)
flushWriteBuffer tr conf resolve hfs hbio refCtx root salt uc reg tc
| WB.null (tableWriteBuffer tc) = pure tc
| WB.null tc.tableWriteBuffer = pure tc
| otherwise = do
!uniq <- incrUniqCounter uc
let !size = WB.numEntries (tableWriteBuffer tc)
let !size = WB.numEntries tc.tableWriteBuffer
!ln = LevelNo 1
(!runParams,
runPaths) = mergingRunParamsForLevel
Expand All @@ -608,23 +609,23 @@ flushWriteBuffer tr conf resolve hfs hbio refCtx root salt uc reg tc
(Run.fromWriteBuffer
hfs hbio refCtx salt
runParams runPaths
(tableWriteBuffer tc)
(tableWriteBufferBlobs tc))
tc.tableWriteBuffer
tc.tableWriteBufferBlobs)
releaseRef
delayedCommit reg (releaseRef (tableWriteBufferBlobs tc))
delayedCommit reg (releaseRef tc.tableWriteBufferBlobs)
wbblobs' <- withRollback reg (WBB.new hfs refCtx (Paths.tableBlobPath root uniq))
releaseRef
levels' <- addRunToLevels tr conf resolve hfs hbio refCtx root salt uc r reg
(tableLevels tc)
(tableUnionLevel tc)
tableCache' <- rebuildCache reg (tableCache tc) levels'
tc.tableLevels
tc.tableUnionLevel
tableCache' <- rebuildCache reg tc.tableCache levels'
pure $! TableContent {
tableWriteBuffer = WB.empty
, tableWriteBufferBlobs = wbblobs'
, tableLevels = levels'
, tableCache = tableCache'
-- TODO: move into regular levels if merge completed and size fits
, tableUnionLevel = tableUnionLevel tc
, tableUnionLevel = tc.tableUnionLevel
}

{-# SPECIALISE addRunToLevels ::
Expand Down
93 changes: 58 additions & 35 deletions lsm-tree/src-core/Database/LSMTree/Internal/Run.hs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE DerivingVia #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE DerivingVia #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE NoFieldSelectors #-}
{-# LANGUAGE OverloadedRecordDot #-}

{-# OPTIONS_HADDOCK not-home #-}

-- | Runs of sorted key\/value data.
module Database.LSMTree.Internal.Run (
-- * Run
Run (Run, runIndex, runHasFS, runHasBlockIO, runRunDataCaching,
runBlobFile, runFilter, runKOpsFile)
Run (Run, index, hasFS, hasBlockIO, dataCaching,
blobFile, bloomFilter, kOpsFile)
, RunFsPaths
, size
, sizeInPages
Expand Down Expand Up @@ -68,75 +71,77 @@ import System.FS.BlockIO.API (HasBlockIO)
-- | The in-memory representation of a completed LSM run.
--
data Run m h = Run {
runNumEntries :: !NumEntries
numEntries :: !NumEntries
-- | The reference count for the LSM run. This counts the
-- number of references from LSM handles to this run. When
-- this drops to zero the open files will be closed.
, runRefCounter :: !(RefCounter m)
, refCounter :: !(RefCounter m)
-- | The file system paths for all the files used by the run.
, runRunFsPaths :: !RunFsPaths
, fsPaths :: !RunFsPaths
-- | The bloom filter for the set of keys in this run.
, runFilter :: !(Bloom SerialisedKey)
, bloomFilter :: !(Bloom SerialisedKey)
-- | The in-memory index mapping keys to page numbers in the
-- Key\/Ops file. In future we may support alternative index
-- representations.
, runIndex :: !Index
, index :: !Index
-- | The file handle for the Key\/Ops file. This file is opened
-- read-only and is accessed in a page-oriented way, i.e. only
-- reading whole pages, at page offsets. It will be opened with
-- 'O_DIRECT' on supported platforms.
, runKOpsFile :: !(FS.Handle h)
, kOpsFile :: !(FS.Handle h)
-- | The file handle for the BLOBs file. This file is opened
-- read-only and is accessed in a normal style using buffered
-- I\/O, reading arbitrary file offset and length spans.
, runBlobFile :: !(Ref (BlobFile m h))
, runRunDataCaching :: !RunDataCaching
, runHasFS :: !(HasFS m h)
, runHasBlockIO :: !(HasBlockIO m h)
, blobFile :: !(Ref (BlobFile m h))
, dataCaching :: !RunDataCaching
, hasFS :: !(HasFS m h)
, hasBlockIO :: !(HasBlockIO m h)
}

-- | Shows only the 'runRunFsPaths' field.
instance Show (Run m h) where
showsPrec _ run = showString "Run { runRunFsPaths = " . showsPrec 0 (runRunFsPaths run) . showString " }"
showsPrec _ run = showString "Run { fsPaths = " . showsPrec 0 run.fsPaths . showString " }"

instance NFData h => NFData (Run m h) where
rnf (Run a b c d e f g h i j) =
rnf a `seq` rwhnf b `seq` rnf c `seq` rnf d `seq` rnf e `seq`
rnf f `seq` rnf g `seq` rnf h `seq` rwhnf i `seq` rwhnf j
rnf (Run numEntries refCounter fsPaths bloomFilter index kOpsFile blobFile dataCaching hasFS hasBlockIO) =
rnf numEntries `seq` rwhnf refCounter `seq` rnf fsPaths `seq`
rnf bloomFilter `seq` rnf index `seq` rnf kOpsFile `seq`
rnf blobFile `seq` rnf dataCaching `seq` rwhnf hasFS `seq` rwhnf hasBlockIO

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change


instance RefCounted m (Run m h) where
getRefCounter = runRefCounter
getRefCounter r = r.refCounter

size :: Ref (Run m h) -> NumEntries
size (DeRef run) = runNumEntries run
size (DeRef run) = run.numEntries

sizeInPages :: Ref (Run m h) -> NumPages
sizeInPages (DeRef run) = Index.sizeInPages (runIndex run)
sizeInPages (DeRef run) = Index.sizeInPages run.index

runFsPaths :: Ref (Run m h) -> RunFsPaths
runFsPaths (DeRef r) = runRunFsPaths r
runFsPaths (DeRef r) = r.fsPaths

runFsPathsNumber :: Ref (Run m h) -> RunNumber
runFsPathsNumber = Paths.runNumber . runFsPaths

-- | See 'openFromDisk'
runIndexType :: Ref (Run m h) -> IndexType
runIndexType (DeRef r) = Index.indexToIndexType (runIndex r)
runIndexType (DeRef r) = Index.indexToIndexType r.index

-- | See 'openFromDisk'
runDataCaching :: Ref (Run m h) -> RunDataCaching
runDataCaching (DeRef r) = runRunDataCaching r
runDataCaching (DeRef r) = r.dataCaching


-- | Helper function to make a 'WeakBlobRef' that points into a 'Run'.
mkRawBlobRef :: Run m h -> BlobSpan -> RawBlobRef m h
mkRawBlobRef Run{runBlobFile} blobspan =
BlobRef.mkRawBlobRef runBlobFile blobspan
mkRawBlobRef run =
BlobRef.mkRawBlobRef run.blobFile

-- | Helper function to make a 'WeakBlobRef' that points into a 'Run'.
mkWeakBlobRef :: Ref (Run m h) -> BlobSpan -> WeakBlobRef m h
mkWeakBlobRef (DeRef Run{runBlobFile}) blobspan =
BlobRef.mkWeakBlobRef runBlobFile blobspan
mkWeakBlobRef (DeRef run) blobspan =
BlobRef.mkWeakBlobRef run.blobFile blobspan

{-# SPECIALISE finaliser ::
HasFS IO h
Expand Down Expand Up @@ -227,7 +232,18 @@ fromBuilder refCtx builder = do
setRunDataCaching runHasBlockIO runKOpsFile runRunDataCaching
newRef refCtx
(finaliser runHasFS runKOpsFile runBlobFile runRunFsPaths)
(\runRefCounter -> Run { .. })
(\refCounter -> Run {
numEntries = runNumEntries
, refCounter = refCounter
, fsPaths = runRunFsPaths
Comment on lines +236 to +238
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
numEntries = runNumEntries
, refCounter = refCounter
, fsPaths = runRunFsPaths
numEntries = runNumEntries
, refCounter = refCounter
, fsPaths = runRunFsPaths

, bloomFilter = runFilter
, index = runIndex
, kOpsFile = runKOpsFile
, blobFile = runBlobFile
, dataCaching = runRunDataCaching
, hasFS = runHasFS
, hasBlockIO = runHasBlockIO
})
Comment thread
jorisdral marked this conversation as resolved.

{-# SPECIALISE fromWriteBuffer ::
HasFS IO h
Expand Down Expand Up @@ -336,9 +352,16 @@ openFromDisk fs hbio refCtx runRunDataCaching indexType expectedSalt runRunFsPat
setRunDataCaching hbio runKOpsFile runRunDataCaching
newRef refCtx (finaliser fs runKOpsFile runBlobFile runRunFsPaths) $ \runRefCounter ->
Run {
runHasFS = fs
, runHasBlockIO = hbio
, ..
numEntries = runNumEntries
, refCounter = runRefCounter
, fsPaths = runRunFsPaths
, bloomFilter = runFilter
, index = runIndex
, kOpsFile = runKOpsFile
, blobFile = runBlobFile
, dataCaching = runRunDataCaching
, hasFS = fs
, hasBlockIO = hbio
}
where
-- Note: all file data for this path is evicted from the page cache /if/ the
Expand Down
Loading
Loading