Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions internal/cache/affinity.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,102 @@ func extractSearchPrefix(term string) string {

return ""
}

// AddSearchHistory adds a search term to the search history.
func (c *Cache) AddSearchHistory(searchTerm string) error {
if c.isNoOp() {
return nil
}

searchTerm = normalizeSearchTerm(searchTerm)
if searchTerm == "" {
return nil
}

start := time.Now()
defer func() {
c.stats.recordOperation("AddSearchHistory", time.Since(start))
}()

now := time.Now().Unix()

// Insert or update the search history
query := `
INSERT INTO search_history (search_term, last_used, use_count)
VALUES (?, ?, 1)
ON CONFLICT(search_term) DO UPDATE SET
last_used = excluded.last_used,
use_count = use_count + 1`

logSQL(query, searchTerm, now)

_, err := c.db.Exec(query, searchTerm, now)
return err
}

// GetSearchHistory returns recent search terms, ordered by most recent first.
// Limited to the specified count (default 10).
func (c *Cache) GetSearchHistory(limit int) ([]string, error) {
if c.isNoOp() {
return nil, nil
}

if limit <= 0 {
limit = 10
}

start := time.Now()
defer func() {
c.stats.recordOperation("GetSearchHistory", time.Since(start))
}()

query := `SELECT search_term FROM search_history ORDER BY last_used DESC LIMIT ?`
logSQL(query, limit)

rows, err := c.db.Query(query, limit)
if err != nil {
return nil, err
}
defer func() { _ = rows.Close() }()

var history []string
for rows.Next() {
var term string
if err := rows.Scan(&term); err != nil {
logger.Log.Warnf("Failed to scan search history: %v", err)
continue
}
history = append(history, term)
}

return history, rows.Err()
}

// CleanSearchHistory removes old search history entries.
func (c *Cache) CleanSearchHistory(maxAge time.Duration) (int64, error) {
if c.isNoOp() {
return 0, nil
}

start := time.Now()
defer func() {
c.stats.recordOperation("CleanSearchHistory", time.Since(start))
}()

cutoff := time.Now().Add(-maxAge).Unix()

query := `DELETE FROM search_history WHERE last_used < ?`
logSQL(query, cutoff)

result, err := c.db.Exec(query, cutoff)
if err != nil {
return 0, err
}

deleted, _ := result.RowsAffected()
if deleted > 0 {
logger.Log.Debugf("Cleaned %d old search history entries", deleted)
}

return deleted, nil
}
33 changes: 33 additions & 0 deletions internal/cache/migrations/v8_search_history.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package migrations

import (
"database/sql"
)

func init() {
Register(&v8SearchHistory{})
}

// v8SearchHistory adds search history table for storing recent searches.
type v8SearchHistory struct{}

func (m *v8SearchHistory) Version() int {
return 8
}

func (m *v8SearchHistory) Description() string {
return "Add search history table for recent searches"
}

func (m *v8SearchHistory) Up(db *sql.DB) error {
statements := []string{
`CREATE TABLE IF NOT EXISTS search_history (
search_term TEXT PRIMARY KEY,
last_used INTEGER NOT NULL,
use_count INTEGER DEFAULT 1
)`,
`CREATE INDEX IF NOT EXISTS idx_search_history_last_used ON search_history(last_used DESC)`,
}

return ExecStatements(db, statements)
}
99 changes: 99 additions & 0 deletions internal/gcp/search/connectivity_tests.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package search

import (
"context"
"fmt"

"github.com/kedare/compass/internal/gcp"
)

// ConnectivityTestClientFactory creates a connectivity test client scoped to a project.
type ConnectivityTestClientFactory func(ctx context.Context, project string) (ConnectivityTestClient, error)

// ConnectivityTestClient exposes the subset of gcp.ConnectivityClient used by the searcher.
type ConnectivityTestClient interface {
ListTests(ctx context.Context, filter string) ([]*gcp.ConnectivityTestResult, error)
}

// ConnectivityTestProvider searches connectivity tests for query matches.
type ConnectivityTestProvider struct {
NewClient ConnectivityTestClientFactory
}

// Kind returns the resource kind this provider handles.
func (p *ConnectivityTestProvider) Kind() ResourceKind {
return KindConnectivityTest
}

// Search implements the Provider interface.
func (p *ConnectivityTestProvider) Search(ctx context.Context, project string, query Query) ([]Result, error) {
if p == nil || p.NewClient == nil {
return nil, fmt.Errorf("%s: %w", project, ErrNoProviders)
}

client, err := p.NewClient(ctx, project)
if err != nil {
return nil, fmt.Errorf("failed to create client for %s: %w", project, err)
}

tests, err := client.ListTests(ctx, "")
if err != nil {
return nil, fmt.Errorf("failed to list connectivity tests in %s: %w", project, err)
}

matches := make([]Result, 0, len(tests))
for _, test := range tests {
if test == nil {
continue
}

// Build searchable fields
searchFields := []string{test.Name, test.DisplayName, test.Description, test.Protocol}

// Add source endpoint info
if test.Source != nil {
searchFields = append(searchFields, test.Source.Instance, test.Source.IPAddress, test.Source.Network)
}

// Add destination endpoint info
if test.Destination != nil {
searchFields = append(searchFields, test.Destination.Instance, test.Destination.IPAddress, test.Destination.Network)
}

if !query.MatchesAny(searchFields...) {
continue
}

matches = append(matches, Result{
Type: KindConnectivityTest,
Name: test.Name,
Project: test.ProjectID,
Location: "global",
Details: connectivityTestDetails(test),
})
}

return matches, nil
}

// connectivityTestDetails extracts display metadata for a connectivity test.
func connectivityTestDetails(test *gcp.ConnectivityTestResult) map[string]string {
details := map[string]string{
"displayName": test.DisplayName,
"protocol": test.Protocol,
}

if test.ReachabilityDetails != nil {
details["result"] = test.ReachabilityDetails.Result
}

if test.Source != nil && test.Source.IPAddress != "" {
details["source"] = test.Source.IPAddress
}

if test.Destination != nil && test.Destination.IPAddress != "" {
details["destination"] = test.Destination.IPAddress
}

return details
}
126 changes: 72 additions & 54 deletions internal/gcp/search/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,46 +239,75 @@ func (e *Engine) SearchStreaming(ctx context.Context, projects []string, query Q
var results []Result
var warnings []SearchWarning

var wg sync.WaitGroup
for _, project := range trimmed {
project := project
wg.Add(1)
go func() {
defer wg.Done()
// Split projects into priority batches for better affinity-based ordering
// Search top projects first to show relevant results faster
priorityBatchSize := 5 // Search top 5 projects first
if len(trimmed) < priorityBatchSize {
priorityBatchSize = len(trimmed)
}

sem <- struct{}{}
defer func() { <-sem }()
searchProjectBatch := func(projects []string) {
var wg sync.WaitGroup
for _, project := range projects {
project := project
wg.Add(1)
go func() {
defer wg.Done()

// Check if context was cancelled
if ctx.Err() != nil {
return
}
sem <- struct{}{}
defer func() { <-sem }()

for _, provider := range activeProviders {
// Check cancellation before each provider
// Check if context was cancelled
if ctx.Err() != nil {
return
}

providerResults, err := provider.Search(ctx, project, query)
for _, provider := range activeProviders {
// Check cancellation before each provider
if ctx.Err() != nil {
return
}

// Update completed count regardless of result
mu.Lock()
completedRequests++
currentCompleted := completedRequests
mu.Unlock()
providerResults, err := provider.Search(ctx, project, query)

if err != nil {
// Record warning but continue with other providers
// Update completed count regardless of result
mu.Lock()
warnings = append(warnings, SearchWarning{
Project: project,
Provider: provider.Kind(),
Err: err,
})
completedRequests++
currentCompleted := completedRequests
mu.Unlock()

if err != nil {
// Record warning but continue with other providers
mu.Lock()
warnings = append(warnings, SearchWarning{
Project: project,
Provider: provider.Kind(),
Err: err,
})
mu.Unlock()

// Still send progress update even on error
if callback != nil {
progress := SearchProgress{
TotalRequests: totalRequests,
CompletedRequests: currentCompleted,
PendingRequests: totalRequests - currentCompleted,
CurrentProject: project,
CurrentProvider: string(provider.Kind()),
}
_ = callback(nil, progress)
}
continue
}

// Call the callback with new results and progress
mu.Lock()
results = append(results, providerResults...)
currentResults := make([]Result, len(providerResults))
copy(currentResults, providerResults)
mu.Unlock()

// Still send progress update even on error
// Send progress update with new results
if callback != nil {
progress := SearchProgress{
TotalRequests: totalRequests,
Expand All @@ -287,38 +316,27 @@ func (e *Engine) SearchStreaming(ctx context.Context, projects []string, query Q
CurrentProject: project,
CurrentProvider: string(provider.Kind()),
}
_ = callback(nil, progress)
if err := callback(currentResults, progress); err != nil {
// Callback requested stop - just return
return
}
}
continue
}
}()
}
wg.Wait()
}

// Call the callback with new results and progress
mu.Lock()
results = append(results, providerResults...)
currentResults := make([]Result, len(providerResults))
copy(currentResults, providerResults)
mu.Unlock()
// Search high-priority projects first
priorityBatch := trimmed[:priorityBatchSize]
searchProjectBatch(priorityBatch)

// Send progress update with new results
if callback != nil {
progress := SearchProgress{
TotalRequests: totalRequests,
CompletedRequests: currentCompleted,
PendingRequests: totalRequests - currentCompleted,
CurrentProject: project,
CurrentProvider: string(provider.Kind()),
}
if err := callback(currentResults, progress); err != nil {
// Callback requested stop - just return
return
}
}
}
}()
// Then search remaining projects
if len(trimmed) > priorityBatchSize {
remainingBatch := trimmed[priorityBatchSize:]
searchProjectBatch(remainingBatch)
}

wg.Wait()

// Sort results for consistent output
sort.Slice(results, func(i, j int) bool {
if results[i].Project != results[j].Project {
Expand Down
Loading