-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindexer.py
More file actions
182 lines (146 loc) · 7.6 KB
/
indexer.py
File metadata and controls
182 lines (146 loc) · 7.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import math
import threading
import time
from mongodb import MongoDB
class Indexer:
def __init__(self, threads_num=4):
self.docs_count = 0
self.doc_lengths = {}
self.document_ids = []
self.mongo_connection = MongoDB.connect_to_db()
self.threads_num = threads_num
self.thread_pool = []
def build_index(self):
print("> Building index...")
tic = time.perf_counter()
# Reset index related database collections
self.mongo_connection.reset_index()
# Copy all records from "Crawler Records" db collection to "Documents" db collection.
self.mongo_connection.build_documents_db()
# Get the documents total count
self.docs_count = self.mongo_connection.get_documents_count()
# Get all document IDs from the database
self.document_ids = self.mongo_connection.find_all_document_record_ids()
doc_counter = 0 # Keeps count of the amount of documents processed.
for document_id in self.document_ids:
doc_counter += 1
print("> Index Builder: Processing document {counter} of {total}...".format(counter=doc_counter,
total=self.docs_count))
document = self.mongo_connection.find_document_record(document_id)
bag = document["bag"]
for word in bag:
while True: # it will stay here till a thread is available
active = 0
for thread in self.thread_pool:
if thread.isAlive():
active += 1
if active < self.threads_num:
break
else:
time.sleep(0.5)
new_task = threading.Thread(target=self.process_word, args=(document, word, ))
new_task.start()
self.thread_pool.append(new_task)
# Wait until all threads in the thread poll have finished
for thread in self.thread_pool:
while thread.isAlive():
time.sleep(0.5)
toc = time.perf_counter()
print("> Index builder execution time: " + "{:.2f}".format(toc - tic) + " secs")
print("> Index building complete!")
# Calculate the document lengths of the given document collection, and store them as a new property
# of each document record.
print("> Calculating document lengths...")
tic = time.perf_counter()
self.calculate_doc_lengths()
# Wait until all threads in the thread poll have finished
for thread in self.thread_pool:
while thread.isAlive():
time.sleep(0.5)
toc = time.perf_counter()
print("> Document lengths calculation execution time: " + "{:.2f}".format(toc - tic) + " secs")
print("> Document lengths calculation complete!")
# Update Query Handler's DB collections with the new index and document data.
print("> Updating Query Handler's DB collections using Index DB collections...")
self.mongo_connection.update_query_handler_db()
print("> Query Handler's DB collections update complete!")
def process_word(self, document, word):
doc_id = document["_id"]
url = document["url"]
title = document["title"]
bag = document["bag"]
# Check if the word already exists in the inverted index
if self.mongo_connection.index_entry_exists(word):
# If the word exists in the index, update the word entry's data
self.mongo_connection.update_index_entry(word, {"_id": doc_id,
"title": title,
"url": url,
"w_d_freq": bag[word]
})
else:
# If the word does not exist in the index, create a new entry for it
# ATTENTION: word must be converted to LOWERCASE for the purposes of querying.
self.mongo_connection.add_index_entry({"word": word.lower(),
"w_freq": 1,
"documents": [{"_id": doc_id,
"title": title,
"url": url,
"w_d_freq": bag[word]}]
})
def calculate_doc_lengths(self):
# Reset thread pool
self.thread_pool = []
doc_counter = 0 # Keeps count of the amount of documents processed.
for document_id in self.document_ids:
doc_counter += 1
print("> Document lengths calculation: Processing document {counter} of {total}...".format(
counter=doc_counter, total=self.docs_count))
document = self.mongo_connection.find_document_record(document_id)
# Wait until thread pool has an available thread
while True:
active = 0
for thread in self.thread_pool:
if thread.isAlive():
active += 1
if active < self.threads_num:
break
else:
time.sleep(0.5)
new_task = threading.Thread(target=self.calculate_doc_length, args=(document,))
new_task.start()
self.thread_pool.append(new_task)
# Given a document id, searches for the word-document frequency on a given term's document list.
def search_w_d_freq(self, doc_id, doc_list):
for document in doc_list:
if document["_id"] == doc_id:
return document["w_d_freq"]
# If no w_d_freq data was found, the document is not present in the target term's
# document list, so we return 0.
return 0
def calculate_doc_length(self, document):
doc_id = document["_id"]
bag = document["bag"]
# Initialize the score accumulator for the current document to 0
squared_weights_sum = 0
# Find maximum word-document frequency value (used in normalization) for the current document
max_w_d_freq = 1
for keyword in bag:
term_record = self.mongo_connection.find_index_entry_by_keyword(keyword)
w_d_freq = self.search_w_d_freq(doc_id, term_record["documents"])
if w_d_freq > max_w_d_freq:
max_w_d_freq = w_d_freq
for keyword in bag:
term_record = self.mongo_connection.find_index_entry_by_keyword(keyword)
w_d_freq = self.search_w_d_freq(doc_id, term_record["documents"])
w_freq = term_record["w_freq"]
norm_w_d_freq = w_d_freq / max_w_d_freq
if self.docs_count > 1:
norm_inv_d_freq = math.log(self.docs_count / w_freq) / math.log(self.docs_count)
else:
# If the document collections consists of just 1 document, set inverted document frequency
# variable to 1 (thus ignoring inverted document frequency scoring)
norm_inv_d_freq = 1
squared_weights_sum += math.pow(norm_w_d_freq * norm_inv_d_freq, 2)
# Calculate current document's length and add the document:length pair to the doc_lengths dictionary.
doc_len = math.sqrt(squared_weights_sum)
self.mongo_connection.add_length_to_document(doc_id, doc_len)