/* * Copyright (c) 2018-2024 OARC, Inc. * All rights reserved. * * This file is part of dnsjit. * * dnsjit is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * dnsjit is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with dnsjit. If not, see . */ #include "config.h" #include "core/channel.h" #include "core/assert.h" #include static core_log_t _log = LOG_T_INIT("core.channel"); static core_channel_t _defaults = { LOG_T_INIT_OBJ("core.channel"), 0, { 0 }, 0, 0, 0, 0 }; core_log_t* core_channel_log() { return &_log; } static inline bool _is_pow2(size_t num) { while (num != 1) { if (num % 2 != 0) return false; num = num / 2; } return true; } void core_channel_init(core_channel_t* self, size_t capacity) { mlassert_self(); if (capacity < 4 || !_is_pow2(capacity)) { mlfatal("invalid capacity"); } *self = _defaults; self->capacity = capacity; lfatal_oom(self->ring_buf = malloc(sizeof(ck_ring_buffer_t) * capacity)); ck_ring_init(&self->ring, capacity); } void core_channel_destroy(core_channel_t* self) { mlassert_self(); free(self->ring_buf); } void core_channel_put(core_channel_t* self, const void* obj) { mlassert_self(); lassert(self->ring_buf, "ring_buf is nil"); while (!ck_ring_enqueue_spsc(&self->ring, self->ring_buf, (void*)obj)) { sched_yield(); } } int core_channel_try_put(core_channel_t* self, const void* obj) { mlassert_self(); lassert(self->ring_buf, "ring_buf is nil"); if (!ck_ring_enqueue_spsc(&self->ring, self->ring_buf, (void*)obj)) { return -1; } return 0; } void* core_channel_get(core_channel_t* self) { void* obj = 0; mlassert_self(); lassert(self->ring_buf, "ring_buf is nil"); while (!ck_ring_dequeue_spsc(&self->ring, self->ring_buf, &obj)) { sched_yield(); if (ck_pr_load_int(&self->closed)) { linfo("channel closed"); return 0; } } return obj; } void* core_channel_try_get(core_channel_t* self) { void* obj = 0; mlassert_self(); lassert(self->ring_buf, "ring_buf is nil"); if (!ck_ring_dequeue_spsc(&self->ring, self->ring_buf, &obj)) { return 0; } return obj; } int core_channel_size(core_channel_t* self) { mlassert_self(); return ck_ring_size(&self->ring); } bool core_channel_full(core_channel_t* self) { mlassert_self(); /* ck_ring can only hold capacity minus one enties at a time */ if (ck_ring_size(&self->ring) < (self->capacity - 1)) { return false; } return true; } void core_channel_close(core_channel_t* self) { mlassert_self(); ck_pr_store_int(&self->closed, 1); } core_receiver_t core_channel_receiver() { return (core_receiver_t)core_channel_put; } void core_channel_run(core_channel_t* self) { void* obj = 0; mlassert_self(); lassert(self->ring_buf, "ring_buf is nil"); if (!self->recv) { lfatal("no receiver set"); } for (;;) { while (!ck_ring_dequeue_spsc(&self->ring, self->ring_buf, &obj)) { sched_yield(); if (ck_pr_load_int(&self->closed)) { linfo("channel closed"); return; } } self->recv(self->ctx, obj); } }