diff options
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r-- | streaming/receiver.c | 48 |
1 files changed, 22 insertions, 26 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index 3ff022e97..10ef8b7d3 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -226,53 +226,47 @@ static inline bool receiver_read_compressed(struct receiver_state *r) { /* Produce a full line if one exists, statefully return where we start next time. * When we hit the end of the buffer with a partial line move it to the beginning for the next fill. */ -inline char *buffered_reader_next_line(struct buffered_reader *reader, char *dst, size_t dst_size) { +inline bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst) { + buffer_need_bytes(dst, reader->read_len - reader->pos + 2); + size_t start = reader->pos; char *ss = &reader->read_buffer[start]; char *se = &reader->read_buffer[reader->read_len]; - char *ds = dst; - char *de = &dst[dst_size - 2]; + char *ds = &dst->buffer[dst->len]; + char *de = &ds[dst->size - dst->len - 2]; if(ss >= se) { *ds = '\0'; reader->pos = 0; reader->read_len = 0; reader->read_buffer[reader->read_len] = '\0'; - return NULL; + return false; } // copy all bytes to buffer - while(ss < se && ds < de && *ss != '\n') + while(ss < se && ds < de && *ss != '\n') { *ds++ = *ss++; + dst->len++; + } // if we have a newline, return the buffer if(ss < se && ds < de && *ss == '\n') { // newline found in the r->read_buffer *ds++ = *ss++; // copy the newline too - *ds = '\0'; + dst->len++; - reader->pos = ss - reader->read_buffer; - return dst; - } - - // if the destination is full, oops! - if(ds == de) { - netdata_log_error("STREAM: received line exceeds %d bytes. Truncating it.", PLUGINSD_LINE_MAX); *ds = '\0'; + reader->pos = ss - reader->read_buffer; - return dst; + return true; } - // no newline found in the r->read_buffer - // move everything to the beginning - memmove(reader->read_buffer, &reader->read_buffer[start], reader->read_len - start); - reader->read_len -= (int)start; - reader->read_buffer[reader->read_len] = '\0'; - *ds = '\0'; reader->pos = 0; - return NULL; + reader->read_len = 0; + reader->read_buffer[reader->read_len] = '\0'; + return false; } bool plugin_is_enabled(struct plugind *cd); @@ -342,10 +336,10 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i buffered_reader_init(&rpt->reader); - char buffer[PLUGINSD_LINE_MAX + 2] = ""; + BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL); while(!receiver_should_stop(rpt)) { - if(!buffered_reader_next_line(&rpt->reader, buffer, PLUGINSD_LINE_MAX + 2)) { + if(!buffered_reader_next_line(&rpt->reader, buffer)) { bool have_new_data = compressed_connection ? receiver_read_compressed(rpt) : receiver_read_uncompressed(rpt); if(unlikely(!have_new_data)) { @@ -356,13 +350,15 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i continue; } - if (unlikely(parser_action(parser, buffer))) { - internal_error(true, "parser_action() failed on keyword '%s'.", buffer); + if (unlikely(parser_action(parser, buffer->buffer))) { receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false); break; } - } + buffer->len = 0; + buffer->buffer[0] = '\0'; + } + buffer_free(buffer); result = parser->user.data_collections_count; // free parser with the pop function |