Package

com.spingo.op_rabbit

stream

Permalink

package stream

Batteries not included

To use this package, you must add 'op-rabbit-akka-stream' to your dependencies.

Overview

See also

Akka Stream Specs on GitHub

Linear Supertypes
AnyRef, Any
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. stream
  2. AnyRef
  3. Any
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. All

Type Members

  1. class MessageNacked extends Exception

    Permalink

    Used by MessagePublisherSink to fail elements in the case the RabbitMQ broker "negatively acknowledges" a published message.

Value Members

  1. object MessagePublisherSink

    Permalink

    A MessagePublisherSink (an AckedSink) publishes each input Message, and either acks or fails the upstream element, depending on ConfirmResponse.

    A MessagePublisherSink (an AckedSink) publishes each input Message, and either acks or fails the upstream element, depending on ConfirmResponse.

    Using a RabbitSource with a MessagePublisherSink is a great way to get persistent, recoverable streams.

    Note - MessagePublisherSink uses ActorPublisher and due to AkkaStream limitations, it DOES NOT abide your configured supervisor strategy.

    Message.ConfirmResponse handling

    After the sink publishes the Message, it listens for the Message.ConfirmResponse, and handles it accordingly:

    • On Message.Ack, ack the upstream element.
    • On Message.Nack, fail the upstream element with MessageNacked. Does not throw a stream exception. Processing continues.
    • On Message.Fail, fail the upstream element with publisher exception. Does not throw a stream exception. Processing continues.

    Future[Unit] materialized type:

    This sinks materialized type is Future[Unit]. The following applies:

    • It yields any upstream failure as soon as it reaches the sink (potentially before messages are confirmed).
    • After the stream completes, and all Message.ConfirmResponse's have have been processed, the Future[Unit] is completed.
  2. object RabbitSource

    Permalink

    Creates an op-rabbit consumer whose messages are delivered through an AckedSource.

    Creates an op-rabbit consumer whose messages are delivered through an AckedSource. Message order guarantees are maintained.

    IMPORTANT NOTE ON ACKNOWLEDGED MESSAGES

    (if you are seeing unacknowledged messages accumulated, followed by the stream progress halting, you will want to pay special attention to this)

    The 'Acked' variety of streams provide type-safe guarantees that acknowledgments aren't dropped on the floor. Filtering a message from the stream (via collect, filter, or mapConcat -> Nil), for example, will cause the incomming RabbitMQ message to be acknowledged.

    If you begin constructing your own Acked components and interacting with the AckTup[T] type directly (or, (Promise[Unit], T)), you must take caution that the promise is not dropped on the floor. This means, every time an exception could be thrown, you must catch it and propagate said exception to the Promise by calling promise.fail(ex).

    If you integrate with a stream compoment that does not support acknowledged streams, you will probably want to acknowledge the message before sending messages to it (by using the .acked acked-stream operation). If it is important that you acknowledge messages after the flow is complete, and the library doesn't provide a reliable way to propagate element exceptions, you will likely want exception to crash the stream (IE: don't resume the stream on exception). Otherwise, if using a resuming decider, you risk elements being dropped and unacknowledged messages accumulating.

    Example:

    import com.spingo.op_rabbit.Directives._
    RabbitSource(
      rabbitControl,
      channel(qos = 3),
      consume(queue("such-queue", durable = true, exclusive = false, autoDelete = false)),
      body(as[Person])).
      runForeach { person =>
        greet(person)
      } // after each successful iteration the message is acknowledged.
  3. object UnconfirmedPublisherSink

    Permalink

Inherited from AnyRef

Inherited from Any

Ungrouped