RocketLogger 2.1.0
rl_socket.c
Go to the documentation of this file.
1
32#include <errno.h>
33#include <stdint.h>
34#include <stdlib.h>
35#include <string.h>
36
37#include <zmq.h>
38
39#include "log.h"
40#include "rl.h"
41#include "rl_file.h"
42#include "sensor/sensor.h"
43#include "util.h"
44
45#include "rl_socket.h"
46
47#define RL_SOCKET_METADATA_SIZE 1000
48
52void *zmq_data_context = NULL;
54void *zmq_data_socket = NULL;
55
56int rl_socket_init(void) {
57 // open and bind zmq data socket
58 zmq_data_context = zmq_ctx_new();
59 zmq_data_socket = zmq_socket(zmq_data_context, ZMQ_PUB);
60 int zmq_res = zmq_bind(zmq_data_socket, RL_ZMQ_DATA_SOCKET);
61 if (zmq_res < 0) {
63 "failed binding zeromq data socket; %d message: %s", errno,
64 strerror(errno));
65 return ERROR;
66 }
67
68 return SUCCESS;
69}
70
72 // close and destroy zmq data socket
73 zmq_close(zmq_data_socket);
74 zmq_ctx_destroy(zmq_data_context);
75
76 zmq_data_socket = NULL;
77 zmq_data_context = NULL;
78
79 return SUCCESS;
80}
81
82int rl_socket_metadata(rl_config_t const *const config) {
83 // data rate and channel info init
85 "{\"data_rate\":%d,\"channels\":[", config->sample_rate);
86
87 // analog channel metadata
88 for (int ch = 0; ch < RL_CHANNEL_COUNT; ch++) {
89 if (!config->channel_enable[ch]) {
90 continue;
91 }
92
95 if (is_current(ch)) {
96 if (is_low_current(ch)) {
98 "\"unit\":\"A\",\"scale\":1e-11},");
99 } else {
101 "\"unit\":\"A\",\"scale\":1e-9},");
102 }
103 } else if (is_voltage(ch)) {
105 "\"unit\":\"V\",\"scale\":1e-8},");
106 } else {
108 "\"unit\":null},");
109 }
110 }
111
112 // ambient channel metadata
113 if (config->ambient_enable) {
114 for (int ch = 0; ch < SENSOR_REGISTRY_SIZE; ch++) {
115 if (rl_status.sensor_available[ch]) {
117 "{\"name\":\"%s\",\"unit\":\"%s\",\"scale\":1e%d},",
118 SENSOR_REGISTRY[ch].name,
120 SENSOR_REGISTRY[ch].scale);
121 }
122 }
123 }
124
125 // digital channel metadata
126 if (config->digital_enable) {
127 for (int i = 0; i < RL_CHANNEL_DIGITAL_COUNT; i++) {
129 "{\"name\":\"DI%d\",\"unit\":\"binary\",\"bit\":%d},",
130 i + 1, i);
131 }
132 }
133
134 // current range valid metadata and JSON end
137 "{\"name\":\"I1L_valid\",\"unit\":\"binary\",\"bit\":6,"
138 "\"hidden\":true},");
139 }
142 "{\"name\":\"I2L_valid\",\"unit\":\"binary\",\"bit\":7,"
143 "\"hidden\":true},");
144 }
145
146 // trim trailing comma of last array entry
147 if (metadata_json[strlen(metadata_json) - 1] == ',') {
148 metadata_json[strlen(metadata_json) - 1] = 0;
149 }
150
151 // JSON end
153
154 return SUCCESS;
155}
156
157int rl_socket_handle_data(int32_t const *analog_buffer,
158 uint32_t const *digital_buffer,
159 int32_t const *ambient_buffer, size_t buffer_size,
160 size_t ambient_buffer_size,
161 rl_timestamp_t const *const timestamp_realtime,
162 rl_timestamp_t const *const timestamp_monotonic,
163 rl_config_t const *const config) {
164
165 // publish metadata to socket
166 int zmq_res = zmq_send(zmq_data_socket, metadata_json,
167 strlen(metadata_json), ZMQ_SNDMORE);
168 if (zmq_res < 0) {
169 rl_log(RL_LOG_ERROR, "failed publishing analog data; %d message: %s",
170 errno, strerror(errno));
171 return ERROR;
172 }
173
174 // publish raw timestamp values
175 rl_timestamp_t timestamps[2] = {
176 *timestamp_realtime,
177 *timestamp_monotonic,
178 };
179
180 // publish timestamp data to socket
181 zmq_res =
182 zmq_send(zmq_data_socket, timestamps, sizeof(timestamps), ZMQ_SNDMORE);
183 if (zmq_res < 0) {
184 rl_log(RL_LOG_ERROR, "failed publishing timestamps; %d message: %s",
185 errno, strerror(errno));
186 return ERROR;
187 }
188
189 // process analog channels
190 int32_t *const data_buffer = malloc(buffer_size * sizeof(int32_t));
191 for (int ch = 0; ch < RL_CHANNEL_COUNT; ch++) {
192 if (!config->channel_enable[ch]) {
193 continue;
194 }
195
196 // copy standalone channel buffer
197 for (size_t i = 0; i < buffer_size; i++) {
198 data_buffer[i] = *(analog_buffer + i * RL_CHANNEL_COUNT + ch);
199 }
200
201 // publish channel data to socket
202 int zmq_res = zmq_send(zmq_data_socket, data_buffer,
203 buffer_size * sizeof(int32_t), ZMQ_SNDMORE);
204 if (zmq_res < 0) {
206 "failed publishing analog data; %d message: %s", errno,
207 strerror(errno));
208 // free data send buffer and exit
209 free(data_buffer);
210 return ERROR;
211 }
212 }
213 free(data_buffer);
214
215 // publish ambient data to socket
216 if (config->ambient_enable) {
217 for (int ch = 0; ch < rl_status.sensor_count; ch++) {
218 // publish channel data to socket
219 int zmq_res = zmq_send(zmq_data_socket, &ambient_buffer[ch],
220 sizeof(int32_t) * (ambient_buffer_size > 0),
221 ZMQ_SNDMORE);
222 if (zmq_res < 0) {
224 "failed publishing sensor data; %d message: %s", errno,
225 strerror(errno));
226 return ERROR;
227 }
228 }
229 }
230
231 // publish digital data (or empty if none available) to socket
232 if (config->digital_enable ||
235 zmq_res = zmq_send(zmq_data_socket, digital_buffer,
236 buffer_size * sizeof(uint32_t), 0);
237 } else {
238 zmq_res = zmq_send(zmq_data_socket, "", 0, 0);
239 }
240 if (zmq_res < 0) {
241 rl_log(RL_LOG_ERROR, "failed publishing digital data; %d message: %s",
242 errno, strerror(errno));
243 return ERROR;
244 }
245
246 return SUCCESS;
247}
int rl_log(rl_log_level_t log_level, char const *const format,...)
Definition: log.c:82
@ RL_LOG_ERROR
Error.
Definition: log.h:49
char const *const RL_CHANNEL_NAMES[RL_CHANNEL_COUNT]
RocketLogger channel names.
Definition: rl.c:95
#define RL_CONFIG_CHANNEL_I2L
Definition: rl.h:89
#define ERROR
Function return value for errors (use errno to indicate the error)
Definition: rl.h:46
#define SUCCESS
Function return value for successful completion.
Definition: rl.h:44
#define RL_CHANNEL_DIGITAL_COUNT
Number of RocketLogger digital channels.
Definition: rl.h:60
#define RL_CONFIG_CHANNEL_I1L
Definition: rl.h:87
#define RL_CHANNEL_COUNT
Number of RocketLogger analog channels.
Definition: rl.h:56
#define RL_ZMQ_DATA_SOCKET
ZeroMQ socket identifier for data publishing status.
Definition: rl.h:123
char * rl_unit_to_string(rl_unit_t const unit)
Definition: rl_file.c:86
int rl_socket_init(void)
Definition: rl_socket.c:56
int rl_socket_handle_data(int32_t const *analog_buffer, uint32_t const *digital_buffer, int32_t const *ambient_buffer, size_t buffer_size, size_t ambient_buffer_size, rl_timestamp_t const *const timestamp_realtime, rl_timestamp_t const *const timestamp_monotonic, rl_config_t const *const config)
Definition: rl_socket.c:157
char metadata_json[RL_SOCKET_METADATA_SIZE]
data socket metadata in JSON format
Definition: rl_socket.c:50
void * zmq_data_context
the ZeroMQ context for data publishing
Definition: rl_socket.c:52
int rl_socket_metadata(rl_config_t const *const config)
Definition: rl_socket.c:82
#define RL_SOCKET_METADATA_SIZE
Definition: rl_socket.c:47
void * zmq_data_socket
the ZeroMQ data socket
Definition: rl_socket.c:54
int rl_socket_deinit(void)
Definition: rl_socket.c:71
const rl_sensor_t SENSOR_REGISTRY[SENSOR_REGISTRY_SIZE]
Definition: sensor.c:63
#define SENSOR_REGISTRY_SIZE
Number of sensor registered.
Definition: sensor.h:47
Definition: rl.h:154
bool ambient_enable
Enable logging of ambient sensor.
Definition: rl.h:180
uint32_t sample_rate
Sampling rate.
Definition: rl.h:164
bool channel_enable[RL_CHANNEL_COUNT]
Channels to sample.
Definition: rl.h:168
bool digital_enable
Enable digital inputs.
Definition: rl.h:174
Definition: rl.h:201
uint16_t sensor_count
Number of sensors found connected to the system.
Definition: rl.h:221
bool sensor_available[RL_SENSOR_COUNT_MAX]
Identifiers of sensors found.
Definition: rl.h:223
bool is_current(int index)
Definition: util.c:49
bool is_low_current(int index)
Definition: util.c:57
bool is_voltage(int index)
Definition: util.c:64
int snprintfcat(char *const buffer, size_t length, char const *format,...)
Definition: util.c:191