Skip to content

Latest commit

 

History

History
868 lines (654 loc) · 25 KB

File metadata and controls

868 lines (654 loc) · 25 KB

EnSync Node.js SDK

The official Node.js SDK for EnSync - a high-performance message-driven integration engine.

🔗 Resources


Installation

npm install ensync-client-sdk

Usage

Importing

Default (gRPC)

// Import the default engine class (gRPC)
const { EnSyncEngine } = require("ensync-client-sdk");

// Production - uses secure TLS on port 443 by default
const engine = new EnSyncEngine("grpcs://node.gms.ensync.cloud");
// Alternative Websocket: const engine = new EnSyncEngine("wss://node.gms.ensync.cloud");

// Development - uses insecure connection on port 50051 by default
// const engine = new EnSyncEngine("localhost");

// Create authenticated client
const client = await engine.createClient("your-app-key");

Both clients provide the same API for publishing and subscribing to messages.

gRPC Connection Options:

  • Production URLs automatically use secure TLS (port 443)
  • localhost automatically uses insecure connection (port 50051)
  • Explicit protocols: grpcs:// (secure) or grpc:// (insecure)
  • Custom ports: node.gms.ensync.cloud9090

API Reference

EnSyncEngine (gRPC - Default)

The main class that manages gRPC connections and client creation for the EnSync system. This is the default and recommended client for production use.

EnSyncWebSocketEngine (WebSocket - Alternative)

An alternative class that manages WebSocket connections and client creation for the EnSync system.

const engine = new EnSyncEngine(url, options);

Parameters

Parameter Type Required Description
url string Yes The URL of the EnSync server
options object No Configuration options

Options Object:

Option Type Default Description
heartbeatInterval number 30000 Heartbeat interval in ms (gRPC)
pingInterval number 30000 Ping interval in ms (WebSocket)
reconnectInterval number 5000 Reconnection interval in ms
maxReconnectAttempts number 5 Maximum reconnection attempts
enableLogging boolean false Enable/disable SDK console logs

Creating a Client

  • Initialize the engine with your server URL
  • Create a client with your app key
// Initialize the engine (gRPC with TLS)
const engine = new EnSyncEngine("grpcs://node.gms.ensync.cloud");

// Create a client
const client = await engine.createClient("your-app-key");

// With options (e.g., enable logging for debugging)
const engineVerbose = new EnSyncEngine("grpcs://node.gms.ensync.cloud", {
  enableLogging: true, // Enable SDK console logs for debugging
});
const clientVerbose = await engineVerbose.createClient("your-app-key");

Client Creation Parameters

Parameter Type Required Description
appKey string Yes Your EnSync application key
options object No Client configuration options

Options Object:

Option Type Default Description
appSecretKey string null Default key used to decrypt incoming messages

Client Returns

Returns a new EnSyncClient instance


Publishing Messages

// Basic publish
await client.publish(
  "company/service/action", // Message name
  ["recipientPublicKey"], // Recipients (base64 public keys)
  { data: "your payload" } // Message payload (must be valid JSON)
);

// With optional parameters
await client.publish(
  "company/service/action",
  ["recipientPublicKey"], // Base64 encoded public key
  { data: "your payload" },
  { persist: true, headers: { source: "order-system" } }
);

Publish Parameters

Parameter Type Required Description
messageName string Yes Name of the message (e.g., "company/service/action")
recipients string[] Yes Array of recipient public keys (base64 encoded)
payload object Yes Your message data (must be valid JSON)
metadata object No Control message persistence and add custom headers

Metadata Object:

Option Type Default Description
persist boolean false Whether to persist the message on the server
headers object {} Custom headers to include with the message

Subscribing to Messages

// Basic subscription
const subscription = await client.subscribe("company/service/action");

// Set up message handler
subscription.on(async (message) => {
  console.log("Received message:", message.payload);
  // Process the message
});

// With options
const subscription = await client.subscribe("company/service/action", {
  autoAck: false, // Manual acknowledgment
  appSecretKey: process.env.CUSTOM_DECRYPT_KEY, // Custom decryption key
});

Subscribe Parameters

Parameter Type Required Description
messageName string Yes Name of the message to subscribe to
options object No Subscription options

Options Object:

Option Type Default Description
autoAck boolean true Set to false for manual acknowledgment
appSecretKey string null Custom decryption key for this subscription (optional)

Subscription Methods

The subscription object provides several methods for message handling and flow control:

on(handler) - Add Message Handler
subscription.on(async (message) => {
  console.log("Received:", message.payload);
  // Process message...
});
ack(messageIdem, block) - Acknowledge Message

Manually acknowledge a message after successful processing (when autoAck: false).

await subscription.ack(message.idem, message.block);

Parameters:

  • messageIdem (string): Unique message identifier
  • block (string): Block identifier from the message

Returns: Promise - Acknowledgment confirmation


defer(messageIdem, delayMs, reason) - Defer Message

Defer a message for later redelivery. Useful for temporary failures or rate limiting.

// Defer for 5 seconds
await subscription.defer(message.idem, 5000, "Database temporarily unavailable");

// Defer for 1 minute
await subscription.defer(message.idem, 60000, "Rate limit exceeded");

Parameters:

  • messageIdem (string): Unique message identifier
  • delayMs (number): Delay in milliseconds before redelivery
  • reason (string): Optional reason for deferral

Returns: Promise - Defer response with delivery time

Use Cases:

  • Temporary service unavailability
  • Rate limiting
  • Retry logic with backoff
  • Resource contention

discard(messageIdem, reason) - Discard Message

Permanently discard a message that cannot be processed. The message will not be redelivered.

await subscription.discard(message.idem, "Invalid data format");
await subscription.discard(message.idem, "Duplicate message");

Parameters:

  • messageIdem (string): Unique message identifier
  • reason (string): Reason for discarding the message

Returns: Promise - Discard confirmation

Use Cases:

  • Invalid or malformed data
  • Duplicate messages
  • Messages that violate business rules
  • Poison messages that cause repeated failures

replay(messageIdem) - Replay Message

Request a specific message to be replayed/redelivered by its identifier.

const messageData = await subscription.replay("message-idem-123");
console.log("Replayed message:", messageData.payload);

Parameters:

  • messageIdem (string): Unique message identifier to replay

Returns: Promise - The replayed message data

Use Cases:

  • Retrieve historical messages
  • Reprocess specific messages
  • Audit and debugging
  • Data recovery

pause(reason) - Pause Message Delivery

Temporarily pause message delivery for this subscription. No new messages will be delivered until resumed.

await subscription.pause("Performing maintenance");
await subscription.pause("System overload");

Parameters:

  • reason (string): Optional reason for pausing

Returns: Promise - Pause confirmation

Use Cases:

  • Scheduled maintenance
  • System overload/backpressure
  • Batch processing
  • Graceful degradation

resume() - Resume Message Delivery

Resume message delivery after a pause.

await subscription.resume();

Returns: Promise - Resume confirmation


unsubscribe() - Stop Receiving Messages

Unsubscribe from the message stream and clean up resources.

await subscription.unsubscribe();

Returns: Promise


Complete Flow Control Example

const subscription = await client.subscribe("orders/process", {
  autoAck: false
});

let processingCount = 0;
const MAX_CONCURRENT = 10;

subscription.on(async (message) => {
  // Check if we're at capacity
  if (processingCount >= MAX_CONCURRENT) {
    await subscription.pause("Max concurrent processing reached");
    return;
  }

  processingCount++;

  try {
    // Process the order
    await processOrder(message.payload);
    
    // Acknowledge successful processing
    await subscription.ack(message.idem, message.block);
  } catch (error) {
    if (error.code === "TEMPORARY_ERROR") {
      // Defer for retry
      await subscription.defer(message.idem, 30000, error.message);
    } else if (error.code === "INVALID_DATA") {
      // Discard permanently
      await subscription.discard(message.idem, error.message);
    } else {
      // Unknown error, defer with longer delay
      await subscription.defer(message.idem, 120000, "Unknown error");
    }
  } finally {
    processingCount--;
    
    // Resume if we have capacity
    if (processingCount < MAX_CONCURRENT) {
      await subscription.resume();
    }
  }
});

Message Structure

When you receive a message through a subscription handler, it contains:

{
  messageName: "company/service/action",  // Message name
  idem: "abc123",                         // Unique message ID (use with ack/discard/replay)
  block: "456",                           // Block ID (use with ack)
  payload: { /* your data */ },           // Your decrypted JSON data
  timestamp: 1634567890123,               // Message timestamp (milliseconds)
  metadata: {                             // Optional metadata
    headers: { /* custom headers */ }
  },
  sender: "base64-encoded-public-key"    // Sender's public key
}

Closing Connections

// Close just this client
await client.close();

// Close client and engine (if you have no other clients)
await client.close(true);

Error Handling

The SDK throws EnSyncError for various error conditions. Always wrap your code in try-catch blocks to handle potential errors gracefully.

try {
  // Your EnSync code
} catch (e) {
  if (e instanceof EnSyncError) {
    console.error("EnSync Error:", e.message);
    // Handle specific error types
    if (e.name === "EnSyncConnectionError") {
      // Handle connection errors
    } else if (e.name === "EnSyncPublishError") {
      // Handle publishing errors
    } else if (e.name === "EnSyncSubscriptionError") {
      // Handle subscription errors
    }
  } else {
    console.error("Unexpected error:", e);
  }
}

Common error types:

Error Type Description
EnSyncConnectionError Connection or authentication issues
EnSyncPublishError Problems publishing events
EnSyncSubscriptionError Subscription-related errors
EnSyncGenericError Other errors

Complete Examples

Quick Start (gRPC - Default)

require("dotenv").config();
const { EnSyncEngine } = require("ensync-client-sdk");

async function quickStart() {
  try {
    // 1. Initialize engine and create client (gRPC by default)
    // Use grpc:// for insecure or grpcs:// for secure TLS connection
    const engine = new EnSyncEngine("grpc://localhost:50051");
    const client = await engine.createClient(process.env.ENSYNC_APP_KEY, {
      appSecretKey: process.env.ENSYNC_SECRET_KEY,
    });

    // 2. Publish an event
    await client.publish(
      "orders/status/updated",
      ["appId"], // The appId of the receiving party
      { orderId: "order-123", status: "completed" }
    );

    // 3. Subscribe to events
    const subscription = await client.subscribe("orders/status/updated");

    // 4. Handle incoming events
    subscription.on((event) => {
      console.log(`Received order update: ${event.payload.orderId} is ${event.payload.status}`);
      // Process event...
    });

    // 5. Clean up when done
    process.on("SIGINT", async () => {
      await subscription.unsubscribe();
      await client.close();
      process.exit(0);
    });
  } catch (error) {
    console.error("Error:", error.message);
  }
}

quickStart();

Note: This example uses environment variables for security. Create a .env file with:

ENSYNC_APP_KEY=your_app_key_here
ENSYNC_SECRET_KEY=your_secret_key_here

Quick Start (WebSocket)

require("dotenv").config();
const { EnSyncEngine } = require("ensync-client-sdk/websocket");

async function quickStart() {
  try {
    // 1. Initialize engine and create client (WebSocket)
    const engine = new EnSyncEngine("wss://node.gms.ensync.cloud");
    const client = await engine.createClient(process.env.ENSYNC_APP_KEY, {
      appSecretKey: process.env.ENSYNC_SECRET_KEY,
    });

    // 2. Publish an event
    await client.publish(
      "orders/status/updated",
      ["appId"], // The appId of the receiving party
      { orderId: "order-123", status: "completed" }
    );

    // 3. Subscribe to events
    const subscription = await client.subscribe("orders/status/updated");

    // 4. Handle incoming events
    subscription.on((event) => {
      console.log(`Received order update: ${event.payload.orderId} is ${event.payload.status}`);
      // Process event...
    });

    // 5. Clean up when done
    process.on("SIGINT", async () => {
      await subscription.unsubscribe();
      await client.close();
      process.exit(0);
    });
  } catch (error) {
    console.error("Error:", error.message);
  }
}

quickStart();

Publishing Example

// Create client
const engine = new EnSyncEngine("wss://node.gms.ensync.cloud");
const client = await engine.createClient(process.env.ENSYNC_APP_KEY);

// Basic publish
const result = await client.publish(
  "notifications/email/sent",
  ["appId"], // The appId of the receiving party
  { to: "user@example.com", subject: "Welcome!" }
);

// With metadata
const resultWithMetadata = await client.publish(
  "notifications/email/sent",
  ["appId"], // The appId of the receiving party
  { to: "user@example.com", subject: "Welcome!" },
  { headers: { source: "notification-service" } }
);

Subscribing Example

// Create client with decryption key
const client = await engine.createClient(process.env.ENSYNC_APP_KEY, {
  appSecretKey: process.env.ENSYNC_SECRET_KEY,
});

// Subscribe with manual acknowledgment
const subscription = await client.subscribe("payments/completed", { autoAck: false });

// Handle events
subscription.on(async (event) => {
  try {
    // Process the payment
    await updateOrderStatus(event.payload.orderId, "paid");

    // Get historical data if needed
    if (needsHistory(event.payload.orderId)) {
      const history = await subscription.replay(event.payload.previousEventId);
      console.log("Previous payment:", history);
    }

    // Acknowledge successful processing
    await subscription.ack(event.idem, event.block);
  } catch (error) {
    // Defer processing if temporary error
    if (isTemporaryError(error)) {
      await subscription.defer(event.idem, 60000, "Temporary processing error");
    } else {
      // Discard if permanent error
      await subscription.discard(event.idem, "Invalid payment data");
    }
  }
});

Best Practices

Connection Management

  • Store connection credentials securely using environment variables
  • Implement proper reconnection logic for production environments
  • Always close connections when they're no longer needed
// Using environment variables for sensitive keys
require("dotenv").config();

const engine = new EnSyncEngine(process.env.ENSYNC_URL);
const client = await engine.createClient(process.env.ENSYNC_APP_KEY);

// Implement proper error handling and reconnection
engine.on("disconnect", () => {
  console.log("Connection lost, will reconnect automatically");
});

// Close connections when done
process.on("SIGINT", async () => {
  await client.destroy(true);
  process.exit(0);
});

Event Design

  • Use hierarchical event names (e.g., domain/entity/action)
  • Keep payloads concise and well-structured
  • Consider versioning your event schemas
// Good event naming pattern
await client.publish("inventory/product/created", ["warehouse-service"], {
  productId: "prod-123",
  name: "Ergonomic Chair",
  sku: "ERG-CH-BLK",
  price: 299.99,
  createdAt: Date.now(),
});

Security Best Practices

  • Never hardcode app keys or secret keys
  • Use environment variables or secure key management solutions
  • Implement proper authentication and authorization
  • Consider encrypting sensitive payloads

Performance Optimization

  • Batch events when possible instead of sending many small messages
  • Consider message size and frequency in high-volume scenarios
  • Use appropriate TTL values for your use case
  • Implement proper error handling and retry logic

Subscription Control

The SDK provides methods to pause, continue, and replay events, which is useful for managing event processing flow.

What Pause and Continue Do

When you create a client using engine.createClient(), that client receives a unique clientId. This clientId (not the appKey) identifies your specific client instance on the EnSync server.

  • Pause: Temporarily stops the client from receiving new events from the server. The subscription remains active on the server, but events are not delivered to this specific client instance. Other clients with the same appKey but different clientId will continue receiving events normally.

  • Continue: Resumes event delivery to the paused client. Any events that occurred during the pause (depending on server settings and TTL) may be delivered once the subscription is continued.

Replaying Events

The replay command allows you to request a specific event to be sent again, even if it has already been processed. Unlike regular event handling which delivers events through the .on handler, the replay function returns the event data directly to your code. This is useful for:

  • Retrieving specific events for analysis or debugging
  • Accessing historical event data without setting up a handler
  • Examining event content without processing it
  • Getting event data synchronously in your code flow
// Request a specific event to be replayed - returns data directly
const eventData = await subscription.replay("event-idem-123");
console.log("Event data:", eventData);

// You can immediately work with the event data
processEventData(eventData);

The replay command returns the complete event object with its payload:

{
  eventName: "gms/ensync/third_party/payments/complete",
  idem: "event-idem-123",
  block: "81404",
  metadata: {
    headers: {},
    $internal: {
      replay_info: {
        isReplayed: true,
        replayTimestamp: 1758410511179,
        wasAcknowledged: false
      }
    }
  },
  payload: { /* payload data */ },
  loggedAt: 1757778462158,
  recipient: "RECIPIENT_PUBLIC_KEY_BASE64",
  isGroup: false
}

Direct Access vs Handler Processing:

Regular event subscription:

// Events come through the handler asynchronously
subscription.on((event) => {
  // Process event here
  console.log("Received event:", event);
});

Replay function:

// Get event data directly and synchronously
const event = await subscription.replay("event-idem-123");
console.log("Retrieved event:", event);

Deferring Events

The defer method allows you to postpone processing of an event for a specified period. This is useful when:

  • You need more time to prepare resources for processing
  • You want to implement a retry mechanism with increasing delays
  • You need to wait for another system to be ready
  • You want to implement rate limiting for event processing
// Defer an event for 5 seconds (5000ms)
const deferResult = await subscription.defer(
  "event-idem-123", // Event ID
  5000, // Delay in milliseconds
  "Waiting for resources to be available" // Optional reason
);
console.log("Defer result:", deferResult);

// Defer with minimum delay (immediate redelivery)
const immediateRedelivery = await subscription.defer("event-idem-123", 0);

The defer method returns an object with status information:

{
  status: "success",
  action: "deferred",
  eventIdem: "event-idem-123",
  delayMs: 5000,
  scheduledDelivery: 1757778467158, // timestamp when event will be redelivered
  timestamp: 1757778462158
}

Discarding Events

The discard method allows you to permanently reject an event without processing it. This is useful when:

  • The event contains invalid or corrupted data
  • The event is no longer relevant or has expired
  • The event was sent to the wrong recipient
  • You want to implement a filtering mechanism
// Discard an event permanently
const discardResult = await subscription.discard(
  "event-idem-123", // Event ID
  "Invalid data format" // Optional reason
);
console.log("Discard result:", discardResult);

The discard method returns an object with status information:

{
  status: "success",
  action: "discarded",
  eventIdem: "event-idem-123",
  timestamp: 1757778462158
}
// Create a subscription
const subscription = await client.subscribe("inventory/updates");

// Set up event handler
subscription.on(async (event) => {
  console.log(`Processing event: ${event.id}`);
  await processEvent(event);
});

// Pause the subscription when needed
// This will temporarily stop receiving events
await subscription.pause();
console.log("Subscription paused - no events will be received");

// Perform some operations while subscription is paused
await performMaintenance();

// Continue the subscription to resume receiving events
await subscription.continue();
console.log("Subscription continued - now receiving events again");

// Example: Implementing controlled processing with pause/continue
async function processInBatches(events) {
  // Pause subscription while processing a batch
  await subscription.pause();

  try {
    // Process events without receiving new ones
    for (const event of events) {
      await processEvent(event);
    }
  } catch (error) {
    console.error("Error processing batch:", error);
  } finally {
    // Always continue subscription when done
    await subscription.continue();
  }
}

Use cases for pause/continue:

  • Temporary maintenance or system updates
  • Rate limiting or throttling event processing
  • Implementing backpressure mechanisms
  • Batch processing of events

Implementation Details

  • Pause/continue operations are performed at the subscription level, not the client level
  • The server maintains the subscription state even when paused
  • Pausing affects only the specific subscription instance, not all subscriptions for the client
  • Events that arrive during a pause may be delivered when continued (depending on TTL settings)
  • The pause state is not persisted across client restarts or reconnections