summaryrefslogtreecommitdiffstats
path: root/libnetdata/circular_buffer/circular_buffer.c
blob: 7ffe6b8bc79a7e2616b3dd6737636cd2c4177c48 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#include "../libnetdata.h"

struct circular_buffer *cbuffer_new(size_t initial, size_t max, size_t *statistics) {
    struct circular_buffer *buf = mallocz(sizeof(struct circular_buffer));
    buf->size = initial;
    buf->data = mallocz(initial);
    buf->write = 0;
    buf->read = 0;
    buf->max_size = max;
    buf->statistics = statistics;

    if(buf->statistics)
        __atomic_add_fetch(buf->statistics, sizeof(struct circular_buffer) + buf->size, __ATOMIC_RELAXED);

    return buf;
}

void cbuffer_free(struct circular_buffer *buf) {
    if (unlikely(!buf))
        return;

    if(buf->statistics)
        __atomic_sub_fetch(buf->statistics, sizeof(struct circular_buffer) + buf->size, __ATOMIC_RELAXED);

    freez(buf->data);
    freez(buf);
}

static int cbuffer_realloc_unsafe(struct circular_buffer *buf) {
    // Check that we can grow
    if (buf->size >= buf->max_size)
        return 1;

    size_t old_size = buf->size;
    size_t new_size = buf->size * 2;
    if (new_size > buf->max_size)
        new_size = buf->max_size;

    // We know that: size < new_size <= max_size
    // For simplicity align the current data at the bottom of the new buffer
    char *new_data = mallocz(new_size);
    if (buf->read == buf->write)
        buf->write = 0; // buffer is empty
    else if (buf->read < buf->write) {
        memcpy(new_data, buf->data + buf->read, buf->write - buf->read);
        buf->write -= buf->read;
    } else {
        size_t top_part = buf->size - buf->read;
        memcpy(new_data, buf->data + buf->read, top_part);
        memcpy(new_data + top_part, buf->data, buf->write);
        buf->write = top_part + buf->write;
    }
    buf->read = 0;

    // Switch buffers
    freez(buf->data);
    buf->data = new_data;
    buf->size = new_size;

    if(buf->statistics)
        __atomic_add_fetch(buf->statistics, new_size - old_size, __ATOMIC_RELAXED);

    return 0;
}

size_t cbuffer_available_size_unsafe(struct circular_buffer *buf) {
    size_t len = (buf->write >= buf->read) ? (buf->write - buf->read) : (buf->size - buf->read + buf->write);
    return buf->max_size - len;
}

int cbuffer_add_unsafe(struct circular_buffer *buf, const char *d, size_t d_len) {
    size_t len = (buf->write >= buf->read) ? (buf->write - buf->read) : (buf->size - buf->read + buf->write);
    while (d_len + len >= buf->size) {
        if (cbuffer_realloc_unsafe(buf)) {
            return 1;
        }
    }
    // Guarantee: write + d_len cannot hit read
    if (buf->write + d_len < buf->size) {
        memcpy(buf->data + buf->write, d, d_len);
        buf->write += d_len;
    }
    else {
        size_t top_part = buf->size - buf->write;
        memcpy(buf->data + buf->write, d, top_part);
        memcpy(buf->data, d + top_part, d_len - top_part); 
        buf->write = d_len - top_part;
    }
    return 0;
}

// Assume caller does not remove too many bytes (i.e. read will jump over write)
void cbuffer_remove_unsafe(struct circular_buffer *buf, size_t num) {
    buf->read += num;
    // Assume num < size (i.e. caller cannot remove more bytes than are in the buffer)
    if (buf->read >= buf->size)
        buf->read -= buf->size;
}

size_t cbuffer_next_unsafe(struct circular_buffer *buf, char **start) {
    if (start != NULL)
      *start = buf->data + buf->read;

    if (buf->read <= buf->write) {
        return buf->write - buf->read;      // Includes empty case
    }
    return buf->size - buf->read;
}

void cbuffer_flush(struct circular_buffer*buf) {
    buf->write = 0;
    buf->read = 0;
}