-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTFIDF.java
More file actions
243 lines (186 loc) · 7.45 KB
/
TFIDF.java
File metadata and controls
243 lines (186 loc) · 7.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
/*=============================================================================
| Assignment: Final Project - Multiple Document Summarization
| Author: Group7 - (Sampath, Ajay, Visesh)
| Grader: Walid Shalaby
|
| Course: ITCS 6190
| Instructor: Srinivas Akella
|
| Language: Java
| Version : 1.8.0_101
|
| Deficiencies: No logical errors.
*===========================================================================*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/*
* Calculate TFIDF and filter top terms in each file
*/
public class TFIDF extends Configured implements Tool {
private static final Logger LOG = Logger.getLogger(TFIDF.class);
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf(), " tfidf ");
Configuration configuration = job.getConfiguration(); // create a
// configuration
// reference
// get file count in the input folder
FileSystem fs = FileSystem.get(configuration);
Path path = new Path(args[1]);
ContentSummary cs = fs.getContentSummary(path);
long fileCount = cs.getFileCount();
configuration.setInt("totalDocuments", (int) fileCount); // use
// configuration
// object to
// pass file
// count
job.setJarByClass(this.getClass());
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
FileInputFormat.addInputPath(job, new Path(args[2]));
FileOutputFormat.setOutputPath(job, new Path(args[3]));
// Explicitly set key and value types of map and reduce output
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
int success = job.waitForCompletion(true) ? 0 : 1;
// Job to convert document to a vector of top 49 TFIDFs
if (success == 0) {
Job docVectorJob = Job.getInstance(getConf(), " tfidf2 ");
docVectorJob.setJarByClass(this.getClass());
docVectorJob.setMapperClass(MapDocVector.class);
docVectorJob.setReducerClass(ReduceDocVector.class);
FileInputFormat.addInputPath(docVectorJob, new Path(args[3]));
FileOutputFormat.setOutputPath(docVectorJob, new Path(args[4]));
// Explicitly set key and value types of map and reduce output
docVectorJob.setOutputKeyClass(Text.class);
docVectorJob.setOutputValueClass(Text.class);
docVectorJob.setMapOutputKeyClass(Text.class);
docVectorJob.setMapOutputValueClass(Text.class);
docVectorJob.setNumReduceTasks(1);
success = docVectorJob.waitForCompletion(true) ? 0 : 1;
}
return success;
}
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable offset, Text lineText, Context context) throws IOException, InterruptedException {
Text currentWord = new Text();
String[] lineInputSplit = lineText.toString().split("#####"); // split
// line
// input
// to
// get
// the
// word
currentWord = new Text(lineInputSplit[0]);
// reformat and combine the filename and term frequency of word in
// that file
String value = lineInputSplit[1].replaceAll("\\s+", "=");
context.write(currentWord, new Text(value));
}
}
public static class Reduce extends Reducer<Text, Text, Text, DoubleWritable> {
@Override
public void reduce(Text word, Iterable<Text> postingsList, Context context)
throws IOException, InterruptedException {
// hashmap to store filename/key and term frequency/value
// Hashmap has been used because it was difficult get the output
// by looping through the Iterable twice
HashMap<String, String> hashMap = new HashMap<>();
int documentFrequency = 0; // number of documents that contain the
// word
boolean isValid = false;
// Loop through postings list to accumulate and find the document
// frequency
for (Text count : postingsList) {
String[] temp = count.toString().split("=");
hashMap.put(temp[0], temp[1]);
documentFrequency++;
}
// Loop through hashmap to get filename and calculate corresponding
// TFIDF from TF.
for (String key : hashMap.keySet()) {
// get termfrequency with filename from map
double termFrequency = Double.parseDouble(hashMap.get(key));
// calculate IDF as per formula
double IDF = Math.log10(1.0 + (Double.valueOf(context.getConfiguration().get("totalDocuments"))
/ Double.valueOf(documentFrequency)));
double tfidf = termFrequency * IDF; // calculate TFIDF
String wordString = word.toString();
// reducer output key to include word and filename as required
String reducerOutputKey = wordString + "#####" + key;
// check if all chars in word are alphanumeric
for (int i = 0; i < wordString.length(); i++) {
if (!Character.isLetterOrDigit(wordString.charAt(i))) {
isValid = false;
break;
} else {
isValid = true;
}
}
// write to output
if (wordString.length() > 1 && isValid) // to remove punctuation
// and other crap
context.write(new Text(reducerOutputKey), new DoubleWritable(tfidf));
}
}
}
/*
* Map and Reduce represent files in terms of top terms
*/
public static class MapDocVector extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable offset, Text lineText, Context context) throws IOException, InterruptedException {
// Split input to get filename and tfidf
String[] lineInputSplit = lineText.toString().split("#####");
String fileNamePlusTFIDF = lineInputSplit[1];
String[] fileNameAndTFIDFArray = fileNamePlusTFIDF.split("\\s+");
String fileName = fileNameAndTFIDFArray[0];
String TFIDF = fileNameAndTFIDFArray[1];
context.write(new Text(fileName), new Text(TFIDF));
}
}
public static class ReduceDocVector extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text word, Iterable<Text> vectorIterable, Context context)
throws IOException, InterruptedException {
ArrayList<Double> vectorList = new ArrayList<>();
String finalVectorList = "";
for (Text tfidf : vectorIterable) {
vectorList.add(Double.valueOf(tfidf.toString()));
}
Collections.sort(vectorList);
Collections.reverse(vectorList);
for (int i = 0; i < 49; i++) {
String tfidf = String.valueOf(vectorList.get(i));
if (i == 0) {
finalVectorList += "[" + tfidf + ",";
} else {
finalVectorList += tfidf + ",";
}
}
finalVectorList += "]";
String output = word.toString() + "=" + finalVectorList;
context.write(new Text(""), new Text(output));
}
}
}