summaryrefslogtreecommitdiffstats
path: root/streaming/receiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r--streaming/receiver.c48
1 files changed, 22 insertions, 26 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 3ff022e97..10ef8b7d3 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -226,53 +226,47 @@ static inline bool receiver_read_compressed(struct receiver_state *r) {
/* Produce a full line if one exists, statefully return where we start next time.
* When we hit the end of the buffer with a partial line move it to the beginning for the next fill.
*/
-inline char *buffered_reader_next_line(struct buffered_reader *reader, char *dst, size_t dst_size) {
+inline bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst) {
+ buffer_need_bytes(dst, reader->read_len - reader->pos + 2);
+
size_t start = reader->pos;
char *ss = &reader->read_buffer[start];
char *se = &reader->read_buffer[reader->read_len];
- char *ds = dst;
- char *de = &dst[dst_size - 2];
+ char *ds = &dst->buffer[dst->len];
+ char *de = &ds[dst->size - dst->len - 2];
if(ss >= se) {
*ds = '\0';
reader->pos = 0;
reader->read_len = 0;
reader->read_buffer[reader->read_len] = '\0';
- return NULL;
+ return false;
}
// copy all bytes to buffer
- while(ss < se && ds < de && *ss != '\n')
+ while(ss < se && ds < de && *ss != '\n') {
*ds++ = *ss++;
+ dst->len++;
+ }
// if we have a newline, return the buffer
if(ss < se && ds < de && *ss == '\n') {
// newline found in the r->read_buffer
*ds++ = *ss++; // copy the newline too
- *ds = '\0';
+ dst->len++;
- reader->pos = ss - reader->read_buffer;
- return dst;
- }
-
- // if the destination is full, oops!
- if(ds == de) {
- netdata_log_error("STREAM: received line exceeds %d bytes. Truncating it.", PLUGINSD_LINE_MAX);
*ds = '\0';
+
reader->pos = ss - reader->read_buffer;
- return dst;
+ return true;
}
- // no newline found in the r->read_buffer
- // move everything to the beginning
- memmove(reader->read_buffer, &reader->read_buffer[start], reader->read_len - start);
- reader->read_len -= (int)start;
- reader->read_buffer[reader->read_len] = '\0';
- *ds = '\0';
reader->pos = 0;
- return NULL;
+ reader->read_len = 0;
+ reader->read_buffer[reader->read_len] = '\0';
+ return false;
}
bool plugin_is_enabled(struct plugind *cd);
@@ -342,10 +336,10 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
buffered_reader_init(&rpt->reader);
- char buffer[PLUGINSD_LINE_MAX + 2] = "";
+ BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL);
while(!receiver_should_stop(rpt)) {
- if(!buffered_reader_next_line(&rpt->reader, buffer, PLUGINSD_LINE_MAX + 2)) {
+ if(!buffered_reader_next_line(&rpt->reader, buffer)) {
bool have_new_data = compressed_connection ? receiver_read_compressed(rpt) : receiver_read_uncompressed(rpt);
if(unlikely(!have_new_data)) {
@@ -356,13 +350,15 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
continue;
}
- if (unlikely(parser_action(parser, buffer))) {
- internal_error(true, "parser_action() failed on keyword '%s'.", buffer);
+ if (unlikely(parser_action(parser, buffer->buffer))) {
receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false);
break;
}
- }
+ buffer->len = 0;
+ buffer->buffer[0] = '\0';
+ }
+ buffer_free(buffer);
result = parser->user.data_collections_count;
// free parser with the pop function