From f7ffaa4f8f39f419261db800f1ba78ab3dea5061 Mon Sep 17 00:00:00 2001 From: Michael Weibel Date: Fri, 27 Mar 2026 10:57:29 +0100 Subject: [PATCH] fix: prevent silent volume migration in ControllerPublishVolume ControllerPublishVolume previously overwrote ServerUUIDs unconditionally, silently moving an attached volume to a new node. When the subsequent ControllerUnpublishVolume for the old node found the volume no longer there, it returned success without detaching, leaving a stale VolumeAttachment that caused Multi-Attach deadlocks in production. Fix: fetch the volume before attaching. If it is attached to a different node return FailedPrecondition so the external-attacher waits for the old detach to complete first. If already attached to the requested node return idempotent success. Also add volume locks (TryAcquire/Release, already used on the node side) to DeleteVolume, ControllerPublishVolume, ControllerUnpublishVolume, CreateSnapshot, and ControllerExpandVolume to prevent TOCTOU races between concurrent mutating operations on the same volume. --- driver/controller.go | 68 ++++++++++- driver/driver_test.go | 259 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 322 insertions(+), 5 deletions(-) diff --git a/driver/controller.go b/driver/controller.go index b80dcaa9..8e220b95 100644 --- a/driver/controller.go +++ b/driver/controller.go @@ -398,6 +398,11 @@ func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) return nil, status.Error(codes.InvalidArgument, "DeleteVolume Volume ID must be provided") } + if acquired := d.volumeLocks.TryAcquire(req.VolumeId); !acquired { + return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", req.VolumeId) + } + defer d.volumeLocks.Release(req.VolumeId) + ll := d.log.WithFields(logrus.Fields{ "volume_id": req.VolumeId, "method": "delete_volume", @@ -461,6 +466,11 @@ func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.Controlle return nil, status.Error(codes.AlreadyExists, "read only Volumes are not supported") } + if acquired := d.volumeLocks.TryAcquire(req.VolumeId); !acquired { + return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", req.VolumeId) + } + defer d.volumeLocks.Release(req.VolumeId) + ll := d.log.WithFields(logrus.Fields{ "volume_id": req.VolumeId, "node_id": req.NodeId, @@ -468,10 +478,46 @@ func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.Controlle }) ll.Info("controller publish volume called") + // Check current attachment state before modifying it. This prevents + // silently moving a volume that is still attached to another node, + // which would cause a stale VolumeAttachment and Multi-Attach errors. + volume, err := d.cloudscaleClient.Volumes.Get(ctx, req.VolumeId) + if err != nil { + return nil, reraiseNotFound(err, ll, "fetch volume for publish") + } + + if volume.ServerUUIDs != nil && len(*volume.ServerUUIDs) > 0 { + alreadyAttachedToRequestedNode := false + for _, serverUUID := range *volume.ServerUUIDs { + if serverUUID == req.NodeId { + alreadyAttachedToRequestedNode = true + break + } + } + + if alreadyAttachedToRequestedNode { + ll.Info("volume is already attached to the requested node") + return &csi.ControllerPublishVolumeResponse{ + PublishContext: map[string]string{ + PublishInfoVolumeName: volume.Name, + LuksEncryptedAttribute: req.VolumeContext[LuksEncryptedAttribute], + LuksCipherAttribute: req.VolumeContext[LuksCipherAttribute], + LuksKeySizeAttribute: req.VolumeContext[LuksKeySizeAttribute], + }, + }, nil + } + + ll.WithField("current_server_uuids", *volume.ServerUUIDs). + Warn("volume is already attached to a different node") + return nil, status.Errorf(codes.FailedPrecondition, + "volume %s is already attached to server(s) %v, must be detached first", + req.VolumeId, *volume.ServerUUIDs) + } + attachRequest := &cloudscale.VolumeUpdateRequest{ ServerUUIDs: &[]string{req.NodeId}, } - err := d.cloudscaleClient.Volumes.Update(ctx, req.VolumeId, attachRequest) + err = d.cloudscaleClient.Volumes.Update(ctx, req.VolumeId, attachRequest) if err != nil { if maxVolumesPerServerErrorMessageRe.MatchString(err.Error()) { return nil, status.Error(codes.ResourceExhausted, err.Error()) @@ -481,10 +527,6 @@ func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.Controlle } ll.Info("volume is attached") - volume, err := d.cloudscaleClient.Volumes.Get(ctx, req.VolumeId) - if err != nil { - return nil, reraiseNotFound(err, ll, "fetch volume") - } return &csi.ControllerPublishVolumeResponse{ PublishContext: map[string]string{ PublishInfoVolumeName: volume.Name, @@ -501,6 +543,11 @@ func (d *Driver) ControllerUnpublishVolume(ctx context.Context, req *csi.Control return nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Volume ID must be provided") } + if acquired := d.volumeLocks.TryAcquire(req.VolumeId); !acquired { + return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", req.VolumeId) + } + defer d.volumeLocks.Release(req.VolumeId) + ll := d.log.WithFields(logrus.Fields{ "volume_id": req.VolumeId, "node_id": req.NodeId, @@ -710,6 +757,11 @@ func (d *Driver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequ return nil, status.Error(codes.InvalidArgument, "CreateSnapshotRequest Source Volume Id must be provided") } + if acquired := d.volumeLocks.TryAcquire(req.SourceVolumeId); !acquired { + return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", req.SourceVolumeId) + } + defer d.volumeLocks.Release(req.SourceVolumeId) + ll := d.log.WithFields(logrus.Fields{ "source_volume_id": req.SourceVolumeId, "name": req.Name, @@ -941,6 +993,12 @@ func (d *Driver) ControllerExpandVolume(ctx context.Context, req *csi.Controller if len(volID) == 0 { return nil, status.Error(codes.InvalidArgument, "ControllerExpandVolume volume ID missing in request") } + + if acquired := d.volumeLocks.TryAcquire(volID); !acquired { + return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", volID) + } + defer d.volumeLocks.Release(volID) + volume, err := d.cloudscaleClient.Volumes.Get(ctx, volID) if err != nil { return nil, status.Errorf(codes.Internal, "ControllerExpandVolume could not retrieve existing volume: %v", err) diff --git a/driver/driver_test.go b/driver/driver_test.go index 420640b4..e1af211c 100644 --- a/driver/driver_test.go +++ b/driver/driver_test.go @@ -1337,3 +1337,262 @@ func TestCreateVolumeFromSnapshot_Idempotent_NeedsExpansion(t *testing.T) { assert.NoError(t, err) assert.Equal(t, expandedSizeGiB, vol.SizeGB) } + +// TestControllerPublishVolume_RejectsWhenAttachedToDifferentNode tests that +// ControllerPublishVolume returns FailedPrecondition when the volume is +// already attached to a different node, preventing silent volume migration. +func TestControllerPublishVolume_RejectsWhenAttachedToDifferentNode(t *testing.T) { + serverA := "server-a-uuid" + serverB := "server-b-uuid" + initialServers := map[string]*cloudscale.Server{ + serverA: {UUID: serverA}, + serverB: {UUID: serverB}, + } + cloudscaleClient := NewFakeClient(initialServers) + + driver := &Driver{ + endpoint: "unix:///tmp/csi-test.sock", + serverId: serverA, + zone: DefaultZone.Slug, + cloudscaleClient: cloudscaleClient, + mounter: &fakeMounter{mounted: map[string]string{}}, + log: logrus.New().WithField("test_enabled", true), + volumeLocks: NewVolumeLocks(), + } + + ctx := context.Background() + volumeID := createVolumeForTest(t, driver, "test-vol-multiattach") + + // Attach volume to server A + _, err := driver.ControllerPublishVolume(ctx, &csi.ControllerPublishVolumeRequest{ + VolumeId: volumeID, + NodeId: serverA, + VolumeCapability: &csi.VolumeCapability{ + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + }, + }) + if err != nil { + t.Fatalf("Failed to publish volume to server A: %v", err) + } + + // Try to attach the same volume to server B — should be rejected + _, err = driver.ControllerPublishVolume(ctx, &csi.ControllerPublishVolumeRequest{ + VolumeId: volumeID, + NodeId: serverB, + VolumeCapability: &csi.VolumeCapability{ + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + }, + }) + if err == nil { + t.Fatal("Expected FailedPrecondition error when publishing to different node, got nil") + } + + st, ok := status.FromError(err) + if !ok { + t.Fatalf("Expected gRPC status error, got: %v", err) + } + if st.Code() != codes.FailedPrecondition { + t.Errorf("Expected codes.FailedPrecondition, got %v: %v", st.Code(), err) + } + + // Verify the volume is still attached to server A (not silently moved) + vol, err := cloudscaleClient.Volumes.Get(ctx, volumeID) + if err != nil { + t.Fatalf("Failed to get volume: %v", err) + } + if len(*vol.ServerUUIDs) != 1 || (*vol.ServerUUIDs)[0] != serverA { + t.Errorf("Volume should still be attached to server A, got ServerUUIDs=%v", *vol.ServerUUIDs) + } +} + +// TestControllerPublishVolume_IdempotentSameNode tests that calling +// ControllerPublishVolume for a volume already attached to the same node +// returns success without error. +func TestControllerPublishVolume_IdempotentSameNode(t *testing.T) { + serverA := "server-a-uuid" + initialServers := map[string]*cloudscale.Server{ + serverA: {UUID: serverA}, + } + cloudscaleClient := NewFakeClient(initialServers) + + driver := &Driver{ + endpoint: "unix:///tmp/csi-test.sock", + serverId: serverA, + zone: DefaultZone.Slug, + cloudscaleClient: cloudscaleClient, + mounter: &fakeMounter{mounted: map[string]string{}}, + log: logrus.New().WithField("test_enabled", true), + volumeLocks: NewVolumeLocks(), + } + + ctx := context.Background() + volumeID := createVolumeForTest(t, driver, "test-vol-idempotent") + + publishReq := &csi.ControllerPublishVolumeRequest{ + VolumeId: volumeID, + NodeId: serverA, + VolumeCapability: &csi.VolumeCapability{ + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + }, + VolumeContext: map[string]string{ + LuksEncryptedAttribute: "false", + }, + } + + // First publish + resp1, err := driver.ControllerPublishVolume(ctx, publishReq) + if err != nil { + t.Fatalf("First publish failed: %v", err) + } + + // Second publish to same node — should succeed (idempotent) + resp2, err := driver.ControllerPublishVolume(ctx, publishReq) + if err != nil { + t.Fatalf("Second publish (idempotent) failed: %v", err) + } + + // Both responses should have the same publish context + if resp1.PublishContext[PublishInfoVolumeName] != resp2.PublishContext[PublishInfoVolumeName] { + t.Errorf("Publish context mismatch: %v vs %v", resp1.PublishContext, resp2.PublishContext) + } +} + +// TestControllerPublishVolume_SucceedsWhenNotAttached tests that +// ControllerPublishVolume works normally when the volume is not attached. +func TestControllerPublishVolume_SucceedsWhenNotAttached(t *testing.T) { + serverA := "server-a-uuid" + initialServers := map[string]*cloudscale.Server{ + serverA: {UUID: serverA}, + } + cloudscaleClient := NewFakeClient(initialServers) + + driver := &Driver{ + endpoint: "unix:///tmp/csi-test.sock", + serverId: serverA, + zone: DefaultZone.Slug, + cloudscaleClient: cloudscaleClient, + mounter: &fakeMounter{mounted: map[string]string{}}, + log: logrus.New().WithField("test_enabled", true), + volumeLocks: NewVolumeLocks(), + } + + ctx := context.Background() + volumeID := createVolumeForTest(t, driver, "test-vol-attach") + + resp, err := driver.ControllerPublishVolume(ctx, &csi.ControllerPublishVolumeRequest{ + VolumeId: volumeID, + NodeId: serverA, + VolumeCapability: &csi.VolumeCapability{ + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + }, + }) + if err != nil { + t.Fatalf("Publish failed: %v", err) + } + + if resp.PublishContext[PublishInfoVolumeName] == "" { + t.Error("Expected non-empty volume name in publish context") + } + + // Verify volume is attached to the server + vol, err := cloudscaleClient.Volumes.Get(ctx, volumeID) + if err != nil { + t.Fatalf("Failed to get volume: %v", err) + } + if len(*vol.ServerUUIDs) != 1 || (*vol.ServerUUIDs)[0] != serverA { + t.Errorf("Expected volume attached to server A, got ServerUUIDs=%v", *vol.ServerUUIDs) + } +} + +// TestControllerOperations_VolumeLocks tests that concurrent controller +// operations on the same volume are properly serialized with volume locks. +func TestControllerOperations_VolumeLocks(t *testing.T) { + driver := createDriverForTest(t) + ctx := context.Background() + volumeID := createVolumeForTest(t, driver, "test-vol-locks") + + // Pre-acquire the volume lock + if !driver.volumeLocks.TryAcquire(volumeID) { + t.Fatal("Failed to pre-acquire volume lock") + } + + // ControllerPublishVolume should return Aborted + _, err := driver.ControllerPublishVolume(ctx, &csi.ControllerPublishVolumeRequest{ + VolumeId: volumeID, + NodeId: "some-node", + VolumeCapability: &csi.VolumeCapability{ + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + }, + }) + assertAbortedError(t, err, "ControllerPublishVolume") + + // ControllerUnpublishVolume should return Aborted + _, err = driver.ControllerUnpublishVolume(ctx, &csi.ControllerUnpublishVolumeRequest{ + VolumeId: volumeID, + NodeId: "some-node", + }) + assertAbortedError(t, err, "ControllerUnpublishVolume") + + // DeleteVolume should return Aborted + _, err = driver.DeleteVolume(ctx, &csi.DeleteVolumeRequest{ + VolumeId: volumeID, + }) + assertAbortedError(t, err, "DeleteVolume") + + // ControllerExpandVolume should return Aborted + _, err = driver.ControllerExpandVolume(ctx, &csi.ControllerExpandVolumeRequest{ + VolumeId: volumeID, + CapacityRange: &csi.CapacityRange{RequiredBytes: 10 * GB}, + }) + assertAbortedError(t, err, "ControllerExpandVolume") + + // CreateSnapshot should return Aborted (locks on source volume ID) + _, err = driver.CreateSnapshot(ctx, &csi.CreateSnapshotRequest{ + Name: "snap-locked", + SourceVolumeId: volumeID, + }) + assertAbortedError(t, err, "CreateSnapshot") + + driver.volumeLocks.Release(volumeID) +} + +func assertAbortedError(t *testing.T, err error, opName string) { + t.Helper() + if err == nil { + t.Errorf("%s: expected Aborted error when volume is locked, got nil", opName) + return + } + st, ok := status.FromError(err) + if !ok { + t.Errorf("%s: expected gRPC status error, got: %v", opName, err) + return + } + if st.Code() != codes.Aborted { + t.Errorf("%s: expected codes.Aborted, got %v: %v", opName, st.Code(), err) + } +}