Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
409 changes: 290 additions & 119 deletions go/src/lqp/v1/transactions.pb.go

Large diffs are not rendered by default.

37 changes: 36 additions & 1 deletion go/src/print.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,13 @@ func (pp PrettyParams) pprint(node interface{}) {
pp.PARENS(func(pp PrettyParams) {
pp.Write("export")
pp.NEWLINE()
pp.INDENT(2, func(pp PrettyParams) { pp.pprint(export.GetCsvConfig()) })
pp.INDENT(2, func(pp PrettyParams) {
if csvConfig := export.GetCsvConfig(); csvConfig != nil {
pp.pprint(csvConfig)
} else if csvTableConfig := export.GetCsvTableConfig(); csvTableConfig != nil {
pp.pprint(csvTableConfig)
}
})
})
} else if whatIf := n.GetWhatIf(); whatIf != nil {
pp.PARENS(func(pp PrettyParams) {
Expand Down Expand Up @@ -1063,6 +1069,35 @@ func (pp PrettyParams) pprint(node interface{}) {
pp.pprint(n.GetColumnData())
})

case *pb.ExportCSVTableConfig:
pp.PARENS(func(pp PrettyParams) {
pp.Write("export_csv_table_config")
pp.NEWLINE()
pp.INDENT(2, func(pp PrettyParams) {
pp.PARENS(func(pp PrettyParams) {
pp.Write("path")
pp.SPACE()
pp.pprint(n.GetPath())
})
pp.NEWLINE()
pp.pprint(n.GetTableDef())
pp.NEWLINE()
configDict := make(map[string]interface{})
configDict["partition_size"] = n.GetPartitionSize()
configDict["compression"] = n.GetCompression()
if n.GetSyntaxHeaderRow() {
configDict["syntax_header_row"] = 1
} else {
configDict["syntax_header_row"] = 0
}
configDict["syntax_missing_string"] = n.GetSyntaxMissingString()
configDict["syntax_delim"] = n.GetSyntaxDelim()
configDict["syntax_quotechar"] = n.GetSyntaxQuotechar()
configDict["syntax_escapechar"] = n.GetSyntaxEscapechar()
pp.configDictToStr(configDict)
})
})

case *pb.Epoch:
pp.PARENS(func(pp PrettyParams) {
pp.Write("epoch")
Expand Down
7 changes: 7 additions & 0 deletions proto/Manifest.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# This file is machine-generated - editing it directly is not advised

julia_version = "1.10.4"
manifest_format = "2.0"
project_hash = "da39a3ee5e6b4b0d3255bfef95601890afd80709"

[deps]
1 change: 1 addition & 0 deletions proto/Project.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[deps]
16 changes: 16 additions & 0 deletions proto/relationalai/lqp/v1/transactions.proto
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,21 @@ message ExportCSVColumn {
RelationId column_data = 2;
}

message ExportCSVTableConfig {
string path = 1;
RelationId table_def = 2;

optional int64 partition_size = 3;
optional string compression = 4;
optional bool syntax_header_row = 5;
optional string syntax_missing_string = 6;
optional string syntax_delim = 7;
optional string syntax_quotechar = 8;
optional string syntax_escapechar = 9;

// TODO: support data integration options, e.g., private tokens for private buckets etc.
}

//
// Read operations
//
Expand All @@ -113,6 +128,7 @@ message Output {
message Export {
oneof export_config {
ExportCSVConfig csv_config = 1;
ExportCSVTableConfig csv_table_config = 2;

// TODO: support JSON export
}
Expand Down
20 changes: 19 additions & 1 deletion python-tools/src/lqp/emit.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,12 @@ def convert_output(o: ir.Output) -> transactions_pb2.Output:
return transactions_pb2.Output(**kwargs) # type: ignore

def convert_export(e: ir.Export) -> transactions_pb2.Export:
return transactions_pb2.Export(csv_config=convert_export_config(e.config)) # type: ignore
if isinstance(e.config, ir.ExportCSVConfig):
return transactions_pb2.Export(csv_config=convert_export_config(e.config)) # type: ignore
elif isinstance(e.config, ir.ExportCSVTableConfig):
return transactions_pb2.Export(csv_table_config=convert_export_table_config(e.config)) # type: ignore
else:
raise ValueError(f"Unknown export config type: {type(e.config)}")

def convert_export_config(ec: ir.ExportCSVConfig) -> transactions_pb2.ExportCSVConfig:
return transactions_pb2.ExportCSVConfig(
Expand All @@ -489,6 +494,19 @@ def convert_export_config(ec: ir.ExportCSVConfig) -> transactions_pb2.ExportCSVC
syntax_escapechar=ec.syntax_escapechar if ec.syntax_escapechar is not None else '\\'
)

def convert_export_table_config(ec: ir.ExportCSVTableConfig) -> transactions_pb2.ExportCSVTableConfig:
return transactions_pb2.ExportCSVTableConfig(
table_def=convert_relation_id(ec.table_def),
path=ec.path,
partition_size=ec.partition_size if ec.partition_size is not None else 0,
compression=ec.compression if ec.compression is not None else "",
syntax_header_row=ec.syntax_header_row if ec.syntax_header_row is not None else True, # type: ignore
syntax_missing_string=ec.syntax_missing_string if ec.syntax_missing_string is not None else "",
syntax_delim=ec.syntax_delim if ec.syntax_delim is not None else ",",
syntax_quotechar=ec.syntax_quotechar if ec.syntax_quotechar is not None else '"',
syntax_escapechar=ec.syntax_escapechar if ec.syntax_escapechar is not None else '\\'
)

def convert_export_csv_column(ec: ir.ExportCSVColumn) -> transactions_pb2.ExportCSVColumn:
return transactions_pb2.ExportCSVColumn(
column_name=ec.column_name,
Expand Down
18 changes: 16 additions & 2 deletions python-tools/src/lqp/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,11 +492,25 @@ class ExportCSVColumn(LqpNode):
column_name: str
column_data: RelationId

# ExportCSVTableConfig
@dataclass(frozen=True)
class ExportCSVTableConfig(LqpNode):
path: str
table_def: RelationId
partition_size: Optional[int] = None
compression: Optional[str] = None

syntax_header_row: Optional[int] = None
syntax_missing_string: Optional[str] = None
syntax_delim: Optional[str] = None
syntax_quotechar: Optional[str] = None
syntax_escapechar: Optional[str] = None

# Export(name::string, relation_id::RelationId)
@dataclass(frozen=True)
class Export(LqpNode):
# TODO: Once we add a JSON export, this should be union[ExportCSVConfig, ExportJSONConfig]
config: ExportCSVConfig
# TODO: Once we add a JSON export, this should be union[ExportCSVConfig, ExportCSVTableConfig, ExportJSONConfig]
config: Union[ExportCSVConfig, ExportCSVTableConfig]

# Abort(name::string?, relation_id::RelationId)
@dataclass(frozen=True)
Expand Down
19 changes: 18 additions & 1 deletion python-tools/src/lqp/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@
read: demand | output | export | abort
demand: "(demand" relation_id ")"
output: "(output" name? relation_id ")"
export: "(export" export_csv_config ")"
export: "(export" (export_csv_config | export_csv_table_config) ")"
abort: "(abort" name? relation_id ")"

export_csv_config: "(export_csv_config" export_path export_columns config_dict ")"
export_csv_table_config: "(export_csv_table_config" export_path relation_id config_dict ")"

export_columns: "(columns" export_column* ")"
export_column: "(column" STRING relation_id ")"
Expand Down Expand Up @@ -298,6 +299,22 @@ def export_csv_config(self, meta, items):
meta=self.meta(meta)
)

def export_csv_table_config(self, meta, items):
assert len(items) >= 2, "Export CSV table config must have at least relation_id and path"

export_fields = {}
for i in items[2:]:
assert isinstance(i, dict)
for k, v in i.items():
export_fields[k] = v.value

return ir.ExportCSVTableConfig(
path=items[0],
table_def=items[1],
**export_fields,
meta=self.meta(meta)
)

def export_columns(self, meta, items):
# items is a list of ExportCSVColumn objects
return items
Expand Down
27 changes: 27 additions & 0 deletions python-tools/src/lqp/print.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,33 @@ def line_conf_f(kw: str, field: Union[int, str]) -> str:
lqp += config_dict_to_str(config_dict, indent_level + 1, options) #type: ignore
lqp += f"{conf.RPAREN()}"

elif isinstance(node, ir.ExportCSVTableConfig):
def line(kw: str, body: str) -> str:
return f"{ind}{conf.SIND()}{conf.LPAREN()}{conf.kw(kw)} {body}{conf.RPAREN()}"

def line_conf_f(kw: str, field: Union[int, str]) -> str:
return line(kw, to_str(field, 0, options, debug_info))

lqp += f"{ind}{conf.LPAREN()}{conf.kw('export_csv_table_config')}\n"

if has_option(options, PrettyOptions.PRINT_CSV_FILENAME):
lqp += line_conf_f('path', node.path) + "\n"
else:
lqp += line_conf_f('path', '<hidden filename>') + "\n"
lqp += ind + conf.SIND() + to_str(node.table_def, 0, options, debug_info) + "\n"

config_dict: dict[str, Any] = {}
config_dict['partition_size'] = node.partition_size if node.partition_size is not None else 0
config_dict['compression'] = node.compression if node.compression is not None else "" #type: ignore
config_dict['syntax_header_row'] = node.syntax_header_row if node.syntax_header_row is not None else 1
config_dict['syntax_missing_string'] = node.syntax_missing_string if node.syntax_missing_string is not None else "" #type: ignore
config_dict['syntax_delim'] = node.syntax_delim if node.syntax_delim is not None else "," #type: ignore
config_dict['syntax_quotechar'] = node.syntax_quotechar if node.syntax_quotechar is not None else '"' #type: ignore
config_dict['syntax_escapechar'] = node.syntax_escapechar if node.syntax_escapechar is not None else '\\' #type: ignore

lqp += config_dict_to_str(config_dict, indent_level + 1, options) #type: ignore
lqp += f"{conf.RPAREN()}"

elif isinstance(node, ir.ExportCSVColumn):
lqp += f"{ind}{conf.LPAREN()}{conf.kw('column')} {to_str(node.column_name, 0, options, debug_info)} {to_str(node.column_data, 0, options, debug_info)}{conf.RPAREN()}"

Expand Down
Loading
Loading