Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 155 additions & 85 deletions client/session.go

Large diffs are not rendered by default.

14 changes: 4 additions & 10 deletions client/tablesession.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package client

import "github.com/apache/iotdb-client-go/v2/common"

// ITableSession defines an interface for interacting with IoTDB tables.
// It supports operations such as data insertion, executing queries, and closing the session.
// Implementations of this interface are expected to manage connections and ensure
Expand All @@ -39,19 +37,17 @@ type ITableSession interface {
// - tablet: A pointer to a Tablet containing time-series data to be inserted.
//
// Returns:
// - r: A pointer to TSStatus indicating the execution result.
// - err: An error if an issue occurs during the operation, such as a connection error or execution failure.
Insert(tablet *Tablet) (r *common.TSStatus, err error)
Insert(tablet *Tablet) error

// ExecuteNonQueryStatement executes a non-query SQL statement, such as a DDL or DML command.
//
// Parameters:
// - sql: The SQL statement to execute.
//
// Returns:
// - r: A pointer to TSStatus indicating the execution result.
// - err: An error if an issue occurs during the operation, such as a connection error or execution failure.
ExecuteNonQueryStatement(sql string) (r *common.TSStatus, err error)
ExecuteNonQueryStatement(sql string) error

// ExecuteQueryStatement executes a query SQL statement and returns the result set.
//
Expand Down Expand Up @@ -125,9 +121,8 @@ func NewClusterTableSession(clusterConfig *ClusterConfig, enableRPCCompression b
// - tablet: A pointer to a Tablet containing the data to be inserted.
//
// Returns:
// - r: A pointer to TSStatus indicating the execution result.
// - err: An error if the operation fails.
func (s *TableSession) Insert(tablet *Tablet) (r *common.TSStatus, err error) {
func (s *TableSession) Insert(tablet *Tablet) error {
return s.session.insertRelationalTablet(tablet)
}

Expand All @@ -137,9 +132,8 @@ func (s *TableSession) Insert(tablet *Tablet) (r *common.TSStatus, err error) {
// - sql: The SQL statement to be executed.
//
// Returns:
// - r: A pointer to TSStatus indicating the execution result.
// - err: An error if the operation fails.
func (s *TableSession) ExecuteNonQueryStatement(sql string) (r *common.TSStatus, err error) {
func (s *TableSession) ExecuteNonQueryStatement(sql string) error {
return s.session.ExecuteNonQueryStatement(sql)
}

Expand Down
24 changes: 10 additions & 14 deletions client/tablesessionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ package client
import (
"log"
"sync/atomic"

"github.com/apache/iotdb-client-go/v2/common"
)

// TableSessionPool manages a pool of ITableSession instances, enabling efficient
Expand Down Expand Up @@ -81,17 +79,16 @@ type PooledTableSession struct {
// - tablet: A pointer to a Tablet containing time-series data to be inserted.
//
// Returns:
// - r: A pointer to TSStatus indicating the execution result.
// - err: An error if an issue occurs during the operation.
func (s *PooledTableSession) Insert(tablet *Tablet) (r *common.TSStatus, err error) {
r, err = s.session.insertRelationalTablet(tablet)
func (s *PooledTableSession) Insert(tablet *Tablet) error {
err := s.session.insertRelationalTablet(tablet)
if err == nil {
return
return nil
}
s.sessionPool.dropSession(s.session)
atomic.StoreInt32(&s.closed, 1)
s.session = Session{}
return
return err
Comment on lines +83 to +91
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PooledTableSession.Insert now drops/invalidates the pooled session on any returned error. Since insertRelationalTablet now calls VerifySuccess internally, non-success server status codes (e.g., user/SQL/data errors) will also cause the session to be dropped even though the connection is still healthy. Consider only dropping the session for transport/connection errors, or otherwise preserve a way to distinguish retryable/connection failures from application-level status errors.

Copilot uses AI. Check for mistakes.
}

// ExecuteNonQueryStatement executes a non-query SQL statement, such as a DDL or DML command.
Expand All @@ -100,17 +97,16 @@ func (s *PooledTableSession) Insert(tablet *Tablet) (r *common.TSStatus, err err
// - sql: The SQL statement to execute.
//
// Returns:
// - r: A pointer to TSStatus indicating the execution result.
// - err: An error if an issue occurs during the operation.
func (s *PooledTableSession) ExecuteNonQueryStatement(sql string) (r *common.TSStatus, err error) {
r, err = s.session.ExecuteNonQueryStatement(sql)
func (s *PooledTableSession) ExecuteNonQueryStatement(sql string) error {
err := s.session.ExecuteNonQueryStatement(sql)
if err == nil {
return
return nil
}
s.sessionPool.dropSession(s.session)
atomic.StoreInt32(&s.closed, 1)
s.session = Session{}
return
return err
Comment on lines +101 to +109
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PooledTableSession.ExecuteNonQueryStatement drops the underlying session for any error returned by Session.ExecuteNonQueryStatement. After this refactor, that error can come from VerifySuccess (i.e., a non-success TSStatus) and not necessarily a broken connection. Dropping the session for application-level errors can cause unnecessary churn in the pool; consider only dropping on connection/transport failures.

Copilot uses AI. Check for mistakes.
}

// ExecuteQueryStatement executes a query SQL statement and returns the result set.
Expand Down Expand Up @@ -140,8 +136,8 @@ func (s *PooledTableSession) ExecuteQueryStatement(sql string, timeoutInMs *int6
func (s *PooledTableSession) Close() error {
if atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
if s.session.config.Database != s.sessionPool.config.Database && s.sessionPool.config.Database != "" {
r, err := s.session.ExecuteNonQueryStatement("use " + s.sessionPool.config.Database)
if r.Code == ExecuteStatementError || err != nil {
err := s.session.ExecuteNonQueryStatement("use " + s.sessionPool.config.Database)
if err != nil {
log.Println("Failed to change back database by executing: use ", s.sessionPool.config.Database)
s.session.Close()
return nil
Expand Down
16 changes: 5 additions & 11 deletions example/session_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,19 +465,19 @@ func deleteData() {

func insertTablet() {
if tablet, err := createTablet(12); err == nil {
status, err := session.InsertTablet(tablet, false)
err = session.InsertTablet(tablet, false)
tablet.Reset()
checkError(status, err)
checkError(err)
} else {
log.Fatal(err)
}
}

func insertAlignedTablet() {
if tablet, err := createTablet(12); err == nil {
status, err := session.InsertAlignedTablet(tablet, false)
err = session.InsertAlignedTablet(tablet, false)
tablet.Reset()
checkError(status, err)
checkError(err)
} else {
log.Fatal(err)
}
Expand Down Expand Up @@ -642,14 +642,8 @@ func executeBatchStatement() {
}
}

func checkError(status *common.TSStatus, err error) {
func checkError(err error) {
if err != nil {
log.Fatal(err)
}

if status != nil {
if err = client.VerifySuccess(status); err != nil {
log.Println(err)
}
}
}
17 changes: 5 additions & 12 deletions example/session_pool/session_pool_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"time"

"github.com/apache/iotdb-client-go/v2/client"
"github.com/apache/iotdb-client-go/v2/common"
)

var (
Expand Down Expand Up @@ -410,9 +409,9 @@ func insertTablet() {
defer sessionPool.PutBack(session)
if err == nil {
if tablet, err := createTablet(12); err == nil {
status, err := session.InsertTablet(tablet, false)
err := session.InsertTablet(tablet, false)
tablet.Reset()
checkError(status, err)
checkError(err)
} else {
log.Fatal(err)
}
Expand All @@ -425,9 +424,9 @@ func insertAlignedTablet() {
defer sessionPool.PutBack(session)
if err == nil {
if tablet, err := createTablet(12); err == nil {
status, err := session.InsertAlignedTablet(tablet, false)
err := session.InsertAlignedTablet(tablet, false)
tablet.Reset()
checkError(status, err)
checkError(err)
} else {
log.Fatal(err)
}
Expand Down Expand Up @@ -725,14 +724,8 @@ func printDataSet2(sds *client.SessionDataSet) {
}
}

func checkError(status *common.TSStatus, err error) {
func checkError(err error) {
if err != nil {
log.Fatal(err)
}

if status != nil {
if err = client.VerifySuccess(status); err != nil {
log.Println(err)
}
}
}
9 changes: 1 addition & 8 deletions example/session_pool/table/table_session_pool_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"time"

"github.com/apache/iotdb-client-go/v2/client"
"github.com/apache/iotdb-client-go/v2/common"
)

func main() {
Expand Down Expand Up @@ -168,14 +167,8 @@ func sessionPoolWithoutSpecificDatabaseExample() {
wg.Wait()
}

func checkError(status *common.TSStatus, err error) {
func checkError(err error) {
if err != nil {
log.Fatal(err)
}

if status != nil {
if err = client.VerifySuccess(status); err != nil {
log.Println(err)
}
}
}
9 changes: 1 addition & 8 deletions example/table/table_session_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"time"

"github.com/apache/iotdb-client-go/v2/client"
"github.com/apache/iotdb-client-go/v2/common"
)

func main() {
Expand Down Expand Up @@ -159,14 +158,8 @@ func query(session client.ITableSession) {
}
}

func checkError(status *common.TSStatus, err error) {
func checkError(err error) {
if err != nil {
log.Fatal(err)
}

if status != nil {
if err = client.VerifySuccess(status); err != nil {
log.Println(err)
}
}
}
6 changes: 1 addition & 5 deletions test/e2e/e2e_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/stretchr/testify/suite"

"github.com/apache/iotdb-client-go/v2/client"
"github.com/apache/iotdb-client-go/v2/common"
)

var (
Expand Down Expand Up @@ -394,9 +393,6 @@ func getValueFromDataSetByIndex(dataSet *client.SessionDataSet, columnIndex int3
return v
}

func (s *e2eTableTestSuite) checkError(status *common.TSStatus, err error) {
func (s *e2eTableTestSuite) checkError(err error) {
s.Require().NoError(err)
if status != nil {
s.Require().NoError(client.VerifySuccess(status))
}
}
28 changes: 12 additions & 16 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/stretchr/testify/suite"

"github.com/apache/iotdb-client-go/v2/client"
"github.com/apache/iotdb-client-go/v2/common"
)

type e2eTestSuite struct {
Expand Down Expand Up @@ -61,20 +60,17 @@ func (s *e2eTestSuite) TearDownSuite() {
}

func (s *e2eTestSuite) SetupTest() {
r, err := s.session.SetStorageGroup("root.tsg1")
s.checkError(r, err)
err := s.session.SetStorageGroup("root.tsg1")
s.checkError(err)
}

func (s *e2eTestSuite) TearDownTest() {
r, err := s.session.DeleteStorageGroup("root.tsg1")
s.checkError(r, err)
err := s.session.DeleteStorageGroup("root.tsg1")
s.checkError(err)
}

func (s *e2eTestSuite) checkError(status *common.TSStatus, err error) {
func (s *e2eTestSuite) checkError(err error) {
s.Require().NoError(err)
if status != nil {
s.Require().NoError(client.VerifySuccess(status))
}
}

func (s *e2eTestSuite) Test_WrongURL() {
Expand Down Expand Up @@ -174,7 +170,7 @@ func (s *e2eTestSuite) Test_InsertRecordsWithWrongType() {
values = [][]interface{}{{100.0, true}, {"aaa"}}
timestamp = []int64{1, 2}
)
_, err := s.session.InsertRecords(deviceId, measurements, dataTypes, values, timestamp)
err := s.session.InsertRecords(deviceId, measurements, dataTypes, values, timestamp)
assert := s.Require()
assert.NotNil(err)
assert.Equal("measurement s1 values[0] 100(float64) must be bool", err.Error())
Expand Down Expand Up @@ -255,8 +251,8 @@ func (s *e2eTestSuite) Test_InsertAlignedTablet() {
var timeseries = []string{"root.ln.device1.**"}
s.session.DeleteTimeseries(timeseries)
if tablet, err := createTablet(12); err == nil {
status, err := s.session.InsertAlignedTablet(tablet, false)
s.checkError(status, err)
err := s.session.InsertAlignedTablet(tablet, false)
s.checkError(err)
tablet.Reset()
} else {
log.Fatal(err)
Expand All @@ -277,8 +273,8 @@ func (s *e2eTestSuite) Test_InsertAlignedTabletWithNilValue() {
var timeseries = []string{"root.ln.device1.**"}
s.session.DeleteTimeseries(timeseries)
if tablet, err := createTabletWithNil(12); err == nil {
status, err := s.session.InsertAlignedTablet(tablet, false)
s.checkError(status, err)
err := s.session.InsertAlignedTablet(tablet, false)
s.checkError(err)
tablet.Reset()
} else {
log.Fatal(err)
Expand Down Expand Up @@ -499,8 +495,8 @@ func (s *e2eTestSuite) Test_QueryAllDataType() {
tablet.SetValueAt("string", 9, 0)
tablet.RowSize = 1

r, err := s.session.InsertAlignedTablet(tablet, true)
s.checkError(r, err)
err = s.session.InsertAlignedTablet(tablet, true)
s.checkError(err)

sessionDataSet, err := s.session.ExecuteQueryStatement("select s0, s1, s2, s3, s4, s5, s6, s7, s8, s9 from root.tsg1.d1 limit 1", nil)
for {
Expand Down
Loading