-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbasic_usage.rs
More file actions
169 lines (146 loc) Β· 5.74 KB
/
basic_usage.rs
File metadata and controls
169 lines (146 loc) Β· 5.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
//! Basic usage example for Timeplus Rust client
//!
//! This example demonstrates:
//! - Connecting to Timeplus
//! - Creating a stream
//! - Inserting data
//! - Querying data
//! - Cleaning up
//!
//! Run with: `cargo run --example basic_usage`
use timeplus_client::{TimeplusClient, utils};
use serde_json::json;
use std::collections::HashMap;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("π Timeplus Basic Usage Example");
println!("{}", "=".repeat(50));
// Create client using utility function
let client = TimeplusClient::with_config(utils::local_config())?;
// Test connection
println!("\nπ Testing connection...");
match client.health_check().await {
Ok(true) => {
let version = client.version().await?;
println!("β
Connected to Timeplus version: {}", version);
}
Ok(false) => {
println!("β Health check failed");
return Ok(());
}
Err(e) => {
println!("β Connection error: {}", e);
println!("π‘ Make sure Timeplus server is running:");
println!(" docker run -d --name timeplus-server \\");
println!(" -p 8000:8000 -p 8123:8123 -p 8463:8463 -p 3218:3218 \\");
println!(" -v /tmp/timeplus_data:/timeplus/data \\");
println!(" docker.timeplus.com/timeplus/timeplus-enterprise:2.7.9");
return Ok(());
}
}
// Create a stream
println!("\nπ Creating stream...");
let stream_name = "basic_example_stream";
let schema = "id uint32, name string, value float64, category string, timestamp datetime64(9) DEFAULT now64()";
// Clean up first (in case stream exists)
let _ = client.streams().drop(stream_name).await;
client.streams().create(stream_name, schema).await?;
println!("β
Stream '{}' created", stream_name);
// Insert some data
println!("\nπ Inserting data...");
// Method 1: Insert single record with key-value pairs
let data = vec![
("id", json!(1)),
("name", json!("Alice")),
("value", json!(10.5)),
("category", json!("A")),
];
client.streams().insert_data(stream_name, &data).await?;
println!("β
Inserted single record");
// Method 2: Insert batch records
let records = vec![
HashMap::from([
("id".to_string(), json!(2)),
("name".to_string(), json!("Bob")),
("value".to_string(), json!(20.3)),
("category".to_string(), json!("B")),
]),
HashMap::from([
("id".to_string(), json!(3)),
("name".to_string(), json!("Charlie")),
("value".to_string(), json!(15.7)),
("category".to_string(), json!("A")),
]),
HashMap::from([
("id".to_string(), json!(4)),
("name".to_string(), json!("Diana")),
("value".to_string(), json!(25.1)),
("category".to_string(), json!("C")),
]),
];
client.streams().insert_batch(stream_name, &records).await?;
// Query the data
println!("\nπ Querying data...");
// Get all records
let result = client.streams().query(stream_name, None).await?;
println!("β
Found {} records in stream", result.row_count());
// Display first few records
for (i, row) in result.rows().iter().take(3).enumerate() {
println!(" Record {}: {}", i + 1, serde_json::to_string_pretty(row)?);
}
// Get stream statistics
println!("\nπ Stream statistics...");
let stats = client.streams().stats(stream_name).await?;
for (key, value) in stats {
println!(" {}: {}", key, value);
}
// Advanced queries
println!("\nπ Advanced queries...");
// Aggregation query
let agg_result = client.query(&format!(
"SELECT category, count(*) as count, avg(value) as avg_value FROM table({}) GROUP BY category ORDER BY count DESC",
stream_name
)).await?;
println!("β
Aggregation results:");
for row in agg_result.rows() {
if let (Some(cat), Some(count), Some(avg)) = (
row.get("category").and_then(|v| v.as_str()),
row.get("count").and_then(|v| v.as_u64()),
row.get("avg_value").and_then(|v| v.as_f64())
) {
println!(" Category {}: {} records, avg value: {:.2}", cat, count, avg);
}
}
// Filter query
let filter_result = client.query(&format!(
"SELECT * FROM table({}) WHERE value > 20.0 ORDER BY value DESC",
stream_name
)).await?;
println!("β
High-value records (value > 20.0): {} found", filter_result.row_count());
for row in filter_result.rows() {
if let (Some(name), Some(value)) = (
row.get("name").and_then(|v| v.as_str()),
row.get("value").and_then(|v| v.as_f64())
) {
println!(" {}: {:.1}", name, value);
}
}
// List all streams
println!("\nπ Listing all streams...");
let streams = client.streams().list().await?;
println!("β
Found {} streams:", streams.len());
for stream in streams.iter().take(5) {
println!(" - {}: {}", stream.name,
stream.schema.chars().take(50).collect::<String>());
}
// Clean up
println!("\nπ§Ή Cleaning up...");
client.streams().drop(stream_name).await?;
println!("β
Stream '{}' dropped", stream_name);
println!("\nπ Basic usage example completed successfully!");
println!("π‘ Next steps:");
println!(" - Try the streaming_callbacks example for real-time processing");
println!(" - Try the python_udfs example for custom functions");
println!(" - Try the advanced_queries example for complex SQL");
Ok(())
}