-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconvert_to_json.py
More file actions
187 lines (171 loc) · 7.39 KB
/
convert_to_json.py
File metadata and controls
187 lines (171 loc) · 7.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""Convert Documents to JSON
A program to convert documents (PDFs, TIFFs, Word and Excel documents) to a standard JSON format.
"""
import os
import argparse
import shutil
import glob
import logging
import datetime
from processors import pdf, xlsx, word_docx
# Setting up logger
LOG_DIR = "./logs"
os.makedirs(LOG_DIR, exist_ok=True)
LOG_FILE = os.path.join(LOG_DIR, str(datetime.datetime.now()) + ".log")
logging.basicConfig(filename=LOG_FILE,
format='%(levelname)s (%(asctime)s): %(message)s', level=logging.INFO)
ALLOWED_EXTENSIONS = ["pdf", "doc", "docx",
"xls", "xlsx", "xlsm", "tiff", "tif"]
def get_files(src, formats):
"""Find files in the source directory matching the allowed or user-selected extensions
Arguments:
src {str} -- Path to the source directory
formats {list} -- List of file formats
Returns:
[list] -- List of absolute paths to files
"""
files = []
for extension in formats:
ext_files = glob.glob(os.path.join(
src, "**/*." + extension), recursive=True)
files += ext_files
return files
def process_files(files, dst, tessdata, oem, overwrite, cleanup, store_results, debug_segmentation, restore_results):
"""Process files and convert them to JSON.
Arguments:
files {list} -- List of absolute paths to files
dst {str} -- Absolute path of output directory
tessdata {str} -- Absolute path to Tesseract model. Required only for PDF formats.
oem {str} -- OEM Mode to use for Tesseract. Options: v4 (LSTM) or v3 (LEGACY TESSERACT). Required only for PDF formats.
overwrite {bool} -- Overwrite existing results. If True, will perform the entire conversion process from scratch.
cleanup {bool} -- Cleanup files/directories after conversion.
store_results {bool} -- Store intermediate results. Useful for debugging or resuming a process later on.
"""
logging.info("Total Files: %d", len(files))
for num, file in enumerate(files, 1):
_, filename = os.path.split(file)
name, ext = os.path.splitext(filename)
# Sanitize the file name. Replace whitespaces.
clean_name = name.replace(" ", "_")
clean_filename = filename.replace(" ", "_")
# Make the output directory.
output_dir = os.path.join(dst, clean_name)
os.makedirs(output_dir, exist_ok=True)
source_file = os.path.join(output_dir, clean_filename)
shutil.copy2(file, source_file)
logging.info("Processing File: %s (%d/%d)", file, num, len(files))
if ext == ".pdf" or ext.startswith(".tif"):
job = pdf.Processor(source_file, output_dir,
tessdata, overwrite, cleanup, oem, store_results, debug_segmentation, restore_results)
job.run()
elif ext == ".doc":
pass
elif ext == ".docx":
job = word_docx.Processor(source_file, output_dir, overwrite, cleanup)
job.run()
elif ext.startswith(".xls"):
job = xlsx.Processor(source_file, output_dir, overwrite, cleanup)
job.run()
else:
raise Exception("Unknown extension (%s)" % file)
def run(src, dst, formats, tessdata, oem, overwrite, cleanup, store_results, debug_segmentation, restore_results):
src = os.path.abspath(src)
if not os.path.exists(src):
raise Exception("Source directory (%s) does not exist." %
(src))
if not os.path.isdir(src):
raise Exception("Source (%s) is not a directory." % (src))
formats = formats if formats else ALLOWED_EXTENSIONS
tessdata = os.path.abspath(
tessdata) if tessdata else None
if tessdata is not None:
if not os.path.exists(tessdata):
raise Exception(
"Tessdata directory (%s) does not exist." % (tessdata))
overwrite = bool(overwrite)
cleanup = bool(cleanup)
debug_segmentation = bool(debug_segmentation)
store_results = bool(store_results)
restore_results = bool(restore_results)
files = get_files(src, formats)
if not files:
raise Exception("Found 0 files in %s" % (src))
dst = os.path.abspath(dst)
os.makedirs(dst, exist_ok=True)
process_files(files, dst, tessdata,
oem, overwrite, cleanup, store_results, debug_segmentation, restore_results)
if __name__ == '__main__':
PARSER = argparse.ArgumentParser(
"Command line arguments for Document Conversion to JSON")
PARSER.add_argument("-s",
"--src",
type=str,
required=True,
dest="src",
help="Source directory of files/Path to a single file")
PARSER.add_argument("-d",
"--dst",
type=str,
required=True,
dest="dst",
help="Destination directory")
PARSER.add_argument("-f",
"--formats",
nargs="*",
type=str,
choices=["pdf", "tiff", "tif", "xls",
"xlsx", "xlsm", "doc", "docx"],
dest="formats",
help="File formats to process. Leave empty for all formats.")
PARSER.add_argument("-t",
"--tessdata",
type=str,
dest="tessdata",
required=True,
help="Path to Tessdata model (v4) for tesserocr. Applies to PDF/TIFF/Images")
PARSER.add_argument("-oem",
type=str,
dest="oem",
choices=["v4", "v3"],
default="v4",
help="OEM Mode for Tesseract")
PARSER.add_argument("-o",
"--overwrite",
type=int,
choices=[0, 1],
default=0,
dest="overwrite",
help="Overwrite files")
PARSER.add_argument("-c",
"--cleanup",
type=int,
choices=[0, 1],
default=1,
dest="cleanup",
help="Clean up temporary files/directories")
PARSER.add_argument("-st",
"--store_results",
type=int,
choices=[0, 1],
default=1,
dest="store_results",
help="Store intermediate results (useful for debugging)")
PARSER.add_argument("-ds",
"--debug_segmentation",
type=int,
choices=[0, 1],
default=0,
dest="debug_segmentation",
help="Debug segmentation results")
PARSER.add_argument("-rst",
"--restore_results",
type=int,
choices=[0, 1],
default=0,
dest="restore_results",
help="Restore intermediate results (useful for debugging)")
FLAGS = PARSER.parse_args()
# try:
run(**vars(FLAGS))
# except Exception as error:
# logging.critical(str(error))