-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathlogger.py
More file actions
214 lines (178 loc) · 7.23 KB
/
logger.py
File metadata and controls
214 lines (178 loc) · 7.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
"""
logger.py — Interaction logger for the tag-context system.
Appends message/response pairs to daily JSONL files.
Tags are intentionally excluded at log time; replay.py assigns them
via the tagger, allowing re-tagging with evolved strategies.
Log format (one JSON object per line):
{
"id": str (uuid4),
"logged_at": float (unix timestamp of logging),
"session_id": str,
"user_id": str,
"channel": str, # "telegram", "voice-pwa", "console", etc.
"interaction_at": float, # when the exchange actually happened
"user_text": str,
"assistant_text": str,
"token_count": int # estimated
}
"""
import json
import time
import uuid
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Optional
LOG_DIR = Path(__file__).parent / "data" / "interactions"
@dataclass
class InteractionRecord:
id: str
logged_at: float
session_id: str
user_id: str
channel: str
interaction_at: float
user_text: str
assistant_text: str
token_count: int
is_automated: bool = False
def _log_path(ts: float) -> Path:
"""Return the JSONL path for a given unix timestamp."""
import datetime
date = datetime.datetime.fromtimestamp(ts).strftime("%Y-%m-%d")
return LOG_DIR / f"{date}.jsonl"
def _is_automated_turn(user_text: str) -> bool:
"""
Detect automated turns (cron jobs, heartbeats, local watcher, subagent events) by inspecting user_text.
Returns True if the message matches any of these patterns:
- Starts with "[cron:" (cron job payloads)
- Contains "Read HEARTBEAT.md if it exists" (heartbeat prompt)
- Starts with "[local-watcher]" (file watcher events)
- Starts with "[subagent" (subagent completion events)
- User text is exactly "HEARTBEAT_OK" (heartbeat acknowledgement)
- Text starts with "[WORKFLOW_AUTO" (post-compaction automated workflow)
Length guard: If text exceeds 500 characters, return False. Long messages
likely contain real content even if they start with an automated prefix.
"""
# Normalize whitespace for consistent matching
text = user_text.strip()
# Pattern 1: Cron job payloads — checked BEFORE the length guard because
# "[cron:" is an unambiguous machine prefix. Cron prompts are routinely
# 2000-4000 chars (full task instructions), so the length guard was
# incorrectly letting them through as non-automated.
if text.startswith("[cron:"):
return True
# Pattern 2: Heartbeat prompt (first 500 chars only — full-body search
# false-positives on compacted messages that embed prior heartbeat context)
if "Read HEARTBEAT.md if it exists" in text[:500]:
return True
# Pattern 3: Local watcher events
if text.startswith("[local-watcher]"):
return True
# Pattern 4: Heartbeat acknowledgement
if text == "HEARTBEAT_OK":
return True
# Pattern 5: Subagent completion events
if text.lower().startswith("[subagent"):
return True
# Pattern 6: WORKFLOW_AUTO / post-compaction detection
if text.startswith("[WORKFLOW_AUTO"):
return True
# Pattern 7: Multi-line System: prefix blocks (cron result delivery,
# X mentions reports, heartbeat system events delivered back to main session)
# These have the form "System: \nSystem: ...\nSystem: ..." throughout.
if text.startswith("System:") and text.count("\nSystem:") >= 2:
return True
# Pattern 8: Single System: line events (timestamps, model switches, etc.)
if text.startswith("System: [") and ("\n" not in text or text.count("\n") <= 2):
return True
# Pattern 9: Scheduled reminders (⏰ REMINDER: ... Handle this reminder internally)
# Only check the first 500 chars — these patterns can appear inside compacted
# summaries of prior turns embedded in otherwise-real messages.
head = text[:500]
if text.startswith("⏰ REMINDER:") or text.startswith("⏰ PARTNER CALL") or (
"REMINDER" in head[:30] and "Handle this reminder internally" in head
):
return True
# Pattern 10: OpenClaw internal action messages (first 500 chars only)
if "Handle this reminder internally. Do not relay" in head:
return True
return False
def log_interaction(
user_text: str,
assistant_text: str,
session_id: str = "default",
user_id: str = "unknown",
channel: str = "unknown",
interaction_at: Optional[float] = None,
token_count: Optional[int] = None,
) -> InteractionRecord:
"""
Append one interaction to today's JSONL log.
Parameters
----------
user_text The user's message.
assistant_text The assistant's response.
session_id OpenClaw session key or similar.
user_id Sender ID (Telegram user ID, etc.)
channel Source channel: "telegram", "voice-pwa", "console", etc.
interaction_at Unix timestamp of the exchange (defaults to now).
token_count Estimated tokens; computed from word count if omitted.
"""
now = time.time()
if interaction_at is None:
interaction_at = now
if token_count is None:
words = len((user_text + " " + assistant_text).split())
token_count = max(1, int(words * 1.3))
# Auto-detect automated turns (cron, heartbeat, local-watcher)
is_automated = _is_automated_turn(user_text)
record = InteractionRecord(
id=str(uuid.uuid4()),
logged_at=now,
session_id=session_id,
user_id=user_id,
channel=channel,
interaction_at=interaction_at,
user_text=user_text,
assistant_text=assistant_text,
token_count=token_count,
is_automated=is_automated,
)
path = _log_path(now)
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("a", encoding="utf-8") as f:
f.write(json.dumps(asdict(record)) + "\n")
return record
def iter_records(start_date: Optional[str] = None,
end_date: Optional[str] = None):
"""
Iterate over all InteractionRecords in the log directory.
Parameters
----------
start_date "YYYY-MM-DD" inclusive lower bound (optional)
end_date "YYYY-MM-DD" inclusive upper bound (optional)
"""
paths = sorted(LOG_DIR.glob("*.jsonl"))
for path in paths:
date_str = path.stem # "2026-02-24"
if start_date and date_str < start_date:
continue
if end_date and date_str > end_date:
continue
with path.open(encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
# Backward compatibility: default is_automated to False for old records
if "is_automated" not in data:
data["is_automated"] = False
yield InteractionRecord(**data)
except (json.JSONDecodeError, TypeError):
continue # skip malformed lines
def count_records(start_date: Optional[str] = None,
end_date: Optional[str] = None) -> int:
"""Count log records in the given date range."""
return sum(1 for _ in iter_records(start_date, end_date))