-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprocess_sources.py
More file actions
347 lines (271 loc) · 11.8 KB
/
process_sources.py
File metadata and controls
347 lines (271 loc) · 11.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
import yaml
import hashlib
import os
import lancedb
import pyarrow as pa
import json
from typing import List
import pandas as pd
from typing import Dict, List, Any
from defs.etl_convert import convert_document
from baml_client.sync_client import b
from baml_client.types import Idea
from baml_client.types import Assertion
from baml_client.types import Nanograph
from dataclasses import dataclass
from defs import etl_gliner # Add import for etl_gliner function
@dataclass
class NodeEntry:
filename: str
node_name: str
index: int
text: str
def parse_json(jd, filename) -> List[NodeEntry]:
# Read and parse the JSON file
data = json.loads(jd)
results = []
# Iterate through each node (key) in the JSON
for node_name, entries in data.items():
# Skip if the value is not a list/array
if not isinstance(entries, list):
continue
# Process each entry in the node's array
for index, text in enumerate(entries):
entry = NodeEntry(
filename=filename,
node_name=node_name,
index=index,
text=text
)
results.append(entry)
return results
def generate_content_hash(content):
"""
Generate a hash based on content.
Args:
content (str): The content to hash
Returns:
str: MD5 hash of the content as a hexadecimal string
"""
return hashlib.md5(content.encode('utf-8')).hexdigest()
def workOnIdea(mdtext: str) -> Idea:
response = b.ExtractIdea(mdtext)
return response
def workOnAssertion(mdtext: str) -> Assertion:
response = b.ExtractAssertion(mdtext)
return response
def workOnNanograph(mdtext: str) -> Nanograph:
response = b.ExtractNanopubs(mdtext)
return response
def process_sources():
# Read the sourcelist.yaml file
with open('stores/sourcelist.yaml', 'r') as file:
sources = yaml.safe_load(file)
# Initialize LanceDB
db = lancedb.connect("./lancedb")
# Create or get the sources table
schema = pa.schema([
pa.field("filename", pa.string()),
pa.field("id", pa.string()),
pa.field("location", pa.string()),
pa.field("markdown", pa.string())
])
# Always recreate the table to avoid duplicates
if "sources" in db.table_names():
db.drop_table("sources")
table = db.create_table("sources", schema=schema)
# Process each source
for source_name, location in sources.items():
print(f"Processing {source_name}: {location}")
# Determine if it's a local file or URL
is_local = location.startswith("file://")
if is_local:
# Remove the file:// prefix and convert to local path
local_path = location[7:] # Remove "file://" prefix
markdown = convert_document(local_file=local_path)
filename = os.path.basename(local_path)
else:
# It's a URL
markdown = convert_document(url=location)
filename = source_name # Use the source name as filename for URLs
if markdown:
# Generate a content hash as ID
content_id = generate_content_hash(markdown)
# Store in LanceDB
data = {
"filename": filename,
"id": content_id,
"location": location,
"markdown": markdown
}
# Add to table (using upsert to avoid duplicates)
table.add([data])
print(f"Successfully processed {source_name} with ID {content_id}")
else:
print(f"Failed to process {source_name}")
def process_claims():
db = lancedb.connect("./lancedb")
if "sources" not in db.table_names():
print("Sources table not found!")
return
# Define schema for the claims table based on NodeEntry class
claims_schema = pa.schema([
pa.field("filename", pa.string()),
pa.field("node_name", pa.string()),
pa.field("index", pa.int32()),
pa.field("text", pa.string())
])
# Create or recreate the claims table
if "claims" in db.table_names():
db.drop_table("claims")
claims_table = db.create_table("claims", schema=claims_schema)
# Process source data
table = db.open_table("sources")
records = table.to_pandas()
print(f"Total records: {len(records)}")
for i, record in records.iterrows():
# Process assertions
ra = workOnAssertion(record['markdown'])
pra = parse_json(ra.model_dump_json(), record['filename'])
# Process ideas
ri = workOnIdea(record['markdown'])
pri = parse_json(ri.model_dump_json(), record['filename'])
# Combine both types of claims
all_claims = []
# Convert NodeEntry objects to dictionaries for LanceDB
for entry in pra:
all_claims.append({
"filename": entry.filename,
"node_name": entry.node_name,
"index": entry.index,
"text": entry.text
})
for entry in pri:
all_claims.append({
"filename": entry.filename,
"node_name": entry.node_name,
"index": entry.index,
"text": entry.text
})
# Add the claims to the table if there are any
if all_claims:
claims_table.add(all_claims)
print(f"Added {len(all_claims)} claims from {record['filename']}")
def process_entities():
db = lancedb.connect("./lancedb")
if "claims" not in db.table_names():
print("Claims table not found!")
return
table = db.open_table("claims")
records = table.to_pandas()
print(f"Total records in claims: {len(records)}")
all_entities_dfs = [] # Initialize a list to store individual DataFrames
for i, record in records.iterrows():
ent = etl_gliner.process(str(record['text']))
df = pd.DataFrame(ent) # df contains columns from ent, e.g., 'entity_text', 'entity_type'
if not df.empty: # Proceed only if etl_gliner found entities
df['filename'] = record['filename']
df['nodename'] = record['node_name'] # Corrected from node_name to nodename to match user's request
df['index'] = record['index'] # Corrected from index to idex
all_entities_dfs.append(df)
# else:
# print(f"No entities found for text in {record['filename']}, node {record['node_name']}, index {record['index']}")
if not all_entities_dfs:
print("No entities processed. 'entities' table will not be created.")
return
# Concatenate all DataFrames into one
final_df = pd.concat(all_entities_dfs, ignore_index=True)
print(f"Total entities processed: {len(final_df)}")
# print(final_df.head()) # Optional: print head of the final DataFrame
# print(final_df.info()) # Optional: print info of the final DataFrame
# Define schema for the new "entities" table
# We need to know the columns and types from 'ent' (output of etl_gliner.process)
# For now, let's assume 'ent' produces 'entity_text' (string) and 'entity_type' (string)
# You MUST adjust this schema based on the actual columns and types in `df` from `etl_gliner.process`
# Dynamically create schema from the final_df
if not final_df.empty:
pa_fields = []
for column_name, dtype in final_df.dtypes.items():
if pd.api.types.is_integer_dtype(dtype):
pa_fields.append(pa.field(column_name, pa.int64())) # or pa.int32() if appropriate
elif pd.api.types.is_float_dtype(dtype):
pa_fields.append(pa.field(column_name, pa.float64()))
elif pd.api.types.is_bool_dtype(dtype):
pa_fields.append(pa.field(column_name, pa.bool_()))
else: # Default to string for object types or other unhandled types
pa_fields.append(pa.field(column_name, pa.string()))
entities_schema = pa.schema(pa_fields)
# Create or recreate the entities table
if "entities" in db.table_names():
db.drop_table("entities")
entities_table = db.create_table("entities", schema=entities_schema)
print(entities_table.schema)
# Add data to the "entities" table
entities_table.add(final_df)
print(f"Successfully created 'entities' table and added {len(final_df)} records.")
else:
print("Final DataFrame is empty. 'entities' table not created.")
def triples_to_dataframe(data: Dict[str, List[Dict[str, str]]]) -> pd.DataFrame:
# Convert Pydantic model to dict using model_dump_json
data_dict = json.loads(data.model_dump_json())
# Extract the triples list from the data dictionary
triples = data_dict.get('triples', [])
# Create a DataFrame from the list of triples
df = pd.DataFrame(triples)
return df
def process_nanopubs():
db = lancedb.connect("./lancedb")
if "nanopubs" not in db.table_names():
print("nanopubs table not found!")
return
table = db.open_table("claims") # pull the claims to process the nanopubs
records = table.to_pandas()
print(f"Total records in claims: {len(records)}")
all_entities_dfs = [] # Initialize a list to store individual DataFrames
# didn't use this lambda... it would need to model_dump_json if I did
# triples_to_df = lambda data: pd.DataFrame(data.get('triples', []))
for i, record in records.iterrows():
try:
r = workOnNanograph(str(record['text']))
df = triples_to_dataframe(r)
if not df.empty: # Proceed only if etl_gliner found entities
df['filename'] = record['filename']
df['nodename'] = record['node_name'] # Corrected from node_name to nodename to match user's request
df['index'] = record['index'] # Corrected from index to idex
all_entities_dfs.append(df)
# else:
# print(f"No entities found for text in {record['filename']}, node {record['node_name']}, index {record['index']}")
except Exception as e:
# This will catch BamlValidationError and any other exceptions
print(f"Error processing record {i} from {record['filename']}: {type(e).__name__}: {str(e)}")
continue
final_df = pd.concat(all_entities_dfs, ignore_index=True)
print(f"Total entities processed: {len(final_df)}")
# Dynamically create schema from the final_df
if not final_df.empty:
pa_fields = []
for column_name, dtype in final_df.dtypes.items():
if pd.api.types.is_integer_dtype(dtype):
pa_fields.append(pa.field(column_name, pa.int64())) # or pa.int32() if appropriate
elif pd.api.types.is_float_dtype(dtype):
pa_fields.append(pa.field(column_name, pa.float64()))
elif pd.api.types.is_bool_dtype(dtype):
pa_fields.append(pa.field(column_name, pa.bool_()))
else: # Default to string for object types or other unhandled types
pa_fields.append(pa.field(column_name, pa.string()))
nanopubs_schema = pa.schema(pa_fields)
# Create or recreate the entity table
if "nanopubs" in db.table_names():
db.drop_table("nanopubs")
entities_table = db.create_table("nanopubs", schema=nanopubs_schema)
print(entities_table.schema)
# Add data to the "entities" table
entities_table.add(final_df)
print(f"Successfully created 'nanopubs' table and added {len(final_df)} records.")
else:
print("Final DataFrame is empty. 'nanopubs' table not created.")
if __name__ == "__main__":
# process_sources()
# process_claims()
# process_entities()
process_nanopubs()