Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 58 additions & 5 deletions driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest)
return nil, status.Error(codes.InvalidArgument, "DeleteVolume Volume ID must be provided")
}

if acquired := d.volumeLocks.TryAcquire(req.VolumeId); !acquired {
return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", req.VolumeId)
}
defer d.volumeLocks.Release(req.VolumeId)

ll := d.log.WithFields(logrus.Fields{
"volume_id": req.VolumeId,
"method": "delete_volume",
Expand Down Expand Up @@ -248,17 +253,58 @@ func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.Controlle
return nil, status.Error(codes.AlreadyExists, "read only Volumes are not supported")
}

if acquired := d.volumeLocks.TryAcquire(req.VolumeId); !acquired {
return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", req.VolumeId)
}
defer d.volumeLocks.Release(req.VolumeId)

ll := d.log.WithFields(logrus.Fields{
"volume_id": req.VolumeId,
"node_id": req.NodeId,
"method": "controller_publish_volume",
})
ll.Info("controller publish volume called")

// Check current attachment state before modifying it. This prevents
// silently moving a volume that is still attached to another node,
// which would cause a stale VolumeAttachment and Multi-Attach errors.
volume, err := d.cloudscaleClient.Volumes.Get(ctx, req.VolumeId)
if err != nil {
return nil, reraiseNotFound(err, ll, "fetch volume for publish")
}

if volume.ServerUUIDs != nil && len(*volume.ServerUUIDs) > 0 {
alreadyAttachedToRequestedNode := false
for _, serverUUID := range *volume.ServerUUIDs {
if serverUUID == req.NodeId {
alreadyAttachedToRequestedNode = true
break
}
}

if alreadyAttachedToRequestedNode {
ll.Info("volume is already attached to the requested node")
return &csi.ControllerPublishVolumeResponse{
PublishContext: map[string]string{
PublishInfoVolumeName: volume.Name,
LuksEncryptedAttribute: req.VolumeContext[LuksEncryptedAttribute],
LuksCipherAttribute: req.VolumeContext[LuksCipherAttribute],
LuksKeySizeAttribute: req.VolumeContext[LuksKeySizeAttribute],
},
}, nil
}

ll.WithField("current_server_uuids", *volume.ServerUUIDs).
Warn("volume is already attached to a different node")
return nil, status.Errorf(codes.FailedPrecondition,
"volume %s is already attached to server(s) %v, must be detached first",
req.VolumeId, *volume.ServerUUIDs)
}

attachRequest := &cloudscale.VolumeRequest{
ServerUUIDs: &[]string{req.NodeId},
}
err := d.cloudscaleClient.Volumes.Update(ctx, req.VolumeId, attachRequest)
err = d.cloudscaleClient.Volumes.Update(ctx, req.VolumeId, attachRequest)
if err != nil {
if maxVolumesPerServerErrorMessageRe.MatchString(err.Error()) {
return nil, status.Error(codes.ResourceExhausted, err.Error())
Expand All @@ -268,10 +314,6 @@ func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.Controlle
}

ll.Info("volume is attached")
volume, err := d.cloudscaleClient.Volumes.Get(ctx, req.VolumeId)
if err != nil {
return nil, reraiseNotFound(err, ll, "fetch volume")
}
return &csi.ControllerPublishVolumeResponse{
PublishContext: map[string]string{
PublishInfoVolumeName: volume.Name,
Expand All @@ -288,6 +330,11 @@ func (d *Driver) ControllerUnpublishVolume(ctx context.Context, req *csi.Control
return nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Volume ID must be provided")
}

if acquired := d.volumeLocks.TryAcquire(req.VolumeId); !acquired {
return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", req.VolumeId)
}
defer d.volumeLocks.Release(req.VolumeId)

ll := d.log.WithFields(logrus.Fields{
"volume_id": req.VolumeId,
"node_id": req.NodeId,
Expand Down Expand Up @@ -511,6 +558,12 @@ func (d *Driver) ControllerExpandVolume(ctx context.Context, req *csi.Controller
if len(volID) == 0 {
return nil, status.Error(codes.InvalidArgument, "ControllerExpandVolume volume ID missing in request")
}

if acquired := d.volumeLocks.TryAcquire(volID); !acquired {
return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", volID)
}
defer d.volumeLocks.Release(volID)

volume, err := d.cloudscaleClient.Volumes.Get(ctx, volID)
if err != nil {
return nil, status.Errorf(codes.Internal, "ControllerExpandVolume could not retrieve existing volume: %v", err)
Expand Down
256 changes: 256 additions & 0 deletions driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,3 +706,259 @@ func TestNodeOperations_CrossOperationLocking(t *testing.T) {
execStage <- struct{}{}
<-respStage
}

func assertAbortedError(t *testing.T, err error, opName string) {
t.Helper()
if err == nil {
t.Errorf("%s: expected Aborted error when volume is locked, got nil", opName)
return
}
st, ok := status.FromError(err)
if !ok {
t.Errorf("%s: expected gRPC status error, got: %v", opName, err)
return
}
if st.Code() != codes.Aborted {
t.Errorf("%s: expected codes.Aborted, got %v: %v", opName, st.Code(), err)
}
}

// TestControllerPublishVolume_RejectsWhenAttachedToDifferentNode verifies that
// ControllerPublishVolume returns FailedPrecondition when the volume is already
// attached to a different node, preventing silent volume migration.
func TestControllerPublishVolume_RejectsWhenAttachedToDifferentNode(t *testing.T) {
serverA := "server-a-uuid"
serverB := "server-b-uuid"
initialServers := map[string]*cloudscale.Server{
serverA: {UUID: serverA},
serverB: {UUID: serverB},
}
cloudscaleClient := NewFakeClient(initialServers)
driver := &Driver{
endpoint: "unix:///tmp/csi-test.sock",
serverId: serverA,
zone: DefaultZone.Slug,
cloudscaleClient: cloudscaleClient,
mounter: &fakeMounter{mounted: map[string]string{}},
log: logrus.New().WithField("test_enabled", true),
volumeLocks: NewVolumeLocks(),
}

ctx := context.Background()
createResp, err := driver.CreateVolume(ctx, &csi.CreateVolumeRequest{
Name: "test-vol-multiattach",
CapacityRange: &csi.CapacityRange{RequiredBytes: 1 * GB},
VolumeCapabilities: []*csi.VolumeCapability{{
AccessMode: &csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER},
AccessType: &csi.VolumeCapability_Mount{Mount: &csi.VolumeCapability_MountVolume{}},
}},
Parameters: map[string]string{"type": "ssd"},
})
if err != nil {
t.Fatalf("CreateVolume failed: %v", err)
}
volID := createResp.Volume.VolumeId

// Attach to server A
_, err = driver.ControllerPublishVolume(ctx, &csi.ControllerPublishVolumeRequest{
VolumeId: volID,
NodeId: serverA,
VolumeCapability: &csi.VolumeCapability{
AccessMode: &csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER},
AccessType: &csi.VolumeCapability_Mount{Mount: &csi.VolumeCapability_MountVolume{}},
},
})
if err != nil {
t.Fatalf("Failed to publish volume to server A: %v", err)
}

// Try to attach the same volume to server B — should be rejected
_, err = driver.ControllerPublishVolume(ctx, &csi.ControllerPublishVolumeRequest{
VolumeId: volID,
NodeId: serverB,
VolumeCapability: &csi.VolumeCapability{
AccessMode: &csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER},
AccessType: &csi.VolumeCapability_Mount{Mount: &csi.VolumeCapability_MountVolume{}},
},
})
if err == nil {
t.Fatal("Expected FailedPrecondition error when publishing to different node, got nil")
}
st, ok := status.FromError(err)
if !ok {
t.Fatalf("Expected gRPC status error, got: %v", err)
}
if st.Code() != codes.FailedPrecondition {
t.Errorf("Expected codes.FailedPrecondition, got %v: %v", st.Code(), err)
}
}

// TestControllerPublishVolume_IdempotentSameNode tests that calling
// ControllerPublishVolume for a volume already attached to the same node
// returns success without error.
func TestControllerPublishVolume_IdempotentSameNode(t *testing.T) {
serverA := "server-a-uuid"
initialServers := map[string]*cloudscale.Server{
serverA: {UUID: serverA},
}
cloudscaleClient := NewFakeClient(initialServers)
driver := &Driver{
endpoint: "unix:///tmp/csi-test.sock",
serverId: serverA,
zone: DefaultZone.Slug,
cloudscaleClient: cloudscaleClient,
mounter: &fakeMounter{mounted: map[string]string{}},
log: logrus.New().WithField("test_enabled", true),
volumeLocks: NewVolumeLocks(),
}

ctx := context.Background()
createResp, err := driver.CreateVolume(ctx, &csi.CreateVolumeRequest{
Name: "test-vol-idempotent",
CapacityRange: &csi.CapacityRange{RequiredBytes: 1 * GB},
VolumeCapabilities: []*csi.VolumeCapability{{
AccessMode: &csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER},
AccessType: &csi.VolumeCapability_Mount{Mount: &csi.VolumeCapability_MountVolume{}},
}},
Parameters: map[string]string{"type": "ssd"},
})
if err != nil {
t.Fatalf("CreateVolume failed: %v", err)
}

publishReq := &csi.ControllerPublishVolumeRequest{
VolumeId: createResp.Volume.VolumeId,
NodeId: serverA,
VolumeCapability: &csi.VolumeCapability{
AccessMode: &csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER},
AccessType: &csi.VolumeCapability_Mount{Mount: &csi.VolumeCapability_MountVolume{}},
},
VolumeContext: map[string]string{LuksEncryptedAttribute: "false"},
}

resp1, err := driver.ControllerPublishVolume(ctx, publishReq)
if err != nil {
t.Fatalf("First publish failed: %v", err)
}
resp2, err := driver.ControllerPublishVolume(ctx, publishReq)
if err != nil {
t.Fatalf("Second publish (idempotent) failed: %v", err)
}
if resp1.PublishContext[PublishInfoVolumeName] != resp2.PublishContext[PublishInfoVolumeName] {
t.Errorf("Publish context mismatch: %v vs %v", resp1.PublishContext, resp2.PublishContext)
}
}

// TestControllerPublishVolume_SucceedsWhenNotAttached tests that
// ControllerPublishVolume works normally when the volume is not attached.
func TestControllerPublishVolume_SucceedsWhenNotAttached(t *testing.T) {
serverA := "server-a-uuid"
initialServers := map[string]*cloudscale.Server{
serverA: {UUID: serverA},
}
cloudscaleClient := NewFakeClient(initialServers)
driver := &Driver{
endpoint: "unix:///tmp/csi-test.sock",
serverId: serverA,
zone: DefaultZone.Slug,
cloudscaleClient: cloudscaleClient,
mounter: &fakeMounter{mounted: map[string]string{}},
log: logrus.New().WithField("test_enabled", true),
volumeLocks: NewVolumeLocks(),
}

ctx := context.Background()
createResp, err := driver.CreateVolume(ctx, &csi.CreateVolumeRequest{
Name: "test-vol-attach",
CapacityRange: &csi.CapacityRange{RequiredBytes: 1 * GB},
VolumeCapabilities: []*csi.VolumeCapability{{
AccessMode: &csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER},
AccessType: &csi.VolumeCapability_Mount{Mount: &csi.VolumeCapability_MountVolume{}},
}},
Parameters: map[string]string{"type": "ssd"},
})
if err != nil {
t.Fatalf("CreateVolume failed: %v", err)
}

resp, err := driver.ControllerPublishVolume(ctx, &csi.ControllerPublishVolumeRequest{
VolumeId: createResp.Volume.VolumeId,
NodeId: serverA,
VolumeCapability: &csi.VolumeCapability{
AccessMode: &csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER},
AccessType: &csi.VolumeCapability_Mount{Mount: &csi.VolumeCapability_MountVolume{}},
},
})
if err != nil {
t.Fatalf("Publish failed: %v", err)
}
if resp.PublishContext[PublishInfoVolumeName] == "" {
t.Error("Expected non-empty volume name in publish context")
}
}

// TestControllerOperations_VolumeLocks tests that concurrent controller
// operations on the same volume are properly serialized with volume locks.
func TestControllerOperations_VolumeLocks(t *testing.T) {
initialServers := map[string]*cloudscale.Server{}
cloudscaleClient := NewFakeClient(initialServers)
driver := &Driver{
mounter: &fakeMounter{},
log: logrus.New().WithField("test_enabled", true),
cloudscaleClient: cloudscaleClient,
volumeLocks: NewVolumeLocks(),
}

ctx := context.Background()
createResp, err := driver.CreateVolume(ctx, &csi.CreateVolumeRequest{
Name: "test-vol-locks",
CapacityRange: &csi.CapacityRange{RequiredBytes: 1 * GB},
VolumeCapabilities: []*csi.VolumeCapability{{
AccessMode: &csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER},
AccessType: &csi.VolumeCapability_Mount{Mount: &csi.VolumeCapability_MountVolume{}},
}},
Parameters: map[string]string{"type": "ssd"},
})
if err != nil {
t.Fatalf("CreateVolume failed: %v", err)
}
volID := createResp.Volume.VolumeId

// Pre-acquire the volume lock
if !driver.volumeLocks.TryAcquire(volID) {
t.Fatal("Failed to pre-acquire volume lock")
}

// ControllerPublishVolume should return Aborted
_, err = driver.ControllerPublishVolume(ctx, &csi.ControllerPublishVolumeRequest{
VolumeId: volID,
NodeId: "some-node",
VolumeCapability: &csi.VolumeCapability{
AccessMode: &csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER},
AccessType: &csi.VolumeCapability_Mount{Mount: &csi.VolumeCapability_MountVolume{}},
},
})
assertAbortedError(t, err, "ControllerPublishVolume")

// ControllerUnpublishVolume should return Aborted
_, err = driver.ControllerUnpublishVolume(ctx, &csi.ControllerUnpublishVolumeRequest{
VolumeId: volID,
NodeId: "some-node",
})
assertAbortedError(t, err, "ControllerUnpublishVolume")

// DeleteVolume should return Aborted
_, err = driver.DeleteVolume(ctx, &csi.DeleteVolumeRequest{
VolumeId: volID,
})
assertAbortedError(t, err, "DeleteVolume")

// ControllerExpandVolume should return Aborted
_, err = driver.ControllerExpandVolume(ctx, &csi.ControllerExpandVolumeRequest{
VolumeId: volID,
CapacityRange: &csi.CapacityRange{RequiredBytes: 10 * GB},
})
assertAbortedError(t, err, "ControllerExpandVolume")

driver.volumeLocks.Release(volID)
}
Loading