SPARK-13809 introduced a framework for state management for computing Streaming Aggregates. The default implementation was in-memory hashmap which was backed up in HDFS complaint file system at the end of every micro-batch.
Current implementation suffers from Performance and Latency Issues. It uses Executor JVM memory to store the states. State store size is limited by the size of the executor memory. Also Executor JVM memory is shared by state storage and other tasks operations. State storage size will impact the performance of task execution
Moreover, GC pauses, executor failures, OOM issues are common when the size of state storage increases which increases overall latency of a micro-batch
RocksDB is a storage engine with key/value interface based on levelDB. New writes are inserted into the memtable; when memtable fills up, it flushes the data on local storage. It supports both point lookups and range scans, and provides different types of ACID guarantees and is optimized for flash storage. Rocksdb based state storage for Structured streaming provides major performance improvements for stateful stream processing.
Discussion on the PR raised can be found here
The connector is available from the Maven Central repository. It can be used using the --packages option or the spark.jars.packages configuration property. Use the following connector artifact
com.qubole.spark/spark-rocksdb-state-store_2.11/1.0.0
Used following repo for the benchmark
Setup
- Used Qubole's distribution of Apache Spark 2.4.0 for my tests.
- Master Instance Type = i3.xlarge
- Driver Memory = 2g
- num-executors = 1
- max-executors = 1
- spark.sql.shuffle.partitions = 8
- Run time = 30 mins
- Source = Rate Source
- executor Memory = 7g
- spark.executor.memoryOverhead=3g
- Processing Time = 30 sec
Executor Instance type = i3.xlarge cores per executor = 4 ratePerSec = 20k
State Storage Type | Mode | Total Trigger Execution Time | Records Processed | Total State Rows | Comments |
---|---|---|---|---|---|
memory | Append | ~7 mins | 8.6 million | 2 million | Application failed before 30 mins |
RockSB | Append | ~30 minutes | 34.6 million | 7 million |
Executor Instance type = C5d.2xlarge cores per executor = 8 ratePerSec = 30k
State Storage Type | Mode | Total Trigger Execution Time | Records Processed | Total State Rows | Comments |
---|---|---|---|---|---|
memory | Append | 8 mins | 12.6 million | 3.1 million | Application was stuck because of GC |
RockSB | Complete | ~30 minutes | 47.34 million | 12.5 million |
Executor info when memory based state storage is used
Longevity run results
Executor Instance type = C5d.2xlarge cores per executor = 8 ratePerSec = 20k
State Storage Type | Mode | Total Trigger Execution Time | Records Processed | Total State Rows | Number of Micro-batch | Comments |
---|---|---|---|---|---|---|
RockSB | Append | ~1.5 hrs | 104.3 million | 10.5 million | 114 |