From 17d6a993fc17d533460c5f40f3908c708e057c18 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Thu, 23 May 2024 18:45:17 +0200 Subject: Merging upstream version 18.2.3. Signed-off-by: Daniel Baumann --- src/rgw/rgw_kafka.cc | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'src/rgw/rgw_kafka.cc') diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index 642787a38..5b9d2f27c 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -100,8 +100,9 @@ struct connection_t { // fire all remaining callbacks (if not fired by rd_kafka_flush) std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) { cb_tag.cb(status); - ldout(cct, 20) << "Kafka destroy: invoking callback with tag=" << cb_tag.tag << - " for: " << broker << dendl; + ldout(cct, 20) << "Kafka destroy: invoking callback with tag=" + << cb_tag.tag << " for: " << broker + << " with status: " << status << dendl; }); callbacks.clear(); delivery_tag = 1; @@ -418,7 +419,9 @@ private: if (tag) { auto const q_len = conn->callbacks.size(); if (q_len < max_inflight) { - ldout(conn->cct, 20) << "Kafka publish (with callback, tag=" << *tag << "): OK. Queue has: " << q_len << " callbacks" << dendl; + ldout(conn->cct, 20) + << "Kafka publish (with callback, tag=" << *tag + << "): OK. Queue has: " << q_len + 1 << " callbacks" << dendl; conn->callbacks.emplace_back(*tag, message->cb); } else { // immediately invoke callback with error - this is not a connection error @@ -462,7 +465,7 @@ private: if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) { ldout(conn->cct, 20) << "kafka run: deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl; std::lock_guard lock(connections_lock); - conn->destroy(STATUS_CONNECTION_IDLE); + conn->status = STATUS_CONNECTION_IDLE; conn_it = connections.erase(conn_it); --connection_count; \ continue; -- cgit v1.2.3