summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_buf.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_buf.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_buf.c530
1 files changed, 530 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_buf.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_buf.c
new file mode 100644
index 000000000..5a0e131e8
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_buf.c
@@ -0,0 +1,530 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "rdkafka_int.h"
+#include "rdkafka_buf.h"
+#include "rdkafka_broker.h"
+#include "rdkafka_interceptor.h"
+
+void rd_kafka_buf_destroy_final(rd_kafka_buf_t *rkbuf) {
+
+ switch (rkbuf->rkbuf_reqhdr.ApiKey) {
+ case RD_KAFKAP_Metadata:
+ if (rkbuf->rkbuf_u.Metadata.topics)
+ rd_list_destroy(rkbuf->rkbuf_u.Metadata.topics);
+ if (rkbuf->rkbuf_u.Metadata.reason)
+ rd_free(rkbuf->rkbuf_u.Metadata.reason);
+ if (rkbuf->rkbuf_u.Metadata.rko)
+ rd_kafka_op_reply(rkbuf->rkbuf_u.Metadata.rko,
+ RD_KAFKA_RESP_ERR__DESTROY);
+ if (rkbuf->rkbuf_u.Metadata.decr) {
+ /* Decrease metadata cache's full_.._sent state. */
+ mtx_lock(rkbuf->rkbuf_u.Metadata.decr_lock);
+ rd_kafka_assert(NULL,
+ (*rkbuf->rkbuf_u.Metadata.decr) > 0);
+ (*rkbuf->rkbuf_u.Metadata.decr)--;
+ mtx_unlock(rkbuf->rkbuf_u.Metadata.decr_lock);
+ }
+ break;
+
+ case RD_KAFKAP_Produce:
+ rd_kafka_msgbatch_destroy(&rkbuf->rkbuf_batch);
+ break;
+ }
+
+ if (rkbuf->rkbuf_response)
+ rd_kafka_buf_destroy(rkbuf->rkbuf_response);
+
+ if (rkbuf->rkbuf_make_opaque && rkbuf->rkbuf_free_make_opaque_cb)
+ rkbuf->rkbuf_free_make_opaque_cb(rkbuf->rkbuf_make_opaque);
+
+ rd_kafka_replyq_destroy(&rkbuf->rkbuf_replyq);
+ rd_kafka_replyq_destroy(&rkbuf->rkbuf_orig_replyq);
+
+ rd_buf_destroy(&rkbuf->rkbuf_buf);
+
+ if (rkbuf->rkbuf_rktp_vers)
+ rd_list_destroy(rkbuf->rkbuf_rktp_vers);
+
+ if (rkbuf->rkbuf_rkb)
+ rd_kafka_broker_destroy(rkbuf->rkbuf_rkb);
+
+ rd_refcnt_destroy(&rkbuf->rkbuf_refcnt);
+
+ rd_free(rkbuf);
+}
+
+
+
+/**
+ * @brief Pushes \p buf of size \p len as a new segment on the buffer.
+ *
+ * \p buf will NOT be freed by the buffer.
+ */
+void rd_kafka_buf_push0(rd_kafka_buf_t *rkbuf,
+ const void *buf,
+ size_t len,
+ int allow_crc_calc,
+ void (*free_cb)(void *)) {
+ rd_buf_push(&rkbuf->rkbuf_buf, buf, len, free_cb);
+
+ if (allow_crc_calc && (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC))
+ rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, buf, len);
+}
+
+
+
+/**
+ * @brief Create a new buffer with \p segcmt initial segments and \p size bytes
+ * of initial backing memory.
+ * The underlying buffer will grow as needed.
+ *
+ * If \p rk is non-NULL (typical case):
+ * Additional space for the Kafka protocol headers is inserted automatically.
+ */
+rd_kafka_buf_t *rd_kafka_buf_new0(int segcnt, size_t size, int flags) {
+ rd_kafka_buf_t *rkbuf;
+
+ rkbuf = rd_calloc(1, sizeof(*rkbuf));
+
+ rkbuf->rkbuf_flags = flags;
+
+ rd_buf_init(&rkbuf->rkbuf_buf, segcnt, size);
+ rd_refcnt_init(&rkbuf->rkbuf_refcnt, 1);
+
+ return rkbuf;
+}
+
+
+/**
+ * @brief Create new request buffer with the request-header written (will
+ * need to be updated with Length, etc, later)
+ */
+rd_kafka_buf_t *rd_kafka_buf_new_request0(rd_kafka_broker_t *rkb,
+ int16_t ApiKey,
+ int segcnt,
+ size_t size,
+ rd_bool_t is_flexver) {
+ rd_kafka_buf_t *rkbuf;
+
+ /* Make room for common protocol request headers */
+ size += RD_KAFKAP_REQHDR_SIZE +
+ RD_KAFKAP_STR_SIZE(rkb->rkb_rk->rk_client_id) +
+ /* Flexible version adds a tag list to the headers
+ * and to the end of the payload, both of which we send
+ * as empty (1 byte each). */
+ (is_flexver ? 1 + 1 : 0);
+ segcnt += 1; /* headers */
+
+ rkbuf = rd_kafka_buf_new0(segcnt, size, 0);
+
+ rkbuf->rkbuf_rkb = rkb;
+ rd_kafka_broker_keep(rkb);
+
+ rkbuf->rkbuf_rel_timeout = rkb->rkb_rk->rk_conf.socket_timeout_ms;
+ rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_DEFAULT_RETRIES;
+
+ rkbuf->rkbuf_reqhdr.ApiKey = ApiKey;
+
+ /* Write request header, will be updated later. */
+ /* Length: updated later */
+ rd_kafka_buf_write_i32(rkbuf, 0);
+ /* ApiKey */
+ rd_kafka_buf_write_i16(rkbuf, rkbuf->rkbuf_reqhdr.ApiKey);
+ /* ApiVersion: updated later */
+ rd_kafka_buf_write_i16(rkbuf, 0);
+ /* CorrId: updated later */
+ rd_kafka_buf_write_i32(rkbuf, 0);
+
+ /* ClientId */
+ rd_kafka_buf_write_kstr(rkbuf, rkb->rkb_rk->rk_client_id);
+
+ if (is_flexver) {
+ /* Must set flexver after writing the client id since
+ * it is still a standard non-compact string. */
+ rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_FLEXVER;
+
+ /* Empty request header tags */
+ rd_kafka_buf_write_i8(rkbuf, 0);
+ }
+
+ return rkbuf;
+}
+
+
+
+/**
+ * @brief Create new read-only rkbuf shadowing a memory region.
+ *
+ * @remark \p free_cb (possibly NULL) will be used to free \p ptr when
+ * buffer refcount reaches 0.
+ * @remark the buffer may only be read from, not written to.
+ *
+ * @warning If the caller has log_decode_errors > 0 then it must set up
+ * \c rkbuf->rkbuf_rkb to a refcnt-increased broker object.
+ */
+rd_kafka_buf_t *
+rd_kafka_buf_new_shadow(const void *ptr, size_t size, void (*free_cb)(void *)) {
+ rd_kafka_buf_t *rkbuf;
+
+ rkbuf = rd_calloc(1, sizeof(*rkbuf));
+
+ rkbuf->rkbuf_reqhdr.ApiKey = RD_KAFKAP_None;
+
+ rd_buf_init(&rkbuf->rkbuf_buf, 1, 0);
+ rd_buf_push(&rkbuf->rkbuf_buf, ptr, size, free_cb);
+
+ rkbuf->rkbuf_totlen = size;
+
+ /* Initialize reader slice */
+ rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf);
+
+ rd_refcnt_init(&rkbuf->rkbuf_refcnt, 1);
+
+ return rkbuf;
+}
+
+
+
+void rd_kafka_bufq_enq(rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf) {
+ TAILQ_INSERT_TAIL(&rkbufq->rkbq_bufs, rkbuf, rkbuf_link);
+ rd_atomic32_add(&rkbufq->rkbq_cnt, 1);
+ if (rkbuf->rkbuf_reqhdr.ApiKey == RD_KAFKAP_Produce)
+ rd_atomic32_add(&rkbufq->rkbq_msg_cnt,
+ rd_kafka_msgq_len(&rkbuf->rkbuf_batch.msgq));
+}
+
+void rd_kafka_bufq_deq(rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf) {
+ TAILQ_REMOVE(&rkbufq->rkbq_bufs, rkbuf, rkbuf_link);
+ rd_kafka_assert(NULL, rd_atomic32_get(&rkbufq->rkbq_cnt) > 0);
+ rd_atomic32_sub(&rkbufq->rkbq_cnt, 1);
+ if (rkbuf->rkbuf_reqhdr.ApiKey == RD_KAFKAP_Produce)
+ rd_atomic32_sub(&rkbufq->rkbq_msg_cnt,
+ rd_kafka_msgq_len(&rkbuf->rkbuf_batch.msgq));
+}
+
+void rd_kafka_bufq_init(rd_kafka_bufq_t *rkbufq) {
+ TAILQ_INIT(&rkbufq->rkbq_bufs);
+ rd_atomic32_init(&rkbufq->rkbq_cnt, 0);
+ rd_atomic32_init(&rkbufq->rkbq_msg_cnt, 0);
+}
+
+/**
+ * Concat all buffers from 'src' to tail of 'dst'
+ */
+void rd_kafka_bufq_concat(rd_kafka_bufq_t *dst, rd_kafka_bufq_t *src) {
+ TAILQ_CONCAT(&dst->rkbq_bufs, &src->rkbq_bufs, rkbuf_link);
+ (void)rd_atomic32_add(&dst->rkbq_cnt, rd_atomic32_get(&src->rkbq_cnt));
+ (void)rd_atomic32_add(&dst->rkbq_msg_cnt,
+ rd_atomic32_get(&src->rkbq_msg_cnt));
+ rd_kafka_bufq_init(src);
+}
+
+/**
+ * Purge the wait-response queue.
+ * NOTE: 'rkbufq' must be a temporary queue and not one of rkb_waitresps
+ * or rkb_outbufs since buffers may be re-enqueued on those queues.
+ * 'rkbufq' needs to be bufq_init():ed before reuse after this call.
+ */
+void rd_kafka_bufq_purge(rd_kafka_broker_t *rkb,
+ rd_kafka_bufq_t *rkbufq,
+ rd_kafka_resp_err_t err) {
+ rd_kafka_buf_t *rkbuf, *tmp;
+
+ rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
+
+ rd_rkb_dbg(rkb, QUEUE, "BUFQ", "Purging bufq with %i buffers",
+ rd_atomic32_get(&rkbufq->rkbq_cnt));
+
+ TAILQ_FOREACH_SAFE(rkbuf, &rkbufq->rkbq_bufs, rkbuf_link, tmp) {
+ rd_kafka_buf_callback(rkb->rkb_rk, rkb, err, NULL, rkbuf);
+ }
+}
+
+
+/**
+ * @brief Update bufq for connection reset:
+ *
+ * - Purge connection-setup API requests from the queue.
+ * - Reset any partially sent buffer's offset. (issue #756)
+ *
+ * Request types purged:
+ * ApiVersion
+ * SaslHandshake
+ */
+void rd_kafka_bufq_connection_reset(rd_kafka_broker_t *rkb,
+ rd_kafka_bufq_t *rkbufq) {
+ rd_kafka_buf_t *rkbuf, *tmp;
+ rd_ts_t now = rd_clock();
+
+ rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
+
+ rd_rkb_dbg(rkb, QUEUE, "BUFQ",
+ "Updating %d buffers on connection reset",
+ rd_atomic32_get(&rkbufq->rkbq_cnt));
+
+ TAILQ_FOREACH_SAFE(rkbuf, &rkbufq->rkbq_bufs, rkbuf_link, tmp) {
+ switch (rkbuf->rkbuf_reqhdr.ApiKey) {
+ case RD_KAFKAP_ApiVersion:
+ case RD_KAFKAP_SaslHandshake:
+ rd_kafka_bufq_deq(rkbufq, rkbuf);
+ rd_kafka_buf_callback(rkb->rkb_rk, rkb,
+ RD_KAFKA_RESP_ERR__DESTROY, NULL,
+ rkbuf);
+ break;
+ default:
+ /* Reset buffer send position and corrid */
+ rd_slice_seek(&rkbuf->rkbuf_reader, 0);
+ rkbuf->rkbuf_corrid = 0;
+ /* Reset timeout */
+ rd_kafka_buf_calc_timeout(rkb->rkb_rk, rkbuf, now);
+ break;
+ }
+ }
+}
+
+
+void rd_kafka_bufq_dump(rd_kafka_broker_t *rkb,
+ const char *fac,
+ rd_kafka_bufq_t *rkbq) {
+ rd_kafka_buf_t *rkbuf;
+ int cnt = rd_kafka_bufq_cnt(rkbq);
+ rd_ts_t now;
+
+ if (!cnt)
+ return;
+
+ now = rd_clock();
+
+ rd_rkb_dbg(rkb, BROKER, fac, "bufq with %d buffer(s):", cnt);
+
+ TAILQ_FOREACH(rkbuf, &rkbq->rkbq_bufs, rkbuf_link) {
+ rd_rkb_dbg(rkb, BROKER, fac,
+ " Buffer %s (%" PRIusz " bytes, corrid %" PRId32
+ ", "
+ "connid %d, prio %d, retry %d in %lldms, "
+ "timeout in %lldms)",
+ rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.ApiKey),
+ rkbuf->rkbuf_totlen, rkbuf->rkbuf_corrid,
+ rkbuf->rkbuf_connid, rkbuf->rkbuf_prio,
+ rkbuf->rkbuf_retries,
+ rkbuf->rkbuf_ts_retry
+ ? (rkbuf->rkbuf_ts_retry - now) / 1000LL
+ : 0,
+ rkbuf->rkbuf_ts_timeout
+ ? (rkbuf->rkbuf_ts_timeout - now) / 1000LL
+ : 0);
+ }
+}
+
+
+
+/**
+ * @brief Calculate the effective timeout for a request attempt
+ */
+void rd_kafka_buf_calc_timeout(const rd_kafka_t *rk,
+ rd_kafka_buf_t *rkbuf,
+ rd_ts_t now) {
+ if (likely(rkbuf->rkbuf_rel_timeout)) {
+ /* Default:
+ * Relative timeout, set request timeout to
+ * to now + rel timeout. */
+ rkbuf->rkbuf_ts_timeout = now + rkbuf->rkbuf_rel_timeout * 1000;
+ } else if (!rkbuf->rkbuf_force_timeout) {
+ /* Use absolute timeout, limited by socket.timeout.ms */
+ rd_ts_t sock_timeout =
+ now + rk->rk_conf.socket_timeout_ms * 1000;
+
+ rkbuf->rkbuf_ts_timeout =
+ RD_MIN(sock_timeout, rkbuf->rkbuf_abs_timeout);
+ } else {
+ /* Use absolue timeout without limit. */
+ rkbuf->rkbuf_ts_timeout = rkbuf->rkbuf_abs_timeout;
+ }
+}
+
+/**
+ * Retry failed request, if permitted.
+ * @remark \p rkb may be NULL
+ * @remark the retry count is only increased for actually transmitted buffers,
+ * if there is a failure while the buffers lingers in the output queue
+ * (rkb_outbufs) then the retry counter is not increased.
+ * Returns 1 if the request was scheduled for retry, else 0.
+ */
+int rd_kafka_buf_retry(rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) {
+ int incr_retry = rd_kafka_buf_was_sent(rkbuf) ? 1 : 0;
+
+ /* Don't allow retries of dummy/empty buffers */
+ rd_assert(rd_buf_len(&rkbuf->rkbuf_buf) > 0);
+
+ if (unlikely(!rkb || rkb->rkb_source == RD_KAFKA_INTERNAL ||
+ rd_kafka_terminating(rkb->rkb_rk) ||
+ rkbuf->rkbuf_retries + incr_retry >
+ rkbuf->rkbuf_max_retries))
+ return 0;
+
+ /* Absolute timeout, check for expiry. */
+ if (rkbuf->rkbuf_abs_timeout && rkbuf->rkbuf_abs_timeout < rd_clock())
+ return 0; /* Expired */
+
+ /* Try again */
+ rkbuf->rkbuf_ts_sent = 0;
+ rkbuf->rkbuf_ts_timeout = 0; /* Will be updated in calc_timeout() */
+ rkbuf->rkbuf_retries += incr_retry;
+ rd_kafka_buf_keep(rkbuf);
+ rd_kafka_broker_buf_retry(rkb, rkbuf);
+ return 1;
+}
+
+
+/**
+ * @brief Handle RD_KAFKA_OP_RECV_BUF.
+ */
+void rd_kafka_buf_handle_op(rd_kafka_op_t *rko, rd_kafka_resp_err_t err) {
+ rd_kafka_buf_t *request, *response;
+ rd_kafka_t *rk;
+
+ request = rko->rko_u.xbuf.rkbuf;
+ rko->rko_u.xbuf.rkbuf = NULL;
+
+ /* NULL on op_destroy() */
+ if (request->rkbuf_replyq.q) {
+ int32_t version = request->rkbuf_replyq.version;
+ /* Current queue usage is done, but retain original replyq for
+ * future retries, stealing
+ * the current reference. */
+ request->rkbuf_orig_replyq = request->rkbuf_replyq;
+ rd_kafka_replyq_clear(&request->rkbuf_replyq);
+ /* Callback might need to version check so we retain the
+ * version across the clear() call which clears it. */
+ request->rkbuf_replyq.version = version;
+ }
+
+ if (!request->rkbuf_cb) {
+ rd_kafka_buf_destroy(request);
+ return;
+ }
+
+ /* Let buf_callback() do destroy()s */
+ response = request->rkbuf_response; /* May be NULL */
+ request->rkbuf_response = NULL;
+
+ if (!(rk = rko->rko_rk)) {
+ rd_assert(request->rkbuf_rkb != NULL);
+ rk = request->rkbuf_rkb->rkb_rk;
+ }
+
+ rd_kafka_buf_callback(rk, request->rkbuf_rkb, err, response, request);
+}
+
+
+
+/**
+ * Call request.rkbuf_cb(), but:
+ * - if the rkbuf has a rkbuf_replyq the buffer is enqueued on that queue
+ * with op type RD_KAFKA_OP_RECV_BUF.
+ * - else call rkbuf_cb().
+ *
+ * \p response may be NULL.
+ *
+ * Will decrease refcount for both response and request, eventually.
+ *
+ * The decision to retry, and the call to buf_retry(), is delegated
+ * to the buffer's response callback.
+ */
+void rd_kafka_buf_callback(rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *response,
+ rd_kafka_buf_t *request) {
+
+ rd_kafka_interceptors_on_response_received(
+ rk, -1, rkb ? rd_kafka_broker_name(rkb) : "",
+ rkb ? rd_kafka_broker_id(rkb) : -1, request->rkbuf_reqhdr.ApiKey,
+ request->rkbuf_reqhdr.ApiVersion, request->rkbuf_reshdr.CorrId,
+ response ? response->rkbuf_totlen : 0,
+ response ? response->rkbuf_ts_sent : -1, err);
+
+ if (err != RD_KAFKA_RESP_ERR__DESTROY && request->rkbuf_replyq.q) {
+ rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_RECV_BUF);
+
+ rd_kafka_assert(NULL, !request->rkbuf_response);
+ request->rkbuf_response = response;
+
+ /* Increment refcnt since rko_rkbuf will be decref:ed
+ * if replyq_enq() fails and we dont want the rkbuf gone in that
+ * case. */
+ rd_kafka_buf_keep(request);
+ rko->rko_u.xbuf.rkbuf = request;
+
+ rko->rko_err = err;
+
+ /* Copy original replyq for future retries, with its own
+ * queue reference. */
+ rd_kafka_replyq_copy(&request->rkbuf_orig_replyq,
+ &request->rkbuf_replyq);
+
+ rd_kafka_replyq_enq(&request->rkbuf_replyq, rko, 0);
+
+ rd_kafka_buf_destroy(request); /* from keep above */
+ return;
+ }
+
+ if (request->rkbuf_cb)
+ request->rkbuf_cb(rk, rkb, err, response, request,
+ request->rkbuf_opaque);
+
+ rd_kafka_buf_destroy(request);
+ if (response)
+ rd_kafka_buf_destroy(response);
+}
+
+
+
+/**
+ * @brief Set the maker callback, which will be called just prior to sending
+ * to construct the buffer contents.
+ *
+ * Use this when the usable ApiVersion must be known but the broker may
+ * currently be down.
+ *
+ * See rd_kafka_make_req_cb_t documentation for more info.
+ */
+void rd_kafka_buf_set_maker(rd_kafka_buf_t *rkbuf,
+ rd_kafka_make_req_cb_t *make_cb,
+ void *make_opaque,
+ void (*free_make_opaque_cb)(void *make_opaque)) {
+ rd_assert(!rkbuf->rkbuf_make_req_cb &&
+ !(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_NEED_MAKE));
+
+ rkbuf->rkbuf_make_req_cb = make_cb;
+ rkbuf->rkbuf_make_opaque = make_opaque;
+ rkbuf->rkbuf_free_make_opaque_cb = free_make_opaque_cb;
+
+ rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_NEED_MAKE;
+}