diff --git a/Makefile b/Makefile index 1f152d0..40eaf25 100644 --- a/Makefile +++ b/Makefile @@ -184,4 +184,4 @@ clean-test-services: stop-test-services @docker compose rm -f localai postgres || true run-e2e: - @E2E=true LOCALAI_ENDPOINT=http://localhost:8081 LOCALRECALL_ENDPOINT=http://localhost:8080 go test -v ./test/e2e/... + @E2E=true LOCALAI_ENDPOINT=http://localhost:8081 LOCALRECALL_ENDPOINT=http://localhost:8080 go test -v -timeout 30m ./test/e2e/... diff --git a/pkg/client/client.go b/pkg/client/client.go index 5dc8108..3e3f9d8 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -66,13 +66,20 @@ func (c *Client) ListCollections() ([]string, error) { return nil, errors.New("failed to list collections") } - var collections []string - err = json.NewDecoder(resp.Body).Decode(&collections) + var apiResp struct { + Success bool `json:"success"` + Message string `json:"message"` + Data struct { + Collections []string `json:"collections"` + Count int `json:"count"` + } `json:"data"` + } + err = json.NewDecoder(resp.Body).Decode(&apiResp) if err != nil { return nil, err } - return collections, nil + return apiResp.Data.Collections, nil } // ListEntries lists all entries in a collection @@ -132,7 +139,8 @@ func (c *Client) GetEntryContent(collection, entry string) ([]EntryChunk, error) var result struct { Data struct { - Chunks []EntryChunk `json:"chunks"` + Content string `json:"content"` + ChunkCount int `json:"chunk_count"` } `json:"data"` } err = json.NewDecoder(resp.Body).Decode(&result) @@ -140,7 +148,11 @@ func (c *Client) GetEntryContent(collection, entry string) ([]EntryChunk, error) return nil, err } - return result.Data.Chunks, nil + if result.Data.Content == "" { + return nil, nil + } + + return []EntryChunk{{Content: result.Data.Content}}, nil } // GetEntryRawFile returns the original uploaded binary file as a ReadCloser. @@ -196,13 +208,19 @@ func (c *Client) DeleteEntry(collection, entry string) ([]string, error) { return nil, errors.New("failed to delete collection: " + bodyResult.String()) } - var results []string - err = json.NewDecoder(resp.Body).Decode(&results) + var apiResp struct { + Success bool `json:"success"` + Message string `json:"message"` + Data struct { + RemainingEntries []string `json:"remaining_entries"` + } `json:"data"` + } + err = json.NewDecoder(resp.Body).Decode(&apiResp) if err != nil { return nil, err } - return results, nil + return apiResp.Data.RemainingEntries, nil } // Search searches a collection @@ -229,13 +247,19 @@ func (c *Client) Search(collection, query string, maxResults int) ([]types.Resul return nil, errors.New("failed to search collection") } - var results []types.Result - err = json.NewDecoder(resp.Body).Decode(&results) + var apiResp struct { + Success bool `json:"success"` + Message string `json:"message"` + Data struct { + Results []types.Result `json:"results"` + } `json:"data"` + } + err = json.NewDecoder(resp.Body).Decode(&apiResp) if err != nil { return nil, err } - return results, nil + return apiResp.Data.Results, nil } func (c *Client) Reset(collection string) error { diff --git a/rag/engine.go b/rag/engine.go index 6887aa1..f39a8f2 100644 --- a/rag/engine.go +++ b/rag/engine.go @@ -14,4 +14,5 @@ type Engine interface { Count() int Delete(where map[string]string, whereDocuments map[string]string, ids ...string) error GetByID(id string) (types.Result, error) + GetBySource(source string) ([]types.Result, error) } diff --git a/rag/engine/chromem.go b/rag/engine/chromem.go index a80311f..283c92f 100644 --- a/rag/engine/chromem.go +++ b/rag/engine/chromem.go @@ -167,6 +167,32 @@ func (c *ChromemDB) GetByID(id string) (types.Result, error) { return types.Result{ID: res.ID, Metadata: res.Metadata, Content: res.Content}, nil } +func (c *ChromemDB) GetBySource(source string) ([]types.Result, error) { + ctx := context.Background() + count := c.collection.Count() + if count == 0 { + return nil, nil + } + + // Use Query with a where filter to find documents by source metadata. + // We use a dummy query and request all documents, relying on the where + // filter to narrow results. + res, err := c.collection.Query(ctx, ".", count, map[string]string{"source": source}, nil) + if err != nil { + return nil, fmt.Errorf("error querying by source: %v", err) + } + + var results []types.Result + for _, r := range res { + results = append(results, types.Result{ + ID: r.ID, + Metadata: r.Metadata, + Content: r.Content, + }) + } + return results, nil +} + func (c *ChromemDB) Search(s string, similarEntries int) ([]types.Result, error) { res, err := c.collection.Query(context.Background(), s, similarEntries, nil, nil) if err != nil { diff --git a/rag/engine/localai.go b/rag/engine/localai.go index 0fafe0f..4c81201 100644 --- a/rag/engine/localai.go +++ b/rag/engine/localai.go @@ -88,6 +88,10 @@ func (db *LocalAIRAGDB) GetByID(id string) (types.Result, error) { return types.Result{}, fmt.Errorf("not implemented") } +func (db *LocalAIRAGDB) GetBySource(source string) ([]types.Result, error) { + return nil, fmt.Errorf("not implemented") +} + func (db *LocalAIRAGDB) Search(s string, similarEntries int) ([]types.Result, error) { resp, err := db.openaiClient.CreateEmbeddings(context.TODO(), openai.EmbeddingRequestStrings{ diff --git a/rag/engine/mock.go b/rag/engine/mock.go new file mode 100644 index 0000000..92bffe3 --- /dev/null +++ b/rag/engine/mock.go @@ -0,0 +1,156 @@ +package engine + +import ( + "fmt" + "strings" + "sync" + + "github.com/mudler/localrecall/rag/types" +) + +// MockEngine is a simple in-memory engine for testing. It requires no +// external dependencies (no LocalAI, no embeddings). +type MockEngine struct { + mu sync.Mutex + docs map[string]types.Result + index int +} + +func NewMockEngine() *MockEngine { + return &MockEngine{ + docs: make(map[string]types.Result), + index: 1, + } +} + +func (m *MockEngine) Store(s string, metadata map[string]string) (Result, error) { + results, err := m.StoreDocuments([]string{s}, metadata) + if err != nil { + return Result{}, err + } + return results[0], nil +} + +func (m *MockEngine) StoreDocuments(s []string, metadata map[string]string) ([]Result, error) { + m.mu.Lock() + defer m.mu.Unlock() + + if len(s) == 0 { + return nil, fmt.Errorf("empty input") + } + + results := make([]Result, len(s)) + for i, content := range s { + id := fmt.Sprintf("%d", m.index) + // Copy metadata so each doc has its own map + meta := make(map[string]string, len(metadata)) + for k, v := range metadata { + meta[k] = v + } + m.docs[id] = types.Result{ + ID: id, + Content: content, + Metadata: meta, + } + results[i] = Result{ID: id} + m.index++ + } + return results, nil +} + +func (m *MockEngine) Search(s string, similarEntries int) ([]types.Result, error) { + m.mu.Lock() + defer m.mu.Unlock() + + var results []types.Result + for _, doc := range m.docs { + if strings.Contains(strings.ToLower(doc.Content), strings.ToLower(s)) { + results = append(results, doc) + } + } + // If no substring match, return all (useful for generic searches) + if len(results) == 0 { + for _, doc := range m.docs { + results = append(results, doc) + } + } + if len(results) > similarEntries { + results = results[:similarEntries] + } + return results, nil +} + +func (m *MockEngine) Delete(where map[string]string, whereDocuments map[string]string, ids ...string) error { + m.mu.Lock() + defer m.mu.Unlock() + + // Delete by IDs + if len(ids) > 0 { + for _, id := range ids { + delete(m.docs, id) + } + return nil + } + + // Delete by metadata where filter + if len(where) > 0 { + for id, doc := range m.docs { + match := true + for k, v := range where { + if doc.Metadata[k] != v { + match = false + break + } + } + if match { + delete(m.docs, id) + } + } + } + + return nil +} + +func (m *MockEngine) GetByID(id string) (types.Result, error) { + m.mu.Lock() + defer m.mu.Unlock() + + doc, ok := m.docs[id] + if !ok { + return types.Result{}, fmt.Errorf("document not found: %s", id) + } + return doc, nil +} + +func (m *MockEngine) GetBySource(source string) ([]types.Result, error) { + m.mu.Lock() + defer m.mu.Unlock() + + var results []types.Result + for _, doc := range m.docs { + if doc.Metadata["source"] == source { + results = append(results, doc) + } + } + return results, nil +} + +func (m *MockEngine) Count() int { + m.mu.Lock() + defer m.mu.Unlock() + + return len(m.docs) +} + +func (m *MockEngine) Reset() error { + m.mu.Lock() + defer m.mu.Unlock() + + m.docs = make(map[string]types.Result) + m.index = 1 + return nil +} + +func (m *MockEngine) GetEmbeddingDimensions() (int, error) { + return 384, nil +} diff --git a/rag/engine/postgres.go b/rag/engine/postgres.go index b3fc59d..36bd49d 100644 --- a/rag/engine/postgres.go +++ b/rag/engine/postgres.go @@ -586,6 +586,45 @@ func (p *PostgresDB) GetByID(id string) (types.Result, error) { return result, nil } +func (p *PostgresDB) GetBySource(source string) ([]types.Result, error) { + ctx := context.Background() + + rows, err := p.pool.Query(ctx, fmt.Sprintf(` + SELECT id::text, COALESCE(title, '') as title, content, metadata + FROM %s WHERE metadata->>'source' = $1 + `, p.tableName), source) + if err != nil { + return nil, fmt.Errorf("failed to query by source: %w", err) + } + defer rows.Close() + + var results []types.Result + for rows.Next() { + var r types.Result + var title string + var metadataJSON []byte + + if err := rows.Scan(&r.ID, &title, &r.Content, &metadataJSON); err != nil { + return nil, fmt.Errorf("failed to scan row: %w", err) + } + + r.Metadata = make(map[string]string) + if len(metadataJSON) > 0 { + json.Unmarshal(metadataJSON, &r.Metadata) + } + if title != "" { + r.Metadata["title"] = title + } + results = append(results, r) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("row iteration error: %w", err) + } + + return results, nil +} + func (p *PostgresDB) Search(s string, similarEntries int) ([]types.Result, error) { ctx := context.Background() diff --git a/rag/persistency.go b/rag/persistency.go index e30a1ee..eeaa486 100644 --- a/rag/persistency.go +++ b/rag/persistency.go @@ -23,8 +23,7 @@ import ( // CollectionState represents the persistent state of a collection type CollectionState struct { - ExternalSources []*ExternalSource `json:"external_sources"` - Index map[string][]engine.Result `json:"index"` + ExternalSources []*ExternalSource `json:"external_sources"` } type PersistentKB struct { @@ -35,8 +34,6 @@ type PersistentKB struct { maxChunkSize int chunkOverlap int sources []*ExternalSource - - index map[string][]engine.Result } func loadDB(path string) (*CollectionState, error) { @@ -48,13 +45,24 @@ func loadDB(path string) (*CollectionState, error) { state := &CollectionState{} err = json.Unmarshal(data, state) if err != nil { + // Handle legacy format (just an array of strings or old format with Index) + // Try to extract only external_sources from the raw JSON + var raw map[string]json.RawMessage + if err2 := json.Unmarshal(data, &raw); err2 == nil { + if esRaw, ok := raw["external_sources"]; ok { + var es []*ExternalSource + if err3 := json.Unmarshal(esRaw, &es); err3 == nil { + state.ExternalSources = es + return state, nil + } + } + } // Handle legacy format (just an array of strings) var legacyFiles []string if err := json.Unmarshal(data, &legacyFiles); err != nil { return nil, err } state.ExternalSources = []*ExternalSource{} - state.Index = map[string][]engine.Result{} } return state, nil @@ -75,7 +83,6 @@ func NewPersistentCollectionKB(stateFile, assetDir string, store Engine, maxChun maxChunkSize: maxChunkSize, chunkOverlap: chunkOverlap, sources: []*ExternalSource{}, - index: map[string][]engine.Result{}, } persistentKB.Lock() defer persistentKB.Unlock() @@ -93,36 +100,85 @@ func NewPersistentCollectionKB(stateFile, assetDir string, store Engine, maxChun chunkOverlap: chunkOverlap, assetDir: assetDir, sources: state.ExternalSources, - index: state.Index, } - // Migrate flat index keys (no "/" in key) to UUID subdirectory layout. + // Migrate flat files in assetDir (files not in UUID subdirectories) to UUID layout. if err := db.migrateToUUIDLayout(); err != nil { xlog.Error("Failed to migrate to UUID layout", err) return nil, err } - // TODO: Automatically repopulate if embeddings dimensions are mismatching. - // To check if dimensions are mismatching, we can check the number of dimensions of the first embedding in the index if is the same as the - // dimension that the embedding model returns. - resp, err := llmClient.CreateEmbeddings(context.Background(), - openai.EmbeddingRequestStrings{ - Input: []string{"test"}, - Model: openai.EmbeddingModel(embeddingModel), - }, - ) - if err == nil && len(resp.Data) > 0 { - embedding := resp.Data[0].Embedding - embeddingDimensions, err := db.Engine.GetEmbeddingDimensions() - if err == nil && len(embedding) != embeddingDimensions { - xlog.Info("Embedding dimensions mismatch, repopulating", "embeddingDimensions", embeddingDimensions, "embedding", embedding) - return db, db.Repopulate() + // Automatically repopulate if embeddings dimensions are mismatching. + if llmClient != nil { + resp, err := llmClient.CreateEmbeddings(context.Background(), + openai.EmbeddingRequestStrings{ + Input: []string{"test"}, + Model: openai.EmbeddingModel(embeddingModel), + }, + ) + if err == nil && len(resp.Data) > 0 { + embedding := resp.Data[0].Embedding + embeddingDimensions, err := db.Engine.GetEmbeddingDimensions() + if err == nil && len(embedding) != embeddingDimensions { + xlog.Info("Embedding dimensions mismatch, repopulating", "embeddingDimensions", embeddingDimensions, "embedding", embedding) + return db, db.Repopulate() + } } } return db, nil } +// listDocumentKeys scans assetDir for UUID subdirectories containing files +// and returns the keys in "uuid/filename" format. +func (db *PersistentKB) listDocumentKeys() []string { + entries, err := os.ReadDir(db.assetDir) + if err != nil { + return nil + } + + var keys []string + for _, entry := range entries { + if !entry.IsDir() { + continue + } + uuidDir := entry.Name() + subEntries, err := os.ReadDir(filepath.Join(db.assetDir, uuidDir)) + if err != nil { + continue + } + for _, sub := range subEntries { + if !sub.IsDir() { + keys = append(keys, filepath.Join(uuidDir, sub.Name())) + } + } + } + return keys +} + +// findEntryKey finds the filesystem key for a given entry name. +// It checks for exact key match first, then falls back to base filename match. +func (db *PersistentKB) findEntryKey(entry string) (string, bool) { + keys := db.listDocumentKeys() + + // Direct key match + for _, k := range keys { + if k == entry { + return k, true + } + } + + // Fall back to base filename match + base := filepath.Base(entry) + for _, k := range keys { + if filepath.Base(k) == base { + return k, true + } + } + + return "", false +} + func (db *PersistentKB) Search(s string, similarEntries int) ([]types.Result, error) { db.Lock() defer db.Unlock() @@ -132,11 +188,9 @@ func (db *PersistentKB) Search(s string, similarEntries int) ([]types.Result, er func (db *PersistentKB) Reset() error { db.Lock() - for f := range db.index { - os.RemoveAll(filepath.Join(db.assetDir, filepath.Dir(f))) - } + os.RemoveAll(db.assetDir) + os.MkdirAll(db.assetDir, 0755) db.sources = []*ExternalSource{} - db.index = map[string][]engine.Result{} db.save() db.Unlock() if err := db.Engine.Reset(); err != nil { @@ -149,7 +203,6 @@ func (db *PersistentKB) Reset() error { func (db *PersistentKB) save() error { state := &CollectionState{ ExternalSources: db.sources, - Index: db.index, } data, err := json.Marshal(state) if err != nil { @@ -168,18 +221,23 @@ func (db *PersistentKB) Count() int { // repopulate reinitializes the persistent knowledge base with the files that were added to it. func (db *PersistentKB) repopulate() error { - if err := db.Engine.Reset(); err != nil { return fmt.Errorf("failed to reset engine: %w", err) } - keys := []string{} - for f := range db.index { - keys = append(keys, f) + keys := db.listDocumentKeys() + // Only repopulate chunkable files + var chunkableKeys []string + for _, k := range keys { + if isChunkableFile(k) { + chunkableKeys = append(chunkableKeys, k) + } } - if _, err := db.store(map[string]string{}, keys...); err != nil { - return fmt.Errorf("failed to store files: %w", err) + if len(chunkableKeys) > 0 { + if _, err := db.store(map[string]string{}, chunkableKeys...); err != nil { + return fmt.Errorf("failed to store files: %w", err) + } } return nil @@ -193,61 +251,25 @@ func (db *PersistentKB) Repopulate() error { } // ListDocuments returns the list of documents in the knowledge base. -// Each entry includes both the index key (uuid/filename) and the original filename. +// Each entry includes the key (uuid/filename). func (db *PersistentKB) ListDocuments() []string { db.Lock() defer db.Unlock() - files := []string{} - - for f := range db.index { - files = append(files, f) - } - return files + return db.listDocumentKeys() } -// EntryExists checks if an entry with the given name exists in the index. -// It searches by the full index key first, then falls back to matching by base filename. +// EntryExists checks if an entry with the given name exists. +// It searches by the full key first, then falls back to matching by base filename. func (db *PersistentKB) EntryExists(entry string) bool { db.Lock() defer db.Unlock() - // Direct key match - if _, ok := db.index[entry]; ok { - return true - } - - // Fall back to base filename match - base := filepath.Base(entry) - for e := range db.index { - if filepath.Base(e) == base { - return true - } - } - - return false -} - -// findEntryKey finds the index key for a given entry name. -// It checks for exact key match first, then falls back to base filename match. -func (db *PersistentKB) findEntryKey(entry string) (string, bool) { - // Direct key match - if _, ok := db.index[entry]; ok { - return entry, true - } - - // Fall back to base filename match - base := filepath.Base(entry) - for e := range db.index { - if filepath.Base(e) == base { - return e, true - } - } - - return "", false + _, ok := db.findEntryKey(entry) + return ok } // GetEntryContent returns all chunks (content, id, metadata) for the given entry. -// It uses the in-memory index and Engine.GetByID to resolve full chunk data. +// It uses Engine.GetBySource to find chunks by source metadata. func (db *PersistentKB) GetEntryContent(entry string) ([]types.Result, error) { db.Lock() defer db.Unlock() @@ -256,15 +278,10 @@ func (db *PersistentKB) GetEntryContent(entry string) ([]types.Result, error) { if !ok { return nil, fmt.Errorf("entry not found: %s", entry) } - chunkResults := db.index[key] - results := make([]types.Result, 0, len(chunkResults)) - for _, r := range chunkResults { - full, err := db.Engine.GetByID(r.ID) - if err != nil { - return nil, fmt.Errorf("failed to get chunk %s: %w", r.ID, err) - } - results = append(results, full) + results, err := db.Engine.GetBySource(key) + if err != nil { + return nil, fmt.Errorf("failed to get chunks for %s: %w", key, err) } return results, nil } @@ -295,8 +312,12 @@ func (db *PersistentKB) GetEntryFileContent(entry string) (content string, chunk if !ok { return "", 0, fmt.Errorf("entry not found: %s", entry) } - chunkResults := db.index[key] - chunkCount = len(chunkResults) + + results, err := db.Engine.GetBySource(key) + if err != nil { + return "", 0, fmt.Errorf("failed to get chunks for %s: %w", key, err) + } + chunkCount = len(results) fpath := filepath.Join(db.assetDir, key) content, err = fileToText(fpath) @@ -340,7 +361,6 @@ func (db *PersistentKB) storeFile(entry string, metadata map[string]string) erro // via GetEntryFilePath(), but no semantic chunks are created. if !isChunkableFile(fileName) { xlog.Info("Storing as raw-only entry (not semantically indexed)", "entry", entry, "indexKey", indexKey) - db.index[indexKey] = nil return db.save() } @@ -365,28 +385,19 @@ func (db *PersistentKB) StoreOrReplace(entry string, metadata map[string]string) // Find the existing key by base filename (if any) oldKey, hadExisting := db.findEntryKey(fileName) if hadExisting { - oldResults := db.index[oldKey] - xlog.Info("Removing old chunks before storing new ones", "entry", oldKey, "old_chunk_count", len(oldResults)) + xlog.Info("Removing old chunks before storing new ones", "entry", oldKey) - // Delete old chunks by their IDs before storing new ones - oldIDsToDelete := make([]string, 0, len(oldResults)) - for _, oldResult := range oldResults { - oldIDsToDelete = append(oldIDsToDelete, oldResult.ID) - } - - if len(oldIDsToDelete) > 0 { - beforeDeleteCount := db.Engine.Count() - if err := db.Engine.Delete(map[string]string{}, map[string]string{}, oldIDsToDelete...); err != nil { - xlog.Error("Failed to delete old chunks", "ids_count", len(oldIDsToDelete), "error", err) - return fmt.Errorf("failed to delete old chunks: %w", err) - } - afterDeleteCount := db.Engine.Count() - xlog.Info("Deleted old chunks", "entry", oldKey, "deleted_count", len(oldIDsToDelete), "count_before", beforeDeleteCount, "count_after", afterDeleteCount) + // Delete old chunks by source metadata + beforeDeleteCount := db.Engine.Count() + if err := db.Engine.Delete(map[string]string{"source": oldKey}, map[string]string{}); err != nil { + xlog.Error("Failed to delete old chunks", "error", err) + return fmt.Errorf("failed to delete old chunks: %w", err) } + afterDeleteCount := db.Engine.Count() + xlog.Info("Deleted old chunks", "entry", oldKey, "count_before", beforeDeleteCount, "count_after", afterDeleteCount) // Remove old file and UUID subdirectory os.RemoveAll(filepath.Join(db.assetDir, filepath.Dir(oldKey))) - delete(db.index, oldKey) } // Now store the new chunks with a new UUID subdir @@ -411,9 +422,9 @@ func (db *PersistentKB) StoreOrReplace(entry string, metadata map[string]string) afterStoreCount := db.Engine.Count() xlog.Info("Stored new chunks", "entry", indexKey, "new_chunk_count", len(results), "count_before", beforeCount, "count_after", afterStoreCount) - // Save the index + // Save state if err := db.save(); err != nil { - return fmt.Errorf("failed to save index: %w", err) + return fmt.Errorf("failed to save state: %w", err) } return nil @@ -443,7 +454,6 @@ func (db *PersistentKB) store(metadata map[string]string, indexKeys ...string) ( return nil, fmt.Errorf("stored %d chunks but expected %d for file: %s", len(res), len(pieces), key) } results = append(results, res...) - db.index[key] = results } return results, nil @@ -460,7 +470,7 @@ func (db *PersistentKB) removeFileEntry(entry string) error { xlog.Info("Removing entry", "entry", entry) - // Resolve the actual index key for this entry + // Resolve the actual key for this entry key, found := db.findEntryKey(entry) if !found { return fmt.Errorf("entry not found: %s", entry) @@ -471,32 +481,16 @@ func (db *PersistentKB) removeFileEntry(entry string) error { // Get count before deletion for logging beforeCount := db.Engine.Count() - xlog.Info("Deleting entry from engine", "entry", key, "chunks_in_index", len(db.index[key]), "total_count_before", beforeCount) + xlog.Info("Deleting entry from engine", "entry", key, "total_count_before", beforeCount) if err := db.Engine.Delete(map[string]string{"source": key}, map[string]string{}); err != nil { xlog.Error("Error deleting by source metadata", "error", err, "entry", key) return err } - // Make sure entries are deleted - for _, id := range db.index[key] { - res, err := db.Engine.GetByID(id.ID) - if err == nil { - xlog.Debug("Found remaining result", "result", res) - err := db.Engine.Delete(map[string]string{}, map[string]string{}, res.ID) - if err != nil { - xlog.Error("Error deleting by ID", "error", err, "id", res.ID) - return err - } - } - } - afterCount := db.Engine.Count() xlog.Info("Deleted entry", "entry", key, "count_before", beforeCount, "count_after", afterCount, "deleted_count", beforeCount-afterCount) - xlog.Info("Deleting entry from index", "entry", key) - delete(db.index, key) - xlog.Info("Removing entry from disk", "file", e) os.Remove(e) // Remove the UUID subdirectory @@ -509,7 +503,6 @@ func (db *PersistentKB) removeFileEntry(entry string) error { // Remove the file and its UUID subdir os.RemoveAll(filepath.Join(db.assetDir, filepath.Dir(key))) - delete(db.index, key) // TODO: this is suboptimal, but currently chromem does not support deleting single entities return db.repopulate() @@ -599,13 +592,17 @@ func chunkFile(fpath string, maxchunksize, chunkOverlap int) ([]string, error) { return chunks, nil } -// migrateToUUIDLayout migrates flat index keys (e.g. "report.pdf") to UUID -// subdirectory layout (e.g. "a1b2c3d4-.../report.pdf"). This is a one-time -// migration that runs on load if any flat keys are detected. +// migrateToUUIDLayout migrates flat files in assetDir (files not in UUID +// subdirectories) to UUID subdirectory layout. This is a one-time migration. func (db *PersistentKB) migrateToUUIDLayout() error { + entries, err := os.ReadDir(db.assetDir) + if err != nil { + return nil // no assetDir yet + } + needsMigration := false - for key := range db.index { - if !strings.Contains(key, string(os.PathSeparator)) && !strings.Contains(key, "/") { + for _, entry := range entries { + if !entry.IsDir() { needsMigration = true break } @@ -614,44 +611,35 @@ func (db *PersistentKB) migrateToUUIDLayout() error { return nil } - xlog.Info("Migrating flat index keys to UUID layout", "asset_dir", db.assetDir) - newIndex := make(map[string][]engine.Result, len(db.index)) + xlog.Info("Migrating flat files to UUID layout", "asset_dir", db.assetDir) - for key, results := range db.index { - // Skip keys that already have UUID layout - if strings.Contains(key, string(os.PathSeparator)) || strings.Contains(key, "/") { - newIndex[key] = results + for _, entry := range entries { + if entry.IsDir() { continue } - + fileName := entry.Name() fileUUID := uuid.New().String() - newKey := filepath.Join(fileUUID, key) uuidDir := filepath.Join(db.assetDir, fileUUID) if err := os.MkdirAll(uuidDir, 0755); err != nil { return fmt.Errorf("failed to create UUID dir during migration: %w", err) } - oldPath := filepath.Join(db.assetDir, key) - newPath := filepath.Join(uuidDir, key) + oldPath := filepath.Join(db.assetDir, fileName) + newPath := filepath.Join(uuidDir, fileName) - if _, err := os.Stat(oldPath); err == nil { - data, err := os.ReadFile(oldPath) - if err != nil { - return fmt.Errorf("failed to read file during migration: %w", err) - } - if err := os.WriteFile(newPath, data, 0644); err != nil { - return fmt.Errorf("failed to write file during migration: %w", err) - } - os.Remove(oldPath) + data, err := os.ReadFile(oldPath) + if err != nil { + return fmt.Errorf("failed to read file during migration: %w", err) } - - newIndex[newKey] = results - xlog.Info("Migrated entry", "old_key", key, "new_key", newKey) + if err := os.WriteFile(newPath, data, 0644); err != nil { + return fmt.Errorf("failed to write file during migration: %w", err) + } + os.Remove(oldPath) + xlog.Info("Migrated entry", "old_key", fileName, "new_key", filepath.Join(fileUUID, fileName)) } - db.index = newIndex - return db.save() + return nil } // GetExternalSources returns the list of external sources for this collection diff --git a/rag/persistency_mock_test.go b/rag/persistency_mock_test.go new file mode 100644 index 0000000..b3bd340 --- /dev/null +++ b/rag/persistency_mock_test.go @@ -0,0 +1,337 @@ +package rag_test + +import ( + "os" + "path/filepath" + + . "github.com/mudler/localrecall/rag" + "github.com/mudler/localrecall/rag/engine" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +// newMockKB creates a PersistentKB backed by a MockEngine. +// It writes a minimal state file so the constructor skips the embedding check. +func newMockKB(stateFile, assetDir string, eng *engine.MockEngine) (*PersistentKB, error) { + return NewPersistentCollectionKB(stateFile, assetDir, eng, 1000, 0, nil, "") +} + +var _ = Describe("PersistentKB with MockEngine", func() { + var ( + tempDir string + stateFile string + assetDir string + eng *engine.MockEngine + ) + + BeforeEach(func() { + var err error + tempDir, err = os.MkdirTemp("", "mock_persistency_test_*") + Expect(err).ToNot(HaveOccurred()) + + stateFile = filepath.Join(tempDir, "state.json") + assetDir = filepath.Join(tempDir, "assets") + eng = engine.NewMockEngine() + }) + + AfterEach(func() { + if tempDir != "" { + os.RemoveAll(tempDir) + } + }) + + // Helper: create a temp .txt file with given content, return its path. + createTxtFile := func(name, content string) string { + p := filepath.Join(tempDir, name) + Expect(os.WriteFile(p, []byte(content), 0644)).To(Succeed()) + return p + } + + Describe("NewPersistentCollectionKB", func() { + It("creates a new KB and state file", func() { + kb, err := newMockKB(stateFile, assetDir, eng) + Expect(err).ToNot(HaveOccurred()) + Expect(kb).ToNot(BeNil()) + Expect(stateFile).To(BeAnExistingFile()) + }) + + It("returns an empty document list for a fresh KB", func() { + kb, err := newMockKB(stateFile, assetDir, eng) + Expect(err).ToNot(HaveOccurred()) + Expect(kb.ListDocuments()).To(BeEmpty()) + }) + }) + + Describe("Store + ListDocuments", func() { + It("lists the stored document after Store", func() { + kb, err := newMockKB(stateFile, assetDir, eng) + Expect(err).ToNot(HaveOccurred()) + + f := createTxtFile("hello.txt", "hello world") + Expect(kb.Store(f, map[string]string{})).To(Succeed()) + + docs := kb.ListDocuments() + Expect(docs).To(HaveLen(1)) + Expect(filepath.Base(docs[0])).To(Equal("hello.txt")) + }) + + It("increases Count after Store", func() { + kb, err := newMockKB(stateFile, assetDir, eng) + Expect(err).ToNot(HaveOccurred()) + + Expect(kb.Count()).To(Equal(0)) + f := createTxtFile("count.txt", "some content for counting") + Expect(kb.Store(f, map[string]string{})).To(Succeed()) + Expect(kb.Count()).To(BeNumerically(">", 0)) + }) + }) + + Describe("EntryExists", func() { + It("returns false for a missing entry", func() { + kb, err := newMockKB(stateFile, assetDir, eng) + Expect(err).ToNot(HaveOccurred()) + Expect(kb.EntryExists("nope.txt")).To(BeFalse()) + }) + + It("returns true after Store (by base filename)", func() { + kb, err := newMockKB(stateFile, assetDir, eng) + Expect(err).ToNot(HaveOccurred()) + + f := createTxtFile("exists.txt", "data") + Expect(kb.Store(f, map[string]string{})).To(Succeed()) + Expect(kb.EntryExists("exists.txt")).To(BeTrue()) + }) + + It("returns false after Remove", func() { + kb, err := newMockKB(stateFile, assetDir, eng) + Expect(err).ToNot(HaveOccurred()) + + f := createTxtFile("gone.txt", "data") + Expect(kb.Store(f, map[string]string{})).To(Succeed()) + Expect(kb.EntryExists("gone.txt")).To(BeTrue()) + Expect(kb.RemoveEntry("gone.txt")).To(Succeed()) + Expect(kb.EntryExists("gone.txt")).To(BeFalse()) + }) + }) + + Describe("GetEntryContent", func() { + It("returns error for missing entry", func() { + kb, err := newMockKB(stateFile, assetDir, eng) + Expect(err).ToNot(HaveOccurred()) + + _, err = kb.GetEntryContent("missing.txt") + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("entry not found")) + }) + + It("returns chunks after Store", func() { + kb, err := newMockKB(stateFile, assetDir, eng) + Expect(err).ToNot(HaveOccurred()) + + f := createTxtFile("content.txt", "the quick brown fox jumps over the lazy dog") + Expect(kb.Store(f, map[string]string{})).To(Succeed()) + + results, err := kb.GetEntryContent("content.txt") + Expect(err).ToNot(HaveOccurred()) + Expect(results).ToNot(BeEmpty()) + + var combined string + for _, r := range results { + combined += r.Content + } + Expect(combined).To(ContainSubstring("quick brown fox")) + }) + }) + + Describe("GetEntryFileContent", func() { + It("returns error for missing entry", func() { + kb, err := newMockKB(stateFile, assetDir, eng) + Expect(err).ToNot(HaveOccurred()) + + _, _, err = kb.GetEntryFileContent("nope.txt") + Expect(err).To(HaveOccurred()) + }) + + It("returns content and chunk count after Store", func() { + kb, err := newMockKB(stateFile, assetDir, eng) + Expect(err).ToNot(HaveOccurred()) + + f := createTxtFile("filecontent.txt", "hello file content test") + Expect(kb.Store(f, map[string]string{})).To(Succeed()) + + content, chunks, err := kb.GetEntryFileContent("filecontent.txt") + Expect(err).ToNot(HaveOccurred()) + Expect(content).To(ContainSubstring("hello file content test")) + Expect(chunks).To(BeNumerically(">", 0)) + }) + }) + + Describe("GetEntryFilePath", func() { + It("returns error for missing entry", func() { + kb, err := newMockKB(stateFile, assetDir, eng) + Expect(err).ToNot(HaveOccurred()) + + _, err = kb.GetEntryFilePath("missing.txt") + Expect(err).To(HaveOccurred()) + }) + + It("returns a valid path after Store", func() { + kb, err := newMockKB(stateFile, assetDir, eng) + Expect(err).ToNot(HaveOccurred()) + + f := createTxtFile("pathtest.txt", "path content") + Expect(kb.Store(f, map[string]string{})).To(Succeed()) + + p, err := kb.GetEntryFilePath("pathtest.txt") + Expect(err).ToNot(HaveOccurred()) + Expect(p).To(BeAnExistingFile()) + }) + }) + + Describe("StoreOrReplace", func() { + It("replaces existing entry, old chunks deleted", func() { + kb, err := newMockKB(stateFile, assetDir, eng) + Expect(err).ToNot(HaveOccurred()) + + f1 := createTxtFile("replace.txt", "original content here") + Expect(kb.Store(f1, map[string]string{})).To(Succeed()) + countAfterFirst := kb.Count() + + // Overwrite the temp file with new content + f2 := createTxtFile("replace.txt", "replaced content now") + Expect(kb.StoreOrReplace(f2, map[string]string{})).To(Succeed()) + + docs := kb.ListDocuments() + Expect(docs).To(HaveLen(1)) + Expect(filepath.Base(docs[0])).To(Equal("replace.txt")) + + // Count should be roughly the same (old chunks removed, new added) + Expect(kb.Count()).To(BeNumerically("~", countAfterFirst, countAfterFirst)) + + results, err := kb.GetEntryContent("replace.txt") + Expect(err).ToNot(HaveOccurred()) + var combined string + for _, r := range results { + combined += r.Content + } + Expect(combined).To(ContainSubstring("replaced content now")) + }) + }) + + Describe("RemoveEntry", func() { + It("removes entry from listing and decreases count", func() { + kb, err := newMockKB(stateFile, assetDir, eng) + Expect(err).ToNot(HaveOccurred()) + + f := createTxtFile("removeme.txt", "remove me please") + Expect(kb.Store(f, map[string]string{})).To(Succeed()) + Expect(kb.ListDocuments()).ToNot(BeEmpty()) + Expect(kb.Count()).To(BeNumerically(">", 0)) + + Expect(kb.RemoveEntry("removeme.txt")).To(Succeed()) + Expect(kb.ListDocuments()).To(BeEmpty()) + Expect(kb.Count()).To(Equal(0)) + }) + }) + + Describe("Reset", func() { + It("clears all entries", func() { + kb, err := newMockKB(stateFile, assetDir, eng) + Expect(err).ToNot(HaveOccurred()) + + f := createTxtFile("reset.txt", "to be reset") + Expect(kb.Store(f, map[string]string{})).To(Succeed()) + Expect(kb.ListDocuments()).ToNot(BeEmpty()) + + Expect(kb.Reset()).To(Succeed()) + // After reset, create a fresh KB from the same paths + kb2, err := newMockKB(stateFile, assetDir, eng) + Expect(err).ToNot(HaveOccurred()) + Expect(kb2.ListDocuments()).To(BeEmpty()) + Expect(kb2.Count()).To(Equal(0)) + }) + }) + + Describe("Repopulate", func() { + It("re-stores all entries in the engine", func() { + kb, err := newMockKB(stateFile, assetDir, eng) + Expect(err).ToNot(HaveOccurred()) + + f := createTxtFile("repop.txt", "repopulate me") + Expect(kb.Store(f, map[string]string{})).To(Succeed()) + countBefore := kb.Count() + Expect(countBefore).To(BeNumerically(">", 0)) + + Expect(kb.Repopulate()).To(Succeed()) + Expect(kb.Count()).To(Equal(countBefore)) + }) + }) + + Describe("Search", func() { + It("returns results after Store", func() { + kb, err := newMockKB(stateFile, assetDir, eng) + Expect(err).ToNot(HaveOccurred()) + + f := createTxtFile("searchme.txt", "unique searchable content xyzzy") + Expect(kb.Store(f, map[string]string{})).To(Succeed()) + + results, err := kb.Search("xyzzy", 10) + Expect(err).ToNot(HaveOccurred()) + Expect(results).ToNot(BeEmpty()) + }) + }) + + Describe("ExternalSources", func() { + It("adds, gets, and removes external sources", func() { + kb, err := newMockKB(stateFile, assetDir, eng) + Expect(err).ToNot(HaveOccurred()) + + Expect(kb.GetExternalSources()).To(BeEmpty()) + + src := &ExternalSource{URL: "https://example.com/feed"} + Expect(kb.AddExternalSource(src)).To(Succeed()) + Expect(kb.GetExternalSources()).To(HaveLen(1)) + + Expect(kb.RemoveExternalSource("https://example.com/feed")).To(Succeed()) + Expect(kb.GetExternalSources()).To(BeEmpty()) + }) + + It("persists sources across reloads", func() { + kb, err := newMockKB(stateFile, assetDir, eng) + Expect(err).ToNot(HaveOccurred()) + + src := &ExternalSource{URL: "https://example.com/persist"} + Expect(kb.AddExternalSource(src)).To(Succeed()) + + // Reload + kb2, err := newMockKB(stateFile, assetDir, eng) + Expect(err).ToNot(HaveOccurred()) + Expect(kb2.GetExternalSources()).To(HaveLen(1)) + Expect(kb2.GetExternalSources()[0].URL).To(Equal("https://example.com/persist")) + }) + }) + + Describe("Raw-only (non-chunkable) files", func() { + It("stores and lists a .png file without creating chunks", func() { + kb, err := newMockKB(stateFile, assetDir, eng) + Expect(err).ToNot(HaveOccurred()) + + // Create a fake image file + f := filepath.Join(tempDir, "photo.png") + Expect(os.WriteFile(f, []byte("fakepng"), 0644)).To(Succeed()) + + Expect(kb.Store(f, map[string]string{})).To(Succeed()) + docs := kb.ListDocuments() + Expect(docs).To(HaveLen(1)) + Expect(filepath.Base(docs[0])).To(Equal("photo.png")) + + // No chunks in engine for raw-only file + Expect(kb.Count()).To(Equal(0)) + + // GetEntryFilePath should work + p, err := kb.GetEntryFilePath("photo.png") + Expect(err).ToNot(HaveOccurred()) + Expect(p).To(BeAnExistingFile()) + }) + }) +}) diff --git a/rag/source_manager.go b/rag/source_manager.go index 592cfb7..3a05ac7 100644 --- a/rag/source_manager.go +++ b/rag/source_manager.go @@ -102,7 +102,10 @@ func (sm *SourceManager) RemoveSource(collectionName, url string) error { } if err := collection.RemoveEntry(fmt.Sprintf("source-%s-%s.txt", collectionName, sanitizeURL(url))); err != nil { - return err + // Ignore error if entry doesn't exist — content may never have been fetched + if !strings.Contains(err.Error(), "entry not found") { + return err + } } // Remove from in-memory sources diff --git a/test/e2e/persistency_test.go b/test/e2e/persistency_test.go index 0a8ab0f..a8b8b4a 100644 --- a/test/e2e/persistency_test.go +++ b/test/e2e/persistency_test.go @@ -82,7 +82,7 @@ var _ = Describe("Persistency", func() { docs := kb.ListDocuments() Expect(docs).To(HaveLen(1)) - Expect(docs[0]).To(Equal("test.txt")) + Expect(docs[0]).To(ContainSubstring("test.txt")) }) It("should remove an entry", func() { @@ -111,7 +111,7 @@ var _ = Describe("Persistency", func() { docs := kb.ListDocuments() Expect(docs).To(HaveLen(1)) - Expect(docs[0]).To(Equal("test.txt")) + Expect(docs[0]).To(ContainSubstring("test.txt")) }) It("should get entry content", func() {