summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/avro/src/io.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:19:48 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:20:02 +0000
commit58daab21cd043e1dc37024a7f99b396788372918 (patch)
tree96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /fluent-bit/lib/avro/src/io.c
parentReleasing debian version 1.43.2-1. (diff)
downloadnetdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz
netdata-58daab21cd043e1dc37024a7f99b396788372918.zip
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/avro/src/io.c')
-rw-r--r--fluent-bit/lib/avro/src/io.c447
1 files changed, 447 insertions, 0 deletions
diff --git a/fluent-bit/lib/avro/src/io.c b/fluent-bit/lib/avro/src/io.c
new file mode 100644
index 000000000..c1e2f5dc9
--- /dev/null
+++ b/fluent-bit/lib/avro/src/io.c
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#include "avro/allocation.h"
+#include "avro/refcount.h"
+#include "avro/errors.h"
+#include "avro/io.h"
+#include "avro_private.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <string.h>
+#include "dump.h"
+
+enum avro_io_type_t {
+ AVRO_FILE_IO,
+ AVRO_MEMORY_IO
+};
+typedef enum avro_io_type_t avro_io_type_t;
+
+struct avro_reader_t_ {
+ avro_io_type_t type;
+ volatile int refcount;
+};
+
+struct avro_writer_t_ {
+ avro_io_type_t type;
+ volatile int refcount;
+};
+
+struct _avro_reader_file_t {
+ struct avro_reader_t_ reader;
+ FILE *fp;
+ int should_close;
+ char *cur;
+ char *end;
+ char buffer[4096];
+};
+
+struct _avro_writer_file_t {
+ struct avro_writer_t_ writer;
+ FILE *fp;
+ int should_close;
+};
+
+struct _avro_reader_memory_t {
+ struct avro_reader_t_ reader;
+ const char *buf;
+ int64_t len;
+ int64_t read;
+};
+
+struct _avro_writer_memory_t {
+ struct avro_writer_t_ writer;
+ const char *buf;
+ int64_t len;
+ int64_t written;
+};
+
+#define avro_io_typeof(obj) ((obj)->type)
+#define is_memory_io(obj) (obj && avro_io_typeof(obj) == AVRO_MEMORY_IO)
+#define is_file_io(obj) (obj && avro_io_typeof(obj) == AVRO_FILE_IO)
+
+#define avro_reader_to_memory(reader_) container_of(reader_, struct _avro_reader_memory_t, reader)
+#define avro_reader_to_file(reader_) container_of(reader_, struct _avro_reader_file_t, reader)
+#define avro_writer_to_memory(writer_) container_of(writer_, struct _avro_writer_memory_t, writer)
+#define avro_writer_to_file(writer_) container_of(writer_, struct _avro_writer_file_t, writer)
+
+static void reader_init(avro_reader_t reader, avro_io_type_t type)
+{
+ reader->type = type;
+ avro_refcount_set(&reader->refcount, 1);
+}
+
+static void writer_init(avro_writer_t writer, avro_io_type_t type)
+{
+ writer->type = type;
+ avro_refcount_set(&writer->refcount, 1);
+}
+
+avro_reader_t avro_reader_file_fp(FILE * fp, int should_close)
+{
+ struct _avro_reader_file_t *file_reader =
+ (struct _avro_reader_file_t *) avro_new(struct _avro_reader_file_t);
+ if (!file_reader) {
+ avro_set_error("Cannot allocate new file reader");
+ return NULL;
+ }
+ memset(file_reader, 0, sizeof(struct _avro_reader_file_t));
+ file_reader->fp = fp;
+ file_reader->should_close = should_close;
+ reader_init(&file_reader->reader, AVRO_FILE_IO);
+ return &file_reader->reader;
+}
+
+avro_reader_t avro_reader_file(FILE * fp)
+{
+ return avro_reader_file_fp(fp, 1);
+}
+
+avro_writer_t avro_writer_file_fp(FILE * fp, int should_close)
+{
+ struct _avro_writer_file_t *file_writer =
+ (struct _avro_writer_file_t *) avro_new(struct _avro_writer_file_t);
+ if (!file_writer) {
+ avro_set_error("Cannot allocate new file writer");
+ return NULL;
+ }
+ file_writer->fp = fp;
+ file_writer->should_close = should_close;
+ writer_init(&file_writer->writer, AVRO_FILE_IO);
+ return &file_writer->writer;
+}
+
+avro_writer_t avro_writer_file(FILE * fp)
+{
+ return avro_writer_file_fp(fp, 1);
+}
+
+avro_reader_t avro_reader_memory(const char *buf, int64_t len)
+{
+ struct _avro_reader_memory_t *mem_reader =
+ (struct _avro_reader_memory_t *) avro_new(struct _avro_reader_memory_t);
+ if (!mem_reader) {
+ avro_set_error("Cannot allocate new memory reader");
+ return NULL;
+ }
+ mem_reader->buf = buf;
+ mem_reader->len = len;
+ mem_reader->read = 0;
+ reader_init(&mem_reader->reader, AVRO_MEMORY_IO);
+ return &mem_reader->reader;
+}
+
+void
+avro_reader_memory_set_source(avro_reader_t reader, const char *buf, int64_t len)
+{
+ if (is_memory_io(reader)) {
+ struct _avro_reader_memory_t *mem_reader = avro_reader_to_memory(reader);
+ mem_reader->buf = buf;
+ mem_reader->len = len;
+ mem_reader->read = 0;
+ }
+}
+
+avro_writer_t avro_writer_memory(const char *buf, int64_t len)
+{
+ struct _avro_writer_memory_t *mem_writer =
+ (struct _avro_writer_memory_t *) avro_new(struct _avro_writer_memory_t);
+ if (!mem_writer) {
+ avro_set_error("Cannot allocate new memory writer");
+ return NULL;
+ }
+ mem_writer->buf = buf;
+ mem_writer->len = len;
+ mem_writer->written = 0;
+ writer_init(&mem_writer->writer, AVRO_MEMORY_IO);
+ return &mem_writer->writer;
+}
+
+void
+avro_writer_memory_set_dest(avro_writer_t writer, const char *buf, int64_t len)
+{
+ if (is_memory_io(writer)) {
+ struct _avro_writer_memory_t *mem_writer = avro_writer_to_memory(writer);
+ mem_writer->buf = buf;
+ mem_writer->len = len;
+ mem_writer->written = 0;
+ }
+}
+
+static int
+avro_read_memory(struct _avro_reader_memory_t *reader, void *buf, int64_t len)
+{
+ if (len > 0) {
+ if ((reader->len - reader->read) < len) {
+ avro_prefix_error("Cannot read %" PRIsz " bytes from memory buffer",
+ (size_t) len);
+ return ENOSPC;
+ }
+ memcpy(buf, reader->buf + reader->read, len);
+ reader->read += len;
+ }
+ return 0;
+}
+
+#define bytes_available(reader) (reader->end - reader->cur)
+#define buffer_reset(reader) {reader->cur = reader->end = reader->buffer;}
+
+static int
+avro_read_file(struct _avro_reader_file_t *reader, void *buf, int64_t len)
+{
+ int64_t needed = len;
+ char *p = (char *) buf;
+ int rval;
+
+ if (len == 0) {
+ return 0;
+ }
+
+ if (needed > (int64_t) sizeof(reader->buffer)) {
+ if (bytes_available(reader) > 0) {
+ memcpy(p, reader->cur, bytes_available(reader));
+ p += bytes_available(reader);
+ needed -= bytes_available(reader);
+ buffer_reset(reader);
+ }
+ rval = fread(p, 1, needed, reader->fp);
+ if (rval != needed) {
+ avro_set_error("Cannot read %" PRIsz " bytes from file",
+ (size_t) needed);
+ return EILSEQ;
+ }
+ return 0;
+ } else if (needed <= bytes_available(reader)) {
+ memcpy(p, reader->cur, needed);
+ reader->cur += needed;
+ return 0;
+ } else {
+ memcpy(p, reader->cur, bytes_available(reader));
+ p += bytes_available(reader);
+ needed -= bytes_available(reader);
+
+ rval =
+ fread(reader->buffer, 1, sizeof(reader->buffer),
+ reader->fp);
+ if (rval == 0) {
+ avro_set_error("Cannot read %" PRIsz " bytes from file",
+ (size_t) needed);
+ return EILSEQ;
+ }
+ reader->cur = reader->buffer;
+ reader->end = reader->cur + rval;
+
+ if (bytes_available(reader) < needed) {
+ avro_set_error("Cannot read %" PRIsz " bytes from file",
+ (size_t) needed);
+ return EILSEQ;
+ }
+ memcpy(p, reader->cur, needed);
+ reader->cur += needed;
+ return 0;
+ }
+ avro_set_error("Cannot read %" PRIsz " bytes from file",
+ (size_t) needed);
+ return EILSEQ;
+}
+
+int avro_read(avro_reader_t reader, void *buf, int64_t len)
+{
+ if (buf && len >= 0) {
+ if (is_memory_io(reader)) {
+ return avro_read_memory(avro_reader_to_memory(reader),
+ buf, len);
+ } else if (is_file_io(reader)) {
+ return avro_read_file(avro_reader_to_file(reader), buf,
+ len);
+ }
+ }
+ return EINVAL;
+}
+
+static int avro_skip_memory(struct _avro_reader_memory_t *reader, int64_t len)
+{
+ if (len > 0) {
+ if ((reader->len - reader->read) < len) {
+ avro_set_error("Cannot skip %" PRIsz " bytes in memory buffer",
+ (size_t) len);
+ return ENOSPC;
+ }
+ reader->read += len;
+ }
+ return 0;
+}
+
+static int avro_skip_file(struct _avro_reader_file_t *reader, int64_t len)
+{
+ int rval;
+ int64_t needed = len;
+
+ if (len == 0) {
+ return 0;
+ }
+ if (needed <= bytes_available(reader)) {
+ reader->cur += needed;
+ } else {
+ needed -= bytes_available(reader);
+ buffer_reset(reader);
+ rval = fseek(reader->fp, needed, SEEK_CUR);
+ if (rval < 0) {
+ avro_set_error("Cannot skip %" PRIsz " bytes in file",
+ (size_t) len);
+ return rval;
+ }
+ }
+ return 0;
+}
+
+int avro_skip(avro_reader_t reader, int64_t len)
+{
+ if (len >= 0) {
+ if (is_memory_io(reader)) {
+ return avro_skip_memory(avro_reader_to_memory(reader),
+ len);
+ } else if (is_file_io(reader)) {
+ return avro_skip_file(avro_reader_to_file(reader), len);
+ }
+ }
+ return 0;
+}
+
+static int
+avro_write_memory(struct _avro_writer_memory_t *writer, void *buf, int64_t len)
+{
+ if (len) {
+ if ((writer->len - writer->written) < len) {
+ avro_set_error("Cannot write %" PRIsz " bytes in memory buffer",
+ (size_t) len);
+ return ENOSPC;
+ }
+ memcpy((void *)(writer->buf + writer->written), buf, len);
+ writer->written += len;
+ }
+ return 0;
+}
+
+static int
+avro_write_file(struct _avro_writer_file_t *writer, void *buf, int64_t len)
+{
+ int rval;
+ if (len > 0) {
+ rval = fwrite(buf, len, 1, writer->fp);
+ if (rval == 0) {
+ return EIO;
+ }
+ }
+ return 0;
+}
+
+int avro_write(avro_writer_t writer, void *buf, int64_t len)
+{
+ if (buf && len >= 0) {
+ if (is_memory_io(writer)) {
+ return avro_write_memory(avro_writer_to_memory(writer),
+ buf, len);
+ } else if (is_file_io(writer)) {
+ return avro_write_file(avro_writer_to_file(writer), buf,
+ len);
+ }
+ }
+ return EINVAL;
+}
+
+void
+avro_reader_reset(avro_reader_t reader)
+{
+ if (is_memory_io(reader)) {
+ avro_reader_to_memory(reader)->read = 0;
+ }
+}
+
+void avro_writer_reset(avro_writer_t writer)
+{
+ if (is_memory_io(writer)) {
+ avro_writer_to_memory(writer)->written = 0;
+ }
+}
+
+int64_t avro_writer_tell(avro_writer_t writer)
+{
+ if (is_memory_io(writer)) {
+ return avro_writer_to_memory(writer)->written;
+ }
+ return EINVAL;
+}
+
+void avro_writer_flush(avro_writer_t writer)
+{
+ if (is_file_io(writer)) {
+ fflush(avro_writer_to_file(writer)->fp);
+ }
+}
+
+void avro_writer_dump(avro_writer_t writer, FILE * fp)
+{
+ if (is_memory_io(writer)) {
+ dump(fp, (char *)avro_writer_to_memory(writer)->buf,
+ avro_writer_to_memory(writer)->written);
+ }
+}
+
+void avro_reader_dump(avro_reader_t reader, FILE * fp)
+{
+ if (is_memory_io(reader)) {
+ dump(fp, (char *)avro_reader_to_memory(reader)->buf,
+ avro_reader_to_memory(reader)->read);
+ }
+}
+
+void avro_reader_free(avro_reader_t reader)
+{
+ if (is_memory_io(reader)) {
+ avro_freet(struct _avro_reader_memory_t, reader);
+ } else if (is_file_io(reader)) {
+ if (avro_reader_to_file(reader)->should_close) {
+ fclose(avro_reader_to_file(reader)->fp);
+ }
+ avro_freet(struct _avro_reader_file_t, reader);
+ }
+}
+
+void avro_writer_free(avro_writer_t writer)
+{
+ if (is_memory_io(writer)) {
+ avro_freet(struct _avro_writer_memory_t, writer);
+ } else if (is_file_io(writer)) {
+ if (avro_writer_to_file(writer)->should_close) {
+ fclose(avro_writer_to_file(writer)->fp);
+ }
+ avro_freet(struct _avro_writer_file_t, writer);
+ }
+}
+
+int avro_reader_is_eof(avro_reader_t reader)
+{
+ if (is_file_io(reader)) {
+ struct _avro_reader_file_t *file = avro_reader_to_file(reader);
+ if (feof(file->fp)) {
+ return file->cur == file->end;
+ }
+ }
+ return 0;
+}