Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@ project/plugins/project/
*.iml

.wercker/

# Build tools (should be installed separately)
sbt/
26 changes: 18 additions & 8 deletions modules/core/src/main/scala/fluflu/Client.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package fluflu

import java.nio.ByteBuffer
import java.time.Instant
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicBoolean
Expand All @@ -10,11 +11,11 @@ import fluflu.msgpack.Ack
import fluflu.msgpack.MOption
import fluflu.msgpack.Packer
import fluflu.msgpack.Unpacker
import org.msgpack.core.MessageBufferPacker
import org.msgpack.core.MessagePack
import org.msgpack.core.MessagePack.PackerConfig

import scala.concurrent.duration.*
import scala.util.control.NonFatal

trait Client {
final def emit[A: Packer](tag: String, record: A): Either[Exception, Unit] =
Expand Down Expand Up @@ -48,7 +49,7 @@ object Client {
Executors.newScheduledThreadPool(1, Utils.namedThreadFactory(name))

private val running = new AtomicBoolean()
private val queue = new ConcurrentLinkedQueue[(String, MessageBufferPacker => Unit)]
private val queue = new ConcurrentLinkedQueue[(String, ByteBuffer)]
private val consumer = new ForwardConsumer(maximumPulls, connection, queue, packerConfig)
private val worker = scheduler("fluflu-scheduler")

Expand Down Expand Up @@ -76,12 +77,21 @@ object Client {
if (closed || worker.isShutdown) Left(new Exception("Client executor was already shutdown"))
else {
logger.trace(s"Queueing message: ${(tag, record, time)}")
val fa = (p: MessageBufferPacker) => Packer[(A, Instant)].apply((record, time), p)
if (queue.offer(tag -> fa))
if (!running.get && running.compareAndSet(false, true)) Worker.start()
else Right(())
else
Left(new Exception("The queue no space is currently available"))
// Encode the data before enqueuing
try {
val packer = packerConfig.newBufferPacker()
try {
Packer[(A, Instant)].apply((record, time), packer)
val encodedData = packer.toMessageBuffer.sliceAsByteBuffer()
if (queue.offer(tag -> encodedData))
if (!running.get && running.compareAndSet(false, true)) Worker.start()
else Right(())
else
Left(new Exception("The queue no space is currently available"))
} finally packer.close()
} catch {
case NonFatal(e) => Left(new Exception(s"Failed to encode message: $e"))
}
}

def close(): Unit =
Expand Down
20 changes: 11 additions & 9 deletions modules/core/src/main/scala/fluflu/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ trait Consumer {
final class ForwardConsumer private[fluflu] (
maximumPulls: Int,
connection: Connection,
val msgQueue: util.Queue[(String, MessageBufferPacker => Unit)],
val msgQueue: util.Queue[(String, ByteBuffer)],
packerConfig: PackerConfig = MessagePack.DEFAULT_PACKER_CONFIG
)(implicit
PS: Packer[String],
Expand All @@ -49,27 +49,29 @@ final class ForwardConsumer private[fluflu] (

private val b64e = Base64.getEncoder

type E = (String, MessageBufferPacker => Unit)
type E = (String, ByteBuffer)

def retrieveElements(): Map[String, ListBuffer[MessageBufferPacker => Unit]] =
def retrieveElements(): Map[String, ListBuffer[ByteBuffer]] =
Iterator
.continually(msgQueue.poll())
.take(maximumPulls)
.takeWhile(_ != null)
.foldLeft(mutable.Map.empty[String, ListBuffer[MessageBufferPacker => Unit]]) { case (acc, (k, f)) =>
acc += k -> (acc.getOrElse(k, ListBuffer.empty) += f)
.foldLeft(mutable.Map.empty[String, ListBuffer[ByteBuffer]]) { case (acc, (k, buffer)) =>
acc += k -> (acc.getOrElse(k, ListBuffer.empty) += buffer)
}
.toMap

def makeMessage(s: String, fs: ListBuffer[MessageBufferPacker => Unit]): Option[(String, ByteBuffer)] = {
def makeMessage(s: String, buffers: ListBuffer[ByteBuffer]): Option[(String, ByteBuffer)] = {
val packer = mPacker.get()
try {
val chunk = b64e.encodeToString(UUID.randomUUID().toString.getBytes(UTF_8))
logger.trace(s"tag: $s, chunk: $chunk")
packer.packArrayHeader(3)
PS.apply(s, packer)
packer.packArrayHeader(fs.size)
fs.foreach(_(packer))
packer.packArrayHeader(buffers.size)
buffers.foreach { buffer =>
packer.writePayload(buffer.array(), buffer.position(), buffer.remaining())
}
PM.apply(MOption(chunk = Some(chunk)), packer)
Some(chunk -> packer.toMessageBuffer.sliceAsByteBuffer())
} catch {
Expand All @@ -79,7 +81,7 @@ final class ForwardConsumer private[fluflu] (
} finally packer.clear()
}

def makeMessages(m: Map[String, ListBuffer[MessageBufferPacker => Unit]]): Iterator[(String, ByteBuffer)] =
def makeMessages(m: Map[String, ListBuffer[ByteBuffer]]): Iterator[(String, ByteBuffer)] =
m.iterator.map { case (a, b) => makeMessage(a, b) }.collect { case Some(v) => v }

private def send(chunk: String, msg: ByteBuffer): Unit =
Expand Down
104 changes: 104 additions & 0 deletions modules/core/src/test/scala/fluflu/EncodingTimingSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package fluflu

import java.nio.ByteBuffer
import java.time.Instant
import java.util.concurrent.ConcurrentLinkedQueue

import fluflu.msgpack.*
import org.msgpack.core.MessagePacker
import org.scalatest.funspec.AnyFunSpec

import scala.util.Success

class EncodingTimingSpec extends AnyFunSpec {

// Test case to demonstrate that encoding happens at emit time
describe("Encoding timing") {
it("should encode data at emit time, not at consume time") {
// Mock connection that always succeeds
implicit val connection: Connection = new Connection {
override def writeAndRead(message: ByteBuffer): scala.util.Try[ByteBuffer] =
Success(ByteBuffer.allocate(1))
override def isClosed: Boolean = false
override def close(): scala.util.Try[Unit] = Success(())
}

// Valid packer instances for string and MOption
implicit val packString: Packer[String] = new Packer[String] {
def apply(a: String, packer: MessagePacker): Unit = packer.packString(a)
}

implicit val packInstant: Packer[Instant] = new Packer[Instant] {
def apply(a: Instant, packer: MessagePacker): Unit = packer.packLong(a.toEpochMilli)
}

implicit val packMOption: Packer[MOption] = new Packer[MOption] {
def apply(a: MOption, packer: MessagePacker): Unit = () // Simple mock
}

implicit val unpackAck: Unpacker[Option[Ack]] = new Unpacker[Option[Ack]] {
def apply(bytes: ByteBuffer): Either[Throwable, Option[Ack]] = Right(Some(Ack("test")))
}

val client = Client()

// This should succeed because the string is encodable
val result1 = client.emit("test.tag", "valid string")
assert(result1.isRight)
assert(client.size > 0, "Queue should contain encoded message after emit")

// Test that encoding errors are caught at emit time
// Create a packer that always fails
implicit val failingPacker: Packer[Int] = new Packer[Int] {
def apply(a: Int, packer: MessagePacker): Unit =
throw new RuntimeException("Encoding failed")
}

val result2 = client.emit("test.tag", 42)
assert(result2.isLeft, "Should fail at emit time due to encoding error")
assert(result2.left.toOption.get.getMessage.contains("Failed to encode message"))

client.close()
}

it("should store encoded ByteBuffers in queue, not functions") {
val queue = new ConcurrentLinkedQueue[(String, ByteBuffer)]()

// Mock connection
implicit val connection: Connection = new Connection {
override def writeAndRead(message: ByteBuffer): scala.util.Try[ByteBuffer] =
Success(ByteBuffer.allocate(1))
override def isClosed: Boolean = false
override def close(): scala.util.Try[Unit] = Success(())
}

// Required implicit instances for ForwardConsumer
implicit val packString: Packer[String] = new Packer[String] {
def apply(a: String, packer: MessagePacker): Unit = packer.packString(a)
}

implicit val packMOption: Packer[MOption] = new Packer[MOption] {
def apply(a: MOption, packer: MessagePacker): Unit = () // Simple mock
}

implicit val unpackAck: Unpacker[Option[Ack]] = new Unpacker[Option[Ack]] {
def apply(bytes: ByteBuffer): Either[Throwable, Option[Ack]] = Right(Some(Ack("test")))
}

// Create a ForwardConsumer with our test queue
val consumer = new ForwardConsumer(10, connection, queue)

// Add some encoded data to the queue directly
val testData = ByteBuffer.wrap("test data".getBytes())
queue.offer("test.tag" -> testData)

// Verify that the queue contains ByteBuffers, not functions
val retrieved = consumer.retrieveElements()
assert(retrieved.contains("test.tag"))
assert(retrieved("test.tag").size === 1)

val retrievedBuffer = retrieved("test.tag").head
assert(retrievedBuffer.isInstanceOf[ByteBuffer])
}
}
}
62 changes: 41 additions & 21 deletions modules/core/src/test/scala/fluflu/ForwardConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService

import fluflu.msgpack.*
import org.msgpack.core.MessageBufferPacker
import org.msgpack.core.MessagePack
import org.msgpack.core.MessagePacker
import org.scalatest.BeforeAndAfterEach
Expand All @@ -16,7 +15,7 @@ import scala.util.Success
import scala.util.Try

class ForwardConsumerSpec extends AnyFunSpec with BeforeAndAfterEach with MsgpackHelper {
type Elem = (String, MessageBufferPacker => Unit)
type Elem = (String, ByteBuffer)

var scheduler: ScheduledExecutorService = _
var connection: Connection = _
Expand All @@ -43,15 +42,27 @@ class ForwardConsumerSpec extends AnyFunSpec with BeforeAndAfterEach with Msgpac

describe("consume") {
it("should consume messages") {
val queue = new ArrayBlockingQueue[Elem](6)
(1 to 6).foreach(_ => queue.offer(("tag", (m: MessageBufferPacker) => m.packNil())))
val queue = new ArrayBlockingQueue[Elem](6)
val packer = MessagePack.DEFAULT_PACKER_CONFIG.newBufferPacker()
(1 to 6).foreach { _ =>
packer.packNil()
val buffer = packer.toMessageBuffer.sliceAsByteBuffer()
queue.offer(("tag", buffer))
packer.clear()
}
val consumer = new ForwardConsumer(10, connection, queue)
consumer.consume()
assert(queue.size() === 0)
}
it("should consume max-pulls messages") {
val queue = new ArrayBlockingQueue[Elem](6)
(1 to 6).foreach(_ => queue.offer(("tag", (m: MessageBufferPacker) => m.packNil())))
val queue = new ArrayBlockingQueue[Elem](6)
val packer = MessagePack.DEFAULT_PACKER_CONFIG.newBufferPacker()
(1 to 6).foreach { _ =>
packer.packNil()
val buffer = packer.toMessageBuffer.sliceAsByteBuffer()
queue.offer(("tag", buffer))
packer.clear()
}
val consumer = new ForwardConsumer(5, connection, queue)
consumer.consume()
assert(queue.size() === 1)
Expand All @@ -60,34 +71,43 @@ class ForwardConsumerSpec extends AnyFunSpec with BeforeAndAfterEach with Msgpac

describe("retrieveElements") {
it("should create Map with tag as key") {
val queue = new ArrayBlockingQueue[Elem](3)
queue.offer(("a", (m: MessageBufferPacker) => m.writePayload(Array(1.toByte))))
queue.offer(("b", (m: MessageBufferPacker) => m.writePayload(Array(2.toByte))))
queue.offer(("b", (m: MessageBufferPacker) => m.writePayload(Array(3.toByte))))
val queue = new ArrayBlockingQueue[Elem](3)
val packer = MessagePack.DEFAULT_PACKER_CONFIG.newBufferPacker()

// Create buffer for tag "a"
packer.writePayload(Array(1.toByte))
val bufferA = packer.toMessageBuffer.sliceAsByteBuffer()
queue.offer(("a", bufferA))
packer.clear()

// Create buffer for tag "b" - first element
packer.writePayload(Array(2.toByte))
val bufferB1 = packer.toMessageBuffer.sliceAsByteBuffer()
queue.offer(("b", bufferB1))
packer.clear()

// Create buffer for tag "b" - second element
packer.writePayload(Array(3.toByte))
val bufferB2 = packer.toMessageBuffer.sliceAsByteBuffer()
queue.offer(("b", bufferB2))
packer.clear()

val consumer = new ForwardConsumer(5, connection, queue)
val m = consumer.retrieveElements()

val p = MessagePack.DEFAULT_PACKER_CONFIG.newBufferPacker()

{
val a = m("a")
assert(a.size === 1)
val List(aa) = a.toList
aa(p)
assert(p.toByteArray === Array(1.toByte))
assert(aa.array() === Array(1.toByte))
}

p.clear()

{
val b = m("b")
assert(b.size === 2)
val List(bb, bbb) = b.toList
bb(p)
assert(p.toByteArray === Array(2.toByte))
p.clear()
bbb(p)
assert(p.toByteArray === Array(3.toByte))
assert(bb.array() === Array(2.toByte))
assert(bbb.array() === Array(3.toByte))
}
}
}
Expand Down