Skip to content


Repository files navigation


Build Status Download

SQS using FS2.


  • Lightweight wrapper over the raw Java AWS SDK. Doesn"t wrap the types, so you gain the full power of the AWS SDK and aren"t forced around a leaky abstraction.
  • Unopinionated primitive building blocks.


  • Automatic, hands off ability to perform batching of messages to reduce costs
  • Dead lettering of failed messages

Quick examples

Included are explicit types for the sake of clarity

Publishing messages

// Construct an infinite Stream SendMessageRequest"s, with the same body "123"
val messageRequestsStream: Stream[Task, SendMessageRequest] =
  Stream.constant(new SendMessageRequest(queueUrl, "123")).repeat

// Construct a Publish pipe that can turn SendMessageRequest"s into SendMessageResult"s
val publishPipe: Pipe[Task, SendMessageRequest, SendMessageResult] = FS2SQS.publishPipe(client)

def loggingSink[A]: Sink[Task, A] = { s => { i =>

// Compose our stream and pipe.
val effect = messageRequestsStream
  .onError(e => Stream.emit(println("Error: " + e.getMessage)))

// Lift our effect into a Task, and run it.

Consuming messages

// Construct a request to get messages from SQS
val messageRequest = new ReceiveMessageRequest(queueUrl)

// Construct an infinite stream of Messages from SQS
val messagesStream: Stream[Task, Message] = FS2SQS.messageStream(client, messageRequest)

// A sink that can acknowledge Messages using a MessageAction
val ackSink: Sink[Task, (Message, (Message) => MessageAction)] = FS2SQS.ackSink(client)

// A pipe that either deletes or requeues the message
val workPipe: Pipe[Task, Message, (Message, (Message) => MessageAction)] = { messages => { message =>
    if (message.getBody == "DOM") {
      (message, (m: Message) => Right(new DeleteMessageRequest(queueUrl, m.getReceiptHandle)))
    } else {
      (message, (m: Message) => Left(new SendMessageRequest(queueUrl, m.getBody)))

// Compose our stream, work pipe and ack sink
val effect: Stream[Task, Unit] = messagesStream

// Lift our effect into a Task, and run it.

Pipes, Streams and Sinks

The FS2 primitives are provided in FS2SQS.scala. You can use these as building blocks around SQS.

publishPipe Pipe[Task, SendMessageRequest, SendMessageResult]

Publishes messages to SQS.

messageStream Stream[Task, Message]

An infinite stream of SQS messages.

ackSink Sink[Task, (Message, (Message => MessageAction))]

A sink that accepts functions from Message => MessageAction. MessageAction is a type alias for Either[SendMessageRequest, DeleteMessageRequest]. In other words, this sink accepts functions from Message to either a SendMessageRequest (for requeuing), or DeleteMessageRequest (for successful acknowledgement).


As a library:

Just add this to your build.sbt

"com.imageintelligence" %% "fs2-sqs" % "1.0.0"

As a project to work on

Clone the repository:

git clone


sbt compile


sbt test


Please see the examples directory.