summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/flb_time.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/src/flb_time.c')
-rw-r--r--fluent-bit/src/flb_time.c444
1 files changed, 444 insertions, 0 deletions
diff --git a/fluent-bit/src/flb_time.c b/fluent-bit/src/flb_time.c
new file mode 100644
index 000000000..624b70d02
--- /dev/null
+++ b/fluent-bit/src/flb_time.c
@@ -0,0 +1,444 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "cmetrics/lib/mpack/src/mpack/mpack.h"
+#include <msgpack.h>
+#include <mpack/mpack.h>
+#include <fluent-bit/flb_compat.h>
+#include <fluent-bit/flb_macros.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_time.h>
+#include <stdint.h>
+#ifdef FLB_HAVE_CLOCK_GET_TIME
+# include <mach/clock.h>
+# include <mach/mach.h>
+#endif
+
+#include <string.h>
+#include <inttypes.h>
+#include <time.h>
+
+#define ONESEC_IN_NSEC 1000000000
+
+static int is_valid_format(int fmt)
+{
+ return (FLB_TIME_ETFMT_INT <= fmt) && (fmt < FLB_TIME_ETFMT_OTHER) ?
+ FLB_TRUE : FLB_FALSE;
+}
+
+static int _flb_time_get(struct flb_time *tm)
+{
+ if (tm == NULL) {
+ return -1;
+ }
+#if defined FLB_TIME_FORCE_FMT_INT
+ tm->tm.tv_sec = time(NULL);
+ tm->tm.tv_nsec = 0;
+ return 0;
+#elif defined FLB_HAVE_TIMESPEC_GET
+ /* C11 supported! */
+ return timespec_get(&tm->tm, TIME_UTC);
+#elif defined FLB_CLOCK_GET_TIME
+ clock_serv_t cclock;
+ mach_timespec_t mts;
+ host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
+ clock_get_time(cclock, &mts);
+ tm->tv_sec = mts.tv_sec;
+ tm->tv_nsec = mts.tv_nsec;
+ return mach_port_deallocate(mach_task_self(), cclock);
+#else /* __STDC_VERSION__ */
+ return clock_gettime(CLOCK_REALTIME, &tm->tm);
+#endif
+}
+
+int flb_time_get(struct flb_time *tm)
+{
+ return _flb_time_get(tm);
+}
+
+/* A portable function to sleep N msec */
+int flb_time_msleep(uint32_t ms)
+{
+#ifdef _MSC_VER
+ Sleep((DWORD) ms);
+ return 0;
+#else
+ struct timespec ts;
+ ts.tv_sec = ms / 1000;
+ ts.tv_nsec = (ms % 1000) * 1000000;
+ return nanosleep(&ts, NULL);
+#endif
+}
+
+double flb_time_to_double(struct flb_time *tm)
+{
+ return (double)(tm->tm.tv_sec) + ((double)tm->tm.tv_nsec/(double)ONESEC_IN_NSEC);
+}
+
+uint64_t flb_time_to_nanosec(struct flb_time *tm)
+{
+ return (((uint64_t)tm->tm.tv_sec * 1000000000L) + tm->tm.tv_nsec);
+}
+
+uint64_t flb_time_to_millisec(struct flb_time *tm)
+{
+ return (((uint64_t)tm->tm.tv_sec * 1000L) + tm->tm.tv_nsec / 1000000L);
+}
+
+int flb_time_add(struct flb_time *base, struct flb_time *duration, struct flb_time *result)
+{
+ if (base == NULL || duration == NULL|| result == NULL) {
+ return -1;
+ }
+ result->tm.tv_sec = base->tm.tv_sec + duration->tm.tv_sec;
+ result->tm.tv_nsec = base->tm.tv_nsec + duration->tm.tv_nsec;
+
+ if (result->tm.tv_nsec > ONESEC_IN_NSEC) {
+ result->tm.tv_nsec -= ONESEC_IN_NSEC;
+ result->tm.tv_sec++;
+ } else if (result->tm.tv_nsec < 0) {
+ result->tm.tv_nsec += ONESEC_IN_NSEC;
+ result->tm.tv_sec--;
+ }
+
+ return 0;
+}
+
+int flb_time_diff(struct flb_time *time1,
+ struct flb_time *time0,struct flb_time *result)
+{
+ if (time1 == NULL || time0 == NULL || result == NULL) {
+ return -1;
+ }
+
+ if (time1->tm.tv_sec >= time0->tm.tv_sec) {
+ result->tm.tv_sec = time1->tm.tv_sec - time0->tm.tv_sec;
+ if (time1->tm.tv_nsec >= time0->tm.tv_nsec) {
+ result->tm.tv_nsec = time1->tm.tv_nsec - time0->tm.tv_nsec;
+ }
+ else if(result->tm.tv_sec == 0){
+ /* underflow */
+ return -2;
+ }
+ else{
+ result->tm.tv_nsec = ONESEC_IN_NSEC
+ + time1->tm.tv_nsec - time0->tm.tv_nsec;
+ result->tm.tv_sec--;
+ }
+ }
+ else {
+ /* underflow */
+ return -3;
+ }
+ return 0;
+}
+
+int flb_time_append_to_mpack(mpack_writer_t *writer, struct flb_time *tm, int fmt)
+{
+ int ret = 0;
+ struct flb_time l_time;
+ char ext_data[8];
+ uint32_t tmp;
+
+ if (!is_valid_format(fmt)) {
+#ifdef FLB_TIME_FORCE_FMT_INT
+ fmt = FLB_TIME_ETFMT_INT;
+#else
+ fmt = FLB_TIME_ETFMT_V1_FIXEXT;
+#endif
+ }
+
+ if (tm == NULL) {
+ if (fmt == FLB_TIME_ETFMT_INT) {
+ l_time.tm.tv_sec = time(NULL);
+ }
+ else {
+ _flb_time_get(&l_time);
+ }
+ tm = &l_time;
+ }
+
+ switch(fmt) {
+ case FLB_TIME_ETFMT_INT:
+ mpack_write_uint(writer, tm->tm.tv_sec);
+ break;
+
+ case FLB_TIME_ETFMT_V0:
+ case FLB_TIME_ETFMT_V1_EXT:
+ /* We can't set with msgpack-c !! */
+ /* see pack_template.h and msgpack_pack_inline_func(_ext) */
+ case FLB_TIME_ETFMT_V1_FIXEXT:
+ tmp = htonl((uint32_t)tm->tm.tv_sec); /* second from epoch */
+ memcpy(&ext_data, &tmp, 4);
+ tmp = htonl((uint32_t)tm->tm.tv_nsec);/* nanosecond */
+ memcpy(&ext_data[4], &tmp, 4);
+
+ /* https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#eventtime-ext-format */
+ mpack_write_ext(writer, 0 /*ext type=0 */, ext_data, sizeof(ext_data));
+ break;
+
+ default:
+ ret = -1;
+ }
+
+ return ret;
+}
+
+int flb_time_append_to_msgpack(struct flb_time *tm, msgpack_packer *pk, int fmt)
+{
+ int ret = 0;
+ struct flb_time l_time;
+ char ext_data[8];
+ uint32_t tmp;
+
+ if (!is_valid_format(fmt)) {
+#ifdef FLB_TIME_FORCE_FMT_INT
+ fmt = FLB_TIME_ETFMT_INT;
+#else
+ fmt = FLB_TIME_ETFMT_V1_FIXEXT;
+#endif
+ }
+
+ if (tm == NULL) {
+ if (fmt == FLB_TIME_ETFMT_INT) {
+ l_time.tm.tv_sec = time(NULL);
+ }
+ else {
+ _flb_time_get(&l_time);
+ }
+ tm = &l_time;
+ }
+
+ switch(fmt) {
+ case FLB_TIME_ETFMT_INT:
+ msgpack_pack_uint64(pk, tm->tm.tv_sec);
+ break;
+
+ case FLB_TIME_ETFMT_V0:
+ case FLB_TIME_ETFMT_V1_EXT:
+ /* We can't set with msgpack-c !! */
+ /* see pack_template.h and msgpack_pack_inline_func(_ext) */
+ case FLB_TIME_ETFMT_V1_FIXEXT:
+ tmp = htonl((uint32_t)tm->tm.tv_sec); /* second from epoch */
+ memcpy(&ext_data, &tmp, 4);
+ tmp = htonl((uint32_t)tm->tm.tv_nsec);/* nanosecond */
+ memcpy(&ext_data[4], &tmp, 4);
+
+ msgpack_pack_ext(pk, 8/*fixext8*/, 0);
+ msgpack_pack_ext_body(pk, ext_data, sizeof(ext_data));
+
+ break;
+
+ default:
+ ret = -1;
+ }
+
+ return ret;
+}
+
+static inline int is_eventtime(msgpack_object *obj)
+{
+ if (obj->via.ext.type != 0 || obj->via.ext.size != 8) {
+ return FLB_FALSE;
+ }
+ return FLB_TRUE;
+}
+
+int flb_time_msgpack_to_time(struct flb_time *time, msgpack_object *obj)
+{
+ uint32_t tmp;
+
+ switch(obj->type) {
+ case MSGPACK_OBJECT_POSITIVE_INTEGER:
+ time->tm.tv_sec = obj->via.u64;
+ time->tm.tv_nsec = 0;
+ break;
+ case MSGPACK_OBJECT_FLOAT:
+ time->tm.tv_sec = obj->via.f64;
+ time->tm.tv_nsec = ((obj->via.f64 - time->tm.tv_sec) * ONESEC_IN_NSEC);
+ break;
+ case MSGPACK_OBJECT_EXT:
+ if (is_eventtime(obj) != FLB_TRUE) {
+ flb_warn("[time] unknown ext type. type=%d size=%d",
+ obj->via.ext.type, obj->via.ext.size);
+ return -1;
+ }
+ memcpy(&tmp, &obj->via.ext.ptr[0], 4);
+ time->tm.tv_sec = (uint32_t) ntohl(tmp);
+ memcpy(&tmp, &obj->via.ext.ptr[4], 4);
+ time->tm.tv_nsec = (uint32_t) ntohl(tmp);
+ break;
+ default:
+ flb_warn("unknown time format %x", obj->type);
+ return -1;
+ }
+
+ return 0;
+}
+
+int flb_time_pop_from_mpack(struct flb_time *time, mpack_reader_t *reader)
+{
+ mpack_tag_t tag;
+ double d;
+ float f;
+ int64_t i;
+ uint32_t tmp;
+ char extbuf[8];
+ size_t ext_len;
+ int header_detected;
+
+ if (time == NULL) {
+ return -1;
+ }
+
+ header_detected = FLB_FALSE;
+
+ /* consume the record array */
+ tag = mpack_read_tag(reader);
+
+ if (mpack_reader_error(reader) != mpack_ok ||
+ mpack_tag_type(&tag) != mpack_type_array ||
+ mpack_tag_array_count(&tag) == 0) {
+ return -1;
+ }
+
+ /* consume the header array or the timestamp
+ * depending on the chunk encoding
+ */
+ tag = mpack_read_tag(reader);
+
+ if (mpack_reader_error(reader) != mpack_ok) {
+ return -1;
+ }
+
+ if (mpack_tag_type(&tag) == mpack_type_array) {
+ if(mpack_tag_array_count(&tag) != 2) {
+ return -1;
+ }
+
+ /* consume the timestamp element */
+ tag = mpack_read_tag(reader);
+
+ if (mpack_reader_error(reader) != mpack_ok) {
+ return -1;
+ }
+
+ header_detected = FLB_TRUE;
+ }
+
+ switch (mpack_tag_type(&tag)) {
+ case mpack_type_int:
+ i = mpack_tag_int_value(&tag);
+ if (i < 0) {
+ flb_warn("expecting positive integer, got %" PRId64, i);
+ return -1;
+ }
+ time->tm.tv_sec = i;
+ time->tm.tv_nsec = 0;
+ break;
+ case mpack_type_uint:
+ time->tm.tv_sec = mpack_tag_uint_value(&tag);
+ time->tm.tv_nsec = 0;
+ break;
+ case mpack_type_float:
+ f = mpack_tag_float_value(&tag);
+ time->tm.tv_sec = f;
+ time->tm.tv_nsec = ((f - time->tm.tv_sec) * ONESEC_IN_NSEC);
+ case mpack_type_double:
+ d = mpack_tag_double_value(&tag);
+ time->tm.tv_sec = d;
+ time->tm.tv_nsec = ((d - time->tm.tv_sec) * ONESEC_IN_NSEC);
+ break;
+ case mpack_type_ext:
+ ext_len = mpack_tag_ext_length(&tag);
+ if (ext_len != 8) {
+ flb_warn("expecting ext_len is 8, got %ld", ext_len);
+ return -1;
+ }
+ mpack_read_bytes(reader, extbuf, ext_len);
+ memcpy(&tmp, extbuf, 4);
+ time->tm.tv_sec = (uint32_t) ntohl(tmp);
+ memcpy(&tmp, extbuf + 4, 4);
+ time->tm.tv_nsec = (uint32_t) ntohl(tmp);
+ break;
+ default:
+ flb_warn("unknown time format %d", tag.type);
+ return -1;
+ }
+
+ /* discard the metadata map if present */
+
+ if (header_detected) {
+ mpack_discard(reader);
+ }
+
+ return 0;
+}
+
+int flb_time_pop_from_msgpack(struct flb_time *time, msgpack_unpacked *upk,
+ msgpack_object **map)
+{
+ int ret;
+ msgpack_object obj;
+
+ if (time == NULL || upk == NULL) {
+ return -1;
+ }
+
+ if (upk->data.type != MSGPACK_OBJECT_ARRAY) {
+ return -1;
+ }
+
+ obj = upk->data.via.array.ptr[0];
+
+ if (obj.type == MSGPACK_OBJECT_ARRAY) {
+ if (obj.via.array.size != 2) {
+ return -1;
+ }
+
+ obj = obj.via.array.ptr[0];
+ }
+
+ *map = &upk->data.via.array.ptr[1];
+
+ ret = flb_time_msgpack_to_time(time, &obj);
+ return ret;
+}
+
+long flb_time_tz_offset_to_second()
+{
+ time_t t = time(NULL);
+ struct tm local = *localtime(&t);
+ struct tm utc = *gmtime(&t);
+
+ long diff = ((local.tm_hour - utc.tm_hour) \
+ * 60 + (local.tm_min - utc.tm_min)) \
+ * 60L + (local.tm_sec - utc.tm_sec);
+
+ int delta_day = local.tm_mday - utc.tm_mday;
+
+ if ((delta_day == 1) || (delta_day < -1)) {
+ diff += 24L * 60 * 60;
+ }
+ else if ((delta_day == -1) || (delta_day > 1)) {
+ diff -= 24L * 60 * 60;
+ }
+
+ return diff;
+}