Skip to content

monix/monix-nio

monix-nio

Java NIO utilities for usage with Monix

Build Status Coverage Status

Join chat: Gitter

Overview

Monix-nio can be used to have the power of Monix combined with underlying Java-nio libraries. For the moment the following support has been added:

  • Read/Write async to a file (combined with utf8 encoding/decoding if necessary)
  • Read/Write async to TCP
  • File system watcher

Benchmarks

Benchmark                                          Mode  Cnt     Score    Error  Units
ReadWriteFileBenchmark.read50MiB                     ss   40   191.350 ± 27.611  ms/op
ReadWriteFileBenchmark.read50MiBJavaNio              ss   40    59.288 ±  5.068  ms/op
ReadWriteFileBenchmark.read50MiBWith100KiBChunks     ss   40    44.501 ±  3.135  ms/op
ReadWriteFileBenchmark.read50MiBWith1KiBChunks       ss   40  1066.369 ± 43.585  ms/op
ReadWriteFileBenchmark.read50MiBWith1MiBChunks       ss   40    48.777 ±  3.725  ms/op
ReadWriteFileBenchmark.write50MiB                    ss   40   259.731 ± 10.388  ms/op
ReadWriteFileBenchmark.write50MiBJavaNio             ss   40   304.585 ± 15.283  ms/op
ReadWriteFileBenchmark.write50MiBWith100KiBChunks    ss   40   258.719 ± 20.431  ms/op
ReadWriteFileBenchmark.write50MiBWith1KiBChunks      ss   40  2414.812 ± 23.128  ms/op
ReadWriteFileBenchmark.write50MiBWith1MiBChunks      ss   40   255.028 ±  9.261  ms/op
  • lower is better

Usage

WARNING: Can break backwards compatibility!

Adding dependency to SBT

libraryDependencies  = "io.monix" %% "monix-nio" % "0.1.0"

Read from a text file

import monix.nio.text.UTF8Codec._
import monix.nio.file._
  
implicit val ctx = monix.execution.Scheduler.Implicits.global
val from = java.nio.file.Paths.get("/myFile.txt")
  
readAsync(from, 30)
  .pipeThrough(utf8Decode) // decode utf8, If you need Array[Byte] just skip the decoding
  .foreach(Console.print)  // print each char

Write to a file

import monix.reactive.Observable
import monix.nio.file._
  
implicit val ctx = monix.execution.Scheduler.Implicits.global
val to = java.nio.file.Paths.get("/out.txt")
val bytes = "Hello world!".getBytes.grouped(3)
  
Observable
  .fromIterator(bytes)
  .consumeWith(writeAsync(to))
  .runAsync

Copy a file (text with decode and encode utf8)

import monix.eval.Callback
import monix.nio.text.UTF8Codec._
import monix.nio.file._
  
val from = java.nio.file.Paths.get("from.txt")
val to = java.nio.file.Paths.get("to.txt")
  
val consumer = writeAsync(to)
  
val callback = new Callback[Long] {
  override def onSuccess(value: Long): Unit = println(s"Copied $value bytes.")
  override def onError(ex: Throwable): Unit = println(ex)
}
readAsync(from, 3)
  .pipeThrough(utf8Decode)
  .map { str =>
    Console.println(str) // do something with it
    str
  }
  .pipeThrough(utf8Encode)
  .consumeWith(consumer)
  .runAsync(callback)

File system watcher

import java.nio.file.{ Paths, WatchEvent }
import monix.nio.file._
  
implicit val ctx = monix.execution.Scheduler.Implicits.global
  
val path = Paths.get("/tmp")
  
def printEvent(event: WatchEvent[_]): Unit = {
  val name = event.context().toString
  val fullPath = path.resolve(name)
  println(s"${event.kind().name()} - $fullPath")
}
  
watchAsync(path)
  .foreach(p => p.foreach(printEvent))

Read from TCP

$ echo 'monix-tcp' | nc -l -k 9500
import monix.reactive.Consumer
import monix.nio.tcp._
  
implicit val ctx = monix.execution.Scheduler.Implicits.global
  
val callback = new monix.eval.Callback[Unit] {
  override def onSuccess(value: Unit): Unit = println("Completed")
  override def onError(ex: Throwable): Unit = println(ex)
}
readAsync("localhost", 9500)
  .consumeWith(Consumer.foreach(c => Console.out.print(new String(c))))
  .runAsync(callback)

Write to TCP

$ nc -l -k 9500
import monix.reactive.Observable
import monix.nio.tcp._
  
implicit val ctx = monix.execution.Scheduler.Implicits.global
  
val tcpConsumer = writeAsync("localhost", 9500)
val chunkSize = 2
  
val callback = new monix.eval.Callback[Long] {
  override def onSuccess(value: Long): Unit = println(s"Sent $value bytes.")
  override def onError(ex: Throwable): Unit = println(ex)
}
Observable
  .fromIterator("Hello world!".getBytes.grouped(chunkSize))
  .consumeWith(tcpConsumer)
  .runAsync(callback)

Create a TCP server and/or client (TCP server-client echo example)

import monix.reactive.Observable
import monix.eval. { Callback, Task }
import monix.execution.Ack.Continue
import monix.nio.tcp._
  
implicit val ctx = monix.execution.Scheduler.Implicits.global
  
val serverProgramT = for {
  server <- asyncServer(java.net.InetAddress.getByName(null).getHostName, 9001)
  socket <- server.accept()
  
  conn <- Task.now(readWriteAsync(socket))
  reader <- conn.tcpObservable
  writer <- conn.tcpConsumer
  
  echoedLen <- reader.doOnTerminateEval(_ => conn.stopWriting()).consumeWith(writer)
  _ <- conn.close()
  _ <- server.close()
} yield {
  echoedLen
}
  
val client = readWriteAsync("localhost", 9001, 256 * 1024)
val clientProgramT = for {
  writer <- client.tcpConsumer
  _ <- Observable.fromIterable(Array("Hello world!".getBytes())).consumeWith(writer)
  _ <- client.stopWriting()
  reader <- client.tcpObservable
  _ <- Task.now(reader
    .doOnTerminateEval(_ => client.close())
    .subscribe(
      bytes => { 
        println(new String(bytes))
        Continue 
      },
      err => println(err),
      () => println("Echo received.")
    ))
} yield {}
  
serverProgramT.runAsync(new Callback[Long] {
  override def onSuccess(value: Long): Unit = println(s"Echoed $value bytes.")
  override def onError(ex: Throwable): Unit = println(ex)
})
clientProgramT.runAsync

Make a raw HTTP request

import monix.reactive.Observable
import monix.eval.Callback
import monix.nio.tcp._
  
implicit val ctx = monix.execution.Scheduler.Implicits.global
  
val asyncTcpClient = readWriteAsync("httpbin.org", 80, 256 * 1024)
val request = 
  "GET /get?tcp=monix HTTP/1.1\r\nHost: httpbin.org\r\nConnection: keep-alive\r\n\r\n"
   
val callbackR = new Callback[Unit] {
  override def onSuccess(value: Unit): Unit = println("OK")
  override def onError(ex: Throwable): Unit = println(ex)
}
asyncTcpClient
  .tcpObservable
  .map { reader =>
    reader
    .doOnTerminateEval(_ => asyncTcpClient.close()) // clean
    .subscribe(
      (bytes: Array[Byte]) => {
        println(new String(bytes, "UTF-8"))
        monix.execution.Ack.Stop 
      },
      err => println(err),
      () => println("Completed")
    )
    ()
  }
  .runAsync(callbackR)
  
val callbackW = new Callback[Long] {
  override def onSuccess(value: Long): Unit = println(s"Sent $value bytes")
  override def onError(ex: Throwable): Unit = println(ex)
}   
asyncTcpClient
  .tcpConsumer
  .flatMap { writer =>
    val data = request.getBytes("UTF-8").grouped(256 * 1024).toArray
    Observable
      .fromIterable(data)
      .consumeWith(writer)
  }
  .runAsync(callbackW)

Maintainers

The current maintainers (people who can help you) are:

Contributing

The Monix project welcomes contributions from anybody wishing to participate. All code or documentation that is provided must be licensed with the same license that Monix is licensed with (Apache 2.0, see LICENSE.txt).

People are expected to follow the Typelevel Code of Conduct when discussing Monix on the Github page, Gitter channel, or other venues.

Feel free to open an issue if you notice a bug, have an idea for a feature, or have a question about the code. Pull requests are also gladly accepted. For more information, check out the contributor guide.

License

All code in this repository is licensed under the Apache License, Version 2.0. See LICENCE.txt.