summaryrefslogtreecommitdiffstats
path: root/src/jaegertracing/jaeger-client-cpp/crossdock
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/jaegertracing/jaeger-client-cpp/crossdock
parentInitial commit. (diff)
downloadceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz
ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/jaegertracing/jaeger-client-cpp/crossdock')
-rw-r--r--src/jaegertracing/jaeger-client-cpp/crossdock/Dockerfile10
-rw-r--r--src/jaegertracing/jaeger-client-cpp/crossdock/Server.cpp808
-rw-r--r--src/jaegertracing/jaeger-client-cpp/crossdock/Server.h87
-rw-r--r--src/jaegertracing/jaeger-client-cpp/crossdock/docker-compose.yml68
4 files changed, 973 insertions, 0 deletions
diff --git a/src/jaegertracing/jaeger-client-cpp/crossdock/Dockerfile b/src/jaegertracing/jaeger-client-cpp/crossdock/Dockerfile
new file mode 100644
index 000000000..bfca32db6
--- /dev/null
+++ b/src/jaegertracing/jaeger-client-cpp/crossdock/Dockerfile
@@ -0,0 +1,10 @@
+FROM gcc:7.2
+
+ADD ./build/crossdock /
+
+ENV AGENT_HOST_PORT=jaeger-agent:5775
+ENV SAMPLING_SERVER_URL=http://test_driver:5778/sampling
+
+EXPOSE 8080-8082
+
+CMD ["/crossdock"]
diff --git a/src/jaegertracing/jaeger-client-cpp/crossdock/Server.cpp b/src/jaegertracing/jaeger-client-cpp/crossdock/Server.cpp
new file mode 100644
index 000000000..5ba85b743
--- /dev/null
+++ b/src/jaegertracing/jaeger-client-cpp/crossdock/Server.cpp
@@ -0,0 +1,808 @@
+/*
+ * Copyright (c) 2017-2018 Uber Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Server.h"
+
+#include <atomic>
+#include <cstdlib>
+#include <future>
+#include <sstream>
+#include <thread>
+
+#include <nlohmann/json.hpp>
+
+#include "jaegertracing/Tracer.h"
+#include "jaegertracing/net/IPAddress.h"
+#include "jaegertracing/net/Socket.h"
+#include "jaegertracing/net/http/Request.h"
+#include "jaegertracing/net/http/Response.h"
+
+namespace jaegertracing {
+namespace crossdock {
+namespace thrift {
+
+#define JSON_FROM_FIELD(var, field) \
+ { \
+ json[#field] = var.field; \
+ }
+
+#define FIELD_FROM_JSON(var, field) \
+ { \
+ var.__set_##field(json.at(#field)); \
+ }
+
+void to_json(nlohmann::json& json, const Transport::type& transport)
+{
+ json = _Transport_VALUES_TO_NAMES.at(static_cast<int>(transport));
+}
+
+void from_json(const nlohmann::json& json, Transport::type& transport)
+{
+ const auto str = json.get<std::string>();
+ if (str == "HTTP") {
+ transport = Transport::HTTP;
+ return;
+ }
+ if (str == "TCHANNEL") {
+ transport = Transport::TCHANNEL;
+ return;
+ }
+ if (str == "DUMMY") {
+ transport = Transport::DUMMY;
+ return;
+ }
+ std::ostringstream oss;
+ oss << "Invalid transport value " << str;
+ throw std::invalid_argument(oss.str());
+}
+
+void to_json(nlohmann::json& json, const Downstream& downstream)
+{
+ JSON_FROM_FIELD(downstream, serviceName);
+ JSON_FROM_FIELD(downstream, serverRole);
+ JSON_FROM_FIELD(downstream, host);
+ JSON_FROM_FIELD(downstream, port);
+ JSON_FROM_FIELD(downstream, transport);
+ if (downstream.downstream) {
+ json["downstream"] = *downstream.downstream;
+ }
+}
+
+void from_json(const nlohmann::json& json, Downstream& downstream)
+{
+ FIELD_FROM_JSON(downstream, serviceName);
+ FIELD_FROM_JSON(downstream, serverRole);
+ FIELD_FROM_JSON(downstream, host);
+ FIELD_FROM_JSON(downstream, port);
+ downstream.__set_transport(json.at("transport").get<Transport::type>());
+ auto itr = json.find("downstream");
+ if (itr != std::end(json) && !itr->is_null()) {
+ downstream.__set_downstream(
+ std::make_shared<Downstream>(itr->get<Downstream>()));
+ }
+}
+
+void to_json(nlohmann::json& json, const StartTraceRequest& request)
+{
+ JSON_FROM_FIELD(request, serverRole);
+ JSON_FROM_FIELD(request, sampled);
+ JSON_FROM_FIELD(request, baggage);
+ JSON_FROM_FIELD(request, downstream);
+}
+
+void from_json(const nlohmann::json& json, StartTraceRequest& request)
+{
+ FIELD_FROM_JSON(request, serverRole);
+ FIELD_FROM_JSON(request, sampled);
+ FIELD_FROM_JSON(request, baggage);
+ FIELD_FROM_JSON(request, downstream);
+}
+
+void to_json(nlohmann::json& json, const JoinTraceRequest& request)
+{
+ JSON_FROM_FIELD(request, serverRole);
+ if (request.__isset.downstream) {
+ json["downstream"] = request.downstream;
+ }
+}
+
+void from_json(const nlohmann::json& json, JoinTraceRequest& request)
+{
+ FIELD_FROM_JSON(request, serverRole);
+ auto itr = json.find("downstream");
+ if (itr != std::end(json) && !itr->is_null()) {
+ request.__set_downstream(itr->get<Downstream>());
+ }
+}
+
+void to_json(nlohmann::json& json, const ObservedSpan& observedSpan)
+{
+ JSON_FROM_FIELD(observedSpan, traceId);
+ JSON_FROM_FIELD(observedSpan, sampled);
+ JSON_FROM_FIELD(observedSpan, baggage);
+}
+
+void from_json(const nlohmann::json& json, ObservedSpan& observedSpan)
+{
+ FIELD_FROM_JSON(observedSpan, traceId);
+ FIELD_FROM_JSON(observedSpan, sampled);
+ FIELD_FROM_JSON(observedSpan, baggage);
+}
+
+void to_json(nlohmann::json& json, const TraceResponse& response)
+{
+ if (response.__isset.span) {
+ JSON_FROM_FIELD(response, span);
+ }
+ if (response.downstream) {
+ json["downstream"] = *response.downstream;
+ }
+ JSON_FROM_FIELD(response, notImplementedError);
+}
+
+void from_json(const nlohmann::json& json, TraceResponse& response)
+{
+ auto itr = json.find("span");
+ if (itr != std::end(json) && !itr->is_null()) {
+ response.__set_span(itr->get<ObservedSpan>());
+ }
+ itr = json.find("downstream");
+ if (itr != std::end(json) && !itr->is_null()) {
+ response.__set_downstream(
+ std::make_shared<TraceResponse>(itr->get<TraceResponse>()));
+ }
+ FIELD_FROM_JSON(response, notImplementedError);
+}
+
+#undef FIELD_FROM_JSON
+#undef JSON_FROM_FIELD
+
+} // namespace thrift
+
+namespace {
+
+constexpr auto kBaggageKey = "crossdock-baggage-key";
+constexpr auto kDefaultTracerServiceName = "crossdock-cpp";
+
+std::string escape(const std::string& str)
+{
+ std::string result;
+ result.reserve(str.size());
+ for (auto&& ch : str) {
+ switch (ch) {
+ case '\n': {
+ result += "\\n";
+ } break;
+ case '\r': {
+ result += "\\r";
+ } break;
+ default: {
+ result += ch;
+ } break;
+ }
+ }
+ return result;
+}
+
+std::string bufferedRead(net::Socket& socket)
+{
+ constexpr auto kBufferSize = 256;
+ std::array<char, kBufferSize> buffer;
+ std::string data;
+ auto numRead = ::read(socket.handle(), &buffer[0], buffer.size());
+ data.append(&buffer[0], numRead);
+ while (numRead == kBufferSize) {
+ numRead = ::read(socket.handle(), &buffer[0], buffer.size());
+ data.append(&buffer[0], numRead);
+ }
+ return data;
+}
+
+class RequestReader : public opentracing::HTTPHeadersReader {
+ public:
+ explicit RequestReader(const net::http::Request& request)
+ : _request(request)
+ {
+ }
+
+ opentracing::expected<void> ForeachKey(
+ std::function<opentracing::expected<void>(opentracing::string_view,
+ opentracing::string_view)> f)
+ const override
+ {
+ for (auto&& header : _request.headers()) {
+ const auto result = f(header.key(), header.value());
+ if (!result) {
+ return result;
+ }
+ }
+ return opentracing::make_expected();
+ }
+
+ private:
+ const net::http::Request& _request;
+};
+
+class RequestWriter : public opentracing::HTTPHeadersWriter {
+ public:
+ explicit RequestWriter(std::ostream& requestStream)
+ : _requestStream(requestStream)
+ {
+ }
+
+ opentracing::expected<void>
+ Set(opentracing::string_view key,
+ opentracing::string_view value) const override
+ {
+ _requestStream << key << ": " << value << "\r\n";
+ return opentracing::make_expected();
+ }
+
+ private:
+ std::ostream& _requestStream;
+};
+
+thrift::ObservedSpan observeSpan(const opentracing::SpanContext& ctx)
+{
+ const auto& sc = static_cast<const SpanContext&>(ctx);
+ thrift::ObservedSpan observedSpan;
+ std::ostringstream oss;
+ oss << sc.traceID();
+ observedSpan.__set_traceId(oss.str());
+ observedSpan.__set_sampled(sc.isSampled());
+ auto itr = sc.baggage().find(kBaggageKey);
+ if (itr != std::end(sc.baggage())) {
+ observedSpan.__set_baggage(itr->second);
+ }
+ return observedSpan;
+}
+
+thrift::TraceResponse callDownstreamHTTP(const opentracing::SpanContext& ctx,
+ const thrift::Downstream& target,
+ opentracing::Tracer& tracer,
+ logging::Logger& logger)
+{
+ thrift::JoinTraceRequest request;
+ request.__set_serverRole(target.serverRole);
+ if (target.downstream) {
+ request.__set_downstream(*target.downstream);
+ }
+
+ const auto requestJSON = nlohmann::json(request).dump();
+ net::Socket socket;
+ socket.open(AF_INET, SOCK_STREAM);
+ const auto authority = target.host + ':' + target.port;
+ socket.connect("http://" + authority);
+ std::ostringstream oss;
+ oss << "POST /join_trace HTTP/1.1\r\n"
+ "Host: "
+ << authority << "\r\n";
+ RequestWriter writer(oss);
+ tracer.Inject(ctx, writer);
+ oss << "Connection: close\r\n"
+ "Content-Type: application/json\r\n"
+ "Content-Length: "
+ << requestJSON.size() << "\r\n\r\n"
+ << requestJSON;
+ const auto message = oss.str();
+ logger.info("Sending request downstream: " + escape(message));
+ const auto numWritten =
+ ::write(socket.handle(), &message[0], message.size());
+ (void)numWritten;
+
+ const auto responseStr = bufferedRead(socket);
+ logger.info("Received downstream response: " + escape(responseStr));
+ std::istringstream iss(responseStr);
+ auto response = net::http::Response::parse(iss);
+ return nlohmann::json::parse(response.body());
+}
+
+thrift::TraceResponse callDownstream(const opentracing::SpanContext& ctx,
+ const std::string& /* role */,
+ const thrift::Downstream& downstream,
+ opentracing::Tracer& tracer,
+ logging::Logger& logger)
+{
+ thrift::TraceResponse response;
+
+ switch (downstream.transport) {
+ case thrift::Transport::HTTP: {
+ response = callDownstreamHTTP(ctx, downstream, tracer, logger);
+ } break;
+ case thrift::Transport::TCHANNEL: {
+ response.__set_notImplementedError(
+ "TCHANNEL transport not implemented");
+ } break;
+ case thrift::Transport::DUMMY: {
+ response.__set_notImplementedError("DUMMY transport not implemented");
+ } break;
+ default: {
+ throw std::invalid_argument("Unrecognized protocol " +
+ std::to_string(downstream.transport));
+ } break;
+ }
+
+ return response;
+}
+
+thrift::TraceResponse prepareResponse(const opentracing::SpanContext& ctx,
+ const std::string& role,
+ const thrift::Downstream* downstream,
+ opentracing::Tracer& tracer,
+ logging::Logger& logger)
+{
+ const auto observedSpan = observeSpan(ctx);
+ thrift::TraceResponse response;
+ response.__set_span(observedSpan);
+ if (downstream) {
+ response.__set_downstream(std::make_shared<thrift::TraceResponse>(
+ callDownstream(ctx, role, *downstream, tracer, logger)));
+ }
+ return response;
+}
+
+struct GenerateTracesRequest {
+ using StrMap = std::unordered_map<std::string, std::string>;
+
+ std::string _type;
+ std::string _operation;
+ StrMap _tags;
+ int _count;
+};
+
+void from_json(const nlohmann::json& json, GenerateTracesRequest& request)
+{
+ request._type = json.at("type");
+ request._operation = json.at("operation");
+ request._tags = json.at("tags").get<GenerateTracesRequest::StrMap>();
+ request._count = json.at("count");
+}
+
+} // anonymous namespace
+
+using Handler = std::function<std::string(const net::http::Request&)>;
+
+class Server::SocketListener {
+ public:
+ SocketListener(const net::IPAddress& ip,
+ const std::shared_ptr<logging::Logger>& logger,
+ Handler handler)
+ : _ip(ip)
+ , _logger(logger)
+ , _handler(handler)
+ , _running(false)
+ {
+ assert(_logger);
+ }
+
+ ~SocketListener() { stop(); }
+
+ void start()
+ {
+ std::promise<void> started;
+ _thread = std::thread([this, &started]() { start(_ip, started); });
+ started.get_future().get();
+ }
+
+ void stop() noexcept
+ {
+ if (_running) {
+ _running = false;
+ _thread.join();
+ _socket.close();
+ }
+ }
+
+ private:
+ void start(const net::IPAddress& ip, std::promise<void>& started)
+ {
+ _socket.open(AF_INET, SOCK_STREAM);
+ const auto enable = 1;
+ ::setsockopt(_socket.handle(),
+ SOL_SOCKET,
+ SO_REUSEADDR,
+ &enable,
+ sizeof(enable));
+ _socket.bind(ip);
+ _socket.listen();
+ _running = true;
+ started.set_value();
+
+ using TaskList = std::deque<std::future<void>>;
+ TaskList tasks;
+
+ while (_running) {
+ auto client = _socket.accept();
+ auto future = std::async(
+ std::launch::async,
+ [this](net::Socket&& socket) {
+ net::Socket client(std::move(socket));
+ auto requestStr = bufferedRead(client);
+ _logger->info("Received request: " + escape(requestStr));
+
+ try {
+ std::istringstream iss(requestStr);
+ const auto request = net::http::Request::parse(iss);
+ const auto responseStr = _handler(request);
+ const auto numWritten = ::write(client.handle(),
+ &responseStr[0],
+ responseStr.size());
+ if (numWritten !=
+ static_cast<int>(responseStr.size())) {
+ std::ostringstream oss;
+ oss << "Unable to write entire response"
+ ", numWritten="
+ << numWritten
+ << ", responseSize=" << responseStr.size();
+ _logger->error(oss.str());
+ }
+ } catch (...) {
+ utils::ErrorUtil::logError(*_logger, "Server error");
+ constexpr auto message =
+ "HTTP/1.1 500 Internal Server Error\r\n\r\n";
+ constexpr auto messageSize = sizeof(message) - 1;
+ const auto numWritten =
+ ::write(client.handle(), message, messageSize);
+ (void)numWritten;
+ }
+
+ client.close();
+ },
+ std::move(client));
+ tasks.emplace_back(std::move(future));
+ }
+
+ std::for_each(std::begin(tasks),
+ std::end(tasks),
+ [](TaskList::value_type& future) { future.get(); });
+ }
+
+ net::IPAddress _ip;
+ net::Socket _socket;
+ std::shared_ptr<logging::Logger> _logger;
+ Handler _handler;
+ std::atomic<bool> _running;
+ std::thread _thread;
+};
+
+class Server::EndToEndHandler {
+ public:
+ using TracerPtr = std::shared_ptr<opentracing::Tracer>;
+
+ EndToEndHandler(const std::string& agentHostPort,
+ const std::string& collectorEndpoint,
+ const std::string& samplingServerURL)
+ : _agentHostPort(agentHostPort)
+ , _collectorEndpoint(collectorEndpoint)
+ , _samplingServerURL(samplingServerURL)
+ {
+ }
+
+ TracerPtr findOrMakeTracer(std::string samplerType)
+ {
+ if (samplerType.empty()) {
+ samplerType = kSamplerTypeRemote;
+ }
+
+ std::lock_guard<std::mutex> lock(_mutex);
+ auto itr = _tracers.find(samplerType);
+ if (itr != std::end(_tracers)) {
+ return itr->second;
+ }
+ return init(samplerType);
+ }
+
+ private:
+ Config makeEndToEndConfig(const std::string& samplerType) const
+ {
+ return Config(false,
+ samplers::Config(samplerType,
+ 1.0,
+ _samplingServerURL,
+ samplers::Config::kDefaultMaxOperations,
+ std::chrono::seconds(5)),
+ reporters::Config(reporters::Config::kDefaultQueueSize,
+ std::chrono::seconds(1),
+ false,
+ _agentHostPort,
+ _collectorEndpoint));
+ }
+
+ TracerPtr init(const std::string& samplerType)
+ {
+ const auto config = makeEndToEndConfig(samplerType);
+ auto tracer = Tracer::make(kDefaultTracerServiceName, config);
+ _tracers[config.sampler().type()] = tracer;
+ return tracer;
+ }
+
+ std::string _agentHostPort;
+ std::string _collectorEndpoint;
+ std::string _samplingServerURL;
+ std::unordered_map<std::string, TracerPtr> _tracers;
+ std::mutex _mutex;
+};
+
+Server::Server(const net::IPAddress& clientIP,
+ const net::IPAddress& serverIP,
+ const std::string& agentHostPort,
+ const std::string& collectorEndpoint,
+ const std::string& samplingServerURL)
+ : _logger(logging::consoleLogger())
+ , _tracer(Tracer::make(kDefaultTracerServiceName, Config(), _logger))
+ , _clientListener(
+ new SocketListener(clientIP,
+ _logger,
+ [this](const net::http::Request& request) {
+ return handleRequest(request);
+ }))
+ , _serverListener(
+ new SocketListener(serverIP,
+ _logger,
+ [this](const net::http::Request& request) {
+ return handleRequest(request);
+ }))
+ , _handler(new EndToEndHandler(agentHostPort, collectorEndpoint, samplingServerURL))
+{
+}
+
+Server::~Server() = default;
+
+void Server::serve()
+{
+ _clientListener->start();
+ _serverListener->start();
+}
+
+template <typename RequestType>
+std::string Server::handleJSON(
+ const net::http::Request& request,
+ std::function<thrift::TraceResponse(
+ const RequestType&, const opentracing::SpanContext&)> handler)
+{
+ RequestReader reader(request);
+ auto result = _tracer->Extract(reader);
+ if (!result) {
+ std::ostringstream oss;
+ oss << "Cannot read request body: opentracing error code "
+ << result.error().value();
+ const auto message = oss.str();
+ oss.str("");
+ oss.clear();
+ oss << "HTTP/1.1 400 Bad Request\r\n"
+ "Content-Length: "
+ << message.size() << "\r\n\r\n"
+ << message;
+ }
+
+ std::unique_ptr<opentracing::SpanContext> ctx(result->release());
+ opentracing::StartSpanOptions options;
+ options.start_system_timestamp = std::chrono::system_clock::now();
+ options.start_steady_timestamp = std::chrono::steady_clock::now();
+ if (ctx) {
+ options.references.emplace_back(std::make_pair(
+ opentracing::SpanReferenceType::ChildOfRef, ctx.get()));
+ }
+ auto span = _tracer->StartSpanWithOptions("post", options);
+
+ RequestType thriftRequest;
+ try {
+ thriftRequest = nlohmann::json::parse(request.body());
+ } catch (const std::exception& ex) {
+ std::ostringstream oss;
+ oss << "Cannot parse request JSON: " << ex.what()
+ << ", json: " << request.body();
+ const auto message = oss.str();
+ oss.str("");
+ oss.clear();
+ oss << "HTTP/1.1 500 Internal Server Error\r\n"
+ "Content-Length: "
+ << message.size() << "\r\n\r\n"
+ << message;
+ return oss.str();
+ } catch (...) {
+ std::ostringstream oss;
+ oss << "Cannot parse request JSON, json: " << request.body();
+ const auto message = oss.str();
+ oss.str("");
+ oss.clear();
+ oss << "HTTP/1.1 500 Internal Server Error\r\n"
+ "Content-Length: "
+ << message.size() << "\r\n\r\n"
+ << message;
+ return oss.str();
+ }
+
+ const auto thriftResponse = handler(thriftRequest, span->context());
+ try {
+ const auto message = nlohmann::json(thriftResponse).dump();
+ std::ostringstream oss;
+ oss << "HTTP/1.1 200 OK\r\n"
+ "Content-Type: application/json\r\n"
+ "Content-Length: "
+ << message.size() << "\r\n\r\n"
+ << message;
+ return oss.str();
+ } catch (const std::exception& ex) {
+ std::ostringstream oss;
+ oss << "Cannot marshal response to JSON: " << ex.what();
+ const auto message = oss.str();
+ oss.str("");
+ oss.clear();
+ oss << "HTTP/1.1 500 Internal Server Error\r\n"
+ "Content-Length: "
+ << message.size() << "\r\n\r\n"
+ << message;
+ return oss.str();
+ } catch (...) {
+ std::ostringstream oss;
+ oss << "Cannot marshal response to JSON";
+ const auto message = oss.str();
+ oss.str("");
+ oss.clear();
+ oss << "HTTP/1.1 500 Internal Server Error\r\n"
+ "Content-Length: "
+ << message.size() << "\r\n\r\n"
+ << message;
+ return oss.str();
+ }
+}
+
+std::string Server::handleRequest(const net::http::Request& request)
+{
+ if (request.target() == "/") {
+ return "HTTP/1.1 200 OK\r\n\r\n";
+ }
+ if (request.target() == "/start_trace") {
+ return handleJSON<thrift::StartTraceRequest>(
+ request,
+ [this](const thrift::StartTraceRequest& request,
+ const opentracing::SpanContext& /* ctx */) {
+ return startTrace(request);
+ });
+ }
+ if (request.target() == "/join_trace") {
+ return handleJSON<thrift::JoinTraceRequest>(
+ request,
+ [this](const thrift::JoinTraceRequest& request,
+ const opentracing::SpanContext& ctx) {
+ return joinTrace(request, ctx);
+ });
+ }
+ if (request.target() == "/create_traces") {
+ return generateTraces(request);
+ }
+ return "HTTP/1.1 404 Not Found\r\n\r\n";
+}
+
+thrift::TraceResponse
+Server::startTrace(const crossdock::thrift::StartTraceRequest& request)
+{
+ auto span = _tracer->StartSpan(request.serverRole);
+ if (request.sampled) {
+ span->SetTag("sampling.priority", 1);
+ }
+ span->SetBaggageItem(kBaggageKey, request.baggage);
+
+ return prepareResponse(span->context(),
+ request.serverRole,
+ &request.downstream,
+ *_tracer,
+ *_logger);
+}
+
+thrift::TraceResponse
+Server::joinTrace(const crossdock::thrift::JoinTraceRequest& request,
+ const opentracing::SpanContext& ctx)
+{
+ return prepareResponse(ctx,
+ request.serverRole,
+ request.__isset.downstream ? &request.downstream
+ : nullptr,
+ *_tracer,
+ *_logger);
+}
+
+std::string Server::generateTraces(const net::http::Request& requestHTTP)
+{
+ GenerateTracesRequest request;
+ try {
+ request = nlohmann::json::parse(requestHTTP.body());
+ } catch (const std::exception& ex) {
+ std::ostringstream oss;
+ oss << "JSON payload is invalid: " << ex.what();
+ const auto message = oss.str();
+ oss.str("");
+ oss.clear();
+ oss << "HTTP/1.1 400 Bad Request\r\n"
+ "Content-Length: "
+ << message.size() << "\r\n\r\n"
+ << message;
+ return oss.str();
+ } catch (...) {
+ const std::string message("JSON payload is invalid");
+ std::ostringstream oss;
+ oss << "HTTP/1.1 400 Bad Request\r\n"
+ "Content-Length: "
+ << message.size() << "\r\n\r\n"
+ << message;
+ return oss.str();
+ }
+
+ auto tracer = _handler->findOrMakeTracer(request._type);
+ if (!tracer) {
+ const std::string message("Tracer is not initialized");
+ std::ostringstream oss;
+ oss << "HTTP/1.1 500 Internal Server Error\r\n"
+ "Content-Length: "
+ << message.size() << "\r\n"
+ << message;
+ return oss.str();
+ }
+
+ for (auto i = 0; i < request._count; ++i) {
+ auto span = tracer->StartSpan(request._operation);
+ for (auto&& pair : request._tags) {
+ span->SetTag(pair.first, pair.second);
+ }
+ span->Finish();
+ }
+
+ return "HTTP/1.1 200 OK\r\n\r\n";
+}
+
+} // namespace crossdock
+} // namespace jaegertracing
+
+int main()
+{
+ const auto rawSenderType = std::getenv("SENDER");
+ const std::string senderType(rawSenderType ? rawSenderType : "");
+
+ if (senderType.empty()) {
+ std::cerr << "env SENDER is not specified!\n";
+ return 1;
+ }
+
+ const auto rawAgentHostPort = std::getenv("AGENT_HOST_PORT");
+ const std::string agentHostPort(rawAgentHostPort ? rawAgentHostPort : "");
+
+ if (agentHostPort.empty() && senderType == "udp") {
+ std::cerr << "env AGENT_HOST_PORT is not specified!\n";
+ return 1;
+ }
+
+ const std::string collectorEndpoint(senderType == "http" ? "http://jaeger-collector:14268/api/traces" : "");
+
+ const auto rawSamplingServerURL = std::getenv("SAMPLING_SERVER_URL");
+ const std::string samplingServerURL(
+ rawSamplingServerURL ? rawSamplingServerURL : "");
+ if (samplingServerURL.empty()) {
+ std::cerr << "env SAMPLING_SERVER_URL is not specified!\n";
+ return 1;
+ }
+
+ jaegertracing::crossdock::Server server(
+ jaegertracing::net::IPAddress::v4("0.0.0.0:8080"),
+ jaegertracing::net::IPAddress::v4("0.0.0.0:8081"),
+ agentHostPort,
+ collectorEndpoint,
+ samplingServerURL);
+ server.serve();
+
+ std::this_thread::sleep_for(std::chrono::hours(1));
+ return 0;
+}
diff --git a/src/jaegertracing/jaeger-client-cpp/crossdock/Server.h b/src/jaegertracing/jaeger-client-cpp/crossdock/Server.h
new file mode 100644
index 000000000..e02d32294
--- /dev/null
+++ b/src/jaegertracing/jaeger-client-cpp/crossdock/Server.h
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2017 Uber Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef JAEGERTRACING_CROSSDOCK_SERVER_H
+#define JAEGERTRACING_CROSSDOCK_SERVER_H
+
+#include <memory>
+
+#include <opentracing/tracer.h>
+
+#include "jaegertracing/thrift-gen/tracetest_types.h"
+
+namespace jaegertracing {
+namespace logging {
+
+class Logger;
+
+} // namespace logging
+
+namespace net {
+
+class IPAddress;
+
+namespace http {
+
+class Request;
+
+} // namespace http
+} // namespace net
+
+namespace crossdock {
+
+class Server {
+ public:
+ Server(const net::IPAddress& clientIP,
+ const net::IPAddress& serverIP,
+ const std::string& agentHostPort,
+ const std::string& collectorEndpoint,
+ const std::string& samplingServerURL);
+
+ ~Server();
+
+ void serve();
+
+ private:
+ template <typename RequestType>
+ std::string handleJSON(
+ const net::http::Request& request,
+ std::function<thrift::TraceResponse(
+ const RequestType&, const opentracing::SpanContext&)> handler);
+
+ std::string handleRequest(const net::http::Request& request);
+
+ thrift::TraceResponse startTrace(const thrift::StartTraceRequest& request);
+
+ thrift::TraceResponse joinTrace(const thrift::JoinTraceRequest& request,
+ const opentracing::SpanContext& ctx);
+
+ std::string generateTraces(const net::http::Request& request);
+
+ class SocketListener;
+ class EndToEndHandler;
+
+ std::shared_ptr<logging::Logger> _logger;
+ std::shared_ptr<opentracing::Tracer> _tracer;
+ std::unique_ptr<SocketListener> _clientListener;
+ std::unique_ptr<SocketListener> _serverListener;
+ std::unique_ptr<EndToEndHandler> _handler;
+};
+
+} // namespace crossdock
+} // namespace jaegertracing
+
+#endif // JAEGERTRACING_CROSSDOCK_SERVER_H
diff --git a/src/jaegertracing/jaeger-client-cpp/crossdock/docker-compose.yml b/src/jaegertracing/jaeger-client-cpp/crossdock/docker-compose.yml
new file mode 100644
index 000000000..550f0799f
--- /dev/null
+++ b/src/jaegertracing/jaeger-client-cpp/crossdock/docker-compose.yml
@@ -0,0 +1,68 @@
+version: '2'
+
+services:
+ crossdock:
+ image: crossdock/crossdock
+ links:
+ - test_driver
+ - go
+ - cpp-udp
+ - cpp-http
+ environment:
+ - WAIT_FOR=test_driver,go,cpp-udp,cpp-http
+ - WAIT_FOR_TIMEOUT=60s
+
+ - CALL_TIMEOUT=60s
+
+ - AXIS_CLIENT=go
+ - AXIS_S1NAME=go,cpp-udp
+ - AXIS_SAMPLED=true,false
+ - AXIS_S2NAME=go,cpp-udp
+ - AXIS_S2TRANSPORT=http
+ - AXIS_S3NAME=go,cpp-udp
+ - AXIS_S3TRANSPORT=http
+
+ - BEHAVIOR_TRACE=client,s1name,sampled,s2name,s2transport,s3name,s3transport
+
+ - AXIS_TESTDRIVER=test_driver
+ - AXIS_SERVICES=cpp-udp,cpp-http
+
+ - BEHAVIOR_ENDTOEND=testdriver,services
+
+ - REPORT=compact
+
+ go:
+ image: jaegertracing/xdock-go
+ ports:
+ - "8080-8082"
+
+ cpp-udp:
+ depends_on:
+ - test_driver
+ build:
+ context: $PWD
+ dockerfile: crossdock/Dockerfile
+ ports:
+ - "8080-8082"
+ environment:
+ - SENDER=udp
+
+ cpp-http:
+ depends_on:
+ - test_driver
+ build:
+ context: $PWD
+ dockerfile: crossdock/Dockerfile
+ ports:
+ - "8080-8082"
+ environment:
+ - SENDER=http
+
+ test_driver:
+ image: jaegertracing/test-driver
+ depends_on:
+ - jaeger-query
+ - jaeger-collector
+ - jaeger-agent
+ ports:
+ - "8080"