Skip to content

Commit

Permalink
Introduced RingBuffer.
Browse files Browse the repository at this point in the history
  • Loading branch information
shajen committed Jan 6, 2023
1 parent a6f7b1a commit d1e4da8
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 29 deletions.
66 changes: 37 additions & 29 deletions sources/radio/hackrf_sdr_device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 9,33 @@
#include <thread>

constexpr uint32_t HACKRF_MIN_SAMPLES_READ_COUNT = 262144;
constexpr uint32_t HACKRF_BUFFER_SIZE = 40 * 1024 * 1024; // 40MB

struct CallbackData {
CallbackData(uint32_t totalSamples, PerformanceLogger &performanceLogger) : buffer(totalSamples), samplesReceived(0), performanceLogger(performanceLogger) {}
std::mutex mutex;
CallbackData(uint32_t windowSize, RingBuffer &buffer, PerformanceLogger &performanceLogger)
: windowSize(windowSize), readSize(0), buffer(buffer), performanceLogger(performanceLogger), initialized(false) {}
uint32_t windowSize;
uint32_t readSize;
std::condition_variable cv;
std::vector<uint8_t> buffer;
uint32_t samplesReceived;
RingBuffer &buffer;
PerformanceLogger &performanceLogger;
bool initialized;
};

int HackRfCallbackStream(hackrf_transfer *transfer) {
Logger::debug("HackRf", "read bytes: {}", transfer->valid_length);
CallbackData *callbackData = reinterpret_cast<CallbackData *>(transfer->rx_ctx);

std::unique_lock<std::mutex> lock(callbackData->mutex);
memcpy(callbackData->buffer.data() callbackData->samplesReceived, transfer->buffer, transfer->valid_length);
callbackData->samplesReceived = transfer->valid_length;
if (callbackData->buffer.size() <= callbackData->samplesReceived) {
Logger::debug("HackRf", "total read bytes: {}", callbackData->samplesReceived);
callbackData->performanceLogger.newSample();
callbackData->samplesReceived = 0;
callbackData->cv.notify_all();
Logger::trace("HackRf", "read samples: {}", transfer->valid_length);
CallbackData *data = reinterpret_cast<CallbackData *>(transfer->rx_ctx);
if (!data->initialized) {
data->initialized = true;
setThreadParams("hackrf_reader", PRIORITY::MEDIUM);
}
data->buffer.push(transfer->buffer, transfer->valid_length);
data->readSize = transfer->valid_length;
if (data->windowSize <= data->readSize && data->windowSize <= data->buffer.availableDataSize()) {
Logger::trace("HackRf", "read samples: window completed");
data->performanceLogger.newSample();
data->readSize -= data->windowSize;
data->cv.notify_one();
}
return 0;
}
Expand All @@ -57,7 62,7 @@ HackRfInitializer::~HackRfInitializer() {
}
}

HackrfSdrDevice::HackrfSdrDevice(const Config &config, const std::string &serial) : SdrDevice("HackRF"), m_config(config), m_serial(serial) {
HackrfSdrDevice::HackrfSdrDevice(const Config &config, const std::string &serial) : SdrDevice("HackRF"), m_config(config), m_serial(serial), m_buffer(HACKRF_BUFFER_SIZE) {
Logger::info("HackRf", "open device, serial: {}", m_serial);
if (hackrf_open_by_serial(m_serial.c_str(), &m_device) != HACKRF_SUCCESS) {
throw std::runtime_error("can not open hackrf device");
Expand Down Expand Up @@ -104,19 109,19 @@ void HackrfSdrDevice::startStream(const FrequencyRange &frequencyRange, Callback
setup(frequencyRange);
const auto samples = std::max(getSamplesCount(frequencyRange.sampleRate, m_config.frequencyRangeScanningTime()), HACKRF_MIN_SAMPLES_READ_COUNT);
Logger::info("HackRf", "start stream, samples: {}", samples);
CallbackData callbackData(samples, m_performanceLogger);
if (hackrf_start_rx(m_device, HackRfCallbackStream, &callbackData) != HACKRF_SUCCESS) {
CallbackData data(samples, m_buffer, m_performanceLogger);
if (hackrf_start_rx(m_device, HackRfCallbackStream, &data) != HACKRF_SUCCESS) {
throw std::runtime_error("can not start stream");
}

while (true) {
std::unique_lock lock(callbackData.mutex);
callbackData.cv.wait(lock);
if (!callback(std::move(callbackData.buffer))) {
break;
std::unique_lock lock(m_mutex);
data.cv.wait(lock);
if (samples <= data.buffer.availableDataSize()) {
if (!callback(data.buffer.pop(samples))) {
break;
}
}
callbackData.buffer = {};
callbackData.buffer.resize(samples);
}
Logger::info("HackRf", "stop stream");
if (hackrf_stop_rx(m_device) != HACKRF_SUCCESS) {
Expand All @@ -132,18 137,20 @@ std::vector<uint8_t> HackrfSdrDevice::readData(const FrequencyRange &frequencyRa
setup(frequencyRange);
const auto samples = std::max(getSamplesCount(frequencyRange.sampleRate, m_config.frequencyRangeScanningTime()), HACKRF_MIN_SAMPLES_READ_COUNT);
Logger::debug("HackRf", "start read data, samples: {}", samples);
CallbackData callbackData(samples, m_performanceLogger);
if (hackrf_start_rx(m_device, HackRfCallbackStream, &callbackData) != HACKRF_SUCCESS) {
CallbackData data(samples, m_buffer, m_performanceLogger);
if (hackrf_start_rx(m_device, HackRfCallbackStream, &data) != HACKRF_SUCCESS) {
throw std::runtime_error("can not start read data");
}

std::unique_lock lock(callbackData.mutex);
callbackData.cv.wait(lock);
while (data.buffer.availableDataSize() < samples) {
std::unique_lock lock(m_mutex);
data.cv.wait(lock);
}
Logger::debug("HackRf", "stop read data");
if (hackrf_stop_rx(m_device) != HACKRF_SUCCESS) {
throw std::runtime_error("can not stop read data");
}
return callbackData.buffer;
return data.buffer.pop(samples);
}

void HackrfSdrDevice::setup(const FrequencyRange &frequencyRange) {
Expand All @@ -161,4 168,5 @@ void HackrfSdrDevice::setup(const FrequencyRange &frequencyRange) {
}
m_frequency = frequencyRange.center();
}
m_buffer.clear();
}
3 changes: 3 additions & 0 deletions sources/radio/hackrf_sdr_device.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 3,7 @@
#include <config.h>
#include <libhackrf/hackrf.h>
#include <radio/sdr_device.h>
#include <ring_buffer.h>

class HackRfInitializer {
public:
Expand Down Expand Up @@ -30,4 31,6 @@ class HackrfSdrDevice : public SdrDevice {
hackrf_device* m_device;
Frequency m_frequency;
Frequency m_sampleRate;
RingBuffer m_buffer;
std::mutex m_mutex;
};
70 changes: 70 additions & 0 deletions sources/ring_buffer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 1,70 @@
#include "ring_buffer.h"

#include <logger.h>

#include <cstring>

constexpr auto PRINT_DEBUG_INTERVAL = 100;

RingBuffer::RingBuffer(uint32_t bufferSize) : m_bufferSize(bufferSize), m_writePosition(0), m_readPosition(0), m_pushDataSize(0), m_popDataSize(0), m_buffer(bufferSize) {
Logger::info("RingBuffer", "init, buffer size: {}", m_bufferSize);
}

void RingBuffer::clear() {
m_writePosition = 0;
m_readPosition = 0;
}

uint32_t RingBuffer::availableDataSize() const {
if (m_readPosition <= m_writePosition) {
return m_writePosition - m_readPosition;
} else {
return m_bufferSize - (m_readPosition - m_writePosition);
}
}

uint32_t RingBuffer::availableSpaceSize() const { return m_bufferSize - availableDataSize(); }

void RingBuffer::push(uint8_t *data, uint32_t size) {
m_pushDataSize = size;

bool overflow = false;
if (availableSpaceSize() < size) {
Logger::warn("RingBuffer", "overflow");
overflow = true;
}
if (m_writePosition size < m_bufferSize) {
memcpy(m_buffer.data() m_writePosition, data, size);
m_writePosition = size;
} else {
const auto endSize = m_bufferSize - m_writePosition;
memcpy(m_buffer.data() m_writePosition, data, endSize);
memcpy(m_buffer.data(), data endSize, size - endSize);
m_writePosition = (m_writePosition size) % m_bufferSize;
}
if (overflow) {
m_readPosition = m_writePosition.load();
}
}

std::vector<uint8_t> RingBuffer::pop(uint32_t size) {
m_popDataSize = size;
m_popCount ;

if (m_popCount % PRINT_DEBUG_INTERVAL == 0) {
const auto ratio = static_cast<float>(m_popDataSize) / static_cast<float>(m_pushDataSize);
Logger::info("RingBuffer", "pop/push: {}/{} ({:.2f})", m_popDataSize, m_pushDataSize, ratio);
}

std::vector<uint8_t> data(size);
if (m_readPosition size < m_bufferSize) {
memcpy(data.data(), m_buffer.data() m_readPosition, size);
m_readPosition = size;
} else {
const auto endSize = m_bufferSize - m_readPosition;
memcpy(data.data(), m_buffer.data() m_readPosition, endSize);
memcpy(data.data() endSize, m_buffer.data(), size - endSize);
m_readPosition = (m_readPosition size) % m_bufferSize;
}
return data;
}
25 changes: 25 additions & 0 deletions sources/ring_buffer.h
Original file line number Diff line number Diff line change
@@ -0,0 1,25 @@
#pragma once

#include <atomic>
#include <cstdint>
#include <vector>

class RingBuffer {
public:
RingBuffer(uint32_t bufferSize);

void clear();
uint32_t availableDataSize() const;
uint32_t availableSpaceSize() const;
void push(uint8_t* data, uint32_t size);
std::vector<uint8_t> pop(uint32_t size);

private:
const uint32_t m_bufferSize;
std::atomic_uint32_t m_writePosition;
std::atomic_uint32_t m_readPosition;
uint64_t m_pushDataSize;
uint64_t m_popDataSize;
std::vector<uint8_t> m_buffer;
uint32_t m_popCount;
};
65 changes: 65 additions & 0 deletions tests/test_ring_buffer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 1,65 @@
#include <gtest/gtest.h>
#include <ring_buffer.h>

TEST(RingBufferTest, Empty) {
RingBuffer buffer(100);
std::vector<uint8_t> tmp(20);

EXPECT_EQ(buffer.availableDataSize(), 0);
EXPECT_EQ(buffer.availableSpaceSize(), 100);

buffer.push(tmp.data(), tmp.size());
EXPECT_EQ(buffer.availableDataSize(), 20);
EXPECT_EQ(buffer.availableSpaceSize(), 80);

buffer.clear();
EXPECT_EQ(buffer.availableDataSize(), 0);
EXPECT_EQ(buffer.availableSpaceSize(), 100);
}

TEST(RingBufferTest, Overflow) {
constexpr auto SIZE = 31;
constexpr auto ITERATION = 83;

RingBuffer buffer(ITERATION * SIZE - 1);
std::vector<uint8_t> tmp(SIZE);
for (int i = 1; i < ITERATION; i) {
buffer.push(tmp.data(), tmp.size());
EXPECT_EQ(buffer.availableDataSize(), i * SIZE);
}
for (int i = 0; i < ITERATION / 2; i) {
buffer.pop(SIZE);
buffer.push(tmp.data(), tmp.size());
}
EXPECT_EQ(buffer.availableSpaceSize(), SIZE - 1);
buffer.push(tmp.data(), tmp.size());
EXPECT_EQ(buffer.availableDataSize(), 0);
}

TEST(RingBufferTest, Round) {
std::vector<uint8_t> push;
std::vector<uint8_t> poped;

constexpr auto PUSH_SIZE = 1229;
constexpr auto POP_SIZE = 1231;
constexpr auto BUFFER_SIZE = 1277 * 10;

for (int i = 0; i < 100 * PUSH_SIZE * POP_SIZE; i) {
push.push_back(rand());
}

RingBuffer buffer(BUFFER_SIZE);
uint32_t pushed = 0;
while (poped.size() < push.size()) {
while (buffer.availableDataSize() < POP_SIZE && pushed < push.size()) {
buffer.push(push.data() pushed, PUSH_SIZE);
pushed = PUSH_SIZE;
}
while (POP_SIZE <= buffer.availableDataSize()) {
const auto tmp = buffer.pop(POP_SIZE);
std::copy(tmp.begin(), tmp.end(), std::back_inserter(poped));
}
}

EXPECT_EQ(push, poped);
}

0 comments on commit d1e4da8

Please sign in to comment.