Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 63 additions & 5 deletions driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,11 @@ func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest)
return nil, status.Error(codes.InvalidArgument, "DeleteVolume Volume ID must be provided")
}

if acquired := d.volumeLocks.TryAcquire(req.VolumeId); !acquired {
return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", req.VolumeId)
}
defer d.volumeLocks.Release(req.VolumeId)

ll := d.log.WithFields(logrus.Fields{
"volume_id": req.VolumeId,
"method": "delete_volume",
Expand Down Expand Up @@ -461,17 +466,58 @@ func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.Controlle
return nil, status.Error(codes.AlreadyExists, "read only Volumes are not supported")
}

if acquired := d.volumeLocks.TryAcquire(req.VolumeId); !acquired {
return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", req.VolumeId)
}
defer d.volumeLocks.Release(req.VolumeId)

ll := d.log.WithFields(logrus.Fields{
"volume_id": req.VolumeId,
"node_id": req.NodeId,
"method": "controller_publish_volume",
})
ll.Info("controller publish volume called")

// Check current attachment state before modifying it. This prevents
// silently moving a volume that is still attached to another node,
// which would cause a stale VolumeAttachment and Multi-Attach errors.
volume, err := d.cloudscaleClient.Volumes.Get(ctx, req.VolumeId)
if err != nil {
return nil, reraiseNotFound(err, ll, "fetch volume for publish")
}

if volume.ServerUUIDs != nil && len(*volume.ServerUUIDs) > 0 {
alreadyAttachedToRequestedNode := false
for _, serverUUID := range *volume.ServerUUIDs {
if serverUUID == req.NodeId {
alreadyAttachedToRequestedNode = true
break
}
}

if alreadyAttachedToRequestedNode {
ll.Info("volume is already attached to the requested node")
return &csi.ControllerPublishVolumeResponse{
PublishContext: map[string]string{
PublishInfoVolumeName: volume.Name,
LuksEncryptedAttribute: req.VolumeContext[LuksEncryptedAttribute],
LuksCipherAttribute: req.VolumeContext[LuksCipherAttribute],
LuksKeySizeAttribute: req.VolumeContext[LuksKeySizeAttribute],
},
}, nil
}

ll.WithField("current_server_uuids", *volume.ServerUUIDs).
Warn("volume is already attached to a different node")
return nil, status.Errorf(codes.FailedPrecondition,
"volume %s is already attached to server(s) %v, must be detached first",
req.VolumeId, *volume.ServerUUIDs)
}

attachRequest := &cloudscale.VolumeUpdateRequest{
ServerUUIDs: &[]string{req.NodeId},
}
err := d.cloudscaleClient.Volumes.Update(ctx, req.VolumeId, attachRequest)
err = d.cloudscaleClient.Volumes.Update(ctx, req.VolumeId, attachRequest)
if err != nil {
if maxVolumesPerServerErrorMessageRe.MatchString(err.Error()) {
return nil, status.Error(codes.ResourceExhausted, err.Error())
Expand All @@ -481,10 +527,6 @@ func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.Controlle
}

ll.Info("volume is attached")
volume, err := d.cloudscaleClient.Volumes.Get(ctx, req.VolumeId)
if err != nil {
return nil, reraiseNotFound(err, ll, "fetch volume")
}
return &csi.ControllerPublishVolumeResponse{
PublishContext: map[string]string{
PublishInfoVolumeName: volume.Name,
Expand All @@ -501,6 +543,11 @@ func (d *Driver) ControllerUnpublishVolume(ctx context.Context, req *csi.Control
return nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Volume ID must be provided")
}

if acquired := d.volumeLocks.TryAcquire(req.VolumeId); !acquired {
return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", req.VolumeId)
}
defer d.volumeLocks.Release(req.VolumeId)

ll := d.log.WithFields(logrus.Fields{
"volume_id": req.VolumeId,
"node_id": req.NodeId,
Expand Down Expand Up @@ -710,6 +757,11 @@ func (d *Driver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequ
return nil, status.Error(codes.InvalidArgument, "CreateSnapshotRequest Source Volume Id must be provided")
}

if acquired := d.volumeLocks.TryAcquire(req.SourceVolumeId); !acquired {
return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", req.SourceVolumeId)
}
defer d.volumeLocks.Release(req.SourceVolumeId)

ll := d.log.WithFields(logrus.Fields{
"source_volume_id": req.SourceVolumeId,
"name": req.Name,
Expand Down Expand Up @@ -941,6 +993,12 @@ func (d *Driver) ControllerExpandVolume(ctx context.Context, req *csi.Controller
if len(volID) == 0 {
return nil, status.Error(codes.InvalidArgument, "ControllerExpandVolume volume ID missing in request")
}

if acquired := d.volumeLocks.TryAcquire(volID); !acquired {
return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", volID)
}
defer d.volumeLocks.Release(volID)

volume, err := d.cloudscaleClient.Volumes.Get(ctx, volID)
if err != nil {
return nil, status.Errorf(codes.Internal, "ControllerExpandVolume could not retrieve existing volume: %v", err)
Expand Down
259 changes: 259 additions & 0 deletions driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1337,3 +1337,262 @@ func TestCreateVolumeFromSnapshot_Idempotent_NeedsExpansion(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, expandedSizeGiB, vol.SizeGB)
}

// TestControllerPublishVolume_RejectsWhenAttachedToDifferentNode tests that
// ControllerPublishVolume returns FailedPrecondition when the volume is
// already attached to a different node, preventing silent volume migration.
func TestControllerPublishVolume_RejectsWhenAttachedToDifferentNode(t *testing.T) {
serverA := "server-a-uuid"
serverB := "server-b-uuid"
initialServers := map[string]*cloudscale.Server{
serverA: {UUID: serverA},
serverB: {UUID: serverB},
}
cloudscaleClient := NewFakeClient(initialServers)

driver := &Driver{
endpoint: "unix:///tmp/csi-test.sock",
serverId: serverA,
zone: DefaultZone.Slug,
cloudscaleClient: cloudscaleClient,
mounter: &fakeMounter{mounted: map[string]string{}},
log: logrus.New().WithField("test_enabled", true),
volumeLocks: NewVolumeLocks(),
}

ctx := context.Background()
volumeID := createVolumeForTest(t, driver, "test-vol-multiattach")

// Attach volume to server A
_, err := driver.ControllerPublishVolume(ctx, &csi.ControllerPublishVolumeRequest{
VolumeId: volumeID,
NodeId: serverA,
VolumeCapability: &csi.VolumeCapability{
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
},
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{},
},
},
})
if err != nil {
t.Fatalf("Failed to publish volume to server A: %v", err)
}

// Try to attach the same volume to server B — should be rejected
_, err = driver.ControllerPublishVolume(ctx, &csi.ControllerPublishVolumeRequest{
VolumeId: volumeID,
NodeId: serverB,
VolumeCapability: &csi.VolumeCapability{
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
},
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{},
},
},
})
if err == nil {
t.Fatal("Expected FailedPrecondition error when publishing to different node, got nil")
}

st, ok := status.FromError(err)
if !ok {
t.Fatalf("Expected gRPC status error, got: %v", err)
}
if st.Code() != codes.FailedPrecondition {
t.Errorf("Expected codes.FailedPrecondition, got %v: %v", st.Code(), err)
}

// Verify the volume is still attached to server A (not silently moved)
vol, err := cloudscaleClient.Volumes.Get(ctx, volumeID)
if err != nil {
t.Fatalf("Failed to get volume: %v", err)
}
if len(*vol.ServerUUIDs) != 1 || (*vol.ServerUUIDs)[0] != serverA {
t.Errorf("Volume should still be attached to server A, got ServerUUIDs=%v", *vol.ServerUUIDs)
}
}

// TestControllerPublishVolume_IdempotentSameNode tests that calling
// ControllerPublishVolume for a volume already attached to the same node
// returns success without error.
func TestControllerPublishVolume_IdempotentSameNode(t *testing.T) {
serverA := "server-a-uuid"
initialServers := map[string]*cloudscale.Server{
serverA: {UUID: serverA},
}
cloudscaleClient := NewFakeClient(initialServers)

driver := &Driver{
endpoint: "unix:///tmp/csi-test.sock",
serverId: serverA,
zone: DefaultZone.Slug,
cloudscaleClient: cloudscaleClient,
mounter: &fakeMounter{mounted: map[string]string{}},
log: logrus.New().WithField("test_enabled", true),
volumeLocks: NewVolumeLocks(),
}

ctx := context.Background()
volumeID := createVolumeForTest(t, driver, "test-vol-idempotent")

publishReq := &csi.ControllerPublishVolumeRequest{
VolumeId: volumeID,
NodeId: serverA,
VolumeCapability: &csi.VolumeCapability{
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
},
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{},
},
},
VolumeContext: map[string]string{
LuksEncryptedAttribute: "false",
},
}

// First publish
resp1, err := driver.ControllerPublishVolume(ctx, publishReq)
if err != nil {
t.Fatalf("First publish failed: %v", err)
}

// Second publish to same node — should succeed (idempotent)
resp2, err := driver.ControllerPublishVolume(ctx, publishReq)
if err != nil {
t.Fatalf("Second publish (idempotent) failed: %v", err)
}

// Both responses should have the same publish context
if resp1.PublishContext[PublishInfoVolumeName] != resp2.PublishContext[PublishInfoVolumeName] {
t.Errorf("Publish context mismatch: %v vs %v", resp1.PublishContext, resp2.PublishContext)
}
}

// TestControllerPublishVolume_SucceedsWhenNotAttached tests that
// ControllerPublishVolume works normally when the volume is not attached.
func TestControllerPublishVolume_SucceedsWhenNotAttached(t *testing.T) {
serverA := "server-a-uuid"
initialServers := map[string]*cloudscale.Server{
serverA: {UUID: serverA},
}
cloudscaleClient := NewFakeClient(initialServers)

driver := &Driver{
endpoint: "unix:///tmp/csi-test.sock",
serverId: serverA,
zone: DefaultZone.Slug,
cloudscaleClient: cloudscaleClient,
mounter: &fakeMounter{mounted: map[string]string{}},
log: logrus.New().WithField("test_enabled", true),
volumeLocks: NewVolumeLocks(),
}

ctx := context.Background()
volumeID := createVolumeForTest(t, driver, "test-vol-attach")

resp, err := driver.ControllerPublishVolume(ctx, &csi.ControllerPublishVolumeRequest{
VolumeId: volumeID,
NodeId: serverA,
VolumeCapability: &csi.VolumeCapability{
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
},
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{},
},
},
})
if err != nil {
t.Fatalf("Publish failed: %v", err)
}

if resp.PublishContext[PublishInfoVolumeName] == "" {
t.Error("Expected non-empty volume name in publish context")
}

// Verify volume is attached to the server
vol, err := cloudscaleClient.Volumes.Get(ctx, volumeID)
if err != nil {
t.Fatalf("Failed to get volume: %v", err)
}
if len(*vol.ServerUUIDs) != 1 || (*vol.ServerUUIDs)[0] != serverA {
t.Errorf("Expected volume attached to server A, got ServerUUIDs=%v", *vol.ServerUUIDs)
}
}

// TestControllerOperations_VolumeLocks tests that concurrent controller
// operations on the same volume are properly serialized with volume locks.
func TestControllerOperations_VolumeLocks(t *testing.T) {
driver := createDriverForTest(t)
ctx := context.Background()
volumeID := createVolumeForTest(t, driver, "test-vol-locks")

// Pre-acquire the volume lock
if !driver.volumeLocks.TryAcquire(volumeID) {
t.Fatal("Failed to pre-acquire volume lock")
}

// ControllerPublishVolume should return Aborted
_, err := driver.ControllerPublishVolume(ctx, &csi.ControllerPublishVolumeRequest{
VolumeId: volumeID,
NodeId: "some-node",
VolumeCapability: &csi.VolumeCapability{
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
},
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{},
},
},
})
assertAbortedError(t, err, "ControllerPublishVolume")

// ControllerUnpublishVolume should return Aborted
_, err = driver.ControllerUnpublishVolume(ctx, &csi.ControllerUnpublishVolumeRequest{
VolumeId: volumeID,
NodeId: "some-node",
})
assertAbortedError(t, err, "ControllerUnpublishVolume")

// DeleteVolume should return Aborted
_, err = driver.DeleteVolume(ctx, &csi.DeleteVolumeRequest{
VolumeId: volumeID,
})
assertAbortedError(t, err, "DeleteVolume")

// ControllerExpandVolume should return Aborted
_, err = driver.ControllerExpandVolume(ctx, &csi.ControllerExpandVolumeRequest{
VolumeId: volumeID,
CapacityRange: &csi.CapacityRange{RequiredBytes: 10 * GB},
})
assertAbortedError(t, err, "ControllerExpandVolume")

// CreateSnapshot should return Aborted (locks on source volume ID)
_, err = driver.CreateSnapshot(ctx, &csi.CreateSnapshotRequest{
Name: "snap-locked",
SourceVolumeId: volumeID,
})
assertAbortedError(t, err, "CreateSnapshot")

driver.volumeLocks.Release(volumeID)
}

func assertAbortedError(t *testing.T, err error, opName string) {
t.Helper()
if err == nil {
t.Errorf("%s: expected Aborted error when volume is locked, got nil", opName)
return
}
st, ok := status.FromError(err)
if !ok {
t.Errorf("%s: expected gRPC status error, got: %v", opName, err)
return
}
if st.Code() != codes.Aborted {
t.Errorf("%s: expected codes.Aborted, got %v: %v", opName, st.Code(), err)
}
}
Loading