-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement UringSystem and FS2 Sockets using netty io_uring API #78
base: feature/jvm
Are you sure you want to change the base?
Implement UringSystem and FS2 Sockets using netty io_uring API #78
Conversation
If I'm not mistaken, we are going to be using netty's Native class. However, it is not a public class. Do we make a fork of Netty and make it public in our version so we can use it ? |
For now, we can create a "backdoor" by adding some code in the |
I wanted to see if I am on the right track 🤔. The next goal is to implement the UringSystem Poller, for that we will need to replace: |
That sounds right.
Not sure about this one. Which method is this? |
In the processCqes: |
Ah, you can access that one with the SQE = submission queue entry (where you submit I/O work) |
Maybe I'm forgetting something but we don't seem to have access to FileDescriptorPoller and FileDescriptorPollHandle. What should we do 🤔 ? |
These only make sense for Scala Native, you don't need to implement them for JVM :) you can delete it. |
Alright I see thanks :) |
I am looking for the equivalent to |
No, that's something different, I wonder if Netty is missing it 🤔 But it doesn't matter, you can implement it manually using this method: Here is how liburing does it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so getting serious about landing this ...
If you don't mind one last request: can we make an effort to deduplicate some of code and share between JVM/Native?
For implementation stuff, only if it's easy, I don't care too much (maybe for IOExceptionHelper
, UringApp
, implicits
).
But ... I think we should try hard to share the tests, so we are running identical test suites on both platforms. We're not doing anything JVM-specific, hopefully?
|
||
import java.net.ProtocolFamily | ||
|
||
private final class UringDatagramSocketGroup[F[_]] extends DatagramSocketGroup[F] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we can work on this in a follow-up PR :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't mind one last request: can we make an effort to deduplicate some of code and share between JVM/Native?
Great idea! I'll handle it :)
But ... I think we should try hard to share the tests, so we are running identical test suites on both platforms. We're not doing anything JVM-specific, hopefully?
I think we aren't, so it should be easy to do.
_ <- F.delay { | ||
buffer.clear() | ||
buffer.writeBytes(bytes.toArray) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we are re-using the same buffer for reading and writing? This is a problem if there are concurrent reads and writes, so we should use two buffers.
Also, I think we need logic to increase the buffer size for larger reads/writes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Absolutely, my bad. Right now we would have two buffers with a predetermined size, we could create the buffer in the read/write itself depending on the number of bytes we want to send. Another option could be to have a default size and if the message is bigger than that size increase the buffer ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option could be to have a default size and if the message is bigger than that size increase the buffer
Yes, this is how it's currently working in the other implementations: we keep replacing the buffer with a bigger one as-needed.
def createBuffer[F[_]: Sync](size: Int): Resource[F, ByteBuf] = | ||
Resource.make( | ||
Sync[F].delay(UnpooledByteBufAllocator.DEFAULT.directBuffer(size)) | ||
)(buf => Sync[F].delay(if (buf.refCnt() > 0) { val _ = buf.release() })) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, what happens if buf.refCnt() == 0
, who is responsible for cleaning it up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be wrong but I think when the counter reaches 0 it is automatically deallocated by netty and if refCnt()
returns 0 it has already been deallocated. However, I realized it should be:
buf.release(buf.refCnt())
to decrease the counter to 0 and be deallocated properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I think that makes more sense.
Well I won't pretend to really understand this 😁 let's just make Netty go away sooner rather than later 😉
linuxSocket => closeSocket(ring, linuxSocket.fd()).to | ||
) | ||
|
||
// private[this] def uringOpenSocket(ring: Uring, ipv6: Boolean): Resource[F, Int] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the problem is that the UringSocket
depends on the LinuxSocket
to get the localAddress:
def localAddress: F[SocketAddress[IpAddress]] = F.delay(SocketAddress.fromInetSocketAddress(linuxSocket.getLocalAddress()))
When I implemented it I didn't find any API in netty to get the localAddress from the fd that UringOpenSocket
gives us. I was planning to implement it once we remove netty but I'm going to check it, maybe this time I'll find something 🤞
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, but we can make a LinuxSocket
directly from the fd
. Well, we need to hack the package 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! In that case it's done in a moment. Thanks Arman 😃
Co-authored-by: Arman Bilge <[email protected]>
This pull request aims to adapt reusable code from Scala Native to Scala JVM. To provide compatibility with the uring library in JVM, we want to import functions from the Netty io_uring (https://github.com/netty/netty-incubator-transport-io_uring).
Plan: