Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmake/Dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ FetchContent_MakeAvailable(googletest)
FetchContent_Declare(
liblsl
GIT_REPOSITORY https://github.com/sccn/liblsl.git
GIT_TAG v1.16.2
GIT_TAG v1.17.7
)
set(LSL_BUILD_EXAMPLES OFF CACHE BOOL "" FORCE)
FetchContent_MakeAvailable(liblsl)
Expand Down
2 changes: 1 addition & 1 deletion include/data_structures/Context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <vector>

struct Context {
double timestamp;
double timestamp = 0.0;
std::vector<std::string>& markers;
};

Expand Down
4 changes: 2 additions & 2 deletions include/data_structures/EEGData.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
#include <vector>

struct EEGData {
double timestamp;
std::vector<double> channelValues;
double timestamp = 0.0;
std::vector<double> channels;
};

#endif // EEGDATA_HPP
4 changes: 2 additions & 2 deletions include/data_structures/Marker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
#include <string>

struct Marker {
std::string name;
double timestamp;
std::string eventName;
double timestamp = 0.0;
};

#endif // MARKER_HPP
24 changes: 24 additions & 0 deletions include/datawriter/CSVFormatStrategy.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#ifndef CSVFORMATSTRATEGY_HPP
#define CSVFORMATSTRATEGY_HPP

#include <datawriter/IDataFormatStrategy.hpp>
#include <fstream>
#include <string>

class CSVFormatStrategy : public IDataFormatStrategy {
public:
CSVFormatStrategy() = default;
~CSVFormatStrategy() override;

void open(const std::string& filepath) override;
void close() override;

void writeHeader() override;
void writeEEGData(const EEGData& data) override;
void writeMarker(const Marker& marker) override;

private:
std::ofstream outputFile;
};

#endif // CSVFORMATSTRATEGY_HPP
39 changes: 39 additions & 0 deletions include/datawriter/DataWriter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#ifndef DATAWRITER_HPP
#define DATAWRITER_HPP

#include <concurrentqueue.h>

#include <datawriter/IDataFormatStrategy.hpp>

class EEGData;
class Marker;

#include <memory>
#include <string>
#include <thread>

class DataWriter {
public:
explicit DataWriter(std::unique_ptr<IDataFormatStrategy> strategy);
~DataWriter();

DataWriter(const DataWriter&) = delete;
DataWriter& operator=(const DataWriter&) = delete;
DataWriter(DataWriter&&) = delete;
DataWriter& operator=(DataWriter&&) = delete;

void start(const std::string& filePath,
std::shared_ptr<moodycamel::ConcurrentQueue<EEGData>> eegQueue,
std::shared_ptr<moodycamel::ConcurrentQueue<Marker>> markerQueue);
void stop();

private:
void writeLoop(const std::stop_token& stopToken);

std::unique_ptr<IDataFormatStrategy> formatStrategy;
std::shared_ptr<moodycamel::ConcurrentQueue<EEGData>> eegQueue;
std::shared_ptr<moodycamel::ConcurrentQueue<Marker>> markerQueue;
std::jthread writerThread;
};

#endif // DATAWRITER_HPP
27 changes: 27 additions & 0 deletions include/datawriter/IDataFormatStrategy.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#ifndef IDATAFORMATSTRATEGY_HPP
#define IDATAFORMATSTRATEGY_HPP

#include <string>

class EEGData;
struct Marker;

class IDataFormatStrategy {
public:
IDataFormatStrategy() = default;
virtual ~IDataFormatStrategy() = default;

IDataFormatStrategy(const IDataFormatStrategy&) = delete;
IDataFormatStrategy& operator=(const IDataFormatStrategy&) = delete;
IDataFormatStrategy(IDataFormatStrategy&&) = delete;
IDataFormatStrategy& operator=(IDataFormatStrategy&&) = delete;

virtual void open(const std::string& filepath) = 0;
virtual void close() = 0;

virtual void writeHeader() = 0;
virtual void writeEEGData(const EEGData& data) = 0;
virtual void writeMarker(const Marker& marker) = 0;
};

#endif // IDATAFORMATSTRATEGY_HPP
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS ${CMAKE_CURRENT_SOURCE_DIR}/../proto
add_library(runtime_core OBJECT
Runtime.cpp
parser/Parser.cpp
datawriter/CSVFormatStrategy.cpp
datawriter/DataWriter.cpp
scene/components/ComponentRegistry.cpp
scene/components/BlinkComponent.cpp
scene/SceneObject.cpp
Expand Down
37 changes: 37 additions & 0 deletions src/datawriter/CSVFormatStrategy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#include <data_structures/EEGData.hpp>
#include <data_structures/Marker.hpp>
#include <datawriter/CSVFormatStrategy.hpp>
#include <stdexcept>

CSVFormatStrategy::~CSVFormatStrategy() { close(); }

void CSVFormatStrategy::open(const std::string& filepath) {
outputFile.open(filepath, std::ios::out | std::ios::trunc);
if (!outputFile.is_open()) {
throw std::runtime_error("Failed to open CSV output file: " + filepath);
}
}

void CSVFormatStrategy::close() {
if (outputFile.is_open()) {
outputFile.flush();
outputFile.close();
}
}

void CSVFormatStrategy::writeHeader() { outputFile << "type,timestamp,payload\n"; }

void CSVFormatStrategy::writeEEGData(const EEGData& data) {
outputFile << "eeg," << data.timestamp << ",\"";
for (std::size_t index = 0; index < data.channels.size(); ++index) {
if (index > 0) {
outputFile << ',';
}
outputFile << data.channels[index];
}
outputFile << '"' << '\n';
}

void CSVFormatStrategy::writeMarker(const Marker& marker) {
outputFile << "marker," << marker.timestamp << ",\"" << marker.eventName << '"' << '\n';
}
79 changes: 79 additions & 0 deletions src/datawriter/DataWriter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#include <chrono>
#include <data_structures/EEGData.hpp>
#include <data_structures/Marker.hpp>
#include <datawriter/DataWriter.hpp>
#include <stdexcept>
#include <thread>
#include <utility>

constexpr auto kWriteLoopSleep = std::chrono::milliseconds(10);

template <typename QueueT, typename ItemT, typename WriteFn>
bool drainQueue(const std::shared_ptr<QueueT>& queue, WriteFn&& writeFn) {
bool wroteData = false;

if (queue) {
ItemT item;
while (queue->try_dequeue(item)) {
writeFn(item);
wroteData = true;
}
}

return wroteData;
}

DataWriter::DataWriter(std::unique_ptr<IDataFormatStrategy> strategy)
: formatStrategy(std::move(strategy)) {}

DataWriter::~DataWriter() { stop(); }

void DataWriter::start(const std::string& filePath,
std::shared_ptr<moodycamel::ConcurrentQueue<EEGData>> eegQueue,
std::shared_ptr<moodycamel::ConcurrentQueue<Marker>> markerQueue) {
stop();

if (!formatStrategy) {
throw std::runtime_error("No data format strategy set for DataWriter.");
}

this->eegQueue = std::move(eegQueue);
this->markerQueue = std::move(markerQueue);

formatStrategy->open(filePath);
formatStrategy->writeHeader();
writerThread = std::jthread([this](const std::stop_token& stopToken) { writeLoop(stopToken); });
}

void DataWriter::stop() {
if (writerThread.joinable()) {
writerThread.request_stop();
}

if (writerThread.joinable()) {
writerThread.join();
}

if (formatStrategy) {
formatStrategy->close();
}
}

void DataWriter::writeLoop(const std::stop_token& stopToken) {
while (!stopToken.stop_requested()) {
bool wroteData = drainQueue<moodycamel::ConcurrentQueue<EEGData>, EEGData>(
eegQueue, [this](const EEGData& eegData) { formatStrategy->writeEEGData(eegData); });

wroteData |= drainQueue<moodycamel::ConcurrentQueue<Marker>, Marker>(
markerQueue, [this](const Marker& marker) { formatStrategy->writeMarker(marker); });

if (!wroteData) {
std::this_thread::sleep_for(kWriteLoopSleep);
}
}

drainQueue<moodycamel::ConcurrentQueue<EEGData>, EEGData>(
eegQueue, [this](const EEGData& eegData) { formatStrategy->writeEEGData(eegData); });
drainQueue<moodycamel::ConcurrentQueue<Marker>, Marker>(
markerQueue, [this](const Marker& marker) { formatStrategy->writeMarker(marker); });
}
2 changes: 1 addition & 1 deletion tests/unit_tests/RendererTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ TEST(RendererTest, RenderLoop_QueuesMarkersFromComponents) {
bool dequeued = markerQueue->try_dequeue(m);
EXPECT_TRUE(dequeued);
if (dequeued) {
EXPECT_EQ(m.name, "test_marker");
EXPECT_EQ(m.eventName, "test_marker");
EXPECT_FALSE(markerQueue->try_dequeue(m));
}

Expand Down
86 changes: 86 additions & 0 deletions tests/unit_tests/data_writer_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#include <gtest/gtest.h>

#include <chrono>
#include <data_structures/EEGData.hpp>
#include <data_structures/Marker.hpp>
#include <datawriter/CSVFormatStrategy.hpp>
#include <datawriter/DataWriter.hpp>
#include <filesystem>
#include <fstream>
#include <memory>
#include <string>
#include <thread>
#include <vector>

namespace {
namespace fs = std::filesystem;

constexpr float kChannelOne = 1.25F;
constexpr float kChannelTwo = 2.5F;
constexpr float kChannelThree = 3.75F;
constexpr double kEegTimestamp = 12.5;
constexpr double kMarkerTimestamp = 13.25;
constexpr auto kWriteWait = std::chrono::milliseconds(50);

fs::path makeTempFilePath(const std::string& nameStem) {
const auto uniqueSuffix =
std::to_string(std::chrono::steady_clock::now().time_since_epoch().count());
return fs::temp_directory_path() / (nameStem + "_" + uniqueSuffix + ".csv");
}

std::vector<std::string> readAllLines(const fs::path& filePath) {
std::ifstream input(filePath);
std::vector<std::string> lines;
std::string line;

while (std::getline(input, line)) {
lines.push_back(line);
}

return lines;
}
} // namespace

TEST(DataWriterTest, WritesCsvHeaderOnStart) {
const auto filePath = makeTempFilePath("datawriter_header");

auto eegQueue = std::make_shared<moodycamel::ConcurrentQueue<EEGData>>();
auto markerQueue = std::make_shared<moodycamel::ConcurrentQueue<Marker>>();

{
DataWriter writer(std::make_unique<CSVFormatStrategy>());
writer.start(filePath.string(), eegQueue, markerQueue);
writer.stop();
}

const auto lines = readAllLines(filePath);
ASSERT_FALSE(lines.empty());
EXPECT_EQ(lines.front(), "type,timestamp,payload");

fs::remove(filePath);
}

TEST(DataWriterTest, FlushesEegAndMarkerRecords) {
const auto filePath = makeTempFilePath("datawriter_records");

auto eegQueue = std::make_shared<moodycamel::ConcurrentQueue<EEGData>>();
auto markerQueue = std::make_shared<moodycamel::ConcurrentQueue<Marker>>();

eegQueue->enqueue(EEGData{kEegTimestamp, {kChannelOne, kChannelTwo, kChannelThree}});
markerQueue->enqueue(Marker{"stimulus_on", kMarkerTimestamp});

{
DataWriter writer(std::make_unique<CSVFormatStrategy>());
writer.start(filePath.string(), eegQueue, markerQueue);
std::this_thread::sleep_for(kWriteWait);
writer.stop();
}

const auto lines = readAllLines(filePath);
ASSERT_GE(lines.size(), 3U);
EXPECT_EQ(lines[0], "type,timestamp,payload");
EXPECT_EQ(lines[1], "eeg,12.5,\"1.25,2.5,3.75\"");
EXPECT_EQ(lines[2], "marker,13.25,\"stimulus_on\"");

fs::remove(filePath);
}
Loading