Skip to content

haraka/message-stream

Repository files navigation

CI Test Status Qlty Test Coverage

haraka-message-stream

A dual-mode message buffer for SMTP: accepts lines as they arrive from the wire, spools to disk when a size threshold is exceeded, and replays the message to one or more downstream consumers via a standard stream pipeline.

Used By

  • Haraka/transaction.js
  • haraka/test-fixtures/transaction

Construction

const MessageStream = require('haraka-message-stream')

const ms = new MessageStream(cfg, uuid, headers)
Parameter Type Description
cfg object Haraka config object. Reads cfg.main.spool_after (bytes) and cfg.main.spool_dir (path, default /tmp).
uuid string Required. Used as the spool filename base (<spool_dir>/<uuid>.eml).
headers string[] Optional. Pre-parsed header lines. When supplied they are emitted first and the raw header block in stored data is skipped.

Writing

Feed lines in as they arrive. Call add_line_end() when the DATA terminator (.) is received.

ms.add_line('From: sender@example.com\r\n')
ms.add_line('To: rcpt@example.com\r\n')
ms.add_line('\r\n') // end-of-headers blank line
ms.add_line('Hello, world.\r\n')

ms.add_line_end(() => {
  // writing is complete; safe to start reading
})

add_line_end(cb) is optional — the callback fires once all data has been flushed to memory or disk.

Reading

pipe(destination, options) — legacy event-driven style

Replays the stored message to any writable stream. Can be called sequentially (not concurrently).

const { createWriteStream } = require('fs')

ms.pipe(createWriteStream('/tmp/out.eml'))

Pipe options

Option Type Default Description
line_endings '\r\n' | '\n' '\r\n' Output line endings. Use '\n' to strip \r for local processing.
dot_stuffed boolean true When true, removes SMTP dot-stuffing (..line.line). Set to false when the stored data is already unstuffed.
ending_dot boolean false Append .\r\n (or .\n) after the last body line — required when forwarding over SMTP.
clamd_style boolean false Prefix each chunk with a 4-byte big-endian length; append a 4-byte zero terminator. Used with ClamAV's INSTREAM protocol.
skip_headers boolean false Omit the header block from output (emit body only).
buffer_size number 65536 Internal read chunk size in bytes.
// Forward message over SMTP — unstuff dots, add terminating dot
ms.pipe(socket, { dot_stuffed: true, ending_dot: true })

// Local virus scan — bare LF, no dot manipulation, clamd framing
ms.pipe(clamdSocket, {
  line_endings: '\n',
  dot_stuffed: false,
  clamd_style: true,
})

// Body-only, no \r, sequential second pass
ms.pipe(bodyParser, { skip_headers: true, line_endings: '\n' })

get_data([options,] callback) — collect into a Buffer

ms.get_data((buf) => {
  console.log(buf.toString())
})

// with pipe options
ms.get_data({ line_endings: '\n', dot_stuffed: true }, (buf) => {
  process(buf)
})

for await...of — modern async iterator

The stream implements Symbol.asyncIterator, so you can consume it with a for await loop. All pipe options are supported.

// Default: CRLF, dot_stuffed: true
for await (const chunk of ms) {
  process(chunk)
}

// With options
for await (const chunk of ms[Symbol.asyncIterator]({ line_endings: '\n' })) {
  process(chunk)
}

Spooling

When total_buffered exceeds cfg.main.spool_after, the stream transparently spools to <spool_dir>/<uuid>.eml. The consumer API is identical in both cases. The spool file is not removed automatically — call ms.destroy() when done.

const cfg = {
  main: {
    spool_after: 1024 * 1024, // spool messages larger than 1 MB
    spool_dir: '/var/spool/haraka',
  },
}

const ms = new MessageStream(cfg, transaction.uuid)
// ... write lines ...
ms.add_line_end(async () => {
  for await (const chunk of ms) {
    await forward(chunk)
  }
  ms.destroy() // close fd and remove spool file
})

Flow control

ms.pause() // suspend the active pipe's source readable
ms.resume() // resume it
ms.destroy() // close the spool file descriptor and delete the file

Pipeline internals (2.x)

The 2.x refactor separated concerns into two composable Transform stages:

stored data
    │
    ▼
HeaderSkipper          (strips raw header block when constructor headers are used)
    │
    ▼
LineTransformer        (dot-unstuffing · CRLF normalisation · ending dot · clamd framing)
    │
    ▼
destination

ChunkEmitter is still exported for write-side batching used by add_line.

const { ChunkEmitter } = require('haraka-message-stream')

Error handling

Errors from disk I/O (spool open, write drain, read) are emitted on the MessageStream instance.

ms.on('error', (err) => {
  console.error('message-stream error', err)
})

REFERENCES

This module is used by the Haraka Transaction.

About

Haraka Message Stream

Topics

Resources

License

Stars

Watchers

Forks

Contributors