-
-
Notifications
You must be signed in to change notification settings - Fork 302
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add SQS Support #374
Add SQS Support #374
Conversation
Thanks a lot @tomazfernandes🔥! I will try to review it in upcoming days. Regarding Kinesis - very likely we won't do anything about it since there is a Kinesis support in Spring Cloud Stream project, but perhaps we can use it for MQTT #38. After very quick look - we've decided to drop Since this is a critical and perhaps the most important integration we have, @MatejNedic @eddumelendez please take a look when you have time :) |
if (attemptsLeft == 0) { | ||
throw new IllegalStateException("Could not retrieve properties for queue " queue, e); | ||
} | ||
Thread.sleep(8000 / attemptsLeft); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering that AWS clients has a built-in retry mechanisms, perhaps we don't need to repeat on failure when getting queue attributes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot @tomazfernandes🔥! I will try to review it in upcoming days.
Thanks for the swift response @maciejwalkowiak!
Regarding Kinesis - very likely we won't do anything about it since there is a Kinesis support in Spring Cloud Stream project, but perhaps we can use it for MQTT #38.
Sounds good! Maybe we can work on a PoC and see how it looks like.
After very quick look - we've decided to drop
@EnableXXX
annotations and support only auto-configurations.
Sure, I had noticed the project didn't make use of such annotations but decided to leave auto-configuration out of the scope of this first draft. I can add it.
Since this is a critical and perhaps the most important integration we have, @MatejNedic @eddumelendez please take a look when you have time :)
Thanks for looking into this! Please let me know if there are any suggestions or concerns - this is meant to be a draft we can improve on rather than some final solution.
Also, there are a few points I think we should address if we're moving forward with this, for example:
-
I tried to leverage existing code as much as possible, specially since it was well adapted to the
Spring Messaging
interfaces and I wanted to have a working draft quickly. But while I think theAbstractMessageHandler
solution is great for what the previous version achieved - handling all queues from the same container and resolving methods at runtime - I'm not sure it's the best solution if we're having a single container per@SqsListener
annotation, since we can just have theMethod
and invoke it directly. -
The
Spring Messaging
project doesn't have much support forCompletableFuture
-basedasync
- I had to come up with a pretty hacky workaround to make a fullyasync
solution. It does have support forreactive
though. Maybe adding such support is something worth discussing with that team? -
I broke down the container runtime logic into many small pieces - I like this kind of design where we can easily swap components to change behavior. But if it looks too fragmented we can change it to a single larger class.
Thanks again!
Quick question @maciejwalkowiak - there seems to be an error related to setting the AWS region on the Thanks! |
@arunkpatra plans to take a look so let's keep him in the look. Regarding AWS region on CI - it is somewhat on purpose. We try to leverage Localstack as much as possible and unless we hit the wall and there will be a AWS feature unsupported by Localstack all tests should run against it. (you can take a look how other integration tests are set up). I can't wait to dig deeper into this PR! Regarding Sonar - feel free to question what it reports - we've set up a default configuration and not everything it reports necessarily is important from our POV. One more thing to keep in mind is that Spring Cloud AWS is used in https://github.com/spring-projects/spring-integration-aws - and while we don't need to be backward compatible - it should be possible to adjust |
spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/BaseSqsIntegrationTest.java
Outdated
Show resolved
Hide resolved
spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/SqsIntegrationTests.java
Outdated
Show resolved
Hide resolved
Sure, no hurry.
Sure, I'm using
Cool, looking forward for your review too!
I've actually enjoyed most of what it reported 😄 Of course, some less-than-critical things, but nothing to write home about.
I see, maybe we'll need to make a couple adjustments, but at a first glance seems simple enough. I'll keep that in mind, and maybe we can involve them when it's appropriate. Thanks! |
Well, it's been a while since I've last dealt with AWS configuration issues, and I'm not sure exactly how these properties are mapped by the framework. I'll take a better look on Monday, and also implement auto-configuration for this - probably with that we'll have a properly configured In the meantime, let me know if there are any further questions or concerns about any of this. Thanks! |
@tomazfernandes I pushed fixing tests to your branch |
Nice, thanks @maciejwalkowiak! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for your contribution @tomazfernandes ! I have left some small comments. I would be checking spring kafka during the week in order to get more familiar with it.
<name>Spring Cloud AWS Messaging Support</name> | ||
<description>Spring Cloud Support for AWS Messaging Services</description> | ||
|
||
<properties> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
those properties should be dropped. the project currently relies on spring-cloud-build and that's the place where the java version is specific
* @author Tomaz Fernandes | ||
* @since 3.0 | ||
*/ | ||
public class MessagingUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given the methods in this class are just used in one single place I would suggest to keep simple validations where they are used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I didn't really understand what you're suggesting - you mean having if (xpto != null) { ... }
etc?
This class was inspired by SK
's JavaUtils class and is used a lot there, so I think as we keep working on this we'll probably find quite a few uses for this pattern.
* @author Tomaz Fernandes | ||
* @since 3.0 | ||
*/ | ||
public class MessagingConfigUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am in favor to move those constants to the places where they are used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see #374 (comment)
<groupId>org.testcontainers</groupId> | ||
<artifactId>localstack</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.testcontainers</groupId> | ||
<artifactId>junit-jupiter</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<!-- AWS SDK v1 is required by testcontainers-localstack --> | ||
<dependency> | ||
<groupId>com.amazonaws</groupId> | ||
<artifactId>aws-java-sdk-core</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
</dependencies> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess tests are going to be implemented later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I prefer to have the team take a look at this PR first, and if we're moving through with it I can implement the unit tests and add docs for this.
spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsMessageProducer.java
Outdated
Show resolved
Hide resolved
for (Map.Entry<MessageSystemAttributeName, String> attributeKeyValuePair : message.attributes().entrySet()) { | ||
messageHeaders.put(attributeKeyValuePair.getKey().name(), attributeKeyValuePair.getValue()); | ||
} | ||
return messageHeaders; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return messageHeaders; | |
return Collections.unmodifiableMap(messageHeaders); |
Map<String, Object> messageHeaders = new HashMap<>(); | ||
for (Map.Entry<MessageSystemAttributeName, String> attributeKeyValuePair : message.attributes().entrySet()) { | ||
messageHeaders.put(attributeKeyValuePair.getKey().name(), attributeKeyValuePair.getValue()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it can be moved to make use of streams api
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. But I think there's a good chance we'll end up doing some refactoring, or perhaps rewriting, to this and other parts I've brought from the previous version, so if that's ok I prefer deferring this change for a moment.
|
||
@Container | ||
static LocalStackContainer localstack = new LocalStackContainer( | ||
DockerImageName.parse("localstack/localstack:0.14.0")).withServices(SQS).withReuse(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we want to use withReuse
DockerImageName.parse("localstack/localstack:0.14.0")).withServices(SQS).withReuse(true); | |
DockerImageName.parse("localstack/localstack:0.14.0")).withServices(SQS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might not be related to this PR, but the question has been raised I want to share what I have in Spring Integration AWS for Localstack Testcontainers: https://github.com/spring-projects/spring-integration-aws/blob/main/src/test/java/org/springframework/integration/aws/LocalstackContainerTest.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just replied here
static void beforeAll() throws IOException, InterruptedException { | ||
// create needed queues in SQS | ||
// TODO: Not working as expected due to some port mapping issue - will look into in the future | ||
localstack.execInContainer("awslocal", "io/awspring/cloud/sqs", "create-queue", "--queue-name", | ||
RECEIVES_MESSAGE_QUEUE_NAME); | ||
localstack.execInContainer("awslocal", "io/awspring/cloud/sqs", "create-queue", "--queue-name", | ||
DOES_NOT_ACK_ON_ERROR_QUEUE_NAME); | ||
localstack.execInContainer("awslocal", "io/awspring/cloud/sqs", "create-queue", "--queue-name", | ||
RECEIVE_FROM_MANY_1_QUEUE_NAME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For initialization resources we can use docker-entrypoint-initaws.d
v0.9.0: Enhance integration with Serverless; refactor CloudFormation implementation; add support for Step Functions, IAM, STS; fix CloudFormation integration; support mounting Lambda code locally; add docker-entrypoint-initaws.d dir for initializing resources; add S3Event Parser for Lambda; fix S3 chunk encoding; fix S3 multipart upload notification; add dotnetcore2.1 and ruby2.5 Lambda runtimes; fix issues with JDK 9; install ES plugins available in AWS
From changelog
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But then you can't use constants or any other variables there. As long as the number of commands is short, for me its easier to have them in the same file. I think we can start using docker-entrypoint-initaws.d
but then it should be consistent across tests - so perhaps something for a separate PR.
Thanks a lot for looking into this @eddumelendez!
Sure, I'll address them, thanks.
Sounds great! Let me know if there's anything I can help with - I've been contributing to that project for a bit more than a year now and know the codebase reasonably well at this point - and surely would enjoy discussing both that and this design. Also, I'm not sure how familiar you are with the previous version of the SQS integration - that would be a good thing to look at as I took a few classes and code from there. Although of course we can rewrite anything we feel could be different. A few quick notes:
Please let me know if there's anything else I can help with. Thanks! |
I've made a few adjustments to performance and ran some preliminary tests against For a load of Running the same test with the current Of course, that difference is kind of expected given the 10 thread / queue limitation of the previous version, but still should be a happy gain for users. The performance is further improved when there's more than 1K messages in the queue e.g. from a previously interrupted test. I think we can expect better performance from a true Also, probably there's room for performance improvement in the logic itself - we can look into that in the future. Thanks! |
I think my next move here would be taking yet another page from the With that we'd further decouple assembly time logic from runtime logic, and also be able to handle use cases the And after that, add Since that should be quite a bit of work, I think I'll let the team catch up with what we already have first, to make sure we're all in the same page. Thanks and please let me know if there's anything I can help with. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implementation looks like you're definitely know what you're doing, but I am getting lost in abstractions.
I am 100% confident we should continue with this design but we either have to simplify it (for example by merging these two modules) or document it.
- Simplification - I understand the reasoning for keeping them separate. The only possible implementation I can think of is AWS IOT but not sure if this will really fit.
- Documentation - at least on the Javadoc level, it must come anyways. But I think it would be really really great if you could describe either with words or a diagram how all of these pieces are meant to work together.
I see that container can have single interceptor - perhaps we should consider adding more? For example, adding a possibility to measure execution time and send it via Micrometer, or adding tracing with Sleuth.
Please add auto-configuration (we do not need @EnableSqs
) and tests. I imagine integration tests cover big chunk of this change and I do not mean that we have to have a unit test for each class, but I do believe some pieces here should be unit tested.
.collect(Collectors.toList()); | ||
} | ||
|
||
private static SqsMessageProducer createMessageProducer(SqsAsyncClient sqsClient, String logicalEndpointName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can see this method is used only in line 44 and does not add more than just calling constructor. Perhaps we can remove it and inline calling constructor in line 44?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, TBH, I think there are a couple of things here to reason about and improve.
For example, I mostly relied on creating a SqsAsyncClient
bean and injecting / passing it along to all containers.
But it seems that the client has a maximum number of concurrent connections, something around 40 if I recall correctly. So I'm thinking maybe we should have a SqsMessageProducerFactory
that creates a separate SqsAsyncClient
instance per container, and the corresponding MessageProducer
.
How does that sound? We do have ConsumerFactory
and ProducerFactory
on Spring Kafka
.
* @since 3.0 | ||
*/ | ||
@SpringBootTest | ||
@DirtiesContext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we should be able to get rid of this before merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What exactly, you mean the Integration Test suite as a whole, or the DirtiesContext
annotation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean @DirtiesContext
Sounds like a reasonable thing to do. Go for it 👍 |
Yeah, well, I definitely mostly know what I'm doing 😄 The bits from
Sure, that was something in my mind too, specially given this is a subproject rather than an entire project on its own. I can add some That should give everyone a better sense on how the components are related and what role they play at
Yes, I'm aware of that. TBH, I just wanted to be able to implement the
Sure. This will come after the
Sure, I'll implement UTs and ITs after this is a bit more mature. Same with javadocs and documentation. |
@maciejwalkowiak, I've added simple javadocs to the assembly-time components just to give a jump start on this. Below is a high-level view of the design, focusing at first at assembly-time components and flows, which I think are harder to understand. Feel free to ask if you (or anyone) have any questions or suggestions. Afterwards we can dive into the runtime components, and some other aspects of the design. I hope this makes sense to everyone 😄 Component types
Abstraction levelsIn terms of abstraction I think in three levels:
In the |
Assembly time flowThis is a bit more complicated to illustrate in writing, but hopefully it'll be helpful.
The It also gathers the methods that will be invoked to handle messages at runtime. This is the part that should be replaced by the
Then the The factory creates the appropriate
The Each factory will have a The factory will combine configuration from the Then it creates the
This is the class that will be used at runtime to fetch and process the actual messages. This part is more straightforward I guess. Currently, all Again, the |
Added a bunch of unit tests. Coverage is now at ~93% combined with integration tests. |
Fixed it, you can run again, thanks! |
One quick note: I think I may have eventually committed a Not sure if it's a problem, haven't looked too deep into it and I can revert it if it is. Maybe we should add it to .gitignore and let the CI create it on the fly? Thanks |
} | ||
else if (object instanceof Collection) { | ||
if (this.parallelLifecycle) { | ||
CompletableFuture.allOf(((Collection<?>) object).stream().map( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we could try catch here for QueueAttributesResolvingException
and add a failure analyzer for this exception so that the error massage in the console is self explanatory.
We need to merge this tomazfernandes#1 and 👍 from my side to merge. In subsequent PRs:
2 & 3 can be done in M3. |
Regarding |
One more thing I noticed that is missing is |
61b253c
to
2e49859
Compare
Resolves awspring#344 Initial draft
Improve TaskExecutor handling Add Interceptor after processing stage Move polling logic to source Add BackPressureHandler Add MessageProcessingPipeline Add Batch Processing Support Add Blocking Interfaces Add Async / Blocking adapters Review container interfaces Add SqsException and SqsListenerException Add SQS AutoConfiguration Autoconfigure SqsAsyncClient and SqsMessageListenerContainerFactory beans Import SqsBootstrapConfiguration Add method-level javadocs Add class-level javadocs Add VisibilityExtensionInterceptor Add SqsListenerCustomizer Improve startup performance Remove messaging support module Join modules Add MessageListener Add javadocs for assembly time components Add `id` to Container, Endpoint and Message Add AbstractEndpoint
Add MessageVisibilityExtendingSinkAdapter Improve Sink ThreadPool Configuration
Add Automatic Payload Conversion by Header QueueAttributes improvements Lifecycle improvements
Improve Thread Management
Improve BackPressureMode Change package for Conversion Context Improve Message Conversion Improve Manual Acknowledgement Extract AbstractMessageConvertingMessageSource Use TaskExecutor interface instead of Executor Add builders to Options, Factory and Container Add QueueNotFoundStrategy Add SQS auto-configuration to spring.factories Add SQS starter Disable formatting for some chained methods Also some ternary conditions Apply spotless Add license header where missing Begin Acknowledgement unit tests Minor improvements Adjust FIFO default acknowledgement orderings Improve extensibility Improve MessageListener creation Introduce Buffering Acknowledgement Processor Small fixes and improvements Add Async and Fail on Ack integration tests Improve BackPressure handling Add set to ContainerOptions methods Other minor improvements Lifecycle Improvements MessageProcessingPipeline fixes and improvements Change afterProcessing return type to Void
Fix code highlighting in documentation Add auto-configuration props to docs Address suggestions Add SqsListenerAnnotationBeanPostProcessorTests Add SQS properties to auto-configuration Use context ObjectMapper Improve Visibility and tests
Add package-info.java Increase Unit Test Coverage Combine Acknowledgement Interfaces Support more than 10 maxMessagesPerPoll Add AcknowledgementOrdering.ORDERED_BY_GROUP Add BatchAcknowledgement Add AsyncBatchAcknowledgement Add tests Add AcknowledgementResultCallback Add tests Replace ExpressionHelper with StringValueResolver Add maxMessagesPerPoll to SqsListener Disable container reuse for TC Use ApplicationContextRunner in tests Address review suggestions Clean up tests Fix InterceptorIntegrationTest Make ComponentFactory a collection Add Unit Tests Apply Spotless
2e49859
to
6137a10
Compare
Kudos, SonarCloud Quality Gate passed! 0 Bugs No Coverage information |
I thought I had this change, but actually this is for the
Sure, let's see how this first milestone goes and what issues appear. Let me know if there's anything else needed for M2. Thanks! |
Resolves #344
Adds support for listening to
SQS
queues and handling messages. (WIP)Features
CompletableFuture
Spring
abstractionsShortcomings (for now...)
Template
not implementedFIFO
queues not implementedI tried to follow the provided general guidelines and what I thought would make sense for the project. I'm ok with discussing and or changing any of it, both design and implementation-wise.
I'll wait for feedback before adding anything else - hopefully this will be a good enough fit for us to work together on going forward. If it doesn't look right, or perhaps another direction is preferred, no worries, we can set it aside and let the team come with something else later.
As a note, there's a
messaging support
module that provides a set of abstractions for adding support to any number of messaging systems -Kinesis
comes to mind, but there sure are others in theAWS
landscape. It was largely inspired by the design of theSpring Kafka
project - that's a battle tested design that most users and engineers should be comfortable with. If that seems like an overkill for this project, it should be easy enough merging the two modules and getting rid of some abstractions.This is a great project and I'm really thrilled to perhaps be taking a small part into building it! 😄
Thanks!