From 415e013adb36c05926c1b32c5d5f6ec699cf46ae Mon Sep 17 00:00:00 2001 From: Michael Weibel Date: Fri, 27 Mar 2026 11:21:35 +0100 Subject: [PATCH] fix: prevent silent volume migration in ControllerPublishVolume ControllerPublishVolume previously overwrote ServerUUIDs unconditionally, silently moving an attached volume to a new node. When the subsequent ControllerUnpublishVolume for the old node found the volume no longer there, it returned success without detaching, leaving a stale VolumeAttachment that caused hours-long Multi-Attach deadlocks in production. Fix: fetch the volume before attaching. If it is attached to a different node return FailedPrecondition so the external-attacher waits for the old detach to complete first. If already attached to the requested node return idempotent success. Also add volume locks (TryAcquire/Release, already used on the node side) to DeleteVolume, ControllerPublishVolume, ControllerUnpublishVolume, and ControllerExpandVolume to prevent TOCTOU races between concurrent mutating operations on the same volume. Backport of f7ffaa4f8f39f419261db800f1ba78ab3dea5061 to release/3.6.x. --- driver/controller.go | 63 ++++++++++- driver/driver_test.go | 256 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 314 insertions(+), 5 deletions(-) diff --git a/driver/controller.go b/driver/controller.go index 09df2c45..358a904e 100644 --- a/driver/controller.go +++ b/driver/controller.go @@ -199,6 +199,11 @@ func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) return nil, status.Error(codes.InvalidArgument, "DeleteVolume Volume ID must be provided") } + if acquired := d.volumeLocks.TryAcquire(req.VolumeId); !acquired { + return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", req.VolumeId) + } + defer d.volumeLocks.Release(req.VolumeId) + ll := d.log.WithFields(logrus.Fields{ "volume_id": req.VolumeId, "method": "delete_volume", @@ -248,6 +253,11 @@ func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.Controlle return nil, status.Error(codes.AlreadyExists, "read only Volumes are not supported") } + if acquired := d.volumeLocks.TryAcquire(req.VolumeId); !acquired { + return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", req.VolumeId) + } + defer d.volumeLocks.Release(req.VolumeId) + ll := d.log.WithFields(logrus.Fields{ "volume_id": req.VolumeId, "node_id": req.NodeId, @@ -255,10 +265,46 @@ func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.Controlle }) ll.Info("controller publish volume called") + // Check current attachment state before modifying it. This prevents + // silently moving a volume that is still attached to another node, + // which would cause a stale VolumeAttachment and Multi-Attach errors. + volume, err := d.cloudscaleClient.Volumes.Get(ctx, req.VolumeId) + if err != nil { + return nil, reraiseNotFound(err, ll, "fetch volume for publish") + } + + if volume.ServerUUIDs != nil && len(*volume.ServerUUIDs) > 0 { + alreadyAttachedToRequestedNode := false + for _, serverUUID := range *volume.ServerUUIDs { + if serverUUID == req.NodeId { + alreadyAttachedToRequestedNode = true + break + } + } + + if alreadyAttachedToRequestedNode { + ll.Info("volume is already attached to the requested node") + return &csi.ControllerPublishVolumeResponse{ + PublishContext: map[string]string{ + PublishInfoVolumeName: volume.Name, + LuksEncryptedAttribute: req.VolumeContext[LuksEncryptedAttribute], + LuksCipherAttribute: req.VolumeContext[LuksCipherAttribute], + LuksKeySizeAttribute: req.VolumeContext[LuksKeySizeAttribute], + }, + }, nil + } + + ll.WithField("current_server_uuids", *volume.ServerUUIDs). + Warn("volume is already attached to a different node") + return nil, status.Errorf(codes.FailedPrecondition, + "volume %s is already attached to server(s) %v, must be detached first", + req.VolumeId, *volume.ServerUUIDs) + } + attachRequest := &cloudscale.VolumeRequest{ ServerUUIDs: &[]string{req.NodeId}, } - err := d.cloudscaleClient.Volumes.Update(ctx, req.VolumeId, attachRequest) + err = d.cloudscaleClient.Volumes.Update(ctx, req.VolumeId, attachRequest) if err != nil { if maxVolumesPerServerErrorMessageRe.MatchString(err.Error()) { return nil, status.Error(codes.ResourceExhausted, err.Error()) @@ -268,10 +314,6 @@ func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.Controlle } ll.Info("volume is attached") - volume, err := d.cloudscaleClient.Volumes.Get(ctx, req.VolumeId) - if err != nil { - return nil, reraiseNotFound(err, ll, "fetch volume") - } return &csi.ControllerPublishVolumeResponse{ PublishContext: map[string]string{ PublishInfoVolumeName: volume.Name, @@ -288,6 +330,11 @@ func (d *Driver) ControllerUnpublishVolume(ctx context.Context, req *csi.Control return nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Volume ID must be provided") } + if acquired := d.volumeLocks.TryAcquire(req.VolumeId); !acquired { + return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", req.VolumeId) + } + defer d.volumeLocks.Release(req.VolumeId) + ll := d.log.WithFields(logrus.Fields{ "volume_id": req.VolumeId, "node_id": req.NodeId, @@ -511,6 +558,12 @@ func (d *Driver) ControllerExpandVolume(ctx context.Context, req *csi.Controller if len(volID) == 0 { return nil, status.Error(codes.InvalidArgument, "ControllerExpandVolume volume ID missing in request") } + + if acquired := d.volumeLocks.TryAcquire(volID); !acquired { + return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", volID) + } + defer d.volumeLocks.Release(volID) + volume, err := d.cloudscaleClient.Volumes.Get(ctx, volID) if err != nil { return nil, status.Errorf(codes.Internal, "ControllerExpandVolume could not retrieve existing volume: %v", err) diff --git a/driver/driver_test.go b/driver/driver_test.go index eaf5b848..76fc8421 100644 --- a/driver/driver_test.go +++ b/driver/driver_test.go @@ -706,3 +706,259 @@ func TestNodeOperations_CrossOperationLocking(t *testing.T) { execStage <- struct{}{} <-respStage } + +func assertAbortedError(t *testing.T, err error, opName string) { + t.Helper() + if err == nil { + t.Errorf("%s: expected Aborted error when volume is locked, got nil", opName) + return + } + st, ok := status.FromError(err) + if !ok { + t.Errorf("%s: expected gRPC status error, got: %v", opName, err) + return + } + if st.Code() != codes.Aborted { + t.Errorf("%s: expected codes.Aborted, got %v: %v", opName, st.Code(), err) + } +} + +// TestControllerPublishVolume_RejectsWhenAttachedToDifferentNode verifies that +// ControllerPublishVolume returns FailedPrecondition when the volume is already +// attached to a different node, preventing silent volume migration. +func TestControllerPublishVolume_RejectsWhenAttachedToDifferentNode(t *testing.T) { + serverA := "server-a-uuid" + serverB := "server-b-uuid" + initialServers := map[string]*cloudscale.Server{ + serverA: {UUID: serverA}, + serverB: {UUID: serverB}, + } + cloudscaleClient := NewFakeClient(initialServers) + driver := &Driver{ + endpoint: "unix:///tmp/csi-test.sock", + serverId: serverA, + zone: DefaultZone.Slug, + cloudscaleClient: cloudscaleClient, + mounter: &fakeMounter{mounted: map[string]string{}}, + log: logrus.New().WithField("test_enabled", true), + volumeLocks: NewVolumeLocks(), + } + + ctx := context.Background() + createResp, err := driver.CreateVolume(ctx, &csi.CreateVolumeRequest{ + Name: "test-vol-multiattach", + CapacityRange: &csi.CapacityRange{RequiredBytes: 1 * GB}, + VolumeCapabilities: []*csi.VolumeCapability{{ + AccessMode: &csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER}, + AccessType: &csi.VolumeCapability_Mount{Mount: &csi.VolumeCapability_MountVolume{}}, + }}, + Parameters: map[string]string{"type": "ssd"}, + }) + if err != nil { + t.Fatalf("CreateVolume failed: %v", err) + } + volID := createResp.Volume.VolumeId + + // Attach to server A + _, err = driver.ControllerPublishVolume(ctx, &csi.ControllerPublishVolumeRequest{ + VolumeId: volID, + NodeId: serverA, + VolumeCapability: &csi.VolumeCapability{ + AccessMode: &csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER}, + AccessType: &csi.VolumeCapability_Mount{Mount: &csi.VolumeCapability_MountVolume{}}, + }, + }) + if err != nil { + t.Fatalf("Failed to publish volume to server A: %v", err) + } + + // Try to attach the same volume to server B — should be rejected + _, err = driver.ControllerPublishVolume(ctx, &csi.ControllerPublishVolumeRequest{ + VolumeId: volID, + NodeId: serverB, + VolumeCapability: &csi.VolumeCapability{ + AccessMode: &csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER}, + AccessType: &csi.VolumeCapability_Mount{Mount: &csi.VolumeCapability_MountVolume{}}, + }, + }) + if err == nil { + t.Fatal("Expected FailedPrecondition error when publishing to different node, got nil") + } + st, ok := status.FromError(err) + if !ok { + t.Fatalf("Expected gRPC status error, got: %v", err) + } + if st.Code() != codes.FailedPrecondition { + t.Errorf("Expected codes.FailedPrecondition, got %v: %v", st.Code(), err) + } +} + +// TestControllerPublishVolume_IdempotentSameNode tests that calling +// ControllerPublishVolume for a volume already attached to the same node +// returns success without error. +func TestControllerPublishVolume_IdempotentSameNode(t *testing.T) { + serverA := "server-a-uuid" + initialServers := map[string]*cloudscale.Server{ + serverA: {UUID: serverA}, + } + cloudscaleClient := NewFakeClient(initialServers) + driver := &Driver{ + endpoint: "unix:///tmp/csi-test.sock", + serverId: serverA, + zone: DefaultZone.Slug, + cloudscaleClient: cloudscaleClient, + mounter: &fakeMounter{mounted: map[string]string{}}, + log: logrus.New().WithField("test_enabled", true), + volumeLocks: NewVolumeLocks(), + } + + ctx := context.Background() + createResp, err := driver.CreateVolume(ctx, &csi.CreateVolumeRequest{ + Name: "test-vol-idempotent", + CapacityRange: &csi.CapacityRange{RequiredBytes: 1 * GB}, + VolumeCapabilities: []*csi.VolumeCapability{{ + AccessMode: &csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER}, + AccessType: &csi.VolumeCapability_Mount{Mount: &csi.VolumeCapability_MountVolume{}}, + }}, + Parameters: map[string]string{"type": "ssd"}, + }) + if err != nil { + t.Fatalf("CreateVolume failed: %v", err) + } + + publishReq := &csi.ControllerPublishVolumeRequest{ + VolumeId: createResp.Volume.VolumeId, + NodeId: serverA, + VolumeCapability: &csi.VolumeCapability{ + AccessMode: &csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER}, + AccessType: &csi.VolumeCapability_Mount{Mount: &csi.VolumeCapability_MountVolume{}}, + }, + VolumeContext: map[string]string{LuksEncryptedAttribute: "false"}, + } + + resp1, err := driver.ControllerPublishVolume(ctx, publishReq) + if err != nil { + t.Fatalf("First publish failed: %v", err) + } + resp2, err := driver.ControllerPublishVolume(ctx, publishReq) + if err != nil { + t.Fatalf("Second publish (idempotent) failed: %v", err) + } + if resp1.PublishContext[PublishInfoVolumeName] != resp2.PublishContext[PublishInfoVolumeName] { + t.Errorf("Publish context mismatch: %v vs %v", resp1.PublishContext, resp2.PublishContext) + } +} + +// TestControllerPublishVolume_SucceedsWhenNotAttached tests that +// ControllerPublishVolume works normally when the volume is not attached. +func TestControllerPublishVolume_SucceedsWhenNotAttached(t *testing.T) { + serverA := "server-a-uuid" + initialServers := map[string]*cloudscale.Server{ + serverA: {UUID: serverA}, + } + cloudscaleClient := NewFakeClient(initialServers) + driver := &Driver{ + endpoint: "unix:///tmp/csi-test.sock", + serverId: serverA, + zone: DefaultZone.Slug, + cloudscaleClient: cloudscaleClient, + mounter: &fakeMounter{mounted: map[string]string{}}, + log: logrus.New().WithField("test_enabled", true), + volumeLocks: NewVolumeLocks(), + } + + ctx := context.Background() + createResp, err := driver.CreateVolume(ctx, &csi.CreateVolumeRequest{ + Name: "test-vol-attach", + CapacityRange: &csi.CapacityRange{RequiredBytes: 1 * GB}, + VolumeCapabilities: []*csi.VolumeCapability{{ + AccessMode: &csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER}, + AccessType: &csi.VolumeCapability_Mount{Mount: &csi.VolumeCapability_MountVolume{}}, + }}, + Parameters: map[string]string{"type": "ssd"}, + }) + if err != nil { + t.Fatalf("CreateVolume failed: %v", err) + } + + resp, err := driver.ControllerPublishVolume(ctx, &csi.ControllerPublishVolumeRequest{ + VolumeId: createResp.Volume.VolumeId, + NodeId: serverA, + VolumeCapability: &csi.VolumeCapability{ + AccessMode: &csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER}, + AccessType: &csi.VolumeCapability_Mount{Mount: &csi.VolumeCapability_MountVolume{}}, + }, + }) + if err != nil { + t.Fatalf("Publish failed: %v", err) + } + if resp.PublishContext[PublishInfoVolumeName] == "" { + t.Error("Expected non-empty volume name in publish context") + } +} + +// TestControllerOperations_VolumeLocks tests that concurrent controller +// operations on the same volume are properly serialized with volume locks. +func TestControllerOperations_VolumeLocks(t *testing.T) { + initialServers := map[string]*cloudscale.Server{} + cloudscaleClient := NewFakeClient(initialServers) + driver := &Driver{ + mounter: &fakeMounter{}, + log: logrus.New().WithField("test_enabled", true), + cloudscaleClient: cloudscaleClient, + volumeLocks: NewVolumeLocks(), + } + + ctx := context.Background() + createResp, err := driver.CreateVolume(ctx, &csi.CreateVolumeRequest{ + Name: "test-vol-locks", + CapacityRange: &csi.CapacityRange{RequiredBytes: 1 * GB}, + VolumeCapabilities: []*csi.VolumeCapability{{ + AccessMode: &csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER}, + AccessType: &csi.VolumeCapability_Mount{Mount: &csi.VolumeCapability_MountVolume{}}, + }}, + Parameters: map[string]string{"type": "ssd"}, + }) + if err != nil { + t.Fatalf("CreateVolume failed: %v", err) + } + volID := createResp.Volume.VolumeId + + // Pre-acquire the volume lock + if !driver.volumeLocks.TryAcquire(volID) { + t.Fatal("Failed to pre-acquire volume lock") + } + + // ControllerPublishVolume should return Aborted + _, err = driver.ControllerPublishVolume(ctx, &csi.ControllerPublishVolumeRequest{ + VolumeId: volID, + NodeId: "some-node", + VolumeCapability: &csi.VolumeCapability{ + AccessMode: &csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER}, + AccessType: &csi.VolumeCapability_Mount{Mount: &csi.VolumeCapability_MountVolume{}}, + }, + }) + assertAbortedError(t, err, "ControllerPublishVolume") + + // ControllerUnpublishVolume should return Aborted + _, err = driver.ControllerUnpublishVolume(ctx, &csi.ControllerUnpublishVolumeRequest{ + VolumeId: volID, + NodeId: "some-node", + }) + assertAbortedError(t, err, "ControllerUnpublishVolume") + + // DeleteVolume should return Aborted + _, err = driver.DeleteVolume(ctx, &csi.DeleteVolumeRequest{ + VolumeId: volID, + }) + assertAbortedError(t, err, "DeleteVolume") + + // ControllerExpandVolume should return Aborted + _, err = driver.ControllerExpandVolume(ctx, &csi.ControllerExpandVolumeRequest{ + VolumeId: volID, + CapacityRange: &csi.CapacityRange{RequiredBytes: 10 * GB}, + }) + assertAbortedError(t, err, "ControllerExpandVolume") + + driver.volumeLocks.Release(volID) +}