diff --git a/CMakeLists.txt b/CMakeLists.txt index 3843439b..c0644300 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -38,6 +38,7 @@ endif() option(USE_IMGUI_GL3 "Use OpenGL3+ imgui implementation" ON) option(WITH_UART "Compile with libserialport" ON) +option(WITH_ZMQ "Compile with cppzmq" ON) option(USE_VSDEV_CAMERA "Use VS_Dev camera preview implementation" OFF) set(SOURCES @@ -189,6 +190,11 @@ if (WITH_UART) set(SOURCES ${SOURCES} "uart.cpp") endif() +if (WITH_ZMQ) + add_definitions(-DWITH_ZMQ) + set(SOURCES ${SOURCES} "zmq_input.cpp") +endif() + if (USE_VSDEV_CAMERA) add_definitions(-DUSE_VSDEV_CAMERA) set(SOURCES ${SOURCES} "extras/VS_Dev.cpp" "widgets/cameraview_vsdev.cpp") @@ -305,6 +311,10 @@ if (WIN32) target_link_libraries(${PROJECT_NAME} ws2_32) endif() +if (WITH_ZMQ) + target_link_libraries(${PROJECT_NAME} zmq) +endif() + #target_compile_options(${PROJECT_NAME} PRIVATE -flto) #target_link_libraries(${PROJECT_NAME} -flto) diff --git a/Classes.h b/Classes.h index 8c6a684a..184ce6e2 100644 --- a/Classes.h +++ b/Classes.h @@ -44,7 +44,6 @@ class particle_manager; struct dictionary_source; class trainset_desc; class scenery_desc; - namespace scene { struct node_data; class basic_node; diff --git a/Globals.cpp b/Globals.cpp index f14cfcf9..d2fccccc 100644 --- a/Globals.cpp +++ b/Globals.cpp @@ -712,6 +712,12 @@ global_settings::ConfigParse(cParser &Parser) { Parser >> uart_conf.debug; } #endif +#ifdef WITH_ZMQ + else if( token == "zmq.address" ) { + Parser.getTokens( 1 ); + Parser >> zmq_address; + } +#endif #ifdef USE_EXTCAM_CAMERA else if( token == "extcam.cmd" ) { Parser.getTokens( 1 ); diff --git a/Globals.h b/Globals.h index f04e4f0a..cc5500a2 100644 --- a/Globals.h +++ b/Globals.h @@ -20,6 +20,9 @@ http://mozilla.org/MPL/2.0/. #ifdef WITH_UART #include "uart.h" #endif +#ifdef WITH_ZMQ +#include "zmq_input.h" +#endif struct global_settings { // members @@ -176,6 +179,9 @@ struct global_settings { int iPoKeysPWM[ 7 ] = { 0, 1, 2, 3, 4, 5, 6 }; // numery wejść dla PWM #ifdef WITH_UART uart_input::conf_t uart_conf; +#endif +#ifdef WITH_ZMQ + std::string zmq_address; #endif // multiplayer int iMultiplayer{ 0 }; // blokada działania niektórych eventów na rzecz kominikacji diff --git a/drivermode.cpp b/drivermode.cpp index 7bb299c2..90d4bdaf 100644 --- a/drivermode.cpp +++ b/drivermode.cpp @@ -51,6 +51,11 @@ driver_mode::drivermode_input::poll() { uart->poll(); } #endif +#ifdef WITH_ZMQ + if( zmq != nullptr ) { + zmq->poll(); + } +#endif /* // TBD, TODO: wrap current command in object, include other input sources? input::command = ( @@ -75,6 +80,11 @@ driver_mode::drivermode_input::init() { uart = std::make_unique(); uart->init(); } +#endif +#ifdef WITH_ZMQ + if (!Global.zmq_address.empty()) { + zmq = std::make_unique(); + } #endif if (Global.motiontelemetry_conf.enable) telemetry = std::make_unique(); diff --git a/drivermode.h b/drivermode.h index 86044961..b596bfe9 100644 --- a/drivermode.h +++ b/drivermode.h @@ -20,6 +20,9 @@ http://mozilla.org/MPL/2.0/. #ifdef WITH_UART #include "uart.h" #endif +#ifdef WITH_ZMQ +#include "zmq_input.h" +#endif class driver_mode : public application_mode { @@ -81,6 +84,9 @@ private: #endif #ifdef WITH_UART std::unique_ptr uart; +#endif +#ifdef WITH_ZMQ + std::unique_ptr zmq; #endif std::unique_ptr telemetry; diff --git a/zmq_input.cpp b/zmq_input.cpp new file mode 100644 index 00000000..505c1587 --- /dev/null +++ b/zmq_input.cpp @@ -0,0 +1,316 @@ +#include "stdafx.h" +#include "zmq_input.h" +#include "Globals.h" +#include "Logs.h" +#include "simulation.h" +#include "simulationtime.h" +#include "Train.h" + +zmq_input::zmq_input() +{ + sock.emplace(ctx, zmq::socket_type::router); + sock->bind(Global.zmq_address); + + // build helper translation tables + std::size_t commandid = 0; + for( auto const &description : simulation::Commands_descriptions ) { + nametocommandmap.emplace( + description.name, + static_cast( commandid ) ); + ++commandid; + } +} + +float zmq_input::unpack_float(const zmq::message_t &msg) { + if (msg.size() < 4) + return 0.0f; + + uint8_t *buf = (uint8_t*)msg.data(); + uint32_t v = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; + return reinterpret_cast(v); +} + +zmq::message_t zmq_input::pack_float(float f) { + uint32_t v = reinterpret_cast(f); + uint8_t buf[4]; + buf[3] = v; + buf[2] = v >> 8; + buf[1] = v >> 16; + buf[0] = v >> 24; + return zmq::message_t(buf, 4); +} + +void zmq_input::poll() +{ + zmq::multipart_t multipart; + bool ok; + while ((ok = multipart.recv(*sock, ZMQ_DONTWAIT))) { + if (multipart.size() < 3) + continue; + + if (multipart[0].size() != 5) + continue; + + uint8_t* buf = (uint8_t*)multipart[0].data(); + uint32_t peer_id = (buf[1] << 24) | (buf[2] << 16) | (buf[3] << 8) | buf[4]; + + auto peer_it = peers.find(peer_id); + if (peer_it == peers.end()) { + peer_it = peers.emplace(peer_id, peer_state()).first; + peer_it->second.update_interval = 60.0f; + peer_it->second.last_update = std::chrono::high_resolution_clock::now(); + } + + if (multipart[1] == zmq::message_t("REG_SOPI", 8)) { + if (multipart.size() < 4) + continue; + + peer_it->second.update_interval = unpack_float(multipart[2]); + peer_it->second.sopi_list.clear(); + + for (size_t i = 3; i < multipart.size(); i++) { + std::string chan_name((char*)multipart[i].data(), multipart[i].size()); + + auto chan_it = output_fields_map.find(chan_name); + if (chan_it != output_fields_map.end()) + peer_it->second.sopi_list.push_back(chan_it->second); + } + } + if (multipart[1] == zmq::message_t("REG_SIPO", 8)) { + peer_it->second.sipo_list.clear(); + + for (size_t i = 2; i < multipart.size(); i++) { + std::string chan_name((char*)multipart[i].data(), multipart[i].size()); + + std::istringstream stream(chan_name); + std::string id1, id2; + std::getline(stream, id1, '/'); + std::getline(stream, id2, '/'); + + input_type type = input_type::none; + user_command cmd1 = user_command::none, cmd2 = user_command::none; + + auto cmd1_it = nametocommandmap.find(id1); + if (cmd1_it != nametocommandmap.end()) + cmd1 = cmd1_it->second; + + if (id2 == "" || id2 == "v") + type = input_type::value; + else if (id2 == "i") + type = input_type::impulse; + else { + type = input_type::toggle; + auto cmd2_it = nametocommandmap.find(id2); + if (cmd2_it != nametocommandmap.end()) + cmd2 = cmd2_it->second; + } + + if (cmd1 == user_command::none) + type = input_type::none; + + peer_it->second.sipo_list.push_back(std::make_tuple(type, cmd1, cmd2, false)); + } + } + if (multipart[1] == zmq::message_t("SIPO_DATA", 9)) { + if (peer_it->second.sipo_list.size() != multipart.size() - 2) { + zmq::multipart_t msg; + + zmq::message_t addr; + addr.copy(multipart[0]); + + msg.add(std::move(addr)); + msg.addstr("UNCF_SIPO"); + msg.send(*sock, ZMQ_DONTWAIT); + + continue; + } + + size_t i = 1; + for (auto &entry : peer_it->second.sipo_list) { + i++; + + float value = unpack_float(multipart[i]); + input_type type = std::get<0>(entry); + + if (type == input_type::value) { + relay.post(std::get<1>(entry), value, 0, GLFW_PRESS, 0); + continue; + } + else if (type == input_type::none) + continue; + + bool state = (value > 0.5f); + bool changed = (state != std::get<3>(entry)); + + if (!changed) + continue; + + auto const action { ( + type != input_type::impulse ? + GLFW_PRESS : + ( state ? + GLFW_PRESS : + GLFW_RELEASE ) ) }; + + auto const command { ( + type != input_type::toggle ? + std::get<1>( entry ) : + ( state ? + std::get<1>( entry ) : + std::get<2>( entry ) ) ) }; + + std::get<3>(entry) = state; + + relay.post(command, 0, 0, action, 0); + } + } + } + + auto now = std::chrono::high_resolution_clock::now(); + + for (auto peer = peers.begin(); peer != peers.end();) { + if (std::chrono::duration(now - peer->second.last_update).count() < peer->second.update_interval) { + ++peer; + continue; + } + + peer->second.last_update = now; + + if (peer->second.sopi_list.empty()) { + ++peer; + continue; + } + + uint8_t peerbuf[5] = { 0, (uint8_t)(peer->first >> 24), (uint8_t)(peer->first >> 16), (uint8_t)(peer->first >> 8), (uint8_t)(peer->first) }; + zmq::multipart_t msg; + + msg.addmem(peerbuf, sizeof(peerbuf)); + msg.addstr("SOPI_DATA"); + + for (output_fields field : peer->second.sopi_list) + msg.add(pack_field(field)); + + if (!msg.send(*sock, ZMQ_DONTWAIT)) + peer = peers.erase(peer); + else + ++peer; + } +} + +std::unordered_map zmq_input::output_fields_map = { + { "shp", output_fields::shp }, + { "alerter", output_fields::alerter }, + { "radio_stop", output_fields::radio_stop }, + { "motor_resistors", output_fields::motor_resistors }, + { "line_breaker", output_fields::line_breaker }, + { "motor_overload", output_fields::motor_overload }, + { "motor_connectors", output_fields::motor_connectors }, + { "wheelslip", output_fields::wheelslip }, + { "converter_overload", output_fields::converter_overload }, + { "converter_off", output_fields::converter_off }, + { "compressor_overload", output_fields::compressor_overload }, + { "ventilator_overload", output_fields::ventilator_overload }, + { "motor_overload_threshold", output_fields::motor_overload_threshold }, + { "train_heating", output_fields::train_heating }, + { "cab", output_fields::cab }, + { "recorder_braking", output_fields::recorder_braking }, + { "recorder_power", output_fields::recorder_power }, + { "alerter_sound",output_fields:: alerter_sound }, + { "coupled_hv_voltage_relays", output_fields::coupled_hv_voltage_relays }, + { "velocity", output_fields::velocity }, + { "reservoir_pressure", output_fields::reservoir_pressure }, + { "pipe_pressure", output_fields::pipe_pressure }, + { "brake_pressure", output_fields::brake_pressure }, + { "hv_voltage", output_fields::hv_voltage }, + { "hv_current_1", output_fields::hv_current_1 }, + { "hv_current_2", output_fields::hv_current_2 }, + { "hv_current_3", output_fields::hv_current_3 }, + { "lv_voltage", output_fields::lv_voltage }, + { "distance", output_fields::distance }, + { "radio_channel", output_fields::radio_channel }, + { "springbrake_active", output_fields::springbrake_active }, + { "time_month_of_era", output_fields::time_month_of_era }, + { "time_minute_of_month", output_fields::time_minute_of_month }, + { "time_millisecond_of_day", output_fields::time_millisecond_of_day } +}; + +zmq::message_t zmq_input::pack_field(zmq_input::output_fields f) { + const SYSTEMTIME time = simulation::Time.data(); + + if (f == output_fields::time_month_of_era) + return pack_float((time.wYear - 1) * 12 + time.wMonth - 1); + if (f == output_fields::time_minute_of_month) + return pack_float((time.wDay - 1) * 1440 + time.wHour * 60 + time.wMinute); + if (f == output_fields::time_millisecond_of_day) + return pack_float(time.wSecond * 1000 + time.wMilliseconds); + + TTrain *train = simulation::Train; + if (!train) + return pack_float(0.0f); + const TTrain::state_t state = train->get_state(); + + if (f == output_fields::shp) + return pack_float(state.shp); + if (f == output_fields::alerter) + return pack_float(state.alerter); + if (f == output_fields::radio_stop) + return pack_float(state.radio_stop); + if (f == output_fields::motor_resistors) + return pack_float(state.motor_resistors); + if (f == output_fields::line_breaker) + return pack_float(state.line_breaker); + if (f == output_fields::motor_overload) + return pack_float(state.motor_overload); + if (f == output_fields::motor_connectors) + return pack_float(state.motor_connectors); + if (f == output_fields::wheelslip) + return pack_float(state.wheelslip); + if (f == output_fields::converter_overload) + return pack_float(state.converter_overload); + if (f == output_fields::converter_off) + return pack_float(state.converter_off); + if (f == output_fields::compressor_overload) + return pack_float(state.compressor_overload); + if (f == output_fields::ventilator_overload) + return pack_float(state.ventilator_overload); + if (f == output_fields::motor_overload_threshold) + return pack_float(state.motor_overload_threshold); + if (f == output_fields::train_heating) + return pack_float(state.train_heating); + if (f == output_fields::cab) + return pack_float(state.cab); + if (f == output_fields::recorder_braking) + return pack_float(state.recorder_braking); + if (f == output_fields::recorder_power) + return pack_float(state.recorder_power); + if (f == output_fields::alerter_sound) + return pack_float(state.alerter_sound); + if (f == output_fields::coupled_hv_voltage_relays) + return pack_float(state.coupled_hv_voltage_relays); + if (f == output_fields::velocity) + return pack_float(state.velocity); + if (f == output_fields::reservoir_pressure) + return pack_float(state.reservoir_pressure); + if (f == output_fields::pipe_pressure) + return pack_float(state.pipe_pressure); + if (f == output_fields::brake_pressure) + return pack_float(state.brake_pressure); + if (f == output_fields::hv_voltage) + return pack_float(state.hv_voltage); + if (f == output_fields::hv_current_1) + return pack_float(state.hv_current[0]); + if (f == output_fields::hv_current_2) + return pack_float(state.hv_current[1]); + if (f == output_fields::hv_current_3) + return pack_float(state.hv_current[2]); + if (f == output_fields::lv_voltage) + return pack_float(state.lv_voltage); + if (f == output_fields::distance) + return pack_float(state.distance); + if (f == output_fields::radio_channel) + return pack_float(state.radio_channel); + if (f == output_fields::springbrake_active) + return pack_float(state.springbrake_active); + + return pack_float(0.0f); +} diff --git a/zmq_input.h b/zmq_input.h new file mode 100644 index 00000000..d96df3b5 --- /dev/null +++ b/zmq_input.h @@ -0,0 +1,77 @@ +#pragma once + +#include +#include "command.h" + +class zmq_input +{ + enum class output_fields { + shp, + alerter, + radio_stop, + motor_resistors, + line_breaker, + motor_overload, + motor_connectors, + wheelslip, + converter_overload, + converter_off, + compressor_overload, + ventilator_overload, + motor_overload_threshold, + train_heating, + cab, + recorder_braking, + recorder_power, + alerter_sound, + coupled_hv_voltage_relays, + velocity, + reservoir_pressure, + pipe_pressure, + brake_pressure, + hv_voltage, + hv_current_1, + hv_current_2, + hv_current_3, + lv_voltage, + distance, + radio_channel, + springbrake_active, + time_month_of_era, + time_minute_of_month, + time_millisecond_of_day + }; + + zmq::context_t ctx; + std::optional sock; + + enum class input_type + { + none, + toggle, // two commands, each mapped to one state; press event on state change + impulse, // one command; press event when set, release when cleared + value // one command; press event, value of specified byte passed as param1 + }; + + struct peer_state { + float update_interval; + std::vector sopi_list; + std::vector> sipo_list;; + std::chrono::time_point last_update; + }; + + std::map peers; + + float unpack_float(const zmq::message_t &); + zmq::message_t pack_float(float f); + zmq::message_t pack_field(output_fields f); + + std::unordered_map nametocommandmap; + + static std::unordered_map output_fields_map; + command_relay relay; + +public: + zmq_input(); + void poll(); +};