1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
// SPDX-License-Identifier: GPL-3.0-or-later
#include "../libnetdata.h"
#ifndef NETDATA_BUFFERED_READER_H
#define NETDATA_BUFFERED_READER_H
struct buffered_reader {
ssize_t read_len;
ssize_t pos;
char read_buffer[PLUGINSD_LINE_MAX + 1];
};
static inline void buffered_reader_init(struct buffered_reader *reader) {
reader->read_buffer[0] = '\0';
reader->read_len = 0;
reader->pos = 0;
}
typedef enum {
BUFFERED_READER_READ_OK = 0,
BUFFERED_READER_READ_FAILED = -1,
BUFFERED_READER_READ_BUFFER_FULL = -2,
BUFFERED_READER_READ_POLLERR = -3,
BUFFERED_READER_READ_POLLHUP = -4,
BUFFERED_READER_READ_POLLNVAL = -5,
BUFFERED_READER_READ_POLL_UNKNOWN = -6,
BUFFERED_READER_READ_POLL_TIMEOUT = -7,
BUFFERED_READER_READ_POLL_CANCELLED = -8,
} buffered_reader_ret_t;
static inline buffered_reader_ret_t buffered_reader_read(struct buffered_reader *reader, int fd) {
#ifdef NETDATA_INTERNAL_CHECKS
if(reader->read_buffer[reader->read_len] != '\0')
fatal("read_buffer does not start with zero");
#endif
char *read_at = reader->read_buffer + reader->read_len;
ssize_t remaining = sizeof(reader->read_buffer) - reader->read_len - 1;
if(unlikely(remaining <= 0))
return BUFFERED_READER_READ_BUFFER_FULL;
ssize_t bytes_read = read(fd, read_at, remaining);
if(unlikely(bytes_read <= 0))
return BUFFERED_READER_READ_FAILED;
reader->read_len += bytes_read;
reader->read_buffer[reader->read_len] = '\0';
return BUFFERED_READER_READ_OK;
}
static inline buffered_reader_ret_t buffered_reader_read_timeout(struct buffered_reader *reader, int fd, int timeout_ms, bool log_error) {
short int revents = 0;
switch(wait_on_socket_or_cancel_with_timeout(
#ifdef ENABLE_HTTPS
NULL,
#endif
fd, timeout_ms, POLLIN, &revents)) {
case 0: // data are waiting
return buffered_reader_read(reader, fd);
case 1: // timeout reached
if(log_error)
netdata_log_error("PARSER: timeout while waiting for data.");
return BUFFERED_READER_READ_POLL_TIMEOUT;
case -1: // thread cancelled
netdata_log_error("PARSER: thread cancelled while waiting for data.");
return BUFFERED_READER_READ_POLL_CANCELLED;
default:
case 2: // error on socket
if(revents & POLLERR) {
if(log_error)
netdata_log_error("PARSER: read failed: POLLERR.");
return BUFFERED_READER_READ_POLLERR;
}
if(revents & POLLHUP) {
if(log_error)
netdata_log_error("PARSER: read failed: POLLHUP.");
return BUFFERED_READER_READ_POLLHUP;
}
if(revents & POLLNVAL) {
if(log_error)
netdata_log_error("PARSER: read failed: POLLNVAL.");
return BUFFERED_READER_READ_POLLNVAL;
}
}
if(log_error)
netdata_log_error("PARSER: poll() returned positive number, but POLLIN|POLLERR|POLLHUP|POLLNVAL are not set.");
return BUFFERED_READER_READ_POLL_UNKNOWN;
}
/* Produce a full line if one exists, statefully return where we start next time.
* When we hit the end of the buffer with a partial line move it to the beginning for the next fill.
*/
static inline bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst) {
buffer_need_bytes(dst, reader->read_len - reader->pos + 2);
size_t start = reader->pos;
char *ss = &reader->read_buffer[start];
char *se = &reader->read_buffer[reader->read_len];
char *ds = &dst->buffer[dst->len];
char *de = &ds[dst->size - dst->len - 2];
if(ss >= se) {
*ds = '\0';
reader->pos = 0;
reader->read_len = 0;
reader->read_buffer[reader->read_len] = '\0';
return false;
}
// copy all bytes to buffer
while(ss < se && ds < de && *ss != '\n') {
*ds++ = *ss++;
dst->len++;
}
// if we have a newline, return the buffer
if(ss < se && ds < de && *ss == '\n') {
// newline found in the r->read_buffer
*ds++ = *ss++; // copy the newline too
dst->len++;
*ds = '\0';
reader->pos = ss - reader->read_buffer;
return true;
}
reader->pos = 0;
reader->read_len = 0;
reader->read_buffer[reader->read_len] = '\0';
return false;
}
#endif //NETDATA_BUFFERED_READER_H
|