This is a Flask-based Python backend that leverages Snowflake Cortex AI to perform intelligent data warehouse analysis with dynamic credential management and selective table analysis. The system uses multiple AI agents to analyze schema metadata, generate KPIs, create visualizations, assess data quality, and provide natural language insights about your Snowflake database.
- UI-driven connection setup - No need to edit .env files
- Secure .pem file upload - Upload private keys directly through API
- Temporary credential handling - Credentials are used once and cleaned up
- Session isolation - Each analysis uses its own Snowflake session
- Table listing endpoint - Fetch all available tables from a schema
- Filtered metadata extraction - Analyze only selected tables
- Optimized performance - Skip unnecessary table scans
- Date/datetime serialization - Proper JSON conversion for temporal types
- Null value handling - Graceful handling of missing KPI values
- Detailed logging - Step-by-step pipeline progress indicators
The backend implements a multi-agent architecture where specialized agents handle different aspects of data analysis:
- BaseAgent: Foundation class for all AI agents that interact with Snowflake Cortex
- MetadataAgent: Retrieves schema information with optional table filtering
- DataProfilerAgent: Profiles tables (row counts, basic statistics)
- DataQualityScopeAgent: Uses AI to determine which columns should be checked for quality issues
- DataQualityProfiler: Executes SQL checks for missing values, duplicates, and invalid dates
- DataQualityAgent: Analyzes quality signals and generates actionable recommendations
- RelationshipAgent: Infers table relationships based on column naming patterns
- KPIGeneratorAgent: AI-generates relevant KPIs based on available data
- KPIExecutionAgent: Executes KPI SQL queries and returns results
- ChartGeneratorAgent: Creates chart definitions with appropriate visualizations
- ChartDataAgent: Executes chart queries with intelligent fallback mechanisms
- NarrativeInsightAgent: Generates executive-level summary insights
- ChatAgent: Provides conversational AI interface for querying insights
The run_pipeline() function orchestrates the complete analysis with enhanced logging:
🚀 STARTING DATA ANALYSIS PIPELINE
============================================================
1. Connect to Snowflake (dynamic or .env credentials)
2. Extract metadata (filtered by selected tables if provided)
3. Profile data (row counts with visual output)
4. Infer relationships (FK patterns)
5. Generate and execute KPIs (with null handling)
6. Generate and execute charts (with dynamic repair)
7. Determine data quality scope (AI-driven)
8. Execute quality checks (SQL-based validation)
9. Analyze quality issues (AI insights)
10. Generate narrative insights
11. Store complete report in CLEAN_INSIGHTS_STORE
============================================================
✨ PIPELINE COMPLETED SUCCESSFULLY
New Parameters:
def run_pipeline(session=None, selected_tables=None):
# session: Optional pre-configured Snowflake session
# selected_tables: Optional list of table names to analyzeThe repair_chart_sql() function provides intelligent fallback when Cortex-generated SQL fails:
- Automatically selects safe dimension columns (DATE, DEVICE_TYPE, CHANNEL)
- Identifies numeric columns for aggregation
- Constructs valid GROUP BY queries with appropriate limits
All AI agents use Snowflake Cortex via SNOWFLAKE.CORTEX.COMPLETE() function:
- Model:
mistral-large2(configurable) - Returns structured JSON responses
- Handles prompt engineering for data-specific tasks
- Private key authentication (PKCS#8 DER format)
- Environment-based configuration for legacy endpoints
- Dynamic credential support with temporary file cleanup
- No credentials stored permanently when using UI configuration
Enhanced sanitize_for_json() function handles:
- Decimal to float conversion
- Date and datetime to ISO string conversion
- Nested dict/list sanitization
- Null value preservation
Health check endpoint.
Response:
{
"service": "Snowflake Cortex Data Intelligence API",
"endpoints": [
"/run-analysis",
"/list-tables",
"/clean-report",
"/clean-report/runs",
"/clean-report/<load_id>"
]
}Fetches all tables from a Snowflake schema using provided credentials.
Request (multipart/form-data):
account: Snowflake account identifier (e.g., "WB19670-C2GPARTNERS")user: Usernamerole: Role namewarehouse: Warehouse namedatabase: Database nameschema: Schema nameprivate_key_file: .pem file uploadprivate_key_passphrase: Optional passphrase
Response:
{
"status": "success",
"tables": ["TABLE1", "TABLE2", "TABLE3"],
"count": 3
}Error Response:
{
"status": "error",
"message": "Error description"
}Triggers analysis with dynamic credentials and table selection.
Request (multipart/form-data):
account: Snowflake accountuser: Usernamerole: Role namewarehouse: Warehouse namedatabase: Database nameschema: Schema nameprivate_key_file: .pem file uploadprivate_key_passphrase: Optional passphrasetables: JSON array of table names (e.g.,["TABLE1", "TABLE2"])
Response:
{
"status": "success",
"data": {
"meta": {
"load_id": "uuid",
"generated_at": "ISO timestamp",
"schema_analyzed": "SCHEMA_NAME"
},
"summary": { ... },
"understanding": { ... },
"kpis": [ ... ],
"charts": [ ... ],
"data_quality": { ... },
"transformations": [ ... ],
"insights": { ... }
}
}Triggers analysis using .env credentials for all tables.
Process:
- Analyzes all tables in configured schema
- Generates 4 KPIs
- Creates 4 charts with data
- Performs data quality checks
- Stores report with unique
load_id
Response:
{
"status": "success",
"data": {
"meta": {
"load_id": "uuid",
"generated_at": "ISO timestamp",
"schema_analyzed": "SCHEMA_NAME"
},
"summary": { ... },
"understanding": { ... },
"kpis": [ ... ],
"charts": [ ... ],
"data_quality": { ... },
"transformations": [ ... ],
"insights": { ... }
}
}Retrieves the latest or specific analysis report.
Response:
{
"status": "success",
"load_id": "uuid",
"load_datetime": "timestamp",
"data": { ... }
}Lists all available report runs.
Response:
[
{
"load_id": "uuid",
"load_datetime": "timestamp"
}
]Conversational AI interface for querying insights.
Request:
{
"message": "What are the top KPIs?"
}Response:
{
"status": "success",
"load_id": "uuid",
"load_datetime": "timestamp",
"question": "What are the top KPIs?",
"answer": "Based on the latest insights, the top KPIs are..."
}# Snowflake Connection
SNOWFLAKE_USER=your_username
SNOWFLAKE_ACCOUNT=your_account.region
SNOWFLAKE_WAREHOUSE=your_warehouse
SNOWFLAKE_DATABASE=your_database
SNOWFLAKE_SCHEMA=your_schema
SNOWFLAKE_ROLE=your_role
# Authentication
PRIVATE_KEY_PATH=./CLARITY_SERVICE_ACCOUNT.pem
PRIVATE_KEY_PASSPHRASE=your_passphraseCREATE TABLE CLEAN_INSIGHTS_STORE (
LOAD_ID VARCHAR(255) PRIMARY KEY,
LOAD_DATETIME TIMESTAMP_NTZ,
CLEAN_JSON VARIANT
);- Python 3.8+
- Snowflake account with Cortex enabled
- Private key file (.pem format)
- Install dependencies:
pip install flask flask-cors snowflake-connector-python snowflake-snowpark-python cryptography-
Configure environment: Create
.envfile with your Snowflake credentials. -
Generate private key (if needed):
# Generate private key
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8
# Extract public key
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Add public key to Snowflake user
ALTER USER your_username SET RSA_PUBLIC_KEY='...';- Create required Snowflake objects:
CREATE TABLE CLEAN_INSIGHTS_STORE (
LOAD_ID VARCHAR(255),
LOAD_DATETIME TIMESTAMP_NTZ,
CLEAN_JSON VARIANT
);- Run the server:
python app.pyServer will start on http://0.0.0.0:8080
curl http://localhost:8080/run-analysiscurl http://localhost:8080/clean-reportcurl -X POST http://localhost:8080/chat \
-H "Content-Type: application/json" \
-d '{"message": "What is the data quality score?"}'┌─────────────────────────────────────────────────────────────────┐
│ Client Request │
└────────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Flask Application (app.py) │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Agent Orchestration │ │
│ │ │ │
│ │ 1. MetadataAgent → Schema Info │ │
│ │ 2. DataProfilerAgent → Row Counts │ │
│ │ 3. RelationshipAgent → Table Relationships │ │
│ │ 4. KPIGeneratorAgent + KPIExecutionAgent → KPIs │ │
│ │ 5. ChartGeneratorAgent + ChartDataAgent → Charts │ │
│ │ 6. DataQualityScopeAgent → Quality Targets │ │
│ │ 7. DataQualityProfiler → SQL Validation │ │
│ │ 8. DataQualityAgent → Quality Analysis │ │
│ │ 9. NarrativeInsightAgent → Summary │ │
│ └──────────────────────────────────────────────────────────┘ │
└────────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Snowflake Cortex AI │
│ • Mistral-Large2 Model │
│ • INFORMATION_SCHEMA queries │
│ • Data table queries │
│ • CLEAN_INSIGHTS_STORE (persistence) │
└─────────────────────────────────────────────────────────────────┘
- Extract JSON: Robust parsing with regex fallback for malformed Cortex responses
- Sanitize for JSON: Converts Snowflake types (Decimal, VARIANT) to JSON-serializable formats
- Dynamic SQL Repair: Automatically repairs invalid chart SQL with schema-aware fallbacks
- Try-catch blocks: Graceful degradation when agents fail
class NewAgent(BaseAgent):
def run(self, context):
return self.cortex(f"""
Your prompt here
Context: {json.dumps(context)}
Format: {{ "result": "" }}
""")Change model in agent initialization:
agent = BaseAgent(session, model="llama3-70b")@app.route("/new-endpoint", methods=["GET", "POST"])
def new_endpoint():
# Your logic here
return jsonify({"status": "success"})- Metadata queries: Cached per pipeline run
- Table profiling: Limited to 15 tables
- Chart data: Limited to 20 rows per chart
- Cortex calls: Synchronous, ~2-5 seconds each
- Total pipeline time: 30-60 seconds for complete analysis
- Check Cortex availability in your region
- Verify model name is correct
- Ensure role has Cortex privileges
- Verify key format (PKCS#8 DER)
- Check passphrase
- Ensure public key is added to Snowflake user
- Dynamic repair should handle most cases
- Check table/column names for special characters
- Review
repair_chart_sql()logic
Proprietary - Data Insights Hub
For issues and questions, contact the development team.