Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement UringSystem and FS2 Sockets using netty io_uring API #78

Open
wants to merge 201 commits into
base: feature/jvm
Choose a base branch
from

Conversation

antoniojimeneznieto
Copy link
Collaborator

@antoniojimeneznieto antoniojimeneznieto commented May 30, 2023

This pull request aims to adapt reusable code from Scala Native to Scala JVM. To provide compatibility with the uring library in JVM, we want to import functions from the Netty io_uring (https://github.com/netty/netty-incubator-transport-io_uring).

Plan:

  • Copy all the reusable code from Native to JVM
  • Import all the Netty functions for compatibility with the uring library
  • Reimplement the Native code in Scala JVM using Netty
    • Implement UringSystem
    • Implement UringSocket
    • Implement UringSocketGroup
    • Implement UringNetwork

@antoniojimeneznieto
Copy link
Collaborator Author

If I'm not mistaken, we are going to be using netty's Native class. However, it is not a public class. Do we make a fork of Netty and make it public in our version so we can use it ?

@armanbilge
Copy link
Owner

For now, we can create a "backdoor" by adding some code in the io.netty.incubator.channel.uring package that publicly exposes those methods so that we can use them.

@antoniojimeneznieto
Copy link
Collaborator Author

antoniojimeneznieto commented Jun 5, 2023

I wanted to see if I am on the right track 🤔. The next goal is to implement the UringSystem Poller, for that we will need to replace:
Ptr[io_uring] -> RingBuffer
Ptr[io_uring_sqe] -> IOUringSubmissionQueue
Ptr[Ptr[io_uring_sqe] -> List[IOUringSubmissionQueue]

@armanbilge
Copy link
Owner

That sounds right.

Ptr[Ptr[io_uring_sqe] -> List[IOUringSubmissionQueue]

Not sure about this one. Which method is this?

@antoniojimeneznieto
Copy link
Collaborator Author

antoniojimeneznieto commented Jun 5, 2023

In the processCqes:
def processCqes(_cqes: Ptr[Ptr[io_uring_cqe]])

@armanbilge
Copy link
Owner

Ah, you can access that one with the IOUringCompletionQueue.

SQE = submission queue entry (where you submit I/O work)
CQE = completion queue entry (where you collect results of completed work)

@antoniojimeneznieto
Copy link
Collaborator Author

Maybe I'm forgetting something but we don't seem to have access to FileDescriptorPoller and FileDescriptorPollHandle. What should we do 🤔 ?

@armanbilge
Copy link
Owner

Maybe I'm forgetting something but we don't seem to have access to FileDescriptorPoller and FileDescriptorPollHandle. What should we do 🤔 ?

These only make sense for Scala Native, you don't need to implement them for JVM :) you can delete it.

@antoniojimeneznieto
Copy link
Collaborator Author

Alright I see thanks :)

@antoniojimeneznieto
Copy link
Collaborator Author

I am looking for the equivalent to io_uring_prep_cancel64 for implementing the cancel method in the ApiImpl class and I am not completely sure, is it addPollRemove from IOUringSubmissionQueue 🤔 ?

@armanbilge
Copy link
Owner

No, that's something different, I wonder if Netty is missing it 🤔

But it doesn't matter, you can implement it manually using this method:

https://github.com/netty/netty-incubator-transport-io_uring/blob/c17c887b63bef590b83cc2f241a7ebbd16a9fd56/transport-classes-io_uring/src/main/java/io/netty/incubator/channel/uring/IOUringSubmissionQueue.java#L113-L114

Here is how liburing does it.
https://github.com/axboe/liburing/blob/b4ee3108b93f7e4602430246236d14978abad085/src/include/liburing.h#L632-L638

build.sbt Outdated Show resolved Hide resolved
Copy link
Owner

@armanbilge armanbilge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so getting serious about landing this ...

If you don't mind one last request: can we make an effort to deduplicate some of code and share between JVM/Native?

For implementation stuff, only if it's easy, I don't care too much (maybe for IOExceptionHelper, UringApp, implicits).

But ... I think we should try hard to share the tests, so we are running identical test suites on both platforms. We're not doing anything JVM-specific, hopefully?


import java.net.ProtocolFamily

private final class UringDatagramSocketGroup[F[_]] extends DatagramSocketGroup[F] {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can work on this in a follow-up PR :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't mind one last request: can we make an effort to deduplicate some of code and share between JVM/Native?

Great idea! I'll handle it :)

But ... I think we should try hard to share the tests, so we are running identical test suites on both platforms. We're not doing anything JVM-specific, hopefully?

I think we aren't, so it should be easy to do.

build.sbt Outdated Show resolved Hide resolved
build.sbt Outdated Show resolved Hide resolved
Comment on lines 132 to 135
_ <- F.delay {
buffer.clear()
buffer.writeBytes(bytes.toArray)
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we are re-using the same buffer for reading and writing? This is a problem if there are concurrent reads and writes, so we should use two buffers.

Also, I think we need logic to increase the buffer size for larger reads/writes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely, my bad. Right now we would have two buffers with a predetermined size, we could create the buffer in the read/write itself depending on the number of bytes we want to send. Another option could be to have a default size and if the message is bigger than that size increase the buffer ?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option could be to have a default size and if the message is bigger than that size increase the buffer

Yes, this is how it's currently working in the other implementations: we keep replacing the buffer with a bigger one as-needed.

def createBuffer[F[_]: Sync](size: Int): Resource[F, ByteBuf] =
Resource.make(
Sync[F].delay(UnpooledByteBufAllocator.DEFAULT.directBuffer(size))
)(buf => Sync[F].delay(if (buf.refCnt() > 0) { val _ = buf.release() }))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, what happens if buf.refCnt() == 0, who is responsible for cleaning it up?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be wrong but I think when the counter reaches 0 it is automatically deallocated by netty and if refCnt() returns 0 it has already been deallocated. However, I realized it should be:

buf.release(buf.refCnt()) to decrease the counter to 0 and be deallocated properly.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think that makes more sense.

Well I won't pretend to really understand this 😁 let's just make Netty go away sooner rather than later 😉

uring/jvm/src/main/scala/fs2/io/uring/unsafe/util.scala Outdated Show resolved Hide resolved
linuxSocket => closeSocket(ring, linuxSocket.fd()).to
)

// private[this] def uringOpenSocket(ring: Uring, ipv6: Boolean): Resource[F, Int] = {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the problem is that the UringSocket depends on the LinuxSocket to get the localAddress:

def localAddress: F[SocketAddress[IpAddress]] = F.delay(SocketAddress.fromInetSocketAddress(linuxSocket.getLocalAddress()))

When I implemented it I didn't find any API in netty to get the localAddress from the fd that UringOpenSocket gives us. I was planning to implement it once we remove netty but I'm going to check it, maybe this time I'll find something 🤞

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! In that case it's done in a moment. Thanks Arman 😃

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants