From 05506cb4f3b4ff60a2bafa84e3ef8f526af6cfe9 Mon Sep 17 00:00:00 2001 From: HTHou Date: Wed, 11 Feb 2026 10:55:09 +0800 Subject: [PATCH 1/3] Move VerifySuccess --- client/session.go | 240 +++++++++++------- client/tablesession.go | 14 +- client/tablesessionpool.go | 24 +- example/session_example.go | 4 + example/session_pool/session_pool_example.go | 8 +- .../table/table_session_pool_example.go | 9 +- example/table/table_session_example.go | 9 +- test/e2e/e2e_table_test.go | 6 +- test/e2e/e2e_test.go | 28 +- 9 files changed, 189 insertions(+), 153 deletions(-) diff --git a/client/session.go b/client/session.go index e62f5e6..bda3d71 100644 --- a/client/session.go +++ b/client/session.go @@ -228,14 +228,17 @@ func (s *Session) Close() error { *return *error: correctness of operation */ -func (s *Session) SetStorageGroup(storageGroupId string) (r *common.TSStatus, err error) { - r, err = s.client.SetStorageGroup(context.Background(), s.sessionId, storageGroupId) +func (s *Session) SetStorageGroup(storageGroupId string) error { + r, err := s.client.SetStorageGroup(context.Background(), s.sessionId, storageGroupId) if err != nil && r == nil { if s.reconnect() { r, err = s.client.SetStorageGroup(context.Background(), s.sessionId, storageGroupId) } } - return r, err + if err != nil { + return err + } + return VerifySuccess(r) } /* @@ -245,14 +248,17 @@ func (s *Session) SetStorageGroup(storageGroupId string) (r *common.TSStatus, er *return *error: correctness of operation */ -func (s *Session) DeleteStorageGroup(storageGroupId string) (r *common.TSStatus, err error) { - r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, []string{storageGroupId}) +func (s *Session) DeleteStorageGroup(storageGroupId string) error { + r, err := s.client.DeleteStorageGroups(context.Background(), s.sessionId, []string{storageGroupId}) if err != nil && r == nil { if s.reconnect() { r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, []string{storageGroupId}) } } - return r, err + if err != nil { + return err + } + return VerifySuccess(r) } /* @@ -262,14 +268,17 @@ func (s *Session) DeleteStorageGroup(storageGroupId string) (r *common.TSStatus, *return *error: correctness of operation */ -func (s *Session) DeleteStorageGroups(storageGroupIds ...string) (r *common.TSStatus, err error) { - r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, storageGroupIds) +func (s *Session) DeleteStorageGroups(storageGroupIds ...string) error { + r, err := s.client.DeleteStorageGroups(context.Background(), s.sessionId, storageGroupIds) if err != nil && r == nil { if s.reconnect() { r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, storageGroupIds) } } - return r, err + if err != nil { + return err + } + return VerifySuccess(r) } /* @@ -282,7 +291,7 @@ func (s *Session) DeleteStorageGroups(storageGroupIds ...string) (r *common.TSSt *return *error: correctness of operation */ -func (s *Session) CreateTimeseries(path string, dataType TSDataType, encoding TSEncoding, compressor TSCompressionType, attributes map[string]string, tags map[string]string) (r *common.TSStatus, err error) { +func (s *Session) CreateTimeseries(path string, dataType TSDataType, encoding TSEncoding, compressor TSCompressionType, attributes map[string]string, tags map[string]string) error { request := rpc.TSCreateTimeseriesReq{ SessionId: s.sessionId, Path: path, DataType: int32(dataType), Encoding: int32(encoding), Compressor: int32(compressor), Attributes: attributes, Tags: tags, @@ -294,7 +303,10 @@ func (s *Session) CreateTimeseries(path string, dataType TSDataType, encoding TS status, err = s.client.CreateTimeseries(context.Background(), &request) } } - return status, err + if err != nil { + return err + } + return VerifySuccess(status) } /* @@ -309,7 +321,7 @@ func (s *Session) CreateTimeseries(path string, dataType TSDataType, encoding TS *return *error: correctness of operation */ -func (s *Session) CreateAlignedTimeseries(prefixPath string, measurements []string, dataTypes []TSDataType, encodings []TSEncoding, compressors []TSCompressionType, measurementAlias []string) (r *common.TSStatus, err error) { +func (s *Session) CreateAlignedTimeseries(prefixPath string, measurements []string, dataTypes []TSDataType, encodings []TSEncoding, compressors []TSCompressionType, measurementAlias []string) error { destTypes := make([]int32, len(dataTypes)) for i, t := range dataTypes { destTypes[i] = int32(t) @@ -341,7 +353,10 @@ func (s *Session) CreateAlignedTimeseries(prefixPath string, measurements []stri status, err = s.client.CreateAlignedTimeseries(context.Background(), &request) } } - return status, err + if err != nil { + return err + } + return VerifySuccess(status) } /* @@ -354,7 +369,7 @@ func (s *Session) CreateAlignedTimeseries(prefixPath string, measurements []stri *return *error: correctness of operation */ -func (s *Session) CreateMultiTimeseries(paths []string, dataTypes []TSDataType, encodings []TSEncoding, compressors []TSCompressionType) (r *common.TSStatus, err error) { +func (s *Session) CreateMultiTimeseries(paths []string, dataTypes []TSDataType, encodings []TSEncoding, compressors []TSCompressionType) error { destTypes := make([]int32, len(dataTypes)) for i, t := range dataTypes { destTypes[i] = int32(t) @@ -374,7 +389,7 @@ func (s *Session) CreateMultiTimeseries(paths []string, dataTypes []TSDataType, SessionId: s.sessionId, Paths: paths, DataTypes: destTypes, Encodings: destEncodings, Compressors: destCompressions, } - r, err = s.client.CreateMultiTimeseries(context.Background(), &request) + r, err := s.client.CreateMultiTimeseries(context.Background(), &request) if err != nil && r == nil { if s.reconnect() { @@ -383,7 +398,10 @@ func (s *Session) CreateMultiTimeseries(paths []string, dataTypes []TSDataType, } } - return r, err + if err != nil { + return err + } + return VerifySuccess(r) } /* @@ -393,14 +411,17 @@ func (s *Session) CreateMultiTimeseries(paths []string, dataTypes []TSDataType, *return *error: correctness of operation */ -func (s *Session) DeleteTimeseries(paths []string) (r *common.TSStatus, err error) { - r, err = s.client.DeleteTimeseries(context.Background(), s.sessionId, paths) +func (s *Session) DeleteTimeseries(paths []string) error { + r, err := s.client.DeleteTimeseries(context.Background(), s.sessionId, paths) if err != nil && r == nil { if s.reconnect() { r, err = s.client.DeleteTimeseries(context.Background(), s.sessionId, paths) } } - return r, err + if err != nil { + return err + } + return VerifySuccess(r) } /* @@ -412,16 +433,19 @@ func (s *Session) DeleteTimeseries(paths []string) (r *common.TSStatus, err erro *return *error: correctness of operation */ -func (s *Session) DeleteData(paths []string, startTime int64, endTime int64) (r *common.TSStatus, err error) { +func (s *Session) DeleteData(paths []string, startTime int64, endTime int64) error { request := rpc.TSDeleteDataReq{SessionId: s.sessionId, Paths: paths, StartTime: startTime, EndTime: endTime} - r, err = s.client.DeleteData(context.Background(), &request) + r, err := s.client.DeleteData(context.Background(), &request) if err != nil && r == nil { if s.reconnect() { request.SessionId = s.sessionId r, err = s.client.DeleteData(context.Background(), &request) } } - return r, err + if err != nil { + return err + } + return VerifySuccess(r) } /* @@ -434,19 +458,22 @@ func (s *Session) DeleteData(paths []string, startTime int64, endTime int64) (r *return *error: correctness of operation */ -func (s *Session) InsertStringRecord(deviceId string, measurements []string, values []string, timestamp int64) (r *common.TSStatus, err error) { +func (s *Session) InsertStringRecord(deviceId string, measurements []string, values []string, timestamp int64) error { request := rpc.TSInsertStringRecordReq{ SessionId: s.sessionId, PrefixPath: deviceId, Measurements: measurements, Values: values, Timestamp: timestamp, } - r, err = s.client.InsertStringRecord(context.Background(), &request) + r, err := s.client.InsertStringRecord(context.Background(), &request) if err != nil && r == nil { if s.reconnect() { request.SessionId = s.sessionId r, err = s.client.InsertStringRecord(context.Background(), &request) } } - return r, err + if err != nil { + return err + } + return VerifySuccess(r) } func (s *Session) GetTimeZone() (string, error) { @@ -457,11 +484,14 @@ func (s *Session) GetTimeZone() (string, error) { return resp.TimeZone, nil } -func (s *Session) SetTimeZone(timeZone string) (r *common.TSStatus, err error) { +func (s *Session) SetTimeZone(timeZone string) error { request := rpc.TSSetTimeZoneReq{SessionId: s.sessionId, TimeZone: timeZone} - r, err = s.client.SetTimeZone(context.Background(), &request) + r, err := s.client.SetTimeZone(context.Background(), &request) s.config.TimeZone = timeZone - return r, err + if err != nil { + return err + } + return VerifySuccess(r) } func (s *Session) ExecuteStatementWithContext(ctx context.Context, sql string) (*SessionDataSet, error) { @@ -491,7 +521,7 @@ func (s *Session) ExecuteStatement(sql string) (*SessionDataSet, error) { return s.ExecuteStatementWithContext(context.Background(), sql) } -func (s *Session) ExecuteNonQueryStatement(sql string) (r *common.TSStatus, err error) { +func (s *Session) ExecuteNonQueryStatement(sql string) error { request := rpc.TSExecuteStatementReq{ SessionId: s.sessionId, Statement: sql, @@ -507,11 +537,13 @@ func (s *Session) ExecuteNonQueryStatement(sql string) (r *common.TSStatus, err resp, err = s.client.ExecuteStatementV2(context.Background(), &request) } } + if err != nil { + return err + } if resp.IsSetDatabase() { s.changeDatabase(*resp.Database) } - - return resp.Status, err + return VerifySuccess(resp.Status) } func (s *Session) changeDatabase(database string) { @@ -648,12 +680,12 @@ func (s *Session) genTSInsertRecordReq(deviceId string, time int64, return request, nil } -func (s *Session) InsertRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r *common.TSStatus, err error) { +func (s *Session) InsertRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) error { request, err := s.genTSInsertRecordReq(deviceId, timestamp, measurements, dataTypes, values, false) if err != nil { - return nil, err + return err } - r, err = s.client.InsertRecord(context.Background(), request) + r, err := s.client.InsertRecord(context.Background(), request) if err != nil && r == nil { if s.reconnect() { @@ -662,15 +694,18 @@ func (s *Session) InsertRecord(deviceId string, measurements []string, dataTypes } } - return r, err + if err != nil { + return err + } + return VerifySuccess(r) } -func (s *Session) InsertAlignedRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r *common.TSStatus, err error) { +func (s *Session) InsertAlignedRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) error { request, err := s.genTSInsertRecordReq(deviceId, timestamp, measurements, dataTypes, values, true) if err != nil { - return nil, err + return err } - r, err = s.client.InsertRecord(context.Background(), request) + r, err := s.client.InsertRecord(context.Background(), request) if err != nil && r == nil { if s.reconnect() { @@ -679,7 +714,10 @@ func (s *Session) InsertAlignedRecord(deviceId string, measurements []string, da } } - return r, err + if err != nil { + return err + } + return VerifySuccess(r) } type deviceData struct { @@ -709,10 +747,10 @@ func (d *deviceData) Swap(i, j int) { // executeBatch, we pack some insert request in batch and send them to server. If you want improve // your performance, please see insertTablet method // Each row is independent, which could have different insertTargetName, time, number of measurements -func (s *Session) InsertRecordsOfOneDevice(deviceId string, timestamps []int64, measurementsSlice [][]string, dataTypesSlice [][]TSDataType, valuesSlice [][]interface{}, sorted bool) (r *common.TSStatus, err error) { +func (s *Session) InsertRecordsOfOneDevice(deviceId string, timestamps []int64, measurementsSlice [][]string, dataTypesSlice [][]TSDataType, valuesSlice [][]interface{}, sorted bool) error { length := len(timestamps) if len(measurementsSlice) != length || len(dataTypesSlice) != length || len(valuesSlice) != length { - return nil, errors.New("timestamps, measurementsSlice and valuesSlice's size should be equal") + return errors.New("timestamps, measurementsSlice and valuesSlice's size should be equal") } if !sorted { @@ -724,10 +762,11 @@ func (s *Session) InsertRecordsOfOneDevice(deviceId string, timestamps []int64, }) } + var err error valuesList := make([][]byte, length) for i := 0; i < length; i++ { if valuesList[i], err = valuesToBytes(dataTypesSlice[i], valuesSlice[i], measurementsSlice[i]); err != nil { - return nil, err + return err } } @@ -739,7 +778,7 @@ func (s *Session) InsertRecordsOfOneDevice(deviceId string, timestamps []int64, ValuesList: valuesList, } - r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request) + r, err := s.client.InsertRecordsOfOneDevice(context.Background(), request) if err != nil && r == nil { if s.reconnect() { @@ -748,13 +787,16 @@ func (s *Session) InsertRecordsOfOneDevice(deviceId string, timestamps []int64, } } - return r, err + if err != nil { + return err + } + return VerifySuccess(r) } -func (s *Session) InsertAlignedRecordsOfOneDevice(deviceId string, timestamps []int64, measurementsSlice [][]string, dataTypesSlice [][]TSDataType, valuesSlice [][]interface{}, sorted bool) (r *common.TSStatus, err error) { +func (s *Session) InsertAlignedRecordsOfOneDevice(deviceId string, timestamps []int64, measurementsSlice [][]string, dataTypesSlice [][]TSDataType, valuesSlice [][]interface{}, sorted bool) error { length := len(timestamps) if len(measurementsSlice) != length || len(dataTypesSlice) != length || len(valuesSlice) != length { - return nil, errors.New("timestamps, measurementsSlice and valuesSlice's size should be equal") + return errors.New("timestamps, measurementsSlice and valuesSlice's size should be equal") } if !sorted { @@ -766,10 +808,11 @@ func (s *Session) InsertAlignedRecordsOfOneDevice(deviceId string, timestamps [] }) } + var err error valuesList := make([][]byte, length) for i := 0; i < length; i++ { if valuesList[i], err = valuesToBytes(dataTypesSlice[i], valuesSlice[i], measurementsSlice[i]); err != nil { - return nil, err + return err } } isAligned := true @@ -782,7 +825,7 @@ func (s *Session) InsertAlignedRecordsOfOneDevice(deviceId string, timestamps [] IsAligned: &isAligned, } - r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request) + r, err := s.client.InsertRecordsOfOneDevice(context.Background(), request) if err != nil && r == nil { if s.reconnect() { @@ -791,7 +834,10 @@ func (s *Session) InsertAlignedRecordsOfOneDevice(deviceId string, timestamps [] } } - return r, err + if err != nil { + return err + } + return VerifySuccess(r) } /* @@ -808,37 +854,43 @@ func (s *Session) InsertAlignedRecordsOfOneDevice(deviceId string, timestamps [] */ func (s *Session) InsertRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, timestamps []int64, -) (r *common.TSStatus, err error) { +) error { request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, values, timestamps, false) if err != nil { - return nil, err + return err } else { - r, err = s.client.InsertRecords(context.Background(), request) + r, err := s.client.InsertRecords(context.Background(), request) if err != nil && r == nil { if s.reconnect() { request.SessionId = s.sessionId r, err = s.client.InsertRecords(context.Background(), request) } } - return r, err + if err != nil { + return err + } + return VerifySuccess(r) } } func (s *Session) InsertAlignedRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, timestamps []int64, -) (r *common.TSStatus, err error) { +) error { request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, values, timestamps, true) if err != nil { - return nil, err + return err } else { - r, err = s.client.InsertRecords(context.Background(), request) + r, err := s.client.InsertRecords(context.Background(), request) if err != nil && r == nil { if s.reconnect() { request.SessionId = s.sessionId r, err = s.client.InsertRecords(context.Background(), request) } } - return r, err + if err != nil { + return err + } + return VerifySuccess(r) } } @@ -847,63 +899,72 @@ func (s *Session) InsertAlignedRecords(deviceIds []string, measurements [][]stri *params *tablets: []*client.Tablet, list of tablets */ -func (s *Session) InsertTablets(tablets []*Tablet, sorted bool) (r *common.TSStatus, err error) { +func (s *Session) InsertTablets(tablets []*Tablet, sorted bool) error { if !sorted { for _, t := range tablets { if err := t.Sort(); err != nil { - return nil, err + return err } } } request, err := s.genInsertTabletsReq(tablets, false) if err != nil { - return nil, err + return err } - r, err = s.client.InsertTablets(context.Background(), request) + r, err := s.client.InsertTablets(context.Background(), request) if err != nil && r == nil { if s.reconnect() { request.SessionId = s.sessionId r, err = s.client.InsertTablets(context.Background(), request) } } - return r, err + if err != nil { + return err + } + return VerifySuccess(r) } -func (s *Session) InsertAlignedTablets(tablets []*Tablet, sorted bool) (r *common.TSStatus, err error) { +func (s *Session) InsertAlignedTablets(tablets []*Tablet, sorted bool) error { if !sorted { for _, t := range tablets { if err := t.Sort(); err != nil { - return nil, err + return err } } } request, err := s.genInsertTabletsReq(tablets, true) if err != nil { - return nil, err + return err } - r, err = s.client.InsertTablets(context.Background(), request) + r, err := s.client.InsertTablets(context.Background(), request) if err != nil && r == nil { if s.reconnect() { request.SessionId = s.sessionId r, err = s.client.InsertTablets(context.Background(), request) } } - return r, err + if err != nil { + return err + } + return VerifySuccess(r) } -func (s *Session) ExecuteBatchStatement(inserts []string) (r *common.TSStatus, err error) { +func (s *Session) ExecuteBatchStatement(inserts []string) error { request := rpc.TSExecuteBatchStatementReq{ SessionId: s.sessionId, Statements: inserts, } - r, err = s.client.ExecuteBatchStatement(context.Background(), &request) + r, err := s.client.ExecuteBatchStatement(context.Background(), &request) if err != nil && r == nil { if s.reconnect() { request.SessionId = s.sessionId r, err = s.client.ExecuteBatchStatement(context.Background(), &request) } } - return r, err + if err != nil { + return err + } + return VerifySuccess(r) } func (s *Session) ExecuteRawDataQuery(paths []string, startTime int64, endTime int64) (*SessionDataSet, error) { @@ -1111,17 +1172,17 @@ func valuesToBytes(dataTypes []TSDataType, values []interface{}, measurementName return buff.Bytes(), nil } -func (s *Session) insertRelationalTablet(tablet *Tablet) (r *common.TSStatus, err error) { +func (s *Session) insertRelationalTablet(tablet *Tablet) error { if tablet.Len() == 0 { - return &common.TSStatus{Code: SuccessStatus}, nil + return nil } request, err := s.genTSInsertTabletReq(tablet, true, true) if err != nil { - return nil, err + return err } request.ColumnCategories = tablet.getColumnCategories() - r, err = s.client.InsertTablet(context.Background(), request) + r, err := s.client.InsertTablet(context.Background(), request) if err != nil && r == nil { if s.reconnect() { @@ -1130,21 +1191,24 @@ func (s *Session) insertRelationalTablet(tablet *Tablet) (r *common.TSStatus, er } } - return r, err + if err != nil { + return err + } + return VerifySuccess(r) } -func (s *Session) InsertTablet(tablet *Tablet, sorted bool) (r *common.TSStatus, err error) { +func (s *Session) InsertTablet(tablet *Tablet, sorted bool) error { if !sorted { if err := tablet.Sort(); err != nil { - return nil, err + return err } } request, err := s.genTSInsertTabletReq(tablet, false, false) if err != nil { - return nil, err + return err } - r, err = s.client.InsertTablet(context.Background(), request) + r, err := s.client.InsertTablet(context.Background(), request) if err != nil && r == nil { if s.reconnect() { @@ -1153,21 +1217,24 @@ func (s *Session) InsertTablet(tablet *Tablet, sorted bool) (r *common.TSStatus, } } - return r, err + if err != nil { + return err + } + return VerifySuccess(r) } -func (s *Session) InsertAlignedTablet(tablet *Tablet, sorted bool) (r *common.TSStatus, err error) { +func (s *Session) InsertAlignedTablet(tablet *Tablet, sorted bool) error { if !sorted { if err := tablet.Sort(); err != nil { - return nil, err + return err } } request, err := s.genTSInsertTabletReq(tablet, true, false) if err != nil { - return nil, err + return err } - r, err = s.client.InsertTablet(context.Background(), request) + r, err := s.client.InsertTablet(context.Background(), request) if err != nil && r == nil { if s.reconnect() { @@ -1176,7 +1243,10 @@ func (s *Session) InsertAlignedTablet(tablet *Tablet, sorted bool) (r *common.TS } } - return r, err + if err != nil { + return err + } + return VerifySuccess(r) } func (s *Session) genTSInsertTabletReq(tablet *Tablet, isAligned bool, writeToTable bool) (*rpc.TSInsertTabletReq, error) { diff --git a/client/tablesession.go b/client/tablesession.go index cea724d..dce44f9 100644 --- a/client/tablesession.go +++ b/client/tablesession.go @@ -19,8 +19,6 @@ package client -import "github.com/apache/iotdb-client-go/v2/common" - // ITableSession defines an interface for interacting with IoTDB tables. // It supports operations such as data insertion, executing queries, and closing the session. // Implementations of this interface are expected to manage connections and ensure @@ -39,9 +37,8 @@ type ITableSession interface { // - tablet: A pointer to a Tablet containing time-series data to be inserted. // // Returns: - // - r: A pointer to TSStatus indicating the execution result. // - err: An error if an issue occurs during the operation, such as a connection error or execution failure. - Insert(tablet *Tablet) (r *common.TSStatus, err error) + Insert(tablet *Tablet) error // ExecuteNonQueryStatement executes a non-query SQL statement, such as a DDL or DML command. // @@ -49,9 +46,8 @@ type ITableSession interface { // - sql: The SQL statement to execute. // // Returns: - // - r: A pointer to TSStatus indicating the execution result. // - err: An error if an issue occurs during the operation, such as a connection error or execution failure. - ExecuteNonQueryStatement(sql string) (r *common.TSStatus, err error) + ExecuteNonQueryStatement(sql string) error // ExecuteQueryStatement executes a query SQL statement and returns the result set. // @@ -125,9 +121,8 @@ func NewClusterTableSession(clusterConfig *ClusterConfig, enableRPCCompression b // - tablet: A pointer to a Tablet containing the data to be inserted. // // Returns: -// - r: A pointer to TSStatus indicating the execution result. // - err: An error if the operation fails. -func (s *TableSession) Insert(tablet *Tablet) (r *common.TSStatus, err error) { +func (s *TableSession) Insert(tablet *Tablet) error { return s.session.insertRelationalTablet(tablet) } @@ -137,9 +132,8 @@ func (s *TableSession) Insert(tablet *Tablet) (r *common.TSStatus, err error) { // - sql: The SQL statement to be executed. // // Returns: -// - r: A pointer to TSStatus indicating the execution result. // - err: An error if the operation fails. -func (s *TableSession) ExecuteNonQueryStatement(sql string) (r *common.TSStatus, err error) { +func (s *TableSession) ExecuteNonQueryStatement(sql string) error { return s.session.ExecuteNonQueryStatement(sql) } diff --git a/client/tablesessionpool.go b/client/tablesessionpool.go index 5df5713..b078979 100644 --- a/client/tablesessionpool.go +++ b/client/tablesessionpool.go @@ -22,8 +22,6 @@ package client import ( "log" "sync/atomic" - - "github.com/apache/iotdb-client-go/v2/common" ) // TableSessionPool manages a pool of ITableSession instances, enabling efficient @@ -81,17 +79,16 @@ type PooledTableSession struct { // - tablet: A pointer to a Tablet containing time-series data to be inserted. // // Returns: -// - r: A pointer to TSStatus indicating the execution result. // - err: An error if an issue occurs during the operation. -func (s *PooledTableSession) Insert(tablet *Tablet) (r *common.TSStatus, err error) { - r, err = s.session.insertRelationalTablet(tablet) +func (s *PooledTableSession) Insert(tablet *Tablet) error { + err := s.session.insertRelationalTablet(tablet) if err == nil { - return + return nil } s.sessionPool.dropSession(s.session) atomic.StoreInt32(&s.closed, 1) s.session = Session{} - return + return err } // ExecuteNonQueryStatement executes a non-query SQL statement, such as a DDL or DML command. @@ -100,17 +97,16 @@ func (s *PooledTableSession) Insert(tablet *Tablet) (r *common.TSStatus, err err // - sql: The SQL statement to execute. // // Returns: -// - r: A pointer to TSStatus indicating the execution result. // - err: An error if an issue occurs during the operation. -func (s *PooledTableSession) ExecuteNonQueryStatement(sql string) (r *common.TSStatus, err error) { - r, err = s.session.ExecuteNonQueryStatement(sql) +func (s *PooledTableSession) ExecuteNonQueryStatement(sql string) error { + err := s.session.ExecuteNonQueryStatement(sql) if err == nil { - return + return nil } s.sessionPool.dropSession(s.session) atomic.StoreInt32(&s.closed, 1) s.session = Session{} - return + return err } // ExecuteQueryStatement executes a query SQL statement and returns the result set. @@ -140,8 +136,8 @@ func (s *PooledTableSession) ExecuteQueryStatement(sql string, timeoutInMs *int6 func (s *PooledTableSession) Close() error { if atomic.CompareAndSwapInt32(&s.closed, 0, 1) { if s.session.config.Database != s.sessionPool.config.Database && s.sessionPool.config.Database != "" { - r, err := s.session.ExecuteNonQueryStatement("use " + s.sessionPool.config.Database) - if r.Code == ExecuteStatementError || err != nil { + err := s.session.ExecuteNonQueryStatement("use " + s.sessionPool.config.Database) + if err != nil { log.Println("Failed to change back database by executing: use ", s.sessionPool.config.Database) s.session.Close() return nil diff --git a/example/session_example.go b/example/session_example.go index 87e5490..e42849a 100644 --- a/example/session_example.go +++ b/example/session_example.go @@ -20,14 +20,18 @@ package main import ( + "context" + "errors" "flag" "fmt" "log" "math/rand" + "sort" "strings" "time" "github.com/apache/iotdb-client-go/v2/client" + "github.com/apache/iotdb-client-go/v2/client/rpc" "github.com/apache/iotdb-client-go/v2/common" ) diff --git a/example/session_pool/session_pool_example.go b/example/session_pool/session_pool_example.go index 6b043d1..580ab50 100644 --- a/example/session_pool/session_pool_example.go +++ b/example/session_pool/session_pool_example.go @@ -725,14 +725,8 @@ func printDataSet2(sds *client.SessionDataSet) { } } -func checkError(status *common.TSStatus, err error) { +func checkError(err error) { if err != nil { log.Fatal(err) } - - if status != nil { - if err = client.VerifySuccess(status); err != nil { - log.Println(err) - } - } } diff --git a/example/session_pool/table/table_session_pool_example.go b/example/session_pool/table/table_session_pool_example.go index a6a85cb..e14f1cc 100644 --- a/example/session_pool/table/table_session_pool_example.go +++ b/example/session_pool/table/table_session_pool_example.go @@ -27,7 +27,6 @@ import ( "time" "github.com/apache/iotdb-client-go/v2/client" - "github.com/apache/iotdb-client-go/v2/common" ) func main() { @@ -168,14 +167,8 @@ func sessionPoolWithoutSpecificDatabaseExample() { wg.Wait() } -func checkError(status *common.TSStatus, err error) { +func checkError(err error) { if err != nil { log.Fatal(err) } - - if status != nil { - if err = client.VerifySuccess(status); err != nil { - log.Println(err) - } - } } diff --git a/example/table/table_session_example.go b/example/table/table_session_example.go index c0aed28..fc43f12 100644 --- a/example/table/table_session_example.go +++ b/example/table/table_session_example.go @@ -27,7 +27,6 @@ import ( "time" "github.com/apache/iotdb-client-go/v2/client" - "github.com/apache/iotdb-client-go/v2/common" ) func main() { @@ -159,14 +158,8 @@ func query(session client.ITableSession) { } } -func checkError(status *common.TSStatus, err error) { +func checkError(err error) { if err != nil { log.Fatal(err) } - - if status != nil { - if err = client.VerifySuccess(status); err != nil { - log.Println(err) - } - } } diff --git a/test/e2e/e2e_table_test.go b/test/e2e/e2e_table_test.go index 7f019f8..b5c092a 100644 --- a/test/e2e/e2e_table_test.go +++ b/test/e2e/e2e_table_test.go @@ -31,7 +31,6 @@ import ( "github.com/stretchr/testify/suite" "github.com/apache/iotdb-client-go/v2/client" - "github.com/apache/iotdb-client-go/v2/common" ) var ( @@ -394,9 +393,6 @@ func getValueFromDataSetByIndex(dataSet *client.SessionDataSet, columnIndex int3 return v } -func (s *e2eTableTestSuite) checkError(status *common.TSStatus, err error) { +func (s *e2eTableTestSuite) checkError(err error) { s.Require().NoError(err) - if status != nil { - s.Require().NoError(client.VerifySuccess(status)) - } } diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 2557955..8fd2167 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -31,7 +31,6 @@ import ( "github.com/stretchr/testify/suite" "github.com/apache/iotdb-client-go/v2/client" - "github.com/apache/iotdb-client-go/v2/common" ) type e2eTestSuite struct { @@ -61,20 +60,17 @@ func (s *e2eTestSuite) TearDownSuite() { } func (s *e2eTestSuite) SetupTest() { - r, err := s.session.SetStorageGroup("root.tsg1") - s.checkError(r, err) + err := s.session.SetStorageGroup("root.tsg1") + s.checkError(err) } func (s *e2eTestSuite) TearDownTest() { - r, err := s.session.DeleteStorageGroup("root.tsg1") - s.checkError(r, err) + err := s.session.DeleteStorageGroup("root.tsg1") + s.checkError(err) } -func (s *e2eTestSuite) checkError(status *common.TSStatus, err error) { +func (s *e2eTestSuite) checkError(err error) { s.Require().NoError(err) - if status != nil { - s.Require().NoError(client.VerifySuccess(status)) - } } func (s *e2eTestSuite) Test_WrongURL() { @@ -174,7 +170,7 @@ func (s *e2eTestSuite) Test_InsertRecordsWithWrongType() { values = [][]interface{}{{100.0, true}, {"aaa"}} timestamp = []int64{1, 2} ) - _, err := s.session.InsertRecords(deviceId, measurements, dataTypes, values, timestamp) + err := s.session.InsertRecords(deviceId, measurements, dataTypes, values, timestamp) assert := s.Require() assert.NotNil(err) assert.Equal("measurement s1 values[0] 100(float64) must be bool", err.Error()) @@ -255,8 +251,8 @@ func (s *e2eTestSuite) Test_InsertAlignedTablet() { var timeseries = []string{"root.ln.device1.**"} s.session.DeleteTimeseries(timeseries) if tablet, err := createTablet(12); err == nil { - status, err := s.session.InsertAlignedTablet(tablet, false) - s.checkError(status, err) + err := s.session.InsertAlignedTablet(tablet, false) + s.checkError(err) tablet.Reset() } else { log.Fatal(err) @@ -277,8 +273,8 @@ func (s *e2eTestSuite) Test_InsertAlignedTabletWithNilValue() { var timeseries = []string{"root.ln.device1.**"} s.session.DeleteTimeseries(timeseries) if tablet, err := createTabletWithNil(12); err == nil { - status, err := s.session.InsertAlignedTablet(tablet, false) - s.checkError(status, err) + err := s.session.InsertAlignedTablet(tablet, false) + s.checkError(err) tablet.Reset() } else { log.Fatal(err) @@ -499,8 +495,8 @@ func (s *e2eTestSuite) Test_QueryAllDataType() { tablet.SetValueAt("string", 9, 0) tablet.RowSize = 1 - r, err := s.session.InsertAlignedTablet(tablet, true) - s.checkError(r, err) + err = s.session.InsertAlignedTablet(tablet, true) + s.checkError(err) sessionDataSet, err := s.session.ExecuteQueryStatement("select s0, s1, s2, s3, s4, s5, s6, s7, s8, s9 from root.tsg1.d1 limit 1", nil) for { From 6c7051d14641d63c046052fb83029640b7bef74b Mon Sep 17 00:00:00 2001 From: HTHou Date: Wed, 11 Feb 2026 11:00:05 +0800 Subject: [PATCH 2/3] fix missing code --- example/session_pool/session_pool_example.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/example/session_pool/session_pool_example.go b/example/session_pool/session_pool_example.go index 580ab50..bd56471 100644 --- a/example/session_pool/session_pool_example.go +++ b/example/session_pool/session_pool_example.go @@ -28,7 +28,6 @@ import ( "time" "github.com/apache/iotdb-client-go/v2/client" - "github.com/apache/iotdb-client-go/v2/common" ) var ( @@ -410,9 +409,9 @@ func insertTablet() { defer sessionPool.PutBack(session) if err == nil { if tablet, err := createTablet(12); err == nil { - status, err := session.InsertTablet(tablet, false) + err := session.InsertTablet(tablet, false) tablet.Reset() - checkError(status, err) + checkError(err) } else { log.Fatal(err) } @@ -425,9 +424,9 @@ func insertAlignedTablet() { defer sessionPool.PutBack(session) if err == nil { if tablet, err := createTablet(12); err == nil { - status, err := session.InsertAlignedTablet(tablet, false) + err := session.InsertAlignedTablet(tablet, false) tablet.Reset() - checkError(status, err) + checkError(err) } else { log.Fatal(err) } From 580b7eb24c8f506b08b118ceb4e9564949d1b59e Mon Sep 17 00:00:00 2001 From: HTHou Date: Wed, 11 Feb 2026 11:03:02 +0800 Subject: [PATCH 3/3] fix missing code --- example/session_example.go | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/example/session_example.go b/example/session_example.go index e42849a..c169960 100644 --- a/example/session_example.go +++ b/example/session_example.go @@ -20,18 +20,14 @@ package main import ( - "context" - "errors" "flag" "fmt" "log" "math/rand" - "sort" "strings" "time" "github.com/apache/iotdb-client-go/v2/client" - "github.com/apache/iotdb-client-go/v2/client/rpc" "github.com/apache/iotdb-client-go/v2/common" ) @@ -469,9 +465,9 @@ func deleteData() { func insertTablet() { if tablet, err := createTablet(12); err == nil { - status, err := session.InsertTablet(tablet, false) + err = session.InsertTablet(tablet, false) tablet.Reset() - checkError(status, err) + checkError(err) } else { log.Fatal(err) } @@ -479,9 +475,9 @@ func insertTablet() { func insertAlignedTablet() { if tablet, err := createTablet(12); err == nil { - status, err := session.InsertAlignedTablet(tablet, false) + err = session.InsertAlignedTablet(tablet, false) tablet.Reset() - checkError(status, err) + checkError(err) } else { log.Fatal(err) } @@ -646,14 +642,8 @@ func executeBatchStatement() { } } -func checkError(status *common.TSStatus, err error) { +func checkError(err error) { if err != nil { log.Fatal(err) } - - if status != nil { - if err = client.VerifySuccess(status); err != nil { - log.Println(err) - } - } }