RocketLogger  2.0.0
rl_socket.c
Go to the documentation of this file.
1 
32 #include <errno.h>
33 #include <stdint.h>
34 #include <stdlib.h>
35 #include <string.h>
36 
37 #include <zmq.h>
38 
39 #include "log.h"
40 #include "rl.h"
41 #include "rl_file.h"
42 #include "sensor/sensor.h"
43 #include "util.h"
44 
45 #include "rl_socket.h"
46 
47 #define RL_SOCKET_METADATA_SIZE 1000
48 
52 void *zmq_data_context = NULL;
54 void *zmq_data_socket = NULL;
55 
56 int rl_socket_init(void) {
57  // open and bind zmq data socket
58  zmq_data_context = zmq_ctx_new();
59  zmq_data_socket = zmq_socket(zmq_data_context, ZMQ_PUB);
60  int zmq_res = zmq_bind(zmq_data_socket, RL_ZMQ_DATA_SOCKET);
61  if (zmq_res < 0) {
63  "failed binding zeromq data socket; %d message: %s", errno,
64  strerror(errno));
65  return ERROR;
66  }
67 
68  return SUCCESS;
69 }
70 
71 int rl_socket_deinit(void) {
72  // close and destroy zmq data socket
73  zmq_close(zmq_data_socket);
74  zmq_ctx_destroy(zmq_data_context);
75 
76  zmq_data_socket = NULL;
77  zmq_data_context = NULL;
78 
79  return SUCCESS;
80 }
81 
82 int rl_socket_metadata(rl_config_t const *const config) {
83  // data rate and channel info init
85  "{\"data_rate\":%d,\"channels\":[", config->sample_rate);
86 
87  // analog channel metadata
88  for (int ch = 0; ch < RL_CHANNEL_COUNT; ch++) {
89  if (!config->channel_enable[ch]) {
90  continue;
91  }
92 
94  RL_CHANNEL_NAMES[ch]);
95  if (is_current(ch)) {
96  if (is_low_current(ch)) {
98  "\"unit\":\"A\",\"scale\":1e-11},");
99  } else {
101  "\"unit\":\"A\",\"scale\":1e-9},");
102  }
103  } else if (is_voltage(ch)) {
105  "\"unit\":\"V\",\"scale\":1e-8},");
106  } else {
108  "\"unit\":null},");
109  }
110  }
111 
112  // digital channel metadata
113  if (config->digital_enable) {
114  for (int i = 0; i < RL_CHANNEL_DIGITAL_COUNT; i++) {
116  "{\"name\":\"DI%d\",\"unit\":\"binary\",\"bit\":%d},",
117  i + 1, i);
118  }
119  }
120 
121  // ambient channel metadata
122  if (config->ambient_enable) {
123  for (int ch = 0; ch < SENSOR_REGISTRY_SIZE; ch++) {
124  if (rl_status.sensor_available[ch]) {
126  "{\"name\":\"%s\",\"unit\":\"%s\",\"scale\":1e%d},",
127  SENSOR_REGISTRY[ch].name,
129  SENSOR_REGISTRY[ch].scale);
130  }
131  }
132  }
133 
134  // current range valid metadata and JSON end
135  snprintfcat(
137  "{\"name\":\"I1L_valid\",\"unit\":\"binary\",\"bit\":6,\"hidden\":true},"
138  "{\"name\":\"I2L_valid\",\"unit\":\"binary\",\"bit\":7,\"hidden\":true}]"
139  "}");
140 
141  return SUCCESS;
142 }
143 
144 int rl_socket_handle_data(int32_t const *analog_buffer,
145  uint32_t const *digital_buffer,
146  int32_t const *ambient_buffer, size_t buffer_size,
147  size_t ambient_buffer_size,
148  rl_timestamp_t const *const timestamp_realtime,
149  rl_timestamp_t const *const timestamp_monotonic,
150  rl_config_t const *const config) {
151 
152  // publish metadata to socket
153  int zmq_res = zmq_send(zmq_data_socket, metadata_json,
154  strlen(metadata_json), ZMQ_SNDMORE);
155  if (zmq_res < 0) {
156  rl_log(RL_LOG_ERROR, "failed publishing analog data; %d message: %s",
157  errno, strerror(errno));
158  return ERROR;
159  }
160 
161  // publish raw timestamp values
162  rl_timestamp_t timestamps[2] = {
163  *timestamp_realtime,
164  *timestamp_monotonic,
165  };
166 
167  // publish timestamp data to socket
168  zmq_res =
169  zmq_send(zmq_data_socket, timestamps, sizeof(timestamps), ZMQ_SNDMORE);
170  if (zmq_res < 0) {
171  rl_log(RL_LOG_ERROR, "failed publishing timestamps; %d message: %s",
172  errno, strerror(errno));
173  return ERROR;
174  }
175 
176  // process analog channels
177  int32_t *const data_buffer = malloc(buffer_size * sizeof(int32_t));
178  for (int ch = 0; ch < RL_CHANNEL_COUNT; ch++) {
179  if (!config->channel_enable[ch]) {
180  continue;
181  }
182 
183  // copy standalone channel buffer
184  for (size_t i = 0; i < buffer_size; i++) {
185  data_buffer[i] = *(analog_buffer + i * RL_CHANNEL_COUNT + ch);
186  }
187 
188  // publish channel data to socket
189  int zmq_res = zmq_send(zmq_data_socket, data_buffer,
190  buffer_size * sizeof(int32_t), ZMQ_SNDMORE);
191  if (zmq_res < 0) {
193  "failed publishing analog data; %d message: %s", errno,
194  strerror(errno));
195  // free data send buffer and exit
196  free(data_buffer);
197  return ERROR;
198  }
199  }
200  free(data_buffer);
201 
202  // publish ambient data to socket
203  if (config->ambient_enable) {
204  for (int ch = 0; ch < rl_status.sensor_count; ch++) {
205  // publish channel data to socket
206  int zmq_res = zmq_send(zmq_data_socket, &ambient_buffer[ch],
207  sizeof(int32_t) * (ambient_buffer_size > 0),
208  ZMQ_SNDMORE);
209  if (zmq_res < 0) {
211  "failed publishing sensor data; %d message: %s", errno,
212  strerror(errno));
213  return ERROR;
214  }
215  }
216  }
217 
218  // publish digital data to socket
219  zmq_res = zmq_send(zmq_data_socket, digital_buffer,
220  buffer_size * sizeof(uint32_t), 0);
221  if (zmq_res < 0) {
222  rl_log(RL_LOG_ERROR, "failed publishing digital data; %d message: %s",
223  errno, strerror(errno));
224  return ERROR;
225  }
226 
227  return SUCCESS;
228 }
int rl_log(rl_log_level_t log_level, char const *const format,...)
Definition: log.c:82
@ RL_LOG_ERROR
Error.
Definition: log.h:49
char const *const RL_CHANNEL_NAMES[RL_CHANNEL_COUNT]
RocketLogger channel names.
Definition: rl.c:95
#define ERROR
Function return value for errors (use errno to indicate the error)
Definition: rl.h:46
#define SUCCESS
Function return value for successful completion.
Definition: rl.h:44
#define RL_CHANNEL_DIGITAL_COUNT
Number of RocketLogger digital channels.
Definition: rl.h:60
#define RL_CHANNEL_COUNT
Number of RocketLogger analog channels.
Definition: rl.h:56
#define RL_ZMQ_DATA_SOCKET
ZeroMQ socket identifier for data publishing status.
Definition: rl.h:123
char * rl_unit_to_string(rl_unit_t const unit)
Definition: rl_file.c:86
int rl_socket_init(void)
Definition: rl_socket.c:56
int rl_socket_handle_data(int32_t const *analog_buffer, uint32_t const *digital_buffer, int32_t const *ambient_buffer, size_t buffer_size, size_t ambient_buffer_size, rl_timestamp_t const *const timestamp_realtime, rl_timestamp_t const *const timestamp_monotonic, rl_config_t const *const config)
Definition: rl_socket.c:144
char metadata_json[RL_SOCKET_METADATA_SIZE]
data socket metadata in JSON format
Definition: rl_socket.c:50
void * zmq_data_context
the ZeroMQ context for data publishing
Definition: rl_socket.c:52
int rl_socket_metadata(rl_config_t const *const config)
Definition: rl_socket.c:82
#define RL_SOCKET_METADATA_SIZE
Definition: rl_socket.c:47
void * zmq_data_socket
the ZeroMQ data socket
Definition: rl_socket.c:54
int rl_socket_deinit(void)
Definition: rl_socket.c:71
const rl_sensor_t SENSOR_REGISTRY[SENSOR_REGISTRY_SIZE]
Definition: sensor.c:63
#define SENSOR_REGISTRY_SIZE
Number of sensor registered.
Definition: sensor.h:47
Definition: rl.h:154
bool ambient_enable
Enable logging of ambient sensor.
Definition: rl.h:180
uint32_t sample_rate
Sampling rate.
Definition: rl.h:164
bool channel_enable[RL_CHANNEL_COUNT]
Channels to sample.
Definition: rl.h:168
bool digital_enable
Enable digital inputs.
Definition: rl.h:174
Definition: rl.h:201
uint16_t sensor_count
Number of sensors found connected to the system.
Definition: rl.h:221
bool sensor_available[RL_SENSOR_COUNT_MAX]
Identifiers of sensors found.
Definition: rl.h:223
bool is_current(int index)
Definition: util.c:49
bool is_low_current(int index)
Definition: util.c:57
bool is_voltage(int index)
Definition: util.c:64
int snprintfcat(char *const buffer, size_t length, char const *format,...)
Definition: util.c:191