Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.25.6
require (
cdr.dev/slog/v3 v3.0.0-rc1
github.com/coder/coder/v2 v2.30.1
github.com/coder/retry v1.5.1
github.com/docker/docker v28.5.2+incompatible
github.com/docker/go-connections v0.6.0
github.com/google/uuid v1.6.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ github.com/coder/coder/v2 v2.30.1 h1:5dxGKxWx80xb6lNd958y8Y4h3fBbQubDgIooHTTv/RQ
github.com/coder/coder/v2 v2.30.1/go.mod h1:w40ThqnpVr727SVnu3wwUrK2woxNx1MrV1zVxxABimk=
github.com/coder/pretty v0.0.0-20230908205945-e89ba86370e0 h1:3A0ES21Ke+FxEM8CXx9n47SZOKOpgSE1bbJzlE4qPVs=
github.com/coder/pretty v0.0.0-20230908205945-e89ba86370e0/go.mod h1:5UuS2Ts+nTToAMeOjNlnHFkPahrtDkmpydBen/3wgZc=
github.com/coder/retry v1.5.1 h1:iWu8YnD8YqHs3XwqrqsjoBTAVqT9ml6z9ViJ2wlMiqc=
github.com/coder/retry v1.5.1/go.mod h1:blHMk9vs6LkoRT9ZHyuZo360cufXEhrxqvEzeMtRGoY=
github.com/coder/serpent v0.13.0 h1:6EoWjpEypkb8cS6i0eCF4qoAv9vrEVaX26RW+3FMMvo=
github.com/coder/serpent v0.13.0/go.mod h1:7OIvFBYMd+OqarMy5einBl8AtRr8LliopVU7pyrwucY=
github.com/coder/terraform-provider-coder/v2 v2.13.1 h1:dtPaJUvueFm+XwBPUMWQCc5Z1QUQBW4B4RNyzX4h4y8=
Expand Down
95 changes: 59 additions & 36 deletions internal/provider/template_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
"io"
"slices"
"strings"
"time"

"cdr.dev/slog/v3"
"github.com/coder/coder/v2/coderd/util/ptr"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/provisionersdk"
"github.com/coder/retry"
"github.com/coder/terraform-provider-coderd/internal/codersdkvalidator"
"github.com/google/uuid"
"github.com/hashicorp/terraform-plugin-framework-validators/listvalidator"
Expand Down Expand Up @@ -1104,49 +1106,70 @@ func uploadDirectory(ctx context.Context, client *codersdk.Client, logger slog.L

func waitForJob(ctx context.Context, client *codersdk.Client, version *codersdk.TemplateVersion) ([]codersdk.ProvisionerJobLog, error) {
const maxRetries = 3
var jobLogs []codersdk.ProvisionerJobLog
for retries := 0; retries < maxRetries; retries++ {
logs, closer, err := client.TemplateVersionLogsAfter(ctx, version.ID, 0)
var allLogs []codersdk.ProvisionerJobLog
var lastLogID int64

for attempts, retrier := 0, retry.New(500*time.Millisecond, 5*time.Second); attempts < maxRetries && retrier.Wait(ctx); attempts++ {
logs, done, err := waitForJobOnce(ctx, client, version, lastLogID)
allLogs = append(allLogs, logs...)
if len(logs) > 0 {
lastLogID = logs[len(logs)-1].ID
}
if err != nil {
return jobLogs, fmt.Errorf("begin streaming logs: %w", err)
return allLogs, err
}
defer func() {
if err := closer.Close(); err != nil {
tflog.Warn(ctx, "error closing template version log stream", map[string]any{
"error": err,
})
}
}()
for {
logs, ok := <-logs
if !ok {
break
}
tflog.Info(ctx, logs.Output, map[string]interface{}{
"job_id": logs.ID,
"job_stage": logs.Stage,
"log_source": logs.Source,
"level": logs.Level,
"created_at": logs.CreatedAt,
})
if logs.Output != "" {
jobLogs = append(jobLogs, logs)
}
if done {
return allLogs, nil
}
latestResp, err := client.TemplateVersion(ctx, version.ID)
if err != nil {
return jobLogs, err
tflog.Warn(ctx, fmt.Sprintf("provisioner job still active, retrying (attempt %d/%d)", attempts+1, maxRetries))
}

if err := ctx.Err(); err != nil {
return allLogs, err
}
return allLogs, fmt.Errorf("provisioner job did not complete after %d retries", maxRetries)
}

func waitForJobOnce(ctx context.Context, client *codersdk.Client, version *codersdk.TemplateVersion, after int64) ([]codersdk.ProvisionerJobLog, bool, error) {
logCh, closer, err := client.TemplateVersionLogsAfter(ctx, version.ID, after)
if err != nil {
return nil, false, fmt.Errorf("begin streaming logs: %w", err)
}
defer func() {
if err := closer.Close(); err != nil {
tflog.Warn(ctx, "error closing template version log stream", map[string]any{
"error": err,
})
}
if latestResp.Job.Status.Active() {
tflog.Warn(ctx, fmt.Sprintf("provisioner job still active, continuing to wait...: %s", latestResp.Job.Status))
continue
}()
var jobLogs []codersdk.ProvisionerJobLog
for {
logMsg, ok := <-logCh
if !ok {
break
}
if latestResp.Job.Status != codersdk.ProvisionerJobSucceeded {
return jobLogs, fmt.Errorf("provisioner job did not succeed: %s (%s)", latestResp.Job.Status, latestResp.Job.Error)
tflog.Info(ctx, logMsg.Output, map[string]interface{}{
"job_id": logMsg.ID,
"job_stage": logMsg.Stage,
"log_source": logMsg.Source,
"level": logMsg.Level,
"created_at": logMsg.CreatedAt,
})
if logMsg.Output != "" {
jobLogs = append(jobLogs, logMsg)
}
return jobLogs, nil
}
return jobLogs, fmt.Errorf("provisioner job did not complete after %d retries", maxRetries)
latestResp, err := client.TemplateVersion(ctx, version.ID)
if err != nil {
return jobLogs, false, err
}
if latestResp.Job.Status.Active() {
return jobLogs, false, nil
}
if latestResp.Job.Status != codersdk.ProvisionerJobSucceeded {
return jobLogs, false, fmt.Errorf("provisioner job did not succeed: %s (%s)", latestResp.Job.Status, latestResp.Job.Error)
}
return jobLogs, true, nil
}

type newVersionRequest struct {
Expand Down
249 changes: 249 additions & 0 deletions internal/provider/wait_for_job_test.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding tests! I tried checking to see if they trigger the panic by temporarily reverting your changes, and they don't seem to do so. Can you double check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right — the existing tests don't reproduce the original v0.0.12 panic (defer before error check), since that was already fixed in v0.0.13.

I've added TestWaitForJob_ClosesConnectionBetweenRetries which specifically tests the defer-in-loop issue this PR fixes. It tracks the maximum number of concurrently open WebSocket connections during retries. With the old defer-in-loop code, closers accumulate and connections stay open across retries (maxOpenConns > 1). With the extracted function, each connection is closed before the next retry starts (maxOpenConns == 1).

Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
package provider

import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"sync/atomic"
"testing"

"github.com/coder/coder/v2/codersdk"
"github.com/coder/websocket"
"github.com/coder/websocket/wsjson"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
)

func TestWaitForJobOnce_Success(t *testing.T) {
t.Parallel()
versionID := uuid.New()

handler := http.NewServeMux()
handler.HandleFunc("/api/v2/templateversions/", func(w http.ResponseWriter, r *http.Request) {
if strings.Contains(r.URL.RawQuery, "follow") {
conn, err := websocket.Accept(w, r, nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
ctx := r.Context()
_ = wsjson.Write(ctx, conn, codersdk.ProvisionerJobLog{
ID: 1,
Output: "test log line",
})
_ = conn.Close(websocket.StatusNormalClosure, "done")
return
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(codersdk.TemplateVersion{
ID: versionID,
Job: codersdk.ProvisionerJob{
Status: codersdk.ProvisionerJobSucceeded,
},
})
})

srv := httptest.NewServer(handler)
t.Cleanup(srv.Close)
srvURL, err := url.Parse(srv.URL)
require.NoError(t, err)
client := codersdk.New(srvURL)

version := &codersdk.TemplateVersion{ID: versionID}
logs, done, err := waitForJobOnce(context.Background(), client, version, 0)
require.NoError(t, err)
require.True(t, done)
require.Len(t, logs, 1)
require.Equal(t, "test log line", logs[0].Output)
}

func TestWaitForJobOnce_JobFailed(t *testing.T) {
t.Parallel()
versionID := uuid.New()

handler := http.NewServeMux()
handler.HandleFunc("/api/v2/templateversions/", func(w http.ResponseWriter, r *http.Request) {
if strings.Contains(r.URL.RawQuery, "follow") {
conn, err := websocket.Accept(w, r, nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_ = conn.Close(websocket.StatusNormalClosure, "done")
return
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(codersdk.TemplateVersion{
ID: versionID,
Job: codersdk.ProvisionerJob{
Status: codersdk.ProvisionerJobFailed,
Error: "something went wrong",
},
})
})

srv := httptest.NewServer(handler)
t.Cleanup(srv.Close)
srvURL, err := url.Parse(srv.URL)
require.NoError(t, err)
client := codersdk.New(srvURL)

version := &codersdk.TemplateVersion{ID: versionID}
_, done, err := waitForJobOnce(context.Background(), client, version, 0)
require.Error(t, err)
require.False(t, done)
require.Contains(t, err.Error(), "provisioner job did not succeed")
require.Contains(t, err.Error(), "something went wrong")
}

func TestWaitForJobOnce_StillActive(t *testing.T) {
t.Parallel()
versionID := uuid.New()

handler := http.NewServeMux()
handler.HandleFunc("/api/v2/templateversions/", func(w http.ResponseWriter, r *http.Request) {
if strings.Contains(r.URL.RawQuery, "follow") {
conn, err := websocket.Accept(w, r, nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_ = conn.Close(websocket.StatusNormalClosure, "done")
return
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(codersdk.TemplateVersion{
ID: versionID,
Job: codersdk.ProvisionerJob{
Status: codersdk.ProvisionerJobRunning,
},
})
})

srv := httptest.NewServer(handler)
t.Cleanup(srv.Close)
srvURL, err := url.Parse(srv.URL)
require.NoError(t, err)
client := codersdk.New(srvURL)

version := &codersdk.TemplateVersion{ID: versionID}
_, done, err := waitForJobOnce(context.Background(), client, version, 0)
require.NoError(t, err)
require.False(t, done)
}

func TestWaitForJob_UsesAfterCursorAcrossRetries(t *testing.T) {
t.Parallel()
versionID := uuid.New()
var versionCallCount atomic.Int32
var wsCallCount atomic.Int32
secondAfterCh := make(chan string, 1)

handler := http.NewServeMux()
handler.HandleFunc("/api/v2/templateversions/", func(w http.ResponseWriter, r *http.Request) {
if strings.Contains(r.URL.RawQuery, "follow") {
call := wsCallCount.Add(1)
if call == 2 {
secondAfterCh <- r.URL.Query().Get("after")
}

conn, err := websocket.Accept(w, r, nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
ctx := r.Context()
if call == 1 {
_ = wsjson.Write(ctx, conn, codersdk.ProvisionerJobLog{ID: 1, Output: "log 1"})
_ = wsjson.Write(ctx, conn, codersdk.ProvisionerJobLog{ID: 2, Output: "log 2"})
_ = wsjson.Write(ctx, conn, codersdk.ProvisionerJobLog{ID: 3, Output: "log 3"})
} else {
_ = wsjson.Write(ctx, conn, codersdk.ProvisionerJobLog{ID: 4, Output: "log 4"})
_ = wsjson.Write(ctx, conn, codersdk.ProvisionerJobLog{ID: 5, Output: "log 5"})
}
_ = conn.Close(websocket.StatusNormalClosure, "done")
return
}

count := versionCallCount.Add(1)
status := codersdk.ProvisionerJobRunning
if count >= 2 {
status = codersdk.ProvisionerJobSucceeded
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(codersdk.TemplateVersion{
ID: versionID,
Job: codersdk.ProvisionerJob{Status: status},
})
})

srv := httptest.NewServer(handler)
t.Cleanup(srv.Close)
srvURL, err := url.Parse(srv.URL)
require.NoError(t, err)
client := codersdk.New(srvURL)

version := &codersdk.TemplateVersion{ID: versionID}
logs, err := waitForJob(context.Background(), client, version)
require.NoError(t, err)
require.Len(t, logs, 5)
for i, log := range logs {
require.Equal(t, int64(i+1), log.ID)
}
require.Equal(t, int32(2), wsCallCount.Load())
select {
case got := <-secondAfterCh:
require.Equal(t, "3", got)
default:
t.Fatal("missing second after cursor")
}
}

func TestWaitForJob_ContextCanceledDuringBackoff(t *testing.T) {
t.Parallel()
versionID := uuid.New()
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
var statusCallCount atomic.Int32
firstStatusSeen := make(chan struct{}, 1)
go func() {
<-firstStatusSeen
cancel()
}()

handler := http.NewServeMux()
handler.HandleFunc("/api/v2/templateversions/", func(w http.ResponseWriter, r *http.Request) {
if strings.Contains(r.URL.RawQuery, "follow") {
conn, err := websocket.Accept(w, r, nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_ = conn.Close(websocket.StatusNormalClosure, "done")
return
}

w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(codersdk.TemplateVersion{
ID: versionID,
Job: codersdk.ProvisionerJob{Status: codersdk.ProvisionerJobRunning},
})
// Cancel after the first status response so waitForJob hits cancellation while waiting to retry.
if statusCallCount.Add(1) == 1 {
firstStatusSeen <- struct{}{}
}
})

srv := httptest.NewServer(handler)
t.Cleanup(srv.Close)
srvURL, err := url.Parse(srv.URL)
require.NoError(t, err)
client := codersdk.New(srvURL)

version := &codersdk.TemplateVersion{ID: versionID}
_, err = waitForJob(ctx, client, version)
require.ErrorIs(t, err, context.Canceled)
}