- Lightweight wrapper over the raw Java AWS SDK. Doesn"t wrap the types, so you gain the full power of the AWS SDK and aren"t forced around a leaky abstraction.
- Unopinionated primitive building blocks.
- Automatic, hands off ability to perform batching of messages to reduce costs
- Dead lettering of failed messages
Included are explicit types for the sake of clarity
" stream.emit(println("error:="" "="" +="" e.getmessage)))="" lift="" task,="" run="" it.="" effect.run.unsaferun()"="">
... // Construct an infinite Stream SendMessageRequest"s, with the same body "123" val messageRequestsStream: Stream[Task, SendMessageRequest] = Stream.constant(new SendMessageRequest(queueUrl, "123")).repeat // Construct a Publish pipe that can turn SendMessageRequest"s into SendMessageResult"s val publishPipe: Pipe[Task, SendMessageRequest, SendMessageResult] = FS2SQS.publishPipe(client) def loggingSink[A]: Sink[Task, A] = { s => s.map { i => println(i) } } // Compose our stream and pipe. val effect = messageRequestsStream .through(publishPipe) .to(loggingSink) .onError(e => Stream.emit(println("Error: " + e.getMessage))) // Lift our effect into a Task, and run it. effect.run.unsafeRun()
...
// Construct a request to get messages from SQS
val messageRequest = new ReceiveMessageRequest(queueUrl)
.withMaxNumberOfMessages(1)
.withWaitTimeSeconds(10)
// Construct an infinite stream of Messages from SQS
val messagesStream: Stream[Task, Message] = FS2SQS.messageStream(client, messageRequest)
// A sink that can acknowledge Messages using a MessageAction
val ackSink: Sink[Task, (Message, (Message) => MessageAction)] = FS2SQS.ackSink(client)
// A pipe that either deletes or requeues the message
val workPipe: Pipe[Task, Message, (Message, (Message) => MessageAction)] = { messages =>
messages.map { message =>
if (message.getBody == "DOM") {
(message, (m: Message) => Right(new DeleteMessageRequest(queueUrl, m.getReceiptHandle)))
} else {
(message, (m: Message) => Left(new SendMessageRequest(queueUrl, m.getBody)))
}
}
}
// Compose our stream, work pipe and ack sink
val effect: Stream[Task, Unit] = messagesStream
.through(workPipe)
.through(ackSink)
// Lift our effect into a Task, and run it.
effect.run.unsafeRun()
The FS2 primitives are provided in FS2SQS.scala. You can use these as building blocks around SQS.
Publishes messages to SQS.
An infinite stream of SQS messages.
A sink that accepts functions from Message => MessageAction
. MessageAction
is a type alias for
Either[SendMessageRequest, DeleteMessageRequest]
. In other words, this sink accepts functions from Message
to
either a SendMessageRequest
(for requeuing), or DeleteMessageRequest
(for successful acknowledgement).
Just add this to your build.sbt
"com.imageintelligence" %% "fs2-sqs" % "1.0.0"
Clone the repository:
git clone https://github.com/ImageIntelligence/fs2-sqs.git
Compile
sbt compile
Test
sbt test
Please see the examples directory.