Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 66 additions & 52 deletions sei-cosmos/store/multiversion/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,26 @@
var _ MultiVersionStore = (*Store)(nil)

type Store struct {
// map that stores the key string -> MultiVersionValue mapping for accessing from a given key
multiVersionMap *sync.Map
// TODO: do we need to support iterators as well similar to how cachekv does it - yes
multiVersionMap *sync.Map // key string -> MultiVersionValue (lock-free reads for OCC workers)

txWritesetKeys *sync.Map // map of tx index -> writeset keys []string
txReadSets *sync.Map // map of tx index -> readset ReadSet
txIterateSets *sync.Map // map of tx index -> iterateset Iterateset
writesetKeysMtx sync.RWMutex
txWritesetKeys map[int][]string

readSetsMtx sync.RWMutex
txReadSets map[int]ReadSet

iterateSetsMtx sync.RWMutex
txIterateSets map[int]Iterateset

parentStore types.KVStore
}

func NewMultiVersionStore(parentStore types.KVStore) *Store {
return &Store{
multiVersionMap: &sync.Map{},
txWritesetKeys: &sync.Map{},
txReadSets: &sync.Map{},
txIterateSets: &sync.Map{},
txWritesetKeys: make(map[int][]string),
txReadSets: make(map[int]ReadSet),
txIterateSets: make(map[int]Iterateset),
parentStore: parentStore,
}
}
Expand Down Expand Up @@ -98,7 +101,6 @@

// Has implements MultiVersionStore. It checks if the key exists in the multiversion store at or before the specified index.
func (s *Store) Has(index int, key []byte) bool {

keyString := string(key)
mvVal, found := s.multiVersionMap.Load(keyString)
// if the key doesn't exist in the overall map, return nil
Expand All @@ -109,16 +111,17 @@
return foundVal
}

// removeOldWriteset must be called with writesetKeysMtx held for writing.
func (s *Store) removeOldWriteset(index int, newWriteSet WriteSet) {
writeset := make(map[string][]byte)
if newWriteSet != nil {
// if non-nil writeset passed in, we can use that to optimize removals
writeset = newWriteSet
}
// if there is already a writeset existing, we should remove that fully
oldKeys, loaded := s.txWritesetKeys.LoadAndDelete(index)
keys, loaded := s.txWritesetKeys[index]
if loaded {
keys := oldKeys.([]string)
delete(s.txWritesetKeys, index)
// we need to delete all of the keys in the writeset from the multiversion store
for _, key := range keys {
// small optimization to check if the new writeset is going to write this key, if so, we can leave it behind
Expand All @@ -128,7 +131,7 @@
}
// remove from the appropriate item if present in multiVersionMap
mvVal, found := s.multiVersionMap.Load(key)
// if the key doesn't exist in the overall map, return nil
// if the key doesn't exist in the overall map, continue
if !found {
continue
}
Expand All @@ -138,38 +141,37 @@
}

// SetWriteset sets a writeset for a transaction index, and also writes all of the multiversion items in the writeset to the multiversion store.
// TODO: returns a list of NEW keys added
func (s *Store) SetWriteset(index int, incarnation int, writeset WriteSet) {
// TODO: add telemetry spans
// remove old writeset if it exists
s.writesetKeysMtx.Lock()
s.removeOldWriteset(index, writeset)
s.writesetKeysMtx.Unlock()

writeSetKeys := make([]string, 0, len(writeset))
for key, value := range writeset {
writeSetKeys = append(writeSetKeys, key)
loadVal, _ := s.multiVersionMap.LoadOrStore(key, NewMultiVersionItem()) // init if necessary
loadVal, _ := s.multiVersionMap.LoadOrStore(key, NewMultiVersionItem())
mvVal := loadVal.(MultiVersionValue)
if value == nil {
// delete if nil value
// TODO: sync map
mvVal.Delete(index, incarnation)
} else {
mvVal.Set(index, incarnation, value)
}
}
sort.Strings(writeSetKeys) // TODO: if we're sorting here anyways, maybe we just put it into a btree instead of a slice
s.txWritesetKeys.Store(index, writeSetKeys)
sort.Strings(writeSetKeys)
s.writesetKeysMtx.Lock()
s.txWritesetKeys[index] = writeSetKeys
s.writesetKeysMtx.Unlock()
}

// InvalidateWriteset iterates over the keys for the given index and incarnation writeset and replaces with ESTIMATEs
func (s *Store) InvalidateWriteset(index int, incarnation int) {
keysAny, found := s.txWritesetKeys.Load(index)
s.writesetKeysMtx.RLock()
keys, found := s.txWritesetKeys[index]
s.writesetKeysMtx.RUnlock()
if !found {
return
}
keys := keysAny.([]string)
for _, key := range keys {
// invalidate all of the writeset items - is this suboptimal? - we could potentially do concurrently if slow because locking is on an item specific level
val, _ := s.multiVersionMap.LoadOrStore(key, NewMultiVersionItem())
val.(MultiVersionValue).SetEstimate(index, incarnation)
}
Expand All @@ -178,84 +180,95 @@

// SetEstimatedWriteset is used to directly write estimates instead of writing a writeset and later invalidating
func (s *Store) SetEstimatedWriteset(index int, incarnation int, writeset WriteSet) {
// remove old writeset if it exists
s.writesetKeysMtx.Lock()
s.removeOldWriteset(index, writeset)
s.writesetKeysMtx.Unlock()

writeSetKeys := make([]string, 0, len(writeset))
// still need to save the writeset so we can remove the elements later:
for key := range writeset {
writeSetKeys = append(writeSetKeys, key)

mvVal, _ := s.multiVersionMap.LoadOrStore(key, NewMultiVersionItem()) // init if necessary
mvVal, _ := s.multiVersionMap.LoadOrStore(key, NewMultiVersionItem())
mvVal.(MultiVersionValue).SetEstimate(index, incarnation)
}
sort.Strings(writeSetKeys)
s.txWritesetKeys.Store(index, writeSetKeys)
s.writesetKeysMtx.Lock()
s.txWritesetKeys[index] = writeSetKeys
s.writesetKeysMtx.Unlock()
}

// GetAllWritesetKeys implements MultiVersionStore.
func (s *Store) GetAllWritesetKeys() map[int][]string {
writesetKeys := make(map[int][]string)
// TODO: is this safe?
s.txWritesetKeys.Range(func(key, value interface{}) bool {
index := key.(int)
keys := value.([]string)
s.writesetKeysMtx.RLock()
writesetKeys := make(map[int][]string, len(s.txWritesetKeys))
for index, keys := range s.txWritesetKeys {
writesetKeys[index] = keys
return true
})

}
Comment on lines +205 to +207

Check warning

Code scanning / CodeQL

Iteration over map Warning

Iteration over map may be a possible source of non-determinism
s.writesetKeysMtx.RUnlock()
return writesetKeys
}

func (s *Store) SetReadset(index int, readset ReadSet) {
s.txReadSets.Store(index, readset)
s.readSetsMtx.Lock()
s.txReadSets[index] = readset
s.readSetsMtx.Unlock()
}

func (s *Store) GetReadset(index int) ReadSet {
readsetAny, found := s.txReadSets.Load(index)
s.readSetsMtx.RLock()
readset, found := s.txReadSets[index]
s.readSetsMtx.RUnlock()
if !found {
return nil
}
return readsetAny.(ReadSet)
return readset
}

func (s *Store) SetIterateset(index int, iterateset Iterateset) {
s.txIterateSets.Store(index, iterateset)
s.iterateSetsMtx.Lock()
s.txIterateSets[index] = iterateset
s.iterateSetsMtx.Unlock()
}

func (s *Store) GetIterateset(index int) Iterateset {
iteratesetAny, found := s.txIterateSets.Load(index)
s.iterateSetsMtx.RLock()
iterateset, found := s.txIterateSets[index]
s.iterateSetsMtx.RUnlock()
if !found {
return nil
}
return iteratesetAny.(Iterateset)
return iterateset
}

func (s *Store) ClearReadset(index int) {
s.txReadSets.Delete(index)
s.readSetsMtx.Lock()
delete(s.txReadSets, index)
s.readSetsMtx.Unlock()
}

func (s *Store) ClearIterateset(index int) {
s.txIterateSets.Delete(index)
s.iterateSetsMtx.Lock()
delete(s.txIterateSets, index)
s.iterateSetsMtx.Unlock()
}

// CollectIteratorItems implements MultiVersionStore. It will return a memDB containing all of the keys present in the multiversion store within the iteration range prior to (exclusive of) the index.
func (s *Store) CollectIteratorItems(index int) *db.MemDB {
sortedItems := db.NewMemDB()

s.writesetKeysMtx.RLock()
// get all writeset keys prior to index
for i := 0; i < index; i++ {
writesetAny, found := s.txWritesetKeys.Load(i)
indexedWriteset, found := s.txWritesetKeys[i]
if !found {
continue
}
indexedWriteset := writesetAny.([]string)
// TODO: do we want to exclude keys out of the range or just let the iterator handle it?
for _, key := range indexedWriteset {
// TODO: inefficient because (logn) for each key + rebalancing? maybe theres a better way to add to a tree to reduce rebalancing overhead
sortedItems.Set([]byte(key), []byte{})
}
}
s.writesetKeysMtx.RUnlock()
return sortedItems
}

Expand Down Expand Up @@ -319,13 +332,13 @@

func (s *Store) checkIteratorAtIndex(index int) bool {
valid := true
iterateSetAny, found := s.txIterateSets.Load(index)
s.iterateSetsMtx.RLock()
iterateset, found := s.txIterateSets[index]
s.iterateSetsMtx.RUnlock()
if !found {
return true
}
iterateset := iterateSetAny.(Iterateset)
for _, iterationTracker := range iterateset {
// TODO: if the value of the key is nil maybe we need to exclude it? - actually it should
iteratorValid := s.validateIterator(index, *iterationTracker)
valid = valid && iteratorValid
}
Expand All @@ -336,11 +349,12 @@
conflictSet := make(map[int]struct{})
valid := true

readSetAny, found := s.txReadSets.Load(index)
s.readSetsMtx.RLock()
readset, found := s.txReadSets[index]
s.readSetsMtx.RUnlock()
if !found {
return true, []int{}
}
readset := readSetAny.(ReadSet)
// iterate over readset and check if the value is the same as the latest value relateive to txIndex in the multiversion store
for key, valueArr := range readset {
if len(valueArr) != 1 {
Expand Down
Loading