summaryrefslogtreecommitdiffstats
path: root/src/jaegertracing/thrift/lib/cpp/src/thrift/async/TConcurrentClientSyncInfo.cpp
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/jaegertracing/thrift/lib/cpp/src/thrift/async/TConcurrentClientSyncInfo.cpp243
1 files changed, 243 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/cpp/src/thrift/async/TConcurrentClientSyncInfo.cpp b/src/jaegertracing/thrift/lib/cpp/src/thrift/async/TConcurrentClientSyncInfo.cpp
new file mode 100644
index 000000000..0dac52458
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/cpp/src/thrift/async/TConcurrentClientSyncInfo.cpp
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <limits>
+#include <memory>
+#include <thrift/TApplicationException.h>
+#include <thrift/async/TConcurrentClientSyncInfo.h>
+#include <thrift/transport/TTransportException.h>
+
+namespace apache { namespace thrift { namespace async {
+
+using namespace ::apache::thrift::concurrency;
+
+TConcurrentClientSyncInfo::TConcurrentClientSyncInfo() :
+ stop_(false),
+ seqidMutex_(),
+ // test rollover all the time
+ nextseqid_((std::numeric_limits<int32_t>::max)()-10),
+ seqidToMonitorMap_(),
+ freeMonitors_(),
+ writeMutex_(),
+ readMutex_(),
+ recvPending_(false),
+ wakeupSomeone_(false),
+ seqidPending_(0),
+ fnamePending_(),
+ mtypePending_(::apache::thrift::protocol::T_CALL)
+{
+ freeMonitors_.reserve(MONITOR_CACHE_SIZE);
+}
+
+bool TConcurrentClientSyncInfo::getPending(
+ std::string &fname,
+ ::apache::thrift::protocol::TMessageType &mtype,
+ int32_t &rseqid)
+{
+ if(stop_)
+ throwDeadConnection_();
+ wakeupSomeone_ = false;
+ if(recvPending_)
+ {
+ recvPending_ = false;
+ rseqid = seqidPending_;
+ fname = fnamePending_;
+ mtype = mtypePending_;
+ return true;
+ }
+ return false;
+}
+
+void TConcurrentClientSyncInfo::updatePending(
+ const std::string &fname,
+ ::apache::thrift::protocol::TMessageType mtype,
+ int32_t rseqid)
+{
+ recvPending_ = true;
+ seqidPending_ = rseqid;
+ fnamePending_ = fname;
+ mtypePending_ = mtype;
+ MonitorPtr monitor;
+ {
+ Guard seqidGuard(seqidMutex_);
+ auto i = seqidToMonitorMap_.find(rseqid);
+ if(i == seqidToMonitorMap_.end())
+ throwBadSeqId_();
+ monitor = i->second;
+ }
+ monitor->notify();
+}
+
+void TConcurrentClientSyncInfo::waitForWork(int32_t seqid)
+{
+ MonitorPtr m;
+ {
+ Guard seqidGuard(seqidMutex_);
+ m = seqidToMonitorMap_[seqid];
+ }
+ while(true)
+ {
+ // be very careful about setting state in this loop that affects waking up. You may exit
+ // this function, attempt to grab some work, and someone else could have beaten you (or not
+ // left) the read mutex, and that will put you right back in this loop, with the mangled
+ // state you left behind.
+ if(stop_)
+ throwDeadConnection_();
+ if(wakeupSomeone_)
+ return;
+ if(recvPending_ && seqidPending_ == seqid)
+ return;
+ m->waitForever();
+ }
+}
+
+void TConcurrentClientSyncInfo::throwBadSeqId_()
+{
+ throw apache::thrift::TApplicationException(
+ TApplicationException::BAD_SEQUENCE_ID,
+ "server sent a bad seqid");
+}
+
+void TConcurrentClientSyncInfo::throwDeadConnection_()
+{
+ throw apache::thrift::transport::TTransportException(
+ apache::thrift::transport::TTransportException::NOT_OPEN,
+ "this client died on another thread, and is now in an unusable state");
+}
+
+void TConcurrentClientSyncInfo::wakeupAnyone_(const Guard &)
+{
+ wakeupSomeone_ = true;
+ if(!seqidToMonitorMap_.empty())
+ {
+ // The monitor map maps integers to monitors. Larger integers are more recent
+ // messages. Since this is ordered, it means that the last element is the most recent.
+ // We are trying to guess which thread will have its message complete next, so we are picking
+ // the most recent. The oldest message is likely to be some polling, long lived message.
+ // If we guess right, the thread we wake up will handle the message that comes in.
+ // If we guess wrong, the thread we wake up will hand off the work to the correct thread,
+ // costing us an extra context switch.
+ seqidToMonitorMap_.rbegin()->second->notify();
+ }
+}
+
+void TConcurrentClientSyncInfo::markBad_(const Guard &)
+{
+ wakeupSomeone_ = true;
+ stop_ = true;
+ for(auto & i : seqidToMonitorMap_)
+ i.second->notify();
+}
+
+TConcurrentClientSyncInfo::MonitorPtr
+TConcurrentClientSyncInfo::newMonitor_(const Guard &)
+{
+ if(freeMonitors_.empty())
+ return std::make_shared<Monitor>(&readMutex_);
+ MonitorPtr retval;
+ //swapping to avoid an atomic operation
+ retval.swap(freeMonitors_.back());
+ freeMonitors_.pop_back();
+ return retval;
+}
+
+void TConcurrentClientSyncInfo::deleteMonitor_(
+ const Guard &,
+ TConcurrentClientSyncInfo::MonitorPtr &m) /*noexcept*/
+{
+ if(freeMonitors_.size() > MONITOR_CACHE_SIZE)
+ {
+ m.reset();
+ return;
+ }
+ //freeMonitors_ was reserved up to MONITOR_CACHE_SIZE in the ctor,
+ //so this shouldn't throw
+ freeMonitors_.push_back(TConcurrentClientSyncInfo::MonitorPtr());
+ //swapping to avoid an atomic operation
+ m.swap(freeMonitors_.back());
+}
+
+int32_t TConcurrentClientSyncInfo::generateSeqId()
+{
+ Guard seqidGuard(seqidMutex_);
+ if(stop_)
+ throwDeadConnection_();
+
+ if(!seqidToMonitorMap_.empty())
+ if(nextseqid_ == seqidToMonitorMap_.begin()->first)
+ throw apache::thrift::TApplicationException(
+ TApplicationException::BAD_SEQUENCE_ID,
+ "about to repeat a seqid");
+ int32_t newSeqId = nextseqid_++;
+ seqidToMonitorMap_[newSeqId] = newMonitor_(seqidGuard);
+ return newSeqId;
+}
+
+TConcurrentRecvSentry::TConcurrentRecvSentry(TConcurrentClientSyncInfo *sync, int32_t seqid) :
+ sync_(*sync),
+ seqid_(seqid),
+ committed_(false)
+{
+ sync_.getReadMutex().lock();
+}
+
+TConcurrentRecvSentry::~TConcurrentRecvSentry()
+{
+ {
+ Guard seqidGuard(sync_.seqidMutex_);
+ sync_.deleteMonitor_(seqidGuard, sync_.seqidToMonitorMap_[seqid_]);
+
+ sync_.seqidToMonitorMap_.erase(seqid_);
+ if(committed_)
+ sync_.wakeupAnyone_(seqidGuard);
+ else
+ sync_.markBad_(seqidGuard);
+ }
+ sync_.getReadMutex().unlock();
+}
+
+void TConcurrentRecvSentry::commit()
+{
+ committed_ = true;
+}
+
+TConcurrentSendSentry::TConcurrentSendSentry(TConcurrentClientSyncInfo *sync) :
+ sync_(*sync),
+ committed_(false)
+{
+ sync_.getWriteMutex().lock();
+}
+
+TConcurrentSendSentry::~TConcurrentSendSentry()
+{
+ if(!committed_)
+ {
+ Guard seqidGuard(sync_.seqidMutex_);
+ sync_.markBad_(seqidGuard);
+ }
+ sync_.getWriteMutex().unlock();
+}
+
+void TConcurrentSendSentry::commit()
+{
+ committed_ = true;
+}
+
+
+}}} // apache::thrift::async