Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions database/dynamodb/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package dynamodb

import (
"github.com/aws/aws-sdk-go-v2/aws/awserr"
awsdynamodb "github.com/aws/aws-sdk-go-v2/service/dynamodb"
)

// CheckConditionalCheckFailed maps a DynamoDB ConditionalCheckFailedException
// to the provided error. This is the DynamoDB equivalent of checking for a
// unique constraint violation.
func CheckConditionalCheckFailed(inErr, outErr error) error {
if inErr != nil {
if awsErr, ok := inErr.(awserr.Error); ok {
if awsErr.Code() == awsdynamodb.ErrCodeConditionalCheckFailedException {
return outErr
}
}
}
return inErr
}

// IsConditionalCheckFailed returns true if the error is a DynamoDB
// ConditionalCheckFailedException.
func IsConditionalCheckFailed(err error) bool {
if err == nil {
return false
}

if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == awsdynamodb.ErrCodeConditionalCheckFailedException {
return true
}
}

return false
}
65 changes: 65 additions & 0 deletions database/dynamodb/test/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package test

import (
"context"
"fmt"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/defaults"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/ory/dockertest/v3"
"github.com/pkg/errors"

"github.com/code-payments/ocp-server/retry"
"github.com/code-payments/ocp-server/retry/backoff"
)

const (
containerRepository = "amazon/dynamodb-local"
containerTag = "latest"
containerAutoKill = 120 // seconds

port = 8000
)

// StartDynamoDB starts a Docker container using the amazon/dynamodb-local image and returns a DynamoDB client for testing purposes.
func StartDynamoDB(pool *dockertest.Pool) (client *dynamodb.Client, closeFunc func(), err error) {
closeFunc = func() {}

resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: containerRepository,
Tag: containerTag,
Cmd: []string{"-jar", "DynamoDBLocal.jar", "-inMemory"},
})
if err != nil {
return nil, closeFunc, errors.Wrap(err, "failed to start resource")
}

resource.Expire(containerAutoKill)

endpoint := fmt.Sprintf("http://localhost:%s", resource.GetPort(fmt.Sprintf("%d/tcp", port)))

cfg := defaults.Config()
cfg.Region = "us-east-1"
cfg.EndpointResolver = aws.ResolveWithEndpointURL(endpoint)
cfg.Credentials = aws.NewStaticCredentialsProvider("dummy", "dummy", "")

client = dynamodb.New(cfg)

// Wait for the container to be ready by issuing a ListTables request.
_, err = retry.Retry(
func() error {
_, listErr := client.ListTablesRequest(&dynamodb.ListTablesInput{}).Send(context.Background())
return listErr
},
retry.Limit(50),
retry.Backoff(backoff.Constant(500*time.Millisecond), 500*time.Second),
)
if err != nil {
resource.Close()
return nil, closeFunc, errors.Wrap(err, "timed out waiting for dynamodb container to become available")
}

return client, func() { resource.Close() }, nil
}
25 changes: 0 additions & 25 deletions ocp/data/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync"
"time"

"github.com/google/uuid"
"github.com/jmoiron/sqlx"

"github.com/code-payments/ocp-server/cache"
Expand All @@ -27,7 +26,6 @@ import (
"github.com/code-payments/ocp-server/ocp/data/deposit"
"github.com/code-payments/ocp-server/ocp/data/fulfillment"
"github.com/code-payments/ocp-server/ocp/data/intent"
"github.com/code-payments/ocp-server/ocp/data/messaging"
"github.com/code-payments/ocp-server/ocp/data/nonce"
"github.com/code-payments/ocp-server/ocp/data/rendezvous"
"github.com/code-payments/ocp-server/ocp/data/swap"
Expand All @@ -45,7 +43,6 @@ import (
deposit_memory_client "github.com/code-payments/ocp-server/ocp/data/deposit/memory"
fulfillment_memory_client "github.com/code-payments/ocp-server/ocp/data/fulfillment/memory"
intent_memory_client "github.com/code-payments/ocp-server/ocp/data/intent/memory"
messaging_memory_client "github.com/code-payments/ocp-server/ocp/data/messaging/memory"
nonce_memory_client "github.com/code-payments/ocp-server/ocp/data/nonce/memory"
rendezvous_memory_client "github.com/code-payments/ocp-server/ocp/data/rendezvous/memory"
swap_memory_client "github.com/code-payments/ocp-server/ocp/data/swap/memory"
Expand All @@ -63,7 +60,6 @@ import (
deposit_postgres_client "github.com/code-payments/ocp-server/ocp/data/deposit/postgres"
fulfillment_postgres_client "github.com/code-payments/ocp-server/ocp/data/fulfillment/postgres"
intent_postgres_client "github.com/code-payments/ocp-server/ocp/data/intent/postgres"
messaging_postgres_client "github.com/code-payments/ocp-server/ocp/data/messaging/postgres"
nonce_postgres_client "github.com/code-payments/ocp-server/ocp/data/nonce/postgres"
rendezvous_postgres_client "github.com/code-payments/ocp-server/ocp/data/rendezvous/postgres"
swap_postgres_client "github.com/code-payments/ocp-server/ocp/data/swap/postgres"
Expand Down Expand Up @@ -182,12 +178,6 @@ type DatabaseData interface {
GetTransactedAmountForAntiMoneyLaundering(ctx context.Context, owner string, since time.Time) (uint64, float64, error)
GetUsdCostBasis(ctx context.Context, owner string, mint string) (float64, error)

// Messaging
// --------------------------------------------------------------------------------
CreateMessage(ctx context.Context, record *messaging.Record) error
GetMessages(ctx context.Context, account string) ([]*messaging.Record, error)
DeleteMessage(ctx context.Context, account string, messageID uuid.UUID) error

// Nonces
// --------------------------------------------------------------------------------
GetNonce(ctx context.Context, address string) (*nonce.Record, error)
Expand Down Expand Up @@ -276,7 +266,6 @@ type DatabaseProvider struct {
deposits deposit.Store
fulfillments fulfillment.Store
intents intent.Store
messages messaging.Store
nonces nonce.Store
rendezvous rendezvous.Store
swaps swap.Store
Expand Down Expand Up @@ -322,7 +311,6 @@ func NewDatabaseProvider(dbConfig *pg.Config) (DatabaseData, error) {
deposits: deposit_postgres_client.New(db),
fulfillments: fulfillment_postgres_client.New(db),
intents: intent_postgres_client.New(db),
messages: messaging_postgres_client.New(db),
nonces: nonce_postgres_client.New(db),
rendezvous: rendezvous_postgres_client.New(db),
swaps: swap_postgres_client.New(db),
Expand All @@ -349,7 +337,6 @@ func NewTestDatabaseProvider() DatabaseData {
deposits: deposit_memory_client.New(),
fulfillments: fulfillment_memory_client.New(),
intents: intent_memory_client.New(),
messages: messaging_memory_client.New(),
nonces: nonce_memory_client.New(),
rendezvous: rendezvous_memory_client.New(),
swaps: swap_memory_client.New(),
Expand Down Expand Up @@ -677,18 +664,6 @@ func (dp *DatabaseProvider) GetUsdCostBasis(ctx context.Context, owner string, m
return dp.intents.GetUsdCostBasis(ctx, owner, mint)
}

// Messaging
// --------------------------------------------------------------------------------
func (dp *DatabaseProvider) CreateMessage(ctx context.Context, record *messaging.Record) error {
return dp.messages.Insert(ctx, record)
}
func (dp *DatabaseProvider) GetMessages(ctx context.Context, account string) ([]*messaging.Record, error) {
return dp.messages.Get(ctx, account)
}
func (dp *DatabaseProvider) DeleteMessage(ctx context.Context, account string, messageID uuid.UUID) error {
return dp.messages.Delete(ctx, account, messageID)
}

// Nonces
// --------------------------------------------------------------------------------
func (dp *DatabaseProvider) GetNonce(ctx context.Context, address string) (*nonce.Record, error) {
Expand Down
128 changes: 128 additions & 0 deletions ocp/data/messaging/dynamodb/model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package dynamodb

import (
"context"
"fmt"
"strconv"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/google/uuid"
"github.com/pkg/errors"

dynamodbutil "github.com/code-payments/ocp-server/database/dynamodb"

"github.com/code-payments/ocp-server/ocp/data/messaging"
)

type model struct {
Account string
MessageID string
Message []byte
CreatedAt time.Time
}

func toModel(record *messaging.Record) (*model, error) {
if err := record.Validate(); err != nil {
return nil, err
}

return &model{
Account: record.Account,
MessageID: record.MessageID.String(),
Message: record.Message,
// The only time we call toModel is on create, so it's fine to default
// to UTC now.
CreatedAt: time.Now().UTC(),
}, nil
}

func fromModel(m *model) (*messaging.Record, error) {
parsedMessageID, err := uuid.Parse(m.MessageID)
if err != nil {
return nil, errors.Wrap(err, "failure parsing message id")
}

return &messaging.Record{
Account: m.Account,
MessageID: parsedMessageID,
Message: m.Message,
}, nil
}

func (m *model) dbPut(ctx context.Context, client *dynamodb.Client, tableName string) error {
req := client.PutItemRequest(&dynamodb.PutItemInput{
TableName: aws.String(tableName),
Item: map[string]dynamodb.AttributeValue{
"account": {S: aws.String(m.Account)},
"message_id": {S: aws.String(m.MessageID)},
"message": {B: m.Message},
"created_at": {N: aws.String(fmt.Sprintf("%d", m.CreatedAt.Unix()))},
},
ConditionExpression: aws.String("attribute_not_exists(account) AND attribute_not_exists(message_id)"),
})

_, err := req.Send(ctx)
if err != nil {
return dynamodbutil.CheckConditionalCheckFailed(err, messaging.ErrDuplicateMessageID)
}

return nil
}

func dbGetAllForAccount(ctx context.Context, client *dynamodb.Client, tableName string, account string) ([]*model, error) {
req := client.QueryRequest(&dynamodb.QueryInput{
TableName: aws.String(tableName),
KeyConditionExpression: aws.String("account = :account"),
ExpressionAttributeValues: map[string]dynamodb.AttributeValue{
":account": {S: aws.String(account)},
},
})

resp, err := req.Send(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to query messages")
}

models := make([]*model, len(resp.Items))
for i, item := range resp.Items {
m := &model{}
if v, ok := item["account"]; ok && v.S != nil {
m.Account = *v.S
}
if v, ok := item["message_id"]; ok && v.S != nil {
m.MessageID = *v.S
}
if v, ok := item["message"]; ok {
m.Message = v.B
}
if v, ok := item["created_at"]; ok && v.N != nil {
seconds, err := strconv.ParseInt(*v.N, 10, 64)
if err != nil {
return nil, errors.Wrap(err, "failed to parse created_at")
}
m.CreatedAt = time.Unix(seconds, 0).UTC()
}
models[i] = m
}

return models, nil
}

func dbDelete(ctx context.Context, client *dynamodb.Client, tableName string, account, messageID string) error {
req := client.DeleteItemRequest(&dynamodb.DeleteItemInput{
TableName: aws.String(tableName),
Key: map[string]dynamodb.AttributeValue{
"account": {S: aws.String(account)},
"message_id": {S: aws.String(messageID)},
},
})

_, err := req.Send(ctx)
if err != nil {
return errors.Wrap(err, "failed to delete message")
}

return nil
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
package postgres
package dynamodb

import (
"context"
"database/sql"

"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"

"github.com/code-payments/ocp-server/ocp/data/messaging"
)

// todo: This doesn't support TTL expiries, which is fine for now. We can
// manually delete old entries while in an invite-only testing phase.
type store struct {
db *sqlx.DB
client *dynamodb.Client
tableName string
}

// New returns a postgres backed messaging.Store.
func New(db *sql.DB) messaging.Store {
// New returns a DynamoDB backed messaging.Store.
func New(client *dynamodb.Client, tableName string) messaging.Store {
return &store{
db: sqlx.NewDb(db, "pgx"),
client: client,
tableName: tableName,
}
}

Expand All @@ -30,27 +29,25 @@ func (s *store) Insert(ctx context.Context, record *messaging.Record) error {
return err
}

return model.dbSave(ctx, s.db)
return model.dbPut(ctx, s.client, s.tableName)
}

// Delete implements messaging.Store.Delete.
func (s *store) Delete(ctx context.Context, account string, messageID uuid.UUID) error {
return dbDelete(ctx, s.db, account, messageID.String())
return dbDelete(ctx, s.client, s.tableName, account, messageID.String())
}

// Get implements messaging.Store.Get.
func (s *store) Get(ctx context.Context, account string) ([]*messaging.Record, error) {
models, err := dbGetAllForAccount(ctx, s.db, account)
models, err := dbGetAllForAccount(ctx, s.client, s.tableName, account)
if err != nil {
return nil, err
}

records := make([]*messaging.Record, len(models))
for i, model := range models {
record, err := fromModel(model)
for i, m := range models {
record, err := fromModel(m)
if err != nil {
// todo(safety): this is the equivalent QoS brick case, although should be less problematic.
// we could have a valve to ignore, and also to delete
return nil, err
}
records[i] = record
Expand Down
Loading
Loading