Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion record/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewJsonStringConverter(r io.Reader, s *schema.IntermediateSchema, recordTyp
inner = newMsgpackInnerDecoder(r)

case RecordTypeTsv:
inner, err = newCsvInnerDecoder(r, s, TsvDelimiter)
inner, err = newTsvInnerDecoder(r, s)

default:
return nil, fmt.Errorf("unsupported record type %s: %w", recordType, ErrUnsupportedRecord)
Expand Down
76 changes: 76 additions & 0 deletions record/tsv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package record

import (
"bufio"
"fmt"
"io"
"errors"
"os"
"strings"
"strconv"

"github.com/reproio/columnify/schema"
)

type tsvInnerDecoder struct {
scanner *bufio.Scanner
names []string
}

var ErrTooFewFields = errors.New("too few fields")

func newTsvInnerDecoder(r io.Reader, s *schema.IntermediateSchema) (*tsvInnerDecoder, error) {
if s == nil || r == nil {
return nil, fmt.Errorf("invalid input: schema and reader must not be nil")
}

names, err := getFieldNamesFromSchema(s)
if err != nil {
return nil, err
}

return &tsvInnerDecoder{scanner: bufio.NewScanner(r), names: names}, nil
}

func (d *tsvInnerDecoder) Decode(r *map[string]interface{}) error {
if !d.scanner.Scan() {
if err := d.scanner.Err(); err != nil {
return err
}
return io.EOF
}
line := d.scanner.Text()
fields := strings.Split(line, "\t")
numNames := len(d.names)
if len(fields) < numNames {
return fmt.Errorf("%w: expected %d, got %d", ErrTooFewFields, numNames, len(fields))
}
if len(fields) > numNames {
fmt.Fprintf(os.Stderr, "warning: extra fields in TSV input, ignoring: %v\n", fields[numNames:])
}
record := make(map[string]interface{}, numNames)
for i, n := range d.names {
v := fields[i]
// bool
if v != "0" && v != "1" {
if vv, err := strconv.ParseBool(v); err == nil {
record[n] = vv
continue
}
}
// int
if vv, err := strconv.ParseInt(v, 10, 64); err == nil {
record[n] = vv
continue
}
// float
if vv, err := strconv.ParseFloat(v, 64); err == nil {
record[n] = vv
continue
}
// others; to string
record[n] = v
}
*r = record
return nil
}
179 changes: 179 additions & 0 deletions record/tsv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@

package record

import (
"bytes"
"io"
"reflect"
"testing"
"errors"
"github.com/apache/arrow/go/arrow"
"github.com/reproio/columnify/schema"
)

func TestTsvInnerDecoder_Primitives(t *testing.T) {
schemaObj := schema.NewIntermediateSchema(
arrow.NewSchema([]arrow.Field{
{Name: "boolean", Type: arrow.FixedWidthTypes.Boolean, Nullable: false},
{Name: "int", Type: arrow.PrimitiveTypes.Uint32, Nullable: false},
{Name: "long", Type: arrow.PrimitiveTypes.Uint64, Nullable: false},
{Name: "float", Type: arrow.PrimitiveTypes.Float32, Nullable: false},
{Name: "double", Type: arrow.PrimitiveTypes.Float64, Nullable: false},
{Name: "bytes", Type: arrow.BinaryTypes.Binary, Nullable: false},
{Name: "string", Type: arrow.BinaryTypes.String, Nullable: false},
}, nil), "primitives")

cases := []struct {
name string
input []byte
expected []map[string]interface{}
isErr bool
errIs error
errMsg string
}{
{ name: "Primitives",
input: []byte("false\t1\t1\t1.1\t1.1\tfoo\tfoo\n" +
"true\t2\t2\t2.2\t2.2\tbar\tbar\n"),
expected: []map[string]interface{}{
{
"boolean": false,
"int": int64(1),
"long": int64(1),
"float": float64(1.1),
"double": float64(1.1),
"bytes": "foo",
"string": "foo",
},
{
"boolean": true,
"int": int64(2),
"long": int64(2),
"float": float64(2.2),
"double": float64(2.2),
"bytes": "bar",
"string": "bar",
},
},
isErr: false,
errIs: nil,
errMsg: "",
},
{
name: "QuotedField",
input: []byte("false\t1\t1\t1.1\t1.1\t\"quoted\"\t\"quoted string\"\n"),
expected: []map[string]interface{}{
{
"boolean": false,
"int": int64(1),
"long": int64(1),
"float": float64(1.1),
"double": float64(1.1),
"bytes": "\"quoted\"",
"string": "\"quoted string\"",
},
},
isErr: false,
errIs: nil,
errMsg: "",
},
{
name: "TooFewFields",
input: []byte("true\t2\t2\n"),
expected: nil,
isErr: true,
errIs: ErrTooFewFields,
errMsg: "too few fields: expected 7, got 3",
},
{
name: "JsonField",
input: []byte("false\t1\t1\t1.1\t1.1\t{\"foo\":123}\tbar\n"),
expected: []map[string]interface{}{
{
"boolean": false,
"int": int64(1),
"long": int64(1),
"float": float64(1.1),
"double": float64(1.1),
"bytes": "{\"foo\":123}",
"string": "bar",
},
},
isErr: false,
errIs: nil,
errMsg: "",
},
{
name: "JsonFieldWithJsonEncodedValue",
input: []byte("false\t1\t1\t1.1\t1.1\t\"{\\\"foo\\\":123}\"\tbar\n"),
expected: []map[string]interface{}{
{
"boolean": false,
"int": int64(1),
"long": int64(1),
"float": float64(1.1),
"double": float64(1.1),
"bytes": "\"{\\\"foo\\\":123}\"",
"string": "bar",
},
},
isErr: false,
errIs: nil,
errMsg: "",
},
{
name: "JsonFieldWithNestedJsonEncodedValue",
input: []byte("false\t1\t1\t1.1\t1.1\t{\"foo\":\"{\\\"bar\\\":123}\"}\tbar\n"),
expected: []map[string]interface{}{
{
"boolean": false,
"int": int64(1),
"long": int64(1),
"float": float64(1.1),
"double": float64(1.1),
"bytes": "{\"foo\":\"{\\\"bar\\\":123}\"}",
"string": "bar",
},
},
isErr: false,
errIs: nil,
errMsg: "",
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
buf := bytes.NewReader(c.input)
d, err := newTsvInnerDecoder(buf, schemaObj)
if err != nil {
t.Fatal(err)
}
actual := make([]map[string]interface{}, 0)
for {
var v map[string]interface{}
err = d.Decode(&v)
if err != nil {
break
}
actual = append(actual, v)
}
if c.isErr {
if err == nil || err == io.EOF {
t.Fatalf("expected error, got: %v", err)
}
if c.errIs != nil && !errors.Is(err, c.errIs) {
t.Fatalf("expected error to match errors.Is: %v, got: %v", c.errIs, err)
}
if c.errMsg != "" && err.Error() != c.errMsg {
t.Fatalf("expected error message: %q, got: %q", c.errMsg, err.Error())
}
return
}
if err != io.EOF {
t.Fatalf("expected EOF, got: %v", err)
}
if !reflect.DeepEqual(actual, c.expected) {
t.Errorf("expected: %v, got: %v", c.expected, actual)
}
})
}
}