-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsql_query_generator.py
More file actions
83 lines (63 loc) · 3.1 KB
/
sql_query_generator.py
File metadata and controls
83 lines (63 loc) · 3.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import json
import re
class SQLQueryGenerator:
"""Класс для генерации SQL-запросов на основе JSON данных"""
@staticmethod
def __get_sql_type(value):
"""Определение типа данных для SQL"""
type_mapping = {
float: "decimal",
bool: "boolean",
int: "bigint",
str: "text",
type(None): "text"
}
return type_mapping.get(type(value), "jsonb")
@staticmethod
def __validate_input(table_name, schema_name):
if not re.fullmatch(r'^[a-z]+[a-z_]*[a-z]$', schema_name):
raise ValueError('Название схемы указано в некорректном формате!')
if not re.fullmatch(r'^[a-z]+[a-z_]*[a-z]$', table_name):
raise ValueError('Название таблицы указано в некорректном формате!')
@staticmethod
def __get_columns(data):
return [f"{key} {SQLQueryGenerator.__get_sql_type(value)}" for key, value in data.items()]
@staticmethod
def __get_json(json_input):
data = json.loads(json_input)
return [data] if isinstance(data, dict) else data
@staticmethod
def __get_table_full_name(table_name, schema_name):
return schema_name + '.' + table_name if schema_name != '' else table_name
@staticmethod
def __get_insert_values(data):
data.update({
key: json.dumps(value, ensure_ascii=False)
for key, value in data.items()
if SQLQueryGenerator.__get_sql_type(value) == "JSONB"
})
return ', '.join([f"'{str(value)}'" for value in data.values()]).replace("'None'", "null")
@staticmethod
def get_create_table_query(table_name, schema_name, json_input):
try:
SQLQueryGenerator.__validate_input(table_name, schema_name)
data = SQLQueryGenerator.__get_json(json_input)
columns = SQLQueryGenerator.__get_columns(data[0])
table_full_name = SQLQueryGenerator.__get_table_full_name(table_name, schema_name)
return f"create table if not exists {table_full_name} ({', '.join(columns)});"
except json.JSONDecodeError:
raise ValueError("JSON указан в некорректном формате!")
@staticmethod
def get_insert_table_query(table_name, schema_name, json_input):
try:
SQLQueryGenerator.__validate_input(table_name, schema_name)
data = SQLQueryGenerator.__get_json(json_input)
columns_names = ', '.join(data[0].keys())
table_full_name = SQLQueryGenerator.__get_table_full_name(table_name, schema_name)
insert_queries = []
for element in data:
values = SQLQueryGenerator.__get_insert_values(element)
insert_queries.append(f"insert into {table_full_name} ({columns_names}) values ({values});")
return '\n'.join(insert_queries)
except json.JSONDecodeError:
raise ValueError("JSON указан в некорректном формате!")