diff options
Diffstat (limited to 'src/jaegertracing/opentelemetry-cpp/ext')
54 files changed, 8044 insertions, 0 deletions
diff --git a/src/jaegertracing/opentelemetry-cpp/ext/BUILD b/src/jaegertracing/opentelemetry-cpp/ext/BUILD new file mode 100644 index 000000000..cc62431b5 --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/BUILD @@ -0,0 +1,7 @@ +package(default_visibility = ["//visibility:public"]) + +cc_library( + name = "headers", + hdrs = glob(["include/**/*.h"]), + strip_include_prefix = "include", +) diff --git a/src/jaegertracing/opentelemetry-cpp/ext/CMakeLists.txt b/src/jaegertracing/opentelemetry-cpp/ext/CMakeLists.txt new file mode 100644 index 000000000..034328eaa --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/CMakeLists.txt @@ -0,0 +1,27 @@ +add_library(opentelemetry_ext INTERFACE) +target_include_directories( + opentelemetry_ext + INTERFACE "$<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}/include>" + "$<INSTALL_INTERFACE:include>") + +set_target_properties(opentelemetry_ext PROPERTIES EXPORT_NAME "ext") +target_link_libraries(opentelemetry_ext INTERFACE opentelemetry_api) + +install( + TARGETS opentelemetry_ext + EXPORT "${PROJECT_NAME}-target" + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} + LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} + ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}) + +install( + DIRECTORY include/opentelemetry/ext + DESTINATION include/opentelemetry/ + FILES_MATCHING + PATTERN "*.h") + +add_subdirectory(src) + +if(BUILD_TESTING) + add_subdirectory(test) +endif() diff --git a/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h new file mode 100644 index 000000000..9f2f05f3f --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h @@ -0,0 +1,316 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include "http_operation_curl.h" +#include "opentelemetry/ext/http/client/http_client.h" +#include "opentelemetry/ext/http/common/url_parser.h" +#include "opentelemetry/version.h" + +#include <map> +#include <string> +#include <vector> + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace http +{ +namespace client +{ +namespace curl +{ + +const opentelemetry::ext::http::client::StatusCode Http_Ok = 200; + +class Request : public opentelemetry::ext::http::client::Request +{ +public: + Request() : method_(opentelemetry::ext::http::client::Method::Get), uri_("/") {} + + void SetMethod(opentelemetry::ext::http::client::Method method) noexcept override + { + method_ = method; + } + + void SetBody(opentelemetry::ext::http::client::Body &body) noexcept override + { + body_ = std::move(body); + } + + void AddHeader(nostd::string_view name, nostd::string_view value) noexcept override + { + headers_.insert(std::pair<std::string, std::string>(static_cast<std::string>(name), + static_cast<std::string>(value))); + } + + void ReplaceHeader(nostd::string_view name, nostd::string_view value) noexcept override + { + // erase matching headers + auto range = headers_.equal_range(static_cast<std::string>(name)); + headers_.erase(range.first, range.second); + AddHeader(name, value); + } + + virtual void SetUri(nostd::string_view uri) noexcept override + { + uri_ = static_cast<std::string>(uri); + } + + void SetTimeoutMs(std::chrono::milliseconds timeout_ms) noexcept override + { + timeout_ms_ = timeout_ms; + } + +public: + opentelemetry::ext::http::client::Method method_; + opentelemetry::ext::http::client::Body body_; + opentelemetry::ext::http::client::Headers headers_; + std::string uri_; + std::chrono::milliseconds timeout_ms_{5000}; // ms +}; + +class Response : public opentelemetry::ext::http::client::Response +{ +public: + Response() : status_code_(Http_Ok) {} + + virtual const opentelemetry::ext::http::client::Body &GetBody() const noexcept override + { + return body_; + } + + virtual bool ForEachHeader( + nostd::function_ref<bool(nostd::string_view name, nostd::string_view value)> callable) + const noexcept override + { + for (const auto &header : headers_) + { + if (!callable(header.first, header.second)) + { + return false; + } + } + return true; + } + + virtual bool ForEachHeader( + const nostd::string_view &name, + nostd::function_ref<bool(nostd::string_view name, nostd::string_view value)> callable) + const noexcept override + { + auto range = headers_.equal_range(static_cast<std::string>(name)); + for (auto it = range.first; it != range.second; ++it) + { + if (!callable(it->first, it->second)) + { + return false; + } + } + return true; + } + + virtual opentelemetry::ext::http::client::StatusCode GetStatusCode() const noexcept override + { + return status_code_; + } + +public: + Headers headers_; + opentelemetry::ext::http::client::Body body_; + opentelemetry::ext::http::client::StatusCode status_code_; +}; + +class HttpClient; + +class Session : public opentelemetry::ext::http::client::Session +{ +public: + Session(HttpClient &http_client, + std::string scheme = "http", + const std::string &host = "", + uint16_t port = 80) + : http_client_(http_client), is_session_active_(false) + { + host_ = scheme + "://" + host + ":" + std::to_string(port) + "/"; + } + + std::shared_ptr<opentelemetry::ext::http::client::Request> CreateRequest() noexcept override + { + http_request_.reset(new Request()); + return http_request_; + } + + virtual void SendRequest( + opentelemetry::ext::http::client::EventHandler &callback) noexcept override + { + is_session_active_ = true; + std::string url = host_ + std::string(http_request_->uri_); + auto callback_ptr = &callback; + curl_operation_.reset(new HttpOperation( + http_request_->method_, url, callback_ptr, RequestMode::Async, http_request_->headers_, + http_request_->body_, false, http_request_->timeout_ms_)); + curl_operation_->SendAsync([this, callback_ptr](HttpOperation &operation) { + if (operation.WasAborted()) + { + // Manually cancelled + callback_ptr->OnEvent(opentelemetry::ext::http::client::SessionState::Cancelled, ""); + } + + if (operation.GetResponseCode() >= CURL_LAST) + { + // we have a http response + auto response = std::unique_ptr<Response>(new Response()); + response->headers_ = operation.GetResponseHeaders(); + response->body_ = operation.GetResponseBody(); + response->status_code_ = operation.GetResponseCode(); + callback_ptr->OnResponse(*response); + } + is_session_active_ = false; + }); + } + + virtual bool CancelSession() noexcept override; + + virtual bool FinishSession() noexcept override; + + virtual bool IsSessionActive() noexcept override { return is_session_active_; } + + void SetId(uint64_t session_id) { session_id_ = session_id; } + + /** + * Returns the base URI. + * @return the base URI as a string consisting of scheme, host and port. + */ + const std::string &GetBaseUri() const { return host_; } + +#ifdef ENABLE_TEST + std::shared_ptr<Request> GetRequest() { return http_request_; } +#endif +private: + std::shared_ptr<Request> http_request_; + std::string host_; + std::unique_ptr<HttpOperation> curl_operation_; + uint64_t session_id_; + HttpClient &http_client_; + bool is_session_active_; +}; + +class HttpClientSync : public opentelemetry::ext::http::client::HttpClientSync +{ +public: + HttpClientSync() { curl_global_init(CURL_GLOBAL_ALL); } + + opentelemetry::ext::http::client::Result Get( + const nostd::string_view &url, + const opentelemetry::ext::http::client::Headers &headers) noexcept override + { + opentelemetry::ext::http::client::Body body; + HttpOperation curl_operation(opentelemetry::ext::http::client::Method::Get, url.data(), nullptr, + RequestMode::Sync, headers, body); + curl_operation.SendSync(); + auto session_state = curl_operation.GetSessionState(); + if (curl_operation.WasAborted()) + { + session_state = opentelemetry::ext::http::client::SessionState::Cancelled; + } + auto response = std::unique_ptr<Response>(new Response()); + if (curl_operation.GetResponseCode() >= CURL_LAST) + { + // we have a http response + + response->headers_ = curl_operation.GetResponseHeaders(); + response->body_ = curl_operation.GetResponseBody(); + response->status_code_ = curl_operation.GetResponseCode(); + } + return opentelemetry::ext::http::client::Result(std::move(response), session_state); + } + + opentelemetry::ext::http::client::Result Post( + const nostd::string_view &url, + const Body &body, + const opentelemetry::ext::http::client::Headers &headers) noexcept override + { + HttpOperation curl_operation(opentelemetry::ext::http::client::Method::Post, url.data(), + nullptr, RequestMode::Sync, headers, body); + curl_operation.SendSync(); + auto session_state = curl_operation.GetSessionState(); + if (curl_operation.WasAborted()) + { + session_state = opentelemetry::ext::http::client::SessionState::Cancelled; + } + auto response = std::unique_ptr<Response>(new Response()); + if (curl_operation.GetResponseCode() >= CURL_LAST) + { + // we have a http response + + response->headers_ = curl_operation.GetResponseHeaders(); + response->body_ = curl_operation.GetResponseBody(); + response->status_code_ = curl_operation.GetResponseCode(); + } + + return opentelemetry::ext::http::client::Result(std::move(response), session_state); + } + + ~HttpClientSync() { curl_global_cleanup(); } +}; + +class HttpClient : public opentelemetry::ext::http::client::HttpClient +{ +public: + // The call (curl_global_init) is not thread safe. Ensure this is called only once. + HttpClient() : next_session_id_{0} { curl_global_init(CURL_GLOBAL_ALL); } + + std::shared_ptr<opentelemetry::ext::http::client::Session> CreateSession( + nostd::string_view url) noexcept override + { + auto parsedUrl = common::UrlParser(std::string(url)); + if (!parsedUrl.success_) + { + return std::make_shared<Session>(*this); + } + auto session = + std::make_shared<Session>(*this, parsedUrl.scheme_, parsedUrl.host_, parsedUrl.port_); + auto session_id = ++next_session_id_; + session->SetId(session_id); + sessions_.insert({session_id, session}); + return session; + } + + bool CancelAllSessions() noexcept override + { + for (auto &session : sessions_) + { + session.second->CancelSession(); + } + return true; + } + + bool FinishAllSessions() noexcept override + { + for (auto &session : sessions_) + { + session.second->FinishSession(); + } + return true; + } + + void CleanupSession(uint64_t session_id) + { + // TBD = Need to be thread safe + sessions_.erase(session_id); + } + + ~HttpClient() { curl_global_cleanup(); } + +private: + std::atomic<uint64_t> next_session_id_; + std::map<uint64_t, std::shared_ptr<Session>> sessions_; +}; + +} // namespace curl +} // namespace client +} // namespace http +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/client/curl/http_operation_curl.h b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/client/curl/http_operation_curl.h new file mode 100644 index 000000000..679251cfe --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/client/curl/http_operation_curl.h @@ -0,0 +1,568 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include "http_client_curl.h" +#include "opentelemetry/ext/http/client/http_client.h" +#include "opentelemetry/version.h" + +#include <future> +#include <map> +#include <regex> +#include <sstream> +#include <string> +#include <vector> +#ifdef _WIN32 +# include <io.h> +# include <winsock2.h> +#else +# include <poll.h> +# include <unistd.h> +#endif +#include <curl/curl.h> + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace http +{ +namespace client +{ +namespace curl +{ +const std::chrono::milliseconds default_http_conn_timeout(5000); // ms +const std::string http_status_regexp = "HTTP\\/\\d\\.\\d (\\d+)\\ .*"; +const std::string http_header_regexp = "(.*)\\: (.*)\\n*"; + +enum class RequestMode +{ + Sync, + Async +}; + +class HttpOperation +{ +public: + void DispatchEvent(opentelemetry::ext::http::client::SessionState type, std::string reason = "") + { + if (request_mode_ == RequestMode::Async && callback_ != nullptr) + { + callback_->OnEvent(type, reason); + } + else + { + session_state_ = type; + } + } + + std::atomic<bool> is_aborted_; // Set to 'true' when async callback is aborted + std::atomic<bool> is_finished_; // Set to 'true' when async callback is finished. + + /** + * Create local CURL instance for url and body + * @param method // HTTP Method + * @param url // HTTP URL + * @param callback + * @param request_mode // sync or async + * @param request Request Headers + * @param body Reques Body + * @param raw_response whether to parse the response + * @param httpConnTimeout HTTP connection timeout in seconds + */ + HttpOperation(opentelemetry::ext::http::client::Method method, + std::string url, + opentelemetry::ext::http::client::EventHandler *callback, + RequestMode request_mode = RequestMode::Async, + // Default empty headers and empty request body + const opentelemetry::ext::http::client::Headers &request_headers = + opentelemetry::ext::http::client::Headers(), + const opentelemetry::ext::http::client::Body &request_body = + opentelemetry::ext::http::client::Body(), + // Default connectivity and response size options + bool is_raw_response = false, + std::chrono::milliseconds http_conn_timeout = default_http_conn_timeout) + : is_aborted_(false), + is_finished_(false), + // Optional connection params + is_raw_response_(is_raw_response), + http_conn_timeout_(http_conn_timeout), + request_mode_(request_mode), + curl_(nullptr), + // Result + res_(CURLE_OK), + callback_(callback), + method_(method), + url_(url), + // Local vars + request_headers_(request_headers), + request_body_(request_body), + sockfd_(0), + nread_(0) + { + /* get a curl handle */ + curl_ = curl_easy_init(); + if (!curl_) + { + res_ = CURLE_FAILED_INIT; + DispatchEvent(opentelemetry::ext::http::client::SessionState::CreateFailed); + return; + } + + curl_easy_setopt(curl_, CURLOPT_VERBOSE, 0); + + // Specify target URL + curl_easy_setopt(curl_, CURLOPT_URL, url_.c_str()); + + // TODO: support ssl cert verification for https request + curl_easy_setopt(curl_, CURLOPT_SSL_VERIFYPEER, 0); // 1L + curl_easy_setopt(curl_, CURLOPT_SSL_VERIFYHOST, 0); // 2L + + // Specify our custom headers + for (auto &kv : this->request_headers_) + { + std::string header = std::string(kv.first); + header += ": "; + header += std::string(kv.second); + headers_chunk_ = curl_slist_append(headers_chunk_, header.c_str()); + } + + if (headers_chunk_ != nullptr) + { + curl_easy_setopt(curl_, CURLOPT_HTTPHEADER, headers_chunk_); + } + + DispatchEvent(opentelemetry::ext::http::client::SessionState::Created); + } + + /** + * Destroy CURL instance + */ + virtual ~HttpOperation() + { + // Given the request has not been aborted we should wait for completion here + // This guarantees the lifetime of this request. + if (result_.valid()) + { + result_.wait(); + } + // TBD - Need to be uncomment. This will callback instance is deleted. + // DispatchEvent(opentelemetry::ext::http::client::SessionState::Destroy); + res_ = CURLE_OK; + curl_easy_cleanup(curl_); + curl_slist_free_all(headers_chunk_); + ReleaseResponse(); + } + + /** + * Finish CURL instance + */ + virtual void Finish() + { + if (result_.valid() && !is_finished_) + { + result_.wait(); + is_finished_ = true; + } + } + + /** + * Send request synchronously + */ + long Send() + { + ReleaseResponse(); + // Request buffer + const void *request = (request_body_.empty()) ? NULL : &request_body_[0]; + const size_t req_size = request_body_.size(); + if (!curl_) + { + res_ = CURLE_FAILED_INIT; + DispatchEvent(opentelemetry::ext::http::client::SessionState::SendFailed); + return res_; + } + + // TODO: control local port to use + // curl_easy_setopt(curl, CURLOPT_LOCALPORT, dcf_port); + + // Perform initial connect, handling the timeout if needed + curl_easy_setopt(curl_, CURLOPT_CONNECT_ONLY, 1L); + curl_easy_setopt(curl_, CURLOPT_TIMEOUT_MS, http_conn_timeout_.count()); + DispatchEvent(opentelemetry::ext::http::client::SessionState::Connecting); + res_ = curl_easy_perform(curl_); + if (CURLE_OK != res_) + { + DispatchEvent(opentelemetry::ext::http::client::SessionState::ConnectFailed, + curl_easy_strerror(res_)); // couldn't connect - stage 1 + return res_; + } + + /* Extract the socket from the curl handle - we'll need it for waiting. + * Note that this API takes a pointer to a 'long' while we use + * curl_socket_t for sockets otherwise. + */ + long sockextr = 0; + res_ = curl_easy_getinfo(curl_, CURLINFO_LASTSOCKET, &sockextr); + + if (CURLE_OK != res_) + { + DispatchEvent(opentelemetry::ext::http::client::SessionState::ConnectFailed, + curl_easy_strerror(res_)); // couldn't connect - stage 2 + return res_; + } + + /* wait for the socket to become ready for sending */ + sockfd_ = sockextr; + if (!WaitOnSocket(sockfd_, 0, static_cast<long>(http_conn_timeout_.count())) || is_aborted_) + { + res_ = CURLE_OPERATION_TIMEDOUT; + DispatchEvent( + opentelemetry::ext::http::client::SessionState::ConnectFailed, + " Is aborted: " + std::to_string(is_aborted_.load())); // couldn't connect - stage 3 + return res_; + } + + DispatchEvent(opentelemetry::ext::http::client::SessionState::Connected); + // once connection is there - switch back to easy perform for HTTP post + curl_easy_setopt(curl_, CURLOPT_CONNECT_ONLY, 0); + + // send all data to our callback function + if (is_raw_response_) + { + curl_easy_setopt(curl_, CURLOPT_HEADER, true); + curl_easy_setopt(curl_, CURLOPT_WRITEFUNCTION, (void *)&WriteMemoryCallback); + curl_easy_setopt(curl_, CURLOPT_WRITEDATA, (void *)&raw_response_); + } + else + { + curl_easy_setopt(curl_, CURLOPT_WRITEFUNCTION, (void *)&WriteVectorCallback); + curl_easy_setopt(curl_, CURLOPT_HEADERDATA, (void *)&resp_headers_); + curl_easy_setopt(curl_, CURLOPT_WRITEDATA, (void *)&resp_body_); + } + + // TODO: only two methods supported for now - POST and GET + if (method_ == opentelemetry::ext::http::client::Method::Post) + { + // POST + curl_easy_setopt(curl_, CURLOPT_POST, true); + curl_easy_setopt(curl_, CURLOPT_POSTFIELDS, (const char *)request); + curl_easy_setopt(curl_, CURLOPT_POSTFIELDSIZE, req_size); + } + else if (method_ == opentelemetry::ext::http::client::Method::Get) + { + // GET + } + else + { + res_ = CURLE_UNSUPPORTED_PROTOCOL; + return res_; + } + + // abort if slower than 4kb/sec during 30 seconds + curl_easy_setopt(curl_, CURLOPT_LOW_SPEED_TIME, 30L); + curl_easy_setopt(curl_, CURLOPT_LOW_SPEED_LIMIT, 4096); + DispatchEvent(opentelemetry::ext::http::client::SessionState::Sending); + + res_ = curl_easy_perform(curl_); + if (CURLE_OK != res_) + { + DispatchEvent(opentelemetry::ext::http::client::SessionState::SendFailed, + curl_easy_strerror(res_)); + return res_; + } + + /* Code snippet to parse raw HTTP response. This might come in handy + * if we ever consider to handle the raw upload instead of curl_easy_perform + ... + std::string resp((const char *)response); + std::regex http_status_regex(HTTP_STATUS_REGEXP); + std::smatch match; + if(std::regex_search(resp, match, http_status_regex)) + http_code = std::stol(match[1]); + ... + */ + + /* libcurl is nice enough to parse the http response code itself: */ + curl_easy_getinfo(curl_, CURLINFO_RESPONSE_CODE, &res_); + // We got some response from server. Dump the contents. + DispatchEvent(opentelemetry::ext::http::client::SessionState::Response); + + // This function returns: + // - on success: HTTP status code. + // - on failure: CURL error code. + // The two sets of enums (CURLE, HTTP codes) - do not intersect, so we collapse them in one set. + return res_; + } + + std::future<long> &SendAsync(std::function<void(HttpOperation &)> callback = nullptr) + { + result_ = std::async(std::launch::async, [this, callback] { + long result = Send(); + if (callback != nullptr) + { + callback(*this); + } + return result; + }); + return result_; + } + + void SendSync() { Send(); } + + /** + * Get HTTP response code. This function returns CURL error code if HTTP response code is invalid. + */ + uint16_t GetResponseCode() { return res_; } + + /** + * Get last session state. + */ + opentelemetry::ext::http::client::SessionState GetSessionState() { return session_state_; } + + /** + * Get whether or not response was programmatically aborted + */ + bool WasAborted() { return is_aborted_.load(); } + + /** + * Return a copy of resposne headers + * + * @return + */ + Headers GetResponseHeaders() + { + Headers result; + if (resp_headers_.size() == 0) + return result; + + std::stringstream ss; + std::string headers((const char *)&resp_headers_[0], resp_headers_.size()); + ss.str(headers); + + std::string header; + while (std::getline(ss, header, '\n')) + { + // TODO - Regex below crashes with out-of-memory on CI docker container, so + // switching to string comparison. Need to debug and revert back. + + /*std::smatch match; + std::regex http_headers_regex(http_header_regexp); + if (std::regex_search(header, match, http_headers_regex)) + result.insert(std::pair<nostd::string_view, nostd::string_view>( + static_cast<nostd::string_view>(match[1]), static_cast<nostd::string_view>(match[2]))); + */ + size_t pos = header.find(": "); + if (pos != std::string::npos) + result.insert( + std::pair<std::string, std::string>(header.substr(0, pos), header.substr(pos + 2))); + } + return result; + } + + /** + * Return a copy of response body + * + * @return + */ + std::vector<uint8_t> GetResponseBody() { return resp_body_; } + + /** + * Return a raw copy of response headers+body + * + * @return + */ + std::vector<uint8_t> GetRawResponse() { return raw_response_; } + + /** + * Release memory allocated for response + */ + void ReleaseResponse() + { + resp_headers_.clear(); + resp_body_.clear(); + raw_response_.clear(); + } + + /** + * Abort request in connecting or reading state. + */ + void Abort() + { + is_aborted_ = true; + if (curl_ != nullptr) + { + // Simply close the socket - connection reset by peer + if (sockfd_) + { +#if defined(_WIN32) + ::closesocket(sockfd_); +#else + ::close(sockfd_); +#endif + sockfd_ = 0; + } + } + } + + CURL *GetHandle() { return curl_; } + +protected: + const bool is_raw_response_; // Do not split response headers from response body + const std::chrono::milliseconds http_conn_timeout_; // Timeout for connect. Default: 5000ms + RequestMode request_mode_; + + CURL *curl_; // Local curl instance + CURLcode res_; // Curl result OR HTTP status code if successful + + opentelemetry::ext::http::client::EventHandler *callback_; + + // Request values + opentelemetry::ext::http::client::Method method_; + std::string url_; + const Headers &request_headers_; + const opentelemetry::ext::http::client::Body &request_body_; + struct curl_slist *headers_chunk_ = nullptr; + opentelemetry::ext::http::client::SessionState session_state_; + + // Processed response headers and body + std::vector<uint8_t> resp_headers_; + std::vector<uint8_t> resp_body_; + std::vector<uint8_t> raw_response_; + + // Socket parameters + curl_socket_t sockfd_; + + curl_off_t nread_; + size_t sendlen_ = 0; // # bytes sent by client + size_t acklen_ = 0; // # bytes ack by server + + std::future<long> result_; + + /** + * Helper routine to wait for data on socket + * + * @param sockfd + * @param for_recv + * @param timeout_ms + * @return true if expected events occur, false if timeout or error happen + */ + static bool WaitOnSocket(curl_socket_t sockfd, int for_recv, long timeout_ms) + { + bool res = false; + +#if defined(_WIN32) + + if (sockfd > FD_SETSIZE) + return false; + + struct timeval tv; + fd_set infd, outfd, errfd; + + tv.tv_sec = timeout_ms / 1000; + tv.tv_usec = (timeout_ms % 1000) * 1000; + + FD_ZERO(&infd); + FD_ZERO(&outfd); + FD_ZERO(&errfd); + + FD_SET(sockfd, &errfd); /* always check for error */ + + if (for_recv) + { + FD_SET(sockfd, &infd); + } + else + { + FD_SET(sockfd, &outfd); + } + + /* select() returns the number of signalled sockets or -1 */ + if (select((int)sockfd + 1, &infd, &outfd, &errfd, &tv) > 0) + { + if (for_recv) + { + res = (0 != FD_ISSET(sockfd, &infd)); + } + else + { + res = (0 != FD_ISSET(sockfd, &outfd)); + } + } + +#else + + struct pollfd fds[1]; + ::memset(fds, 0, sizeof(fds)); + + fds[0].fd = sockfd; + if (for_recv) + { + fds[0].events = POLLIN; + } + else + { + fds[0].events = POLLOUT; + } + + if (poll(fds, 1, timeout_ms) > 0) + { + if (for_recv) + { + res = (0 != (fds[0].revents & POLLIN)); + } + else + { + res = (0 != (fds[0].revents & POLLOUT)); + } + } + +#endif + + return res; + } + + /** + * Old-school memory allocator + * + * @param contents + * @param size + * @param nmemb + * @param userp + * @return + */ + static size_t WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp) + { + std::vector<char> *buf = static_cast<std::vector<char> *>(userp); + buf->insert(buf->end(), static_cast<char *>(contents), + static_cast<char *>(contents) + (size * nmemb)); + return size * nmemb; + } + + /** + * C++ STL std::vector allocator + * + * @param ptr + * @param size + * @param nmemb + * @param data + * @return + */ + static size_t WriteVectorCallback(void *ptr, + size_t size, + size_t nmemb, + std::vector<uint8_t> *data) + { + if (data != nullptr) + { + const unsigned char *begin = (unsigned char *)(ptr); + const unsigned char *end = begin + size * nmemb; + data->insert(data->end(), begin, end); + } + return size * nmemb; + } +}; +} // namespace curl +} // namespace client +} // namespace http +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/client/http_client.h b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/client/http_client.h new file mode 100644 index 000000000..308335e49 --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/client/http_client.h @@ -0,0 +1,253 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include <chrono> +#include <map> +#include <string> +#include <vector> +#include "opentelemetry/nostd/function_ref.h" +#include "opentelemetry/nostd/string_view.h" +#include "opentelemetry/version.h" + +/* + Usage Example + +Sync Request: + + HttpClient httpClient; + auto result = httpClient.Get(url); // GET request + if (result){ + auto response = result.GetResponse(); + } else { + std::cout << result.GetSessionState(); + } + +Async Request: + + struct SimpleReponseHandler: public ResponseHandler { + void OnResponse(Response& res) noexcept override + { + if (res.IsSuccess()) { + res.GetNextHeader([](nostd::string_view name, std::string value) -> bool { + std::cout << "Header Name:" << name << " Header Value:"<< value ; + return true; + }); + .. process response body + } + } + + void OnError(nostd::string_view err) noexcept override + { + std::cerr << " Error:" << err; + } + }; + + HttpClient httpClient; // implementer can provide singleton implementation for it + auto session = httpClient.createSession("localhost" + 8000); + auto request = session->CreateRequest(); + request->AddHeader(..); + SimpleResponseHandler res_handler; + session->SendRequest(res_handler); + session->FinishSession() // optionally in the end + ...shutdown + httpClient.FinishAllSessions() + +*/ + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace http +{ +namespace client +{ + +enum class Method +{ + Get, + Post, + Put, + Options, + Head, + Patch, + Delete +}; + +enum class SessionState +{ + CreateFailed, // session create failed + Created, // session created + Destroyed, // session destroyed + Connecting, // connecting to peer + ConnectFailed, // connection failed + Connected, // connected + Sending, // sending request + SendFailed, // request send failed + Response, // response received + SSLHandshakeFailed, // SSL handshake failed + TimedOut, // request time out + NetworkError, // network error + ReadError, // error reading response + WriteError, // error writing request + Cancelled // (manually) cancelled +}; + +using Byte = uint8_t; +using StatusCode = uint16_t; +using Body = std::vector<Byte>; +using SSLCertificate = std::vector<Byte>; + +struct cmp_ic +{ + bool operator()(const std::string &s1, const std::string &s2) const + { + return std::lexicographical_compare( + s1.begin(), s1.end(), s2.begin(), s2.end(), + [](char c1, char c2) { return ::tolower(c1) < ::tolower(c2); }); + } +}; +using Headers = std::multimap<std::string, std::string, cmp_ic>; + +class Request +{ +public: + virtual void SetMethod(Method method) noexcept = 0; + + virtual void SetUri(nostd::string_view uri) noexcept = 0; + + virtual void SetBody(Body &body) noexcept = 0; + + virtual void AddHeader(nostd::string_view name, nostd::string_view value) noexcept = 0; + + virtual void ReplaceHeader(nostd::string_view name, nostd::string_view value) noexcept = 0; + + virtual void SetTimeoutMs(std::chrono::milliseconds timeout_ms) noexcept = 0; + + virtual ~Request() = default; +}; + +class Response +{ +public: + virtual const Body &GetBody() const noexcept = 0; + + virtual bool ForEachHeader( + nostd::function_ref<bool(nostd::string_view name, nostd::string_view value)> callable) + const noexcept = 0; + + virtual bool ForEachHeader( + const nostd::string_view &key, + nostd::function_ref<bool(nostd::string_view name, nostd::string_view value)> callable) + const noexcept = 0; + + virtual StatusCode GetStatusCode() const noexcept = 0; + + virtual ~Response() = default; +}; + +class NoopResponse : public Response +{ +public: + const Body &GetBody() const noexcept override + { + static Body body; + return body; + } + bool ForEachHeader(nostd::function_ref<bool(nostd::string_view name, nostd::string_view value)> + callable) const noexcept override + { + return true; + } + + bool ForEachHeader(const nostd::string_view &key, + nostd::function_ref<bool(nostd::string_view name, nostd::string_view value)> + callable) const noexcept override + { + return true; + } + + StatusCode GetStatusCode() const noexcept override { return 0; } +}; + +class Result +{ + +public: + Result(std::unique_ptr<Response> res, SessionState session_state) + : response_(std::move(res)), session_state_(session_state) + {} + operator bool() const { return session_state_ == SessionState::Response; } + Response &GetResponse() + { + if (response_ == nullptr) + { + // let's not return nullptr + response_.reset(new NoopResponse()); + } + return *response_; + } + SessionState GetSessionState() { return session_state_; } + +private: + std::unique_ptr<Response> response_; + SessionState session_state_; +}; + +class EventHandler +{ +public: + virtual void OnResponse(Response &) noexcept = 0; + + virtual void OnEvent(SessionState, nostd::string_view) noexcept = 0; + + virtual void OnConnecting(const SSLCertificate &) noexcept {} + + virtual ~EventHandler() = default; +}; + +class Session +{ +public: + virtual std::shared_ptr<Request> CreateRequest() noexcept = 0; + + virtual void SendRequest(EventHandler &) noexcept = 0; + + virtual bool IsSessionActive() noexcept = 0; + + virtual bool CancelSession() noexcept = 0; + + virtual bool FinishSession() noexcept = 0; + + virtual ~Session() = default; +}; + +class HttpClient +{ +public: + virtual std::shared_ptr<Session> CreateSession(nostd::string_view url) noexcept = 0; + + virtual bool CancelAllSessions() noexcept = 0; + + virtual bool FinishAllSessions() noexcept = 0; + + virtual ~HttpClient() = default; +}; + +class HttpClientSync +{ +public: + virtual Result Get(const nostd::string_view &url, const Headers & = {{}}) noexcept = 0; + + virtual Result Post(const nostd::string_view &url, + const Body &body, + const Headers & = {{"content-type", "application/json"}}) noexcept = 0; + + virtual ~HttpClientSync() = default; +}; + +} // namespace client +} // namespace http +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/client/http_client_factory.h b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/client/http_client_factory.h new file mode 100644 index 000000000..f03c1a0b6 --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/client/http_client_factory.h @@ -0,0 +1,26 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once +#include "opentelemetry/ext/http/client/http_client.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace http +{ +namespace client +{ +class HttpClientFactory +{ +public: + static std::shared_ptr<HttpClientSync> CreateSync(); + + static std::shared_ptr<HttpClient> Create(); + + static std::shared_ptr<HttpClient> CreateNoSend(); +}; +} // namespace client +} // namespace http +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/client/nosend/http_client_nosend.h b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/client/nosend/http_client_nosend.h new file mode 100644 index 000000000..02433d75c --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/client/nosend/http_client_nosend.h @@ -0,0 +1,184 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#ifdef ENABLE_TEST +# include "opentelemetry/ext/http/client/http_client.h" +# include "opentelemetry/ext/http/common/url_parser.h" +# include "opentelemetry/version.h" + +# include <map> +# include <string> +# include <vector> + +# include <gtest/gtest.h> +# include "gmock/gmock.h" + +using namespace testing; +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace http +{ +namespace client +{ +namespace nosend +{ + +const opentelemetry::ext::http::client::StatusCode Http_Ok = 200; + +class Request : public opentelemetry::ext::http::client::Request +{ +public: + Request() : method_(opentelemetry::ext::http::client::Method::Get), uri_("/") {} + + void SetMethod(opentelemetry::ext::http::client::Method method) noexcept override + { + method_ = method; + } + + void SetBody(opentelemetry::ext::http::client::Body &body) noexcept override + { + body_ = std::move(body); + } + + void AddHeader(nostd::string_view name, nostd::string_view value) noexcept override + { + headers_.insert(std::pair<std::string, std::string>(static_cast<std::string>(name), + static_cast<std::string>(value))); + } + + void ReplaceHeader(nostd::string_view name, nostd::string_view value) noexcept override; + + virtual void SetUri(nostd::string_view uri) noexcept override + { + uri_ = static_cast<std::string>(uri); + } + + void SetTimeoutMs(std::chrono::milliseconds timeout_ms) noexcept override + { + timeout_ms_ = timeout_ms; + } + +public: + opentelemetry::ext::http::client::Method method_; + opentelemetry::ext::http::client::Body body_; + opentelemetry::ext::http::client::Headers headers_; + std::string uri_; + std::chrono::milliseconds timeout_ms_{5000}; // ms +}; + +class Response : public opentelemetry::ext::http::client::Response +{ +public: + Response() : status_code_(Http_Ok) {} + + virtual const opentelemetry::ext::http::client::Body &GetBody() const noexcept override + { + return body_; + } + + virtual bool ForEachHeader( + nostd::function_ref<bool(nostd::string_view name, nostd::string_view value)> callable) + const noexcept override; + + virtual bool ForEachHeader( + const nostd::string_view &name, + nostd::function_ref<bool(nostd::string_view name, nostd::string_view value)> callable) + const noexcept override; + + virtual opentelemetry::ext::http::client::StatusCode GetStatusCode() const noexcept override + { + return status_code_; + } + +public: + Headers headers_; + opentelemetry::ext::http::client::Body body_; + opentelemetry::ext::http::client::StatusCode status_code_; +}; + +class HttpClient; + +class Session : public opentelemetry::ext::http::client::Session +{ +public: + Session(HttpClient &http_client, + std::string scheme = "http", + const std::string &host = "", + uint16_t port = 80) + : http_client_(http_client), is_session_active_(false) + { + host_ = scheme + "://" + host + ":" + std::to_string(port) + "/"; + } + + std::shared_ptr<opentelemetry::ext::http::client::Request> CreateRequest() noexcept override + { + http_request_.reset(new Request()); + return http_request_; + } + + MOCK_METHOD(void, + SendRequest, + (opentelemetry::ext::http::client::EventHandler &), + (noexcept, override)); + + virtual bool CancelSession() noexcept override; + + virtual bool FinishSession() noexcept override; + + virtual bool IsSessionActive() noexcept override { return is_session_active_; } + + void SetId(uint64_t session_id) { session_id_ = session_id; } + + /** + * Returns the base URI. + * @return the base URI as a string consisting of scheme, host and port. + */ + const std::string &GetBaseUri() const { return host_; } + + std::shared_ptr<Request> GetRequest() { return http_request_; } + +private: + std::shared_ptr<Request> http_request_; + std::string host_; + uint64_t session_id_; + HttpClient &http_client_; + bool is_session_active_; +}; + +class HttpClient : public opentelemetry::ext::http::client::HttpClient +{ +public: + HttpClient() { session_ = std::shared_ptr<Session>{new Session(*this)}; } + + std::shared_ptr<opentelemetry::ext::http::client::Session> CreateSession( + nostd::string_view) noexcept override + { + return session_; + } + + bool CancelAllSessions() noexcept override + { + session_->CancelSession(); + return true; + } + + bool FinishAllSessions() noexcept override + { + session_->FinishSession(); + return true; + } + + void CleanupSession(uint64_t session_id) {} + + std::shared_ptr<Session> session_; +}; + +} // namespace nosend +} // namespace client +} // namespace http +} // namespace ext +OPENTELEMETRY_END_NAMESPACE +#endif diff --git a/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/common/url_parser.h b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/common/url_parser.h new file mode 100644 index 000000000..9ca45eda0 --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/common/url_parser.h @@ -0,0 +1,132 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include <string> +#include <vector> +#include "opentelemetry/nostd/string_view.h" +#include "opentelemetry/version.h" +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace http +{ +namespace common +{ +// http://user:password@host:port/path1/path2?key1=val2&key2=val2 +// http://host:port/path1/path2?key1=val1&key2=val2 +// host:port/path1 +// host:port ( path defaults to "/") +// host:port? + +class UrlParser +{ +public: + std::string url_; + std::string host_; + std::string scheme_; + std::string path_; + uint16_t port_; + std::string query_; + bool success_; + + UrlParser(std::string url) : url_(url), success_(true) + { + if (url_.length() == 0) + { + return; + } + size_t cpos = 0; + // scheme + size_t pos = url_.find("://", cpos); + if (pos == std::string::npos) + { + // scheme missing, use default as http + scheme_ = "http"; + } + else + { + scheme_ = std::string(url_.begin() + cpos, url_.begin() + pos); + cpos = pos + 3; + } + + // credentials + pos = url_.find_first_of("@", cpos); + if (pos != std::string::npos) + { + // TODO - handle credentials + cpos = pos + 1; + } + pos = url_.find_first_of(":", cpos); + bool is_port = false; + if (pos == std::string::npos) + { + // port missing. Used default 80 / 443 + if (scheme_ == "http") + port_ = 80; + if (scheme_ == "https") + port_ = 443; + } + else + { + // port present + is_port = true; + host_ = std::string(url_.begin() + cpos, url_.begin() + pos); + cpos = pos + 1; + } + pos = url_.find_first_of("/?", cpos); + if (pos == std::string::npos) + { + path_ = "/"; // use default path + if (is_port) + { + port_ = static_cast<uint16_t>( + std::stoi(std::string(url_.begin() + cpos, url_.begin() + url_.length()))); + } + else + { + host_ = std::string(url_.begin() + cpos, url_.begin() + url_.length()); + } + return; + } + if (is_port) + { + port_ = + static_cast<uint16_t>(std::stoi(std::string(url_.begin() + cpos, url_.begin() + pos))); + } + else + { + host_ = std::string(url_.begin() + cpos, url_.begin() + pos); + } + cpos = pos; + + if (url_[cpos] == '/') + { + pos = url_.find('?', cpos); + if (pos == std::string::npos) + { + path_ = std::string(url_.begin() + cpos, url_.begin() + url_.length()); + query_ = ""; + } + else + { + path_ = std::string(url_.begin() + cpos, url_.begin() + pos); + cpos = pos + 1; + query_ = std::string(url_.begin() + cpos, url_.begin() + url_.length()); + } + return; + } + path_ = "/"; + if (url_[cpos] == '?') + { + query_ = std::string(url_.begin() + cpos, url_.begin() + url_.length()); + } + } +}; + +} // namespace common + +} // namespace http +} // namespace ext +OPENTELEMETRY_END_NAMESPACE
\ No newline at end of file diff --git a/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/server/file_http_server.h b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/server/file_http_server.h new file mode 100644 index 000000000..cb28d01fa --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/server/file_http_server.h @@ -0,0 +1,148 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include <fstream> +#include <iostream> +#include <string> +#include <unordered_map> +#include <vector> + +#include "opentelemetry/ext/http/server/http_server.h" + +namespace HTTP_SERVER_NS +{ + +class FileHttpServer : public HTTP_SERVER_NS::HttpServer +{ +protected: + /** + * Construct the server by initializing the endpoint for serving static files, + * which show up on the web if the user is on the given host:port. Static + * files can be seen relative to the folder where the executable was ran. + */ + FileHttpServer(const std::string &host = "127.0.0.1", int port = 3333) : HttpServer() + { + std::ostringstream os; + os << host << ":" << port; + setServerName(os.str()); + addListeningPort(port); + }; + + /** + * Set the HTTP server to serve static files from the root of host:port. + * Derived HTTP servers should initialize the file endpoint AFTER they + * initialize their own, otherwise everything will be served like a file + * @param server should be an instance of this object + */ + void InitializeFileEndpoint(FileHttpServer &server) { server[root_endpt_] = ServeFile; } + +private: + /** + * Return whether a file is found whose location is searched for relative to + * where the executable was triggered. If the file is valid, fill result with + * the file data/information required to display it on a webpage + * @param name of the file to look for, + * @param resulting file information, necessary for displaying them on a + * webpage + * @returns whether a file was found and result filled with display + * information + */ + bool FileGetSuccess(const std::string &filename, std::vector<char> &result) + { +#ifdef _WIN32 + std::replace(filename.begin(), filename.end(), '/', '\\'); +#endif + std::streampos size; + std::ifstream file(filename, std::ios::in | std::ios::binary | std::ios::ate); + if (file.is_open()) + { + size = file.tellg(); + if (size) + { + result.resize(size); + file.seekg(0, std::ios::beg); + file.read(result.data(), size); + } + file.close(); + return true; + } + return false; + }; + + /** + * Returns the extension of a file + * @param name of the file + * @returns file extension type under HTTP protocol + */ + std::string GetMimeContentType(const std::string &filename) + { + std::string file_ext = filename.substr(filename.find_last_of(".") + 1); + auto file_type = mime_types_.find(file_ext); + return (file_type != mime_types_.end()) ? file_type->second : HTTP_SERVER_NS::CONTENT_TYPE_TEXT; + }; + + /** + * Returns the standardized name of a file by removing backslashes, and + * assuming index.html is the wanted file if a directory is given + * @param name of the file + */ + std::string GetFileName(std::string name) + { + if (name.back() == '/') + { + auto temp = name.substr(0, name.size() - 1); + name = temp; + } + // If filename appears to be a directory, serve the hypothetical index.html + // file there + if (name.find(".") == std::string::npos) + name += "/index.html"; + + return name; + } + + /** + * Sets the response object with the correct file data based on the requested + * file address, or return 404 error if a file isn't found + * @param req is the HTTP request, which we use to figure out the response to + * send + * @param resp is the HTTP response we want to send to the frontend, including + * file data + */ + HTTP_SERVER_NS::HttpRequestCallback ServeFile{ + [&](HTTP_SERVER_NS::HttpRequest const &req, HTTP_SERVER_NS::HttpResponse &resp) { + LOG_INFO("File: %s\n", req.uri.c_str()); + auto f = GetFileName(req.uri); + auto filename = f.c_str() + 1; + + std::vector<char> content; + if (FileGetSuccess(filename, content)) + { + resp.headers[HTTP_SERVER_NS::CONTENT_TYPE] = GetMimeContentType(filename); + resp.body = std::string(content.data(), content.size()); + resp.code = 200; + resp.message = HTTP_SERVER_NS::HttpServer::getDefaultResponseMessage(resp.code); + return resp.code; + } + // Two additional 'special' return codes possible here: + // 0 - proceed to next handler + // -1 - immediately terminate and close connection + resp.headers[HTTP_SERVER_NS::CONTENT_TYPE] = HTTP_SERVER_NS::CONTENT_TYPE_TEXT; + resp.code = 404; + resp.message = HTTP_SERVER_NS::HttpServer::getDefaultResponseMessage(resp.code); + resp.body = resp.message; + return 404; + }}; + + // Maps file extensions to their HTTP-compatible mime file type + const std::unordered_map<std::string, std::string> mime_types_ = { + {"css", "text/css"}, {"png", "image/png"}, {"js", "text/javascript"}, + {"htm", "text/html"}, {"html", "text/html"}, {"json", "application/json"}, + {"txt", "text/plain"}, {"jpg", "image/jpeg"}, {"jpeg", "image/jpeg"}, + }; + const std::string root_endpt_ = "/"; +}; + +} // namespace HTTP_SERVER_NS diff --git a/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/server/http_server.h b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/server/http_server.h new file mode 100644 index 000000000..dec1fd914 --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/server/http_server.h @@ -0,0 +1,879 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include <functional> +#include <list> +#include <map> + +#include "socket_tools.h" + +#ifdef HAVE_HTTP_DEBUG +# ifdef LOG_TRACE +# undef LOG_TRACE +# define LOG_TRACE(x, ...) printf(x "\n", __VA_ARGS__) +# endif +#endif + +#ifndef HTTP_SERVER_NS +# define HTTP_SERVER_NS testing +#endif + +namespace HTTP_SERVER_NS +{ + +constexpr const char *CONTENT_TYPE = "Content-Type"; +constexpr const char *CONTENT_TYPE_TEXT = "text/plain"; +constexpr const char *CONTENT_TYPE_BIN = "application/octet-stream"; + +struct HttpRequest +{ + std::string client; + std::string method; + std::string uri; + std::string protocol; + std::map<std::string, std::string> headers; + std::string content; +}; + +struct HttpResponse +{ + int code; + std::string message; + std::map<std::string, std::string> headers; + std::string body; +}; + +using CallbackFunction = std::function<int(HttpRequest const &request, HttpResponse &response)>; + +class HttpRequestCallback +{ +protected: + CallbackFunction callback = nullptr; + +public: + HttpRequestCallback(){}; + + HttpRequestCallback &operator=(HttpRequestCallback other) + { + callback = other.callback; + return *this; + }; + + HttpRequestCallback(CallbackFunction func) : callback(func){}; + + HttpRequestCallback &operator=(CallbackFunction func) + { + callback = func; + return (*this); + } + + virtual int onHttpRequest(HttpRequest const &request, HttpResponse &response) + { + if (callback != nullptr) + { + return callback(request, response); + } + return 0; + }; +}; + +// Simple HTTP server +// Goals: +// - Support enough of HTTP to be used as a mock +// - Be flexible to allow creating various test scenarios +// Out of scope: +// - Performance +// - Full support of RFC 7230-7237 +class HttpServer : private SocketTools::Reactor::SocketCallback +{ +protected: + struct Connection + { + SocketTools::Socket socket; + std::string receiveBuffer; + std::string sendBuffer; + enum + { + Idle, + ReceivingHeaders, + Sending100Continue, + ReceivingBody, + Processing, + SendingHeaders, + SendingBody, + Closing + } state; + size_t contentLength; + bool keepalive; + HttpRequest request; + HttpResponse response; + }; + + std::string m_serverHost; + bool allowKeepalive{true}; + SocketTools::Reactor m_reactor; + std::list<SocketTools::Socket> m_listeningSockets; + + class HttpRequestHandler : public std::pair<std::string, HttpRequestCallback *> + { + public: + HttpRequestHandler(std::string key, HttpRequestCallback *value) + { + first = key; + second = value; + }; + + HttpRequestHandler() : std::pair<std::string, HttpRequestCallback *>() + { + first = ""; + second = nullptr; + }; + + HttpRequestHandler &operator=(std::pair<std::string, HttpRequestCallback *> other) + { + first = other.first; + second = other.second; + return (*this); + }; + + HttpRequestHandler &operator=(HttpRequestCallback &cb) + { + second = &cb; + return (*this); + }; + + HttpRequestHandler &operator=(HttpRequestCallback *cb) + { + second = cb; + return (*this); + }; + }; + + std::list<HttpRequestHandler> m_handlers; + + std::map<SocketTools::Socket, Connection> m_connections; + size_t m_maxRequestHeadersSize, m_maxRequestContentSize; + +public: + void setKeepalive(bool keepAlive) { allowKeepalive = keepAlive; } + + HttpServer() + : m_serverHost("unnamed"), + allowKeepalive(true), + m_reactor(*this), + m_maxRequestHeadersSize(8192), + m_maxRequestContentSize(2 * 1024 * 1024){}; + + HttpServer(std::string serverHost, int port = 30000) : HttpServer() + { + std::ostringstream os; + os << serverHost << ":" << port; + setServerName(os.str()); + addListeningPort(port); + }; + + ~HttpServer() + { + for (auto &sock : m_listeningSockets) + { + sock.close(); + } + } + + void setRequestLimits(size_t maxRequestHeadersSize, size_t maxRequestContentSize) + { + m_maxRequestHeadersSize = maxRequestHeadersSize; + m_maxRequestContentSize = maxRequestContentSize; + } + + void setServerName(std::string const &name) { m_serverHost = name; } + + int addListeningPort(int port) + { + SocketTools::Socket socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + socket.setNonBlocking(); + socket.setReuseAddr(); + + SocketTools::SocketAddr addr(0, port); + socket.bind(addr); + socket.getsockname(addr); + + socket.listen(10); + m_listeningSockets.push_back(socket); + m_reactor.addSocket(socket, SocketTools::Reactor::Acceptable); + LOG_INFO("HttpServer: Listening on %s", addr.toString().c_str()); + + return addr.port(); + } + + HttpRequestHandler &addHandler(const std::string &root, HttpRequestCallback &handler) + { + // No thread-safety here! + m_handlers.push_back({root, &handler}); + LOG_INFO("HttpServer: Added handler for %s", root.c_str()); + return m_handlers.back(); + } + + HttpRequestHandler &operator[](const std::string &root) + { + // No thread-safety here! + m_handlers.push_back({root, nullptr}); + LOG_INFO("HttpServer: Added handler for %s", root.c_str()); + return m_handlers.back(); + } + + HttpServer &operator+=(std::pair<const std::string &, HttpRequestCallback &> other) + { + LOG_INFO("HttpServer: Added handler for %s", other.first.c_str()); + m_handlers.push_back(HttpRequestHandler(other.first, &other.second)); + return (*this); + }; + + void start() { m_reactor.start(); } + + void stop() { m_reactor.stop(); } + +protected: + virtual void onSocketAcceptable(SocketTools::Socket socket) override + { + LOG_TRACE("HttpServer: accepting socket fd=0x%llx", socket.m_sock); + assert(std::find(m_listeningSockets.begin(), m_listeningSockets.end(), socket) != + m_listeningSockets.end()); + + SocketTools::Socket csocket; + SocketTools::SocketAddr caddr; + if (socket.accept(csocket, caddr)) + { + csocket.setNonBlocking(); + Connection &conn = m_connections[csocket]; + conn.socket = csocket; + conn.state = Connection::Idle; + conn.request.client = caddr.toString(); + m_reactor.addSocket(csocket, SocketTools::Reactor::Readable | SocketTools::Reactor::Closed); + LOG_TRACE("HttpServer: [%s] accepted", conn.request.client.c_str()); + } + } + + virtual void onSocketReadable(SocketTools::Socket socket) override + { + LOG_TRACE("HttpServer: reading socket fd=0x%llx", socket.m_sock); + // No thread-safety here! + assert(std::find(m_listeningSockets.begin(), m_listeningSockets.end(), socket) == + m_listeningSockets.end()); + + // No thread-safety here! + auto connIt = m_connections.find(socket); + if (connIt == m_connections.end()) + { + return; + } + Connection &conn = connIt->second; + + char buffer[2048] = {0}; + int received = socket.recv(buffer, sizeof(buffer)); + LOG_TRACE("HttpServer: [%s] received %d", conn.request.client.c_str(), received); + if (received <= 0) + { + handleConnectionClosed(conn); + return; + } + conn.receiveBuffer.append(buffer, buffer + received); + + handleConnection(conn); + } + + virtual void onSocketWritable(SocketTools::Socket socket) override + { + LOG_TRACE("HttpServer: writing socket fd=0x%llx", socket.m_sock); + + // No thread-safety here! + assert(std::find(m_listeningSockets.begin(), m_listeningSockets.end(), socket) == + m_listeningSockets.end()); + + // No thread-safety here! + auto connIt = m_connections.find(socket); + if (connIt == m_connections.end()) + { + return; + } + Connection &conn = connIt->second; + + if (!sendMore(conn)) + { + handleConnection(conn); + } + } + + virtual void onSocketClosed(SocketTools::Socket socket) override + { + LOG_TRACE("HttpServer: closing socket fd=0x%llx", socket.m_sock); + assert(std::find(m_listeningSockets.begin(), m_listeningSockets.end(), socket) == + m_listeningSockets.end()); + + auto connIt = m_connections.find(socket); + if (connIt == m_connections.end()) + { + return; + } + Connection &conn = connIt->second; + + handleConnectionClosed(conn); + } + + bool sendMore(Connection &conn) + { + if (conn.sendBuffer.empty()) + { + return false; + } + + int sent = conn.socket.send(conn.sendBuffer.data(), static_cast<int>(conn.sendBuffer.size())); + LOG_TRACE("HttpServer: [%s] sent %d", conn.request.client.c_str(), sent); + if (sent < 0 && conn.socket.error() != SocketTools::Socket::ErrorWouldBlock) + { + return true; + } + conn.sendBuffer.erase(0, sent); + + if (!conn.sendBuffer.empty()) + { + m_reactor.addSocket(conn.socket, + SocketTools::Reactor::Writable | SocketTools::Reactor::Closed); + return true; + } + + return false; + } + +protected: + void handleConnectionClosed(Connection &conn) + { + LOG_TRACE("HttpServer: [%s] closed", conn.request.client.c_str()); + if (conn.state != Connection::Idle && conn.state != Connection::Closing) + { + LOG_WARN("HttpServer: [%s] connection closed unexpectedly", conn.request.client.c_str()); + } + m_reactor.removeSocket(conn.socket); + auto connIt = m_connections.find(conn.socket); + conn.socket.close(); + m_connections.erase(connIt); + } + + void handleConnection(Connection &conn) + { + for (;;) + { + if (conn.state == Connection::Idle) + { + conn.response.code = 0; + conn.state = Connection::ReceivingHeaders; + LOG_TRACE("HttpServer: [%s] receiving headers", conn.request.client.c_str()); + } + + if (conn.state == Connection::ReceivingHeaders) + { + bool lfOnly = false; + size_t ofs = conn.receiveBuffer.find("\r\n\r\n"); + if (ofs == std::string::npos) + { + lfOnly = true; + ofs = conn.receiveBuffer.find("\n\n"); + } + size_t headersLen = (ofs != std::string::npos) ? ofs : conn.receiveBuffer.length(); + if (headersLen > m_maxRequestHeadersSize) + { + LOG_WARN("HttpServer: [%s] headers too long - %u", conn.request.client.c_str(), + static_cast<unsigned>(headersLen)); + conn.response.code = 431; // Request Header Fields Too Large + conn.keepalive = false; + conn.state = Connection::Processing; + continue; + } + if (ofs == std::string::npos) + { + return; + } + + if (!parseHeaders(conn)) + { + LOG_WARN("HttpServer: [%s] invalid headers", conn.request.client.c_str()); + conn.response.code = 400; // Bad Request + conn.keepalive = false; + conn.state = Connection::Processing; + continue; + } + LOG_INFO("HttpServer: [%s] %s %s %s", conn.request.client.c_str(), + conn.request.method.c_str(), conn.request.uri.c_str(), + conn.request.protocol.c_str()); + conn.receiveBuffer.erase(0, ofs + (lfOnly ? 2 : 4)); + + conn.keepalive = (conn.request.protocol == "HTTP/1.1"); + auto const connection = conn.request.headers.find("Connection"); + if (connection != conn.request.headers.end()) + { + if (equalsLowercased(connection->second, "keep-alive")) + { + conn.keepalive = true; + } + else if (equalsLowercased(connection->second, "close")) + { + conn.keepalive = false; + } + } + + auto const contentLength = conn.request.headers.find("Content-Length"); + if (contentLength != conn.request.headers.end()) + { + conn.contentLength = atoi(contentLength->second.c_str()); + } + else + { + conn.contentLength = 0; + } + if (conn.contentLength > m_maxRequestContentSize) + { + LOG_WARN("HttpServer: [%s] content too long - %u", conn.request.client.c_str(), + static_cast<unsigned>(conn.contentLength)); + conn.response.code = 413; // Payload Too Large + conn.keepalive = false; + conn.state = Connection::Processing; + continue; + } + + auto const expect = conn.request.headers.find("Expect"); + if (expect != conn.request.headers.end() && conn.request.protocol == "HTTP/1.1") + { + if (!equalsLowercased(expect->second, "100-continue")) + { + LOG_WARN("HttpServer: [%s] unknown expectation - %s", conn.request.client.c_str(), + expect->second.c_str()); + conn.response.code = 417; // Expectation Failed + conn.keepalive = false; + conn.state = Connection::Processing; + continue; + } + conn.sendBuffer = "HTTP/1.1 100 Continue\r\n\r\n"; + conn.state = Connection::Sending100Continue; + LOG_TRACE("HttpServer: [%s] sending \"100 Continue\"", conn.request.client.c_str()); + continue; + } + + conn.state = Connection::ReceivingBody; + LOG_TRACE("HttpServer: [%s] receiving body", conn.request.client.c_str()); + } + + if (conn.state == Connection::Sending100Continue) + { + if (sendMore(conn)) + { + return; + } + + conn.state = Connection::ReceivingBody; + LOG_TRACE("HttpServer: [%s] receiving body", conn.request.client.c_str()); + } + + if (conn.state == Connection::ReceivingBody) + { + if (conn.receiveBuffer.length() < conn.contentLength) + { + return; + } + + if (conn.receiveBuffer.length() == conn.contentLength) + { + conn.request.content = std::move(conn.receiveBuffer); + conn.receiveBuffer.clear(); + } + else + { + conn.request.content.assign(conn.receiveBuffer, 0, conn.contentLength); + conn.receiveBuffer.erase(0, conn.contentLength); + } + + conn.state = Connection::Processing; + LOG_TRACE("HttpServer: [%s] processing request", conn.request.client.c_str()); + } + + if (conn.state == Connection::Processing) + { + processRequest(conn); + + std::ostringstream os; + os << conn.request.protocol << ' ' << conn.response.code << ' ' << conn.response.message + << "\r\n"; + for (auto const &header : conn.response.headers) + { + os << header.first << ": " << header.second << "\r\n"; + } + os << "\r\n"; + + conn.sendBuffer = os.str(); + conn.state = Connection::SendingHeaders; + LOG_TRACE("HttpServer: [%s] sending headers", conn.request.client.c_str()); + } + + if (conn.state == Connection::SendingHeaders) + { + if (sendMore(conn)) + { + return; + } + + conn.sendBuffer = std::move(conn.response.body); + conn.state = Connection::SendingBody; + LOG_TRACE("HttpServer: [%s] sending body", conn.request.client.c_str()); + } + + if (conn.state == Connection::SendingBody) + { + if (sendMore(conn)) + { + return; + } + + conn.keepalive &= allowKeepalive; + + if (conn.keepalive) + { + m_reactor.addSocket(conn.socket, + SocketTools::Reactor::Readable | SocketTools::Reactor::Closed); + conn.state = Connection::Idle; + LOG_TRACE("HttpServer: [%s] idle (keep-alive)", conn.request.client.c_str()); + if (conn.receiveBuffer.empty()) + { + return; + } + } + else + { + conn.socket.shutdown(SocketTools::Socket::ShutdownSend); + m_reactor.addSocket(conn.socket, SocketTools::Reactor::Closed); + conn.state = Connection::Closing; + LOG_TRACE("HttpServer: [%s] closing", conn.request.client.c_str()); + } + } + + if (conn.state == Connection::Closing) + { + return; + } + } + } + + bool parseHeaders(Connection &conn) + { + // Method + char const *begin = conn.receiveBuffer.c_str(); + char const *ptr = begin; + while (*ptr && *ptr != ' ' && *ptr != '\r' && *ptr != '\n') + { + ptr++; + } + if (*ptr != ' ') + { + return false; + } + conn.request.method.assign(begin, ptr); + while (*ptr == ' ') + { + ptr++; + } + + // URI + begin = ptr; + while (*ptr && *ptr != ' ' && *ptr != '\r' && *ptr != '\n') + { + ptr++; + } + if (*ptr != ' ') + { + return false; + } + conn.request.uri.assign(begin, ptr); + while (*ptr == ' ') + { + ptr++; + } + + // Protocol + begin = ptr; + while (*ptr && *ptr != ' ' && *ptr != '\r' && *ptr != '\n') + { + ptr++; + } + if (*ptr != '\r' && *ptr != '\n') + { + return false; + } + conn.request.protocol.assign(begin, ptr); + if (*ptr == '\r') + { + ptr++; + } + if (*ptr != '\n') + { + return false; + } + ptr++; + + // Headers + conn.request.headers.clear(); + while (*ptr != '\r' && *ptr != '\n') + { + // Name + begin = ptr; + while (*ptr && *ptr != ':' && *ptr != ' ' && *ptr != '\r' && *ptr != '\n') + { + ptr++; + } + if (*ptr != ':') + { + return false; + } + std::string name = normalizeHeaderName(begin, ptr); + ptr++; + while (*ptr == ' ') + { + ptr++; + } + + // Value + begin = ptr; + while (*ptr && *ptr != '\r' && *ptr != '\n') + { + ptr++; + } + conn.request.headers[name] = std::string(begin, ptr); + if (*ptr == '\r') + { + ptr++; + } + if (*ptr != '\n') + { + return false; + } + ptr++; + } + + if (*ptr == '\r') + { + ptr++; + } + if (*ptr != '\n') + { + return false; + } + ptr++; + + return true; + } + + static bool equalsLowercased(std::string const &str, char const *mask) + { + char const *ptr = str.c_str(); + while (*ptr && *mask && ::tolower(*ptr) == *mask) + { + ptr++; + mask++; + } + return !*ptr && !*mask; + } + + static std::string normalizeHeaderName(char const *begin, char const *end) + { + std::string result(begin, end); + bool first = true; + for (char &ch : result) + { + if (first) + { + ch = static_cast<char>(::toupper(ch)); + first = false; + } + else if (ch == '-') + { + first = true; + } + else + { + ch = static_cast<char>(::tolower(ch)); + } + } + return result; + } + + void processRequest(Connection &conn) + { + conn.response.message.clear(); + conn.response.headers.clear(); + conn.response.body.clear(); + + if (conn.response.code == 0) + { + conn.response.code = 404; // Not Found + for (auto &handler : m_handlers) + { + if (conn.request.uri.length() >= handler.first.length() && + strncmp(conn.request.uri.c_str(), handler.first.c_str(), handler.first.length()) == 0) + { + LOG_TRACE("HttpServer: [%s] using handler for %s", conn.request.client.c_str(), + handler.first.c_str()); + // auto callback = handler.second; // Bazel gets mad at this unused + // var, uncomment when using + int result = handler.second->onHttpRequest(conn.request, conn.response); + if (result != 0) + { + conn.response.code = result; + break; + } + } + } + + if (conn.response.code == -1) + { + LOG_TRACE("HttpServer: [%s] closing by request", conn.request.client.c_str()); + handleConnectionClosed(conn); + } + } + + if (conn.response.message.empty()) + { + conn.response.message = getDefaultResponseMessage(conn.response.code); + } + + conn.response.headers["Host"] = m_serverHost; + conn.response.headers["Connection"] = (conn.keepalive ? "keep-alive" : "close"); + conn.response.headers["Date"] = formatTimestamp(time(nullptr)); + conn.response.headers["Content-Length"] = std::to_string(conn.response.body.size()); + } + + static std::string formatTimestamp(time_t time) + { + tm tm; +#ifdef _WIN32 + gmtime_s(&tm, &time); +#else + gmtime_r(&time, &tm); +#endif + char buf[32]; + strftime(buf, sizeof(buf), "%a, %d %b %Y %H:%M:%S GMT", &tm); + return buf; + } + +public: + static char const *getDefaultResponseMessage(int code) + { + switch (code) + { + // *INDENT-OFF* + case 100: + return "Continue"; + case 101: + return "Switching Protocols"; + case 200: + return "OK"; + case 201: + return "Created"; + case 202: + return "Accepted"; + case 203: + return "Non-Authoritative Information"; + case 204: + return "No Content"; + case 205: + return "Reset Content"; + case 206: + return "Partial Content"; + case 300: + return "Multiple Choices"; + case 301: + return "Moved Permanently"; + case 302: + return "Found"; + case 303: + return "See Other"; + case 304: + return "Not Modified"; + case 305: + return "Use Proxy"; + case 306: + return "Switch Proxy"; + case 307: + return "Temporary Redirect"; + case 308: + return "Permanent Redirect"; + case 400: + return "Bad Request"; + case 401: + return "Unauthorized"; + case 402: + return "Payment Required"; + case 403: + return "Forbidden"; + case 404: + return "Not Found"; + case 405: + return "Method Not Allowed"; + case 406: + return "Not Acceptable"; + case 407: + return "Proxy Authentication Required"; + case 408: + return "Request Timeout"; + case 409: + return "Conflict"; + case 410: + return "Gone"; + case 411: + return "Length Required"; + case 412: + return "Precondition Failed"; + case 413: + return "Payload Too Large"; + case 414: + return "URI Too Long"; + case 415: + return "Unsupported Media Type"; + case 416: + return "Range Not Satisfiable"; + case 417: + return "Expectation Failed"; + case 421: + return "Misdirected Request"; + case 426: + return "Upgrade Required"; + case 428: + return "Precondition Required"; + case 429: + return "Too Many Requests"; + case 431: + return "Request Header Fields Too Large"; + case 500: + return "Internal Server Error"; + case 501: + return "Not Implemented"; + case 502: + return "Bad Gateway"; + case 503: + return "Service Unavailable"; + case 504: + return "Gateway Timeout"; + case 505: + return "HTTP Version Not Supported"; + case 506: + return "Variant Also Negotiates"; + case 510: + return "Not Extended"; + case 511: + return "Network Authentication Required"; + default: + return "???"; + // *INDENT-ON* + } + } +}; + +} // namespace HTTP_SERVER_NS diff --git a/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/server/socket_tools.h b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/server/socket_tools.h new file mode 100644 index 000000000..b54d3e55a --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/server/socket_tools.h @@ -0,0 +1,849 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 +#pragma once + +#include <algorithm> +#include <atomic> +#include <cassert> +#include <cstddef> +#include <cstring> +#include <iostream> +#include <map> +#include <sstream> +#include <string> +#include <thread> +#include <vector> + +#ifdef _WIN32 + +//# include <Windows.h> + +# include <winsock2.h> + +// TODO: consider NOMINMAX +# undef min +# undef max +# pragma comment(lib, "ws2_32.lib") + +#else + +# include <unistd.h> + +# ifdef __linux__ +# include <sys/epoll.h> +# endif + +# if __APPLE__ +# include "TargetConditionals.h" +// Use kqueue on mac +# include <sys/event.h> +# include <sys/time.h> +# include <sys/types.h> +# endif + +// Common POSIX headers for Linux and Mac OS X +# include <arpa/inet.h> +# include <fcntl.h> +# include <netdb.h> +# include <netinet/in.h> +# include <netinet/tcp.h> +# include <sys/socket.h> + +#endif + +#ifndef _Out_cap_ +# define _Out_cap_(size) +#endif + +#if defined(HAVE_CONSOLE_LOG) && !defined(LOG_DEBUG) +// Log to console if there's no standard log facility defined +# include <cstdio> +# ifndef LOG_DEBUG +# define LOG_DEBUG(fmt_, ...) printf(" " fmt_ "\n", ##__VA_ARGS__) +# define LOG_TRACE(fmt_, ...) printf(" " fmt_ "\n", ##__VA_ARGS__) +# define LOG_INFO(fmt_, ...) printf(" " fmt_ "\n", ##__VA_ARGS__) +# define LOG_WARN(fmt_, ...) printf(" " fmt_ "\n", ##__VA_ARGS__) +# define LOG_ERROR(fmt_, ...) printf(" " fmt_ "\n", ##__VA_ARGS__) +# endif +#endif + +#ifndef LOG_DEBUG +// Don't log anything if there's no standard log facility defined +# define LOG_DEBUG(fmt_, ...) +# define LOG_TRACE(fmt_, ...) +# define LOG_INFO(fmt_, ...) +# define LOG_WARN(fmt_, ...) +# define LOG_ERROR(fmt_, ...) +#endif + +namespace common +{ + +/// <summary> +/// A simple thread, derived class overloads onThread() method. +/// </summary> +struct Thread +{ + std::thread m_thread; + + std::atomic<bool> m_terminate{false}; + + /// <summary> + /// Thread Constructor + /// </summary> + /// <returns>Thread</returns> + Thread() {} + + /// <summary> + /// Start Thread + /// </summary> + void startThread() + { + m_terminate = false; + m_thread = std::thread([&]() { this->onThread(); }); + } + + /// <summary> + /// Join Thread + /// </summary> + void joinThread() + { + m_terminate = true; + if (m_thread.joinable()) + { + m_thread.join(); + } + } + + /// <summary> + /// Indicates if this thread should terminate + /// </summary> + /// <returns></returns> + bool shouldTerminate() const { return m_terminate; } + + /// <summary> + /// Must be implemented by children + /// </summary> + virtual void onThread() = 0; + + /// <summary> + /// Thread destructor + /// </summary> + /// <returns></returns> + virtual ~Thread() noexcept {} +}; + +}; // namespace common +namespace SocketTools +{ + +#ifdef _WIN32 +// WinSocks need extra (de)initialization, solved by a global object here, +// whose constructor/destructor will be called before and after main(). +struct WsaInitializer +{ + WsaInitializer() + { + WSADATA wsaData; + WSAStartup(MAKEWORD(2, 2), &wsaData); + } + + ~WsaInitializer() { WSACleanup(); } +}; + +static WsaInitializer g_wsaInitializer; + +#endif + +/// <summary> +/// Encapsulation of sockaddr(_in) +/// </summary> +struct SocketAddr +{ + static u_long const Loopback = 0x7F000001; + + sockaddr m_data; + + /// <summary> + /// SocketAddr constructor + /// </summary> + /// <returns>SocketAddr</returns> + SocketAddr() { memset(&m_data, 0, sizeof(m_data)); } + + SocketAddr(u_long addr, int port) + { + sockaddr_in &inet4 = reinterpret_cast<sockaddr_in &>(m_data); + inet4.sin_family = AF_INET; + inet4.sin_port = htons(static_cast<unsigned short>(port)); + inet4.sin_addr.s_addr = htonl(addr); + } + + SocketAddr(char const *addr) + { +#ifdef _WIN32 + INT addrlen = sizeof(m_data); + WCHAR buf[200]; + for (int i = 0; i < sizeof(buf) && addr[i]; i++) + { + buf[i] = addr[i]; + } + buf[199] = L'\0'; + ::WSAStringToAddressW(buf, AF_INET, nullptr, &m_data, &addrlen); +#else + sockaddr_in &inet4 = reinterpret_cast<sockaddr_in &>(m_data); + inet4.sin_family = AF_INET; + char const *colon = strchr(addr, ':'); + if (colon) + { + inet4.sin_port = htons(atoi(colon + 1)); + char buf[16]; + memcpy(buf, addr, (std::min<ptrdiff_t>)(15, colon - addr)); + buf[15] = '\0'; + ::inet_pton(AF_INET, buf, &inet4.sin_addr); + } + else + { + inet4.sin_port = 0; + ::inet_pton(AF_INET, addr, &inet4.sin_addr); + } +#endif + } + + SocketAddr(SocketAddr const &other) = default; + + SocketAddr &operator=(SocketAddr const &other) = default; + + operator sockaddr *() { return &m_data; } + + operator const sockaddr *() const { return &m_data; } + + int port() const + { + switch (m_data.sa_family) + { + case AF_INET: { + sockaddr_in const &inet4 = reinterpret_cast<sockaddr_in const &>(m_data); + return ntohs(inet4.sin_port); + } + + default: + return -1; + } + } + + std::string toString() const + { + std::ostringstream os; + + switch (m_data.sa_family) + { + case AF_INET: { + sockaddr_in const &inet4 = reinterpret_cast<sockaddr_in const &>(m_data); + u_long addr = ntohl(inet4.sin_addr.s_addr); + os << (addr >> 24) << '.' << ((addr >> 16) & 255) << '.' << ((addr >> 8) & 255) << '.' + << (addr & 255); + os << ':' << ntohs(inet4.sin_port); + break; + } + + default: + os << "[?AF?" << m_data.sa_family << ']'; + } + return os.str(); + } +}; + +/// <summary> +/// Encapsulation of a socket (non-exclusive ownership) +/// </summary> +struct Socket +{ +#ifdef _WIN32 + typedef SOCKET Type; + static Type const Invalid = INVALID_SOCKET; +#else + typedef int Type; + static Type const Invalid = -1; +#endif + + Type m_sock; + + Socket(Type sock = Invalid) : m_sock(sock) {} + + Socket(int af, int type, int proto) { m_sock = ::socket(af, type, proto); } + + ~Socket() {} + + operator Socket::Type() const { return m_sock; } + + bool operator==(Socket const &other) const { return (m_sock == other.m_sock); } + + bool operator!=(Socket const &other) const { return (m_sock != other.m_sock); } + + bool operator<(Socket const &other) const { return (m_sock < other.m_sock); } + + bool invalid() const { return (m_sock == Invalid); } + + void setNonBlocking() + { + assert(m_sock != Invalid); +#ifdef _WIN32 + u_long value = 1; + ::ioctlsocket(m_sock, FIONBIO, &value); +#else + int flags = ::fcntl(m_sock, F_GETFL, 0); + ::fcntl(m_sock, F_SETFL, flags | O_NONBLOCK); +#endif + } + + bool setReuseAddr() + { + assert(m_sock != Invalid); +#ifdef _WIN32 + BOOL value = TRUE; +#else + int value = 1; +#endif + return (::setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char *>(&value), + sizeof(value)) == 0); + } + + bool setNoDelay() + { + assert(m_sock != Invalid); +#ifdef _WIN32 + BOOL value = TRUE; +#else + int value = 1; +#endif + return (::setsockopt(m_sock, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<char *>(&value), + sizeof(value)) == 0); + } + + bool connect(SocketAddr const &addr) + { + assert(m_sock != Invalid); + return (::connect(m_sock, addr, sizeof(addr)) == 0); + } + + void close() + { + assert(m_sock != Invalid); +#ifdef _WIN32 + ::closesocket(m_sock); +#else + ::close(m_sock); +#endif + m_sock = Invalid; + } + + int recv(_Out_cap_(size) void *buffer, unsigned size) + { + assert(m_sock != Invalid); + int flags = 0; + return static_cast<int>(::recv(m_sock, reinterpret_cast<char *>(buffer), size, flags)); + } + + int send(void const *buffer, unsigned size) + { + assert(m_sock != Invalid); + return static_cast<int>(::send(m_sock, reinterpret_cast<char const *>(buffer), size, 0)); + } + + bool bind(SocketAddr const &addr) + { + assert(m_sock != Invalid); + return (::bind(m_sock, addr, sizeof(addr)) == 0); + } + + bool getsockname(SocketAddr &addr) const + { + assert(m_sock != Invalid); +#ifdef _WIN32 + int addrlen = sizeof(addr); +#else + socklen_t addrlen = sizeof(addr); +#endif + return (::getsockname(m_sock, addr, &addrlen) == 0); + } + + bool listen(int backlog) + { + assert(m_sock != Invalid); + return (::listen(m_sock, backlog) == 0); + } + + bool accept(Socket &csock, SocketAddr &caddr) + { + assert(m_sock != Invalid); +#ifdef _WIN32 + int addrlen = sizeof(caddr); +#else + socklen_t addrlen = sizeof(caddr); +#endif + csock = ::accept(m_sock, caddr, &addrlen); + return !csock.invalid(); + } + + bool shutdown(int how) + { + assert(m_sock != Invalid); + return (::shutdown(m_sock, how) == 0); + } + + int error() const + { +#ifdef _WIN32 + return ::WSAGetLastError(); +#else + return errno; +#endif + } + + enum + { +#ifdef _WIN32 + ErrorWouldBlock = WSAEWOULDBLOCK +#else + ErrorWouldBlock = EWOULDBLOCK +#endif + }; + + enum + { +#ifdef _WIN32 + ShutdownReceive = SD_RECEIVE, + ShutdownSend = SD_SEND, + ShutdownBoth = SD_BOTH +#else + ShutdownReceive = SHUT_RD, + ShutdownSend = SHUT_WR, + ShutdownBoth = SHUT_RDWR +#endif + }; +}; + +/// <summary> +/// Socket Data +/// </summary> +struct SocketData +{ + Socket socket; + int flags; + + SocketData() : socket(), flags(0) {} + + bool operator==(Socket s) { return (socket == s); } +}; + +/// <summary> +/// Socket Reactor +/// </summary> +struct Reactor : protected common::Thread +{ + /// <summary> + /// Socket State callback + /// </summary> + class SocketCallback + { + public: + virtual void onSocketReadable(Socket sock) = 0; + virtual void onSocketWritable(Socket sock) = 0; + virtual void onSocketAcceptable(Socket sock) = 0; + virtual void onSocketClosed(Socket sock) = 0; + }; + + /// <summary> + /// Socket State + /// </summary> + enum State + { + Readable = 1, + Writable = 2, + Acceptable = 4, + Closed = 8 + }; + + SocketCallback &m_callback; + + std::vector<SocketData> m_sockets; + +#ifdef _WIN32 + /* use WinSock events on Windows */ + std::vector<WSAEVENT> m_events{}; +#endif + +#ifdef __linux__ + /* use epoll on Linux */ + int m_epollFd; +#endif + +#ifdef TARGET_OS_MAC + /* use kqueue on Mac */ +# define KQUEUE_SIZE 32 + int kq{0}; + struct kevent m_events[KQUEUE_SIZE]; +#endif + +public: + Reactor(SocketCallback &callback) : m_callback(callback) + { +#ifdef __linux__ +# ifdef ANDROID + m_epollFd = ::epoll_create(0); +# else + m_epollFd = ::epoll_create1(0); +# endif +#endif + +#ifdef TARGET_OS_MAC + bzero(&m_events[0], sizeof(m_events)); + kq = kqueue(); +#endif + } + + ~Reactor() + { +#ifdef __linux__ + ::close(m_epollFd); +#endif +#ifdef TARGET_OS_MAC + ::close(kq); +#endif + } + + /// <summary> + /// Add Socket + /// </summary> + /// <param name="socket"></param> + /// <param name="flags"></param> + void addSocket(const Socket &socket, int flags) + { + if (flags == 0) + { + removeSocket(socket); + } + else + { + auto it = std::find(m_sockets.begin(), m_sockets.end(), socket); + if (it == m_sockets.end()) + { + LOG_TRACE("Reactor: Adding socket 0x%x with flags 0x%x", static_cast<int>(socket), flags); +#ifdef _WIN32 + m_events.push_back(::WSACreateEvent()); +#endif +#ifdef __linux__ + epoll_event event = {}; + event.data.fd = socket; + event.events = 0; + ::epoll_ctl(m_epollFd, EPOLL_CTL_ADD, socket, &event); +#endif +#ifdef TARGET_OS_MAC + struct kevent event; + bzero(&event, sizeof(event)); + event.ident = socket.m_sock; + EV_SET(&event, event.ident, EVFILT_READ, EV_ADD, 0, 0, NULL); + kevent(kq, &event, 1, NULL, 0, NULL); + EV_SET(&event, event.ident, EVFILT_WRITE, EV_ADD, 0, 0, NULL); + kevent(kq, &event, 1, NULL, 0, NULL); +#endif + m_sockets.push_back(SocketData()); + m_sockets.back().socket = socket; + m_sockets.back().flags = 0; + it = m_sockets.end() - 1; + } + else + { + LOG_TRACE("Reactor: Updating socket 0x%x with flags 0x%x", static_cast<int>(socket), flags); + } + + if (it->flags != flags) + { + it->flags = flags; +#ifdef _WIN32 + long lNetworkEvents = 0; + if (it->flags & Readable) + { + lNetworkEvents |= FD_READ; + } + if (it->flags & Writable) + { + lNetworkEvents |= FD_WRITE; + } + if (it->flags & Acceptable) + { + lNetworkEvents |= FD_ACCEPT; + } + if (it->flags & Closed) + { + lNetworkEvents |= FD_CLOSE; + } + auto eventIt = m_events.begin() + std::distance(m_sockets.begin(), it); + ::WSAEventSelect(socket, *eventIt, lNetworkEvents); +#endif +#ifdef __linux__ + int events = 0; + if (it->flags & Readable) + { + events |= EPOLLIN; + } + if (it->flags & Writable) + { + events |= EPOLLOUT; + } + if (it->flags & Acceptable) + { + events |= EPOLLIN; + } + // if (it->flags & Closed) - always handled (EPOLLERR | EPOLLHUP) + epoll_event event = {}; + event.data.fd = socket; + event.events = events; + ::epoll_ctl(m_epollFd, EPOLL_CTL_MOD, socket, &event); +#endif +#ifdef TARGET_OS_MAC + // TODO: [MG] - Mac OS X socket doesn't currently support updating flags +#endif + } + } + } + + /// <summary> + /// Remove Socket + /// </summary> + /// <param name="socket"></param> + void removeSocket(const Socket &socket) + { + LOG_TRACE("Reactor: Removing socket 0x%x", static_cast<int>(socket)); + auto it = std::find(m_sockets.begin(), m_sockets.end(), socket); + if (it != m_sockets.end()) + { +#ifdef _WIN32 + auto eventIt = m_events.begin() + std::distance(m_sockets.begin(), it); + ::WSAEventSelect(it->socket, *eventIt, 0); + ::WSACloseEvent(*eventIt); + m_events.erase(eventIt); +#endif +#ifdef __linux__ + ::epoll_ctl(m_epollFd, EPOLL_CTL_DEL, socket, nullptr); +#endif +#ifdef TARGET_OS_MAC + struct kevent event; + bzero(&event, sizeof(event)); + event.ident = socket; + EV_SET(&event, socket, EVFILT_READ, EV_DELETE, 0, 0, NULL); + if (-1 == kevent(kq, &event, 1, NULL, 0, NULL)) + { + //// Already removed? + LOG_ERROR("cannot delete fd=0x%x from kqueue!", event.ident); + } + EV_SET(&event, socket, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + if (-1 == kevent(kq, &event, 1, NULL, 0, NULL)) + { + //// Already removed? + LOG_ERROR("cannot delete fd=0x%x from kqueue!", event.ident); + } +#endif + m_sockets.erase(it); + } + } + + /// <summary> + /// Start server + /// </summary> + void start() + { + LOG_INFO("Reactor: Starting..."); + startThread(); + } + + /// <summary> + /// Stop server + /// </summary> + void stop() + { + LOG_INFO("Reactor: Stopping..."); + joinThread(); +#ifdef _WIN32 + for (auto &hEvent : m_events) + { + ::WSACloseEvent(hEvent); + } +#else /* Linux and Mac */ + for (auto &sd : m_sockets) + { +# ifdef __linux__ + ::epoll_ctl(m_epollFd, EPOLL_CTL_DEL, sd.socket, nullptr); +# endif +# ifdef TARGET_OS_MAC + struct kevent event; + bzero(&event, sizeof(event)); + event.ident = sd.socket; + EV_SET(&event, sd.socket, EVFILT_READ, EV_DELETE, 0, 0, NULL); + if (-1 == kevent(kq, &event, 1, NULL, 0, NULL)) + { + LOG_ERROR("cannot delete fd=0x%x from kqueue!", event.ident); + } + EV_SET(&event, sd.socket, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + if (-1 == kevent(kq, &event, 1, NULL, 0, NULL)) + { + LOG_ERROR("cannot delete fd=0x%x from kqueue!", event.ident); + } +# endif + } +#endif + m_sockets.clear(); + } + + /// <summary> + /// Thread Loop for async events processing + /// </summary> + virtual void onThread() override + { + LOG_INFO("Reactor: Thread started"); + while (!shouldTerminate()) + { +#ifdef _WIN32 + DWORD dwResult = ::WSAWaitForMultipleEvents(static_cast<DWORD>(m_events.size()), + m_events.data(), FALSE, 500, FALSE); + if (dwResult == WSA_WAIT_TIMEOUT) + { + continue; + } + + assert(dwResult <= WSA_WAIT_EVENT_0 + m_events.size()); + int index = dwResult - WSA_WAIT_EVENT_0; + Socket socket = m_sockets[index].socket; + int flags = m_sockets[index].flags; + + WSANETWORKEVENTS ne; + ::WSAEnumNetworkEvents(socket, m_events[index], &ne); + LOG_TRACE( + "Reactor: Handling socket 0x%x (index %d) with active flags 0x%x " + "(armed 0x%x)", + static_cast<int>(socket), index, ne.lNetworkEvents, flags); + + if ((flags & Readable) && (ne.lNetworkEvents & FD_READ)) + { + m_callback.onSocketReadable(socket); + } + if ((flags & Writable) && (ne.lNetworkEvents & FD_WRITE)) + { + m_callback.onSocketWritable(socket); + } + if ((flags & Acceptable) && (ne.lNetworkEvents & FD_ACCEPT)) + { + m_callback.onSocketAcceptable(socket); + } + if ((flags & Closed) && (ne.lNetworkEvents & FD_CLOSE)) + { + m_callback.onSocketClosed(socket); + } +#endif + +#ifdef __linux__ + epoll_event events[4]; + int result = ::epoll_wait(m_epollFd, events, sizeof(events) / sizeof(events[0]), 500); + if (result == 0 || (result == -1 && errno == EINTR)) + { + continue; + } + + assert(result >= 1 && static_cast<size_t>(result) <= sizeof(events) / sizeof(events[0])); + for (int i = 0; i < result; i++) + { + auto it = std::find(m_sockets.begin(), m_sockets.end(), events[i].data.fd); + assert(it != m_sockets.end()); + Socket socket = it->socket; + int flags = it->flags; + + LOG_TRACE("Reactor: Handling socket 0x%x active flags 0x%x (armed 0x%x)", + static_cast<int>(socket), events[i].events, flags); + + if ((flags & Readable) && (events[i].events & EPOLLIN)) + { + m_callback.onSocketReadable(socket); + } + if ((flags & Writable) && (events[i].events & EPOLLOUT)) + { + m_callback.onSocketWritable(socket); + } + if ((flags & Acceptable) && (events[i].events & EPOLLIN)) + { + m_callback.onSocketAcceptable(socket); + } + if ((flags & Closed) && (events[i].events & (EPOLLHUP | EPOLLERR))) + { + m_callback.onSocketClosed(socket); + } + } +#endif + +#if defined(TARGET_OS_MAC) + unsigned waitms = 500; // never block for more than 500ms + struct timespec timeout; + timeout.tv_sec = waitms / 1000; + timeout.tv_nsec = (waitms % 1000) * 1000 * 1000; + + int nev = kevent(kq, NULL, 0, m_events, KQUEUE_SIZE, &timeout); + for (int i = 0; i < nev; i++) + { + struct kevent &event = m_events[i]; + int fd = (int)event.ident; + auto it = std::find(m_sockets.begin(), m_sockets.end(), fd); + assert(it != m_sockets.end()); + Socket socket = it->socket; + int flags = it->flags; + + LOG_TRACE("Handling socket 0x%x active flags 0x%x (armed 0x%x)", static_cast<int>(socket), + event.flags, event.fflags); + + if (event.filter == EVFILT_READ) + { + if (flags & Acceptable) + { + m_callback.onSocketAcceptable(socket); + } + if (flags & Readable) + { + m_callback.onSocketReadable(socket); + } + continue; + } + + if (event.filter == EVFILT_WRITE) + { + if (flags & Writable) + { + m_callback.onSocketWritable(socket); + } + continue; + } + + if ((event.flags & EV_EOF) || (event.flags & EV_ERROR)) + { + LOG_TRACE("event.filter=%s", "EVFILT_WRITE"); + m_callback.onSocketClosed(socket); + it->flags = Closed; + struct kevent kevt; + EV_SET(&kevt, event.ident, EVFILT_READ, EV_DELETE, 0, 0, NULL); + if (-1 == kevent(kq, &kevt, 1, NULL, 0, NULL)) + { + LOG_ERROR("cannot delete fd=0x%x from kqueue!", event.ident); + } + EV_SET(&kevt, event.ident, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + if (-1 == kevent(kq, &kevt, 1, NULL, 0, NULL)) + { + LOG_ERROR("cannot delete fd=0x%x from kqueue!", event.ident); + } + continue; + } + LOG_ERROR("Reactor: unhandled kevent!"); + } +#endif + } + LOG_TRACE("Reactor: Thread done"); + } +}; + +} // namespace SocketTools diff --git a/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/latency_boundaries.h b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/latency_boundaries.h new file mode 100644 index 000000000..cc03b169b --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/latency_boundaries.h @@ -0,0 +1,60 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include <array> +#include <chrono> + +#include "opentelemetry/version.h" + +using std::chrono::microseconds; +using std::chrono::milliseconds; +using std::chrono::nanoseconds; +using std::chrono::seconds; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace zpages +{ +/** + * kLatencyBoundaries is a constant array that contains the 9 latency + * boundaries. Each value in the array represents the lower limit(inclusive) of + * the boundary(in nano seconds) and the upper limit(exclusive) of the boundary + * is the lower limit of the next one. The upper limit of the last boundary is + * INF. + */ +const std::array<nanoseconds, 9> kLatencyBoundaries = { + nanoseconds(0), + nanoseconds(microseconds(10)), + nanoseconds(microseconds(100)), + nanoseconds(milliseconds(1)), + nanoseconds(milliseconds(10)), + nanoseconds(milliseconds(100)), + nanoseconds(seconds(1)), + nanoseconds(seconds(10)), + nanoseconds(seconds(100)), +}; + +/** + * LatencyBoundary enum is used to index into the kLatencyBoundaries container. + * Using this enum lets you access the latency boundary at each index without + * using magic numbers + */ +enum LatencyBoundary +{ + k0MicroTo10Micro, + k10MicroTo100Micro, + k100MicroTo1Milli, + k1MilliTo10Milli, + k10MilliTo100Milli, + k100MilliTo1Second, + k1SecondTo10Second, + k10SecondTo100Second, + k100SecondToMax +}; + +} // namespace zpages +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/static/tracez_index.h b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/static/tracez_index.h new file mode 100644 index 000000000..c4c5b4933 --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/static/tracez_index.h @@ -0,0 +1,48 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +const char tracez_index[] = + "" + "<!doctype html>" + "<html>" + " <head>" + " <title>zPages TraceZ</title>" + " <script src='/tracez/script.js'></script>" + " <link href='/tracez/style.css' rel='stylesheet'>" + " </head>" + " <body>" + " <h1>zPages TraceZ</h1>" + " <span id='top-right'>Data last fetched: <span id='lastUpdateTime'></span><br>" + " <button onclick='refreshData()'>Fetch new data</button></span>" + " <br><br>" + " <div class='table-wrap'>" + " <table id='headers'>" + " <colgroup>" + " <col class='md'>" + " <col class='sm'>" + " <col class='sm'>" + " <col class='lg'>" + " </colgroup>" + " <tr>" + " <th>Span Name</th>" + " <th>Error Samples</th>" + " <th>Running</th>" + " <th>Latency Samples</th>" + " </tr>" + " </table>" + " <table id='overview_table'>" + " </table>" + " <div class='right'>Row count: <span id='overview_table_count'>0</span></div>" + " </div>" + " <br>" + " <br>" + " <span id='name_type_detail_table_header'></span>" + " <div class='table-wrap'>" + " <table id='name_type_detail_table'>" + " </table>" + " <div class='right'>Row count: <span id='name_type_detail_table_count'>0</span></div>" + " </div>" + " </body>" + "</html>"; diff --git a/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/static/tracez_script.h b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/static/tracez_script.h new file mode 100644 index 000000000..b21ceea8e --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/static/tracez_script.h @@ -0,0 +1,293 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +const char tracez_script[] = + "" + "window.onload = () => refreshData();" + "" + "const latencies = [" + " '>0s', '>10µs', '>100µs'," + " '>1ms', '>10ms', '>100ms'," + " '>1s', '>10s', '>100s'," + "];" + "" + "const statusCodeDescriptions = {" + " 'OK': 'The operation completed successfully.'," + " 'CANCELLED': 'The operation was cancelled (typically by the caller).'," + " 'UNKNOWN': `Unknown error. An example of where this error may be returned is if a Status " + "value received" + " from another address space belongs to an error-space that is not known in this " + "address space." + " Also errors raised by APIs that do not return enough error information may be " + "converted to" + " this error.`," + " 'INVALID_ARGUMENT': `Client specified an invalid argument. Note that this differs from " + "FAILED_PRECONDITION." + " INVALID_ARGUMENT indicates arguments that are problematic regardless of the state " + "of the" + " system (e.g., a malformed file name).`," + " 'DEADLINE_EXCEEDED': `Deadline expired before operation could complete. For operations that " + "change the state of the" + " system, this error may be returned even if the operation has completed " + "successfully. For" + " example, a successful response from a server could have been delayed long enough " + "for the" + " deadline to expire.`," + " 'NOT_FOUND' : 'Some requested entity (e.g., file or directory) was not found.'," + " 'ALREADY_EXISTS': 'Some entity that we attempted to create (e.g., file or directory) " + "already exists.'," + " 'PERMISSION_DENIED': `The caller does not have permission to execute the specified " + "operation. PERMISSION_DENIED" + " must not be used for rejections caused by exhausting some resource (use " + "RESOURCE_EXHAUSTED" + " instead for those errors). PERMISSION_DENIED must not be used if the caller cannot " + "be" + " identified (use UNAUTHENTICATED instead for those errors).`," + " 'RESOURCE_EXHAUSTED': `Some resource has been exhausted, perhaps a per-user quota, or " + "perhaps the entire file system" + " is out of space.`," + " 'FAILED_PRECONDITION': `Operation was rejected because the system is not in a state " + "required for the operation's" + " execution. For example, directory to be deleted may be non-empty, an rmdir " + "operation is" + " applied to a non-directory, etc.`," + " 'ABORTED': `The operation was aborted, typically due to a concurrency issue like sequencer " + "check" + " failures, transaction aborts, etc`," + " 'OUT_OF_RANGE': `Operation was attempted past the valid range. E.g., seeking or reading " + "past end of file.`," + " 'UNIMPLEMENTED': 'Operation is not implemented or not supported/enabled in this service.'," + " 'INTERNAL': `Internal errors. Means some invariants expected by underlying system has been " + "broken. If you" + " see one of these errors, something is very broken.`," + " 'UNAVAILABLE': `The service is currently unavailable. This is a most likely a transient " + "condition and may be" + " corrected by retrying with a backoff.`," + " 'DATA_LOSS': 'Unrecoverable data loss or corruption.'," + " 'UNAUTHENTICATED': 'The request does not have valid authentication credentials for the " + "operation.'," + "};" + "" + "const units = {'duration': 'ns'};" + "" + "" + "const details = {'status': statusCodeDescriptions};" + "" + "/* Latency info is returned as an array, so they need to be parsed accordingly */" + "const getLatencyCell = (span, i, h) => `<td${span[h][i] === 0 ? '' : ` class='click'" + " onclick=\"overwriteDetailedTable(${i}, '${span['name']}')\"`}>${span[h][i]}</td>`;" + "" + "/* Pretty print a cell with a map */" + "const getKeyValueCell = (span, h) => `<td><pr><code>" + " ${JSON.stringify(span[h], null, 2)}" + " </code></pre></td>`;" + "" + "/* Standard categories when checking span details */" + "const idCols = ['spanid', 'parentid', 'traceid'];" + "const detailCols = ['attributes']; /* Columns error, running, and latency spans all share */" + "const dateCols = ['start']; /* Categories to change to date */" + "const numCols = ['duration']; /* Categories to change to num */" + "const clickCols = ['error', 'running']; /* Non-latency clickable cols */" + "const arrayCols = { " + " 'latency': getLatencyCell," + " 'events': getKeyValueCell," + " 'attributes': getKeyValueCell" + "};" + "" + "const base_endpt = '/tracez/get/'; /* For making GET requests */" + "" + "/* Maps table types to their approporiate formatting */" + "const tableFormatting = {" + " 'all': {" + " 'url': base_endpt + 'aggregations'," + " 'html_id': 'overview_table'," + " 'sizing': [" + " {'sz': 'md', 'repeats': 1}," + " {'sz': 'sm', 'repeats': 11}," + " ]," + " 'headings': ['name', ...clickCols, 'latency']," + " 'cell_headings': ['name', ...clickCols, ...latencies]," + " }," + " 'error': {" + " 'url': base_endpt + 'error/'," + " 'html_id': 'name_type_detail_table'," + " 'sizing': [" + " {'sz': 'sm', 'repeats': 5}," + " {'sz': 'sm-md', 'repeats': 1}," + " ]," + " 'headings': [...idCols, ...dateCols, 'status', ...detailCols]," + " 'has_subheading': true," + " }," + " 'running': {" + " 'url': base_endpt + 'running/'," + " 'html_id': 'name_type_detail_table'," + " 'sizing': [" + " {'sz': 'sm', 'repeats': 4}," + " {'sz': 'sm-md', 'repeats': 1}," + " ]," + " 'headings': [...idCols, ...dateCols, ...detailCols]," + " 'has_subheading': true," + " 'status': 'pending'," + " }," + " 'latency': {" + " 'url': base_endpt + 'latency/'," + " 'html_id': 'name_type_detail_table'," + " 'sizing': [" + " {'sz': 'sm', 'repeats': 5}," + " {'sz': 'sm-md', 'repeats': 1}," + " ]," + " 'headings': [...idCols, ...dateCols, ...numCols, ...detailCols]," + " 'has_subheading': true," + " 'status': 'ok'" + " }" + "};" + "const getFormat = group => tableFormatting[group];" + "" + "/* Getters using formatting config variable */" + "const getURL = group => getFormat(group)['url'];" + "const getHeadings = group => getFormat(group)['headings'];" + "const getCellHeadings = group => 'cell_headings' in getFormat(group)" + " ? getFormat(group)['cell_headings'] : getHeadings(group); " + "const getSizing = group => getFormat(group)['sizing'];" + "const getStatus = group => isLatency(group) ? 'ok' : getFormat(group)['status'];" + "const getHTML = group => getFormat(group)['html_id'];" + "" + "const isDate = col => new Set(dateCols).has(col);" + "const isLatency = group => !(new Set(clickCols).has(group)); /* non latency clickable cols, " + "change to include latency? */" + "const isArrayCol = group => (new Set(Object.keys(arrayCols)).has(group));" + "const hasCallback = col => new Set(clickCols).has(col); /* Non-latency cb columns */" + "const hideHeader = h => new Set([...clickCols, 'name']).has(h); /* Headers to not show render " + "twice */" + "const hasSubheading = group => isLatency(group) || 'has_subheading' in getFormat(group); " + "const hasStatus = group => isLatency(group) || 'status' in getFormat(group);" + "" + "const toTitlecase = word => word.charAt(0).toUpperCase() + word.slice(1);" + "const updateLastRefreshStr = () => document.getElementById('lastUpdateTime').innerHTML = new " + "Date().toLocaleString();" + "" + "const getStatusHTML = group => !hasStatus(group) ? ''" + " : `All of these spans have status code ${getStatus(group)}`;" + "" + "/* Returns an HTML string that handlles width formatting" + " for a table group */" + "const tableSizing = group => '<colgroup>'" + " + getSizing(group).map(sz =>" + " (`<col class='${sz['sz']}'></col>`).repeat(sz['repeats']))" + " .join('')" + " + '</colgroup>';" + "" + "/* Returns an HTML string for a table group's headings," + " hiding headings where needed */" + "const tableHeadings = group => '<tr>'" + " + getCellHeadings(group).map(h => `<th>${(hideHeader(h) ? '' : h)}</th>`).join('')" + " + '</tr>';" + "" + "/* Returns an HTML string, which represents the formatting for" + " the entire header for a table group. This doesn't change, and" + " includes the width formatting and the actual table headers */" + "const tableHeader = group => tableSizing(group) + tableHeadings(group);" + "" + "/* Return formatting for an array-based value based on its header */" + "const getArrayCells = (h, span) => span[h].length" + " ? (span[h].map((_, i) => arrayCols[h](span, i, h))).join('')" + " : (Object.keys(span[h]).length ? arrayCols[h](span, h) : `<td>${emptyContent()}</td>`);" + "" + "const emptyContent = () => `<span class='empty'>(not set)</span>`;" + "" + "const dateStr = nanosec => {" + " const mainDate = new Date(nanosec / 1000000).toLocaleString();" + " let lostPrecision = String(nanosec % 1000000);" + " while (lostPrecision.length < 6) lostPrecision = 0 + lostPrecision;" + " const endingLocation = mainDate.indexOf('M') - 2;" + " return `${mainDate.substr(0, " + "endingLocation)}:${lostPrecision}${mainDate.substr(endingLocation)}`;" + "};" + "" + "const detailCell = (h, span) => {" + " const detailKey = Object.keys(details[h])[span[h]];" + " const detailVal = details[h][detailKey];" + " return `<span class='has-tooltip'>" + " ${detailKey}" + " <span class='tooltip'>${detailVal}</span>" + " </span>`;" + "};" + "" + "/* Format cells as needed */" + "const getCellContent = (h, span) => {" + " if (h in details) return detailCell(h, span);" + " else if (h in units) return `${span[h]} ${units[h]}`;" + " else if (span[h] === '') return emptyContent();" + " else if (!isDate(h)) return span[h];" + " return dateStr(span[h]);" + "};" + "" + "/* Create cell based on what header we want to render */" + "const getCell = (h, span) => (isArrayCol(h)) ? getArrayCells(h, span)" + " : `<td ${hasCallback(h) && span[h] !== 0 ? (`class='click'" + " onclick=\"overwriteDetailedTable('${h}', '${span['name']}')\"`)" + " : ''}>` + `${getCellContent(h, span)}</td>`;" + "" + "/* Returns an HTML string with for a span's aggregated data" + " while columns are ordered according to its table group */" + "const tableRow = (group, span) => '<tr>'" + " + getHeadings(group).map(h => getCell(h, span)).join('')" + " + '</tr>';" + "" + "/* Returns an HTML string from all the data given as" + " table rows, with each row being a group of spans by name */" + "const tableRows = (group, data) => data.map(span => tableRow(group, span)).join('');" + "" + "/* Overwrite a table on the DOM based on the group given by adding" + " its headers and fetching data for its url */" + "function overwriteTable(group, url_end = '') {" + " fetch(getURL(group) + url_end).then(res => res.json())" + " .then(data => {" + " document.getElementById(getHTML(group))" + " .innerHTML = tableHeader(group)" + " + tableRows(group, data);" + " document.getElementById(getHTML(group) + '_count')" + " .innerHTML = data.length;" + " })" + " .catch(err => console.log(err));" + "};" + "" + "/* Adds a title subheading where needed */" + "function updateSubheading(group, name) {" + " if (hasSubheading(group)) {" + " document.getElementById(getHTML(isLatency(group) ? 'latency' : group) + '_header')" + " .innerHTML = `<h2><span class='subhead-name'>${name}</span>" + " ${(isLatency(group) ? `${latencies[group]} Bucket` : toTitlecase(group))}" + " Spans</h2><i>Showing span details for up to 5 most recent spans. " + " ${getStatusHTML(group)}</i><br><br>`;" + " }" + "};" + "" + "/* Overwrites a table on the DOM based on the group and also" + " changes the subheader, since this a looking at sampled spans */" + "function overwriteDetailedTable(group, name) {" + " if (isLatency(group)) overwriteTable('latency', group + '/' + name);" + " else overwriteTable(group, name);" + " updateSubheading(group, name);" + "};" + "" + "/* Append to a table on the DOM based on the group given */" + "function addToTable(group, url_end = '') {" + " fetch(getURL(group) + url_end).then(res => res.json())" + " .then(data => {" + " const rowsStr = tableRows(group, data);" + " if (!rowsStr) console.log(`No rows added for ${group} table`);" + " document.getElementById(getHTML(group))" + " .getElementsByTagName('tbody')[0]" + " .innerHTML += rowsStr;" + " })" + " .catch(err => console.log(err));" + "};" + "" + "const refreshData = () => {" + " updateLastRefreshStr();" + " overwriteTable('all');" + "};" + ""; diff --git a/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/static/tracez_style.h b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/static/tracez_style.h new file mode 100644 index 000000000..16b83f897 --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/static/tracez_style.h @@ -0,0 +1,165 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +const char tracez_style[] = + "" + "body {" + " color: #252525;" + " text-align: center;" + " font-family: monospace, sans-serif;" + " word-break: break-all;" + " font-size: .9em" + "}" + "" + "code {" + " font-size: 12px;" + "}" + "" + "h1 {" + " margin: 20px 0 0;" + "}" + "" + "table {" + " font-family: monospace, sans-serif;" + " border-collapse: collapse;" + " font-size: 1.05em;" + " width: 100%;" + "}" + "" + ".table-wrap {" + " width: 100%;" + " min-width: 700px;" + " max-width: 2000px;" + " margin: auto;" + "}" + "" + "td, th {" + " word-break: break-word;" + " border: 1px solid #f5f5f5;" + " padding: 6px;" + " text-align: center;" + "}" + "" + "#overview_table th, #overview_table tr {" + " border-top: none;" + "}" + "" + "#headers th, #headers tr {" + " border-bottom: none;" + "}" + "" + "#top-right {" + " text-align: right;" + " position: absolute;" + " top: 10px;" + " right: 10px;" + " text-shadow: .5px .5px .25px #fff;" + "}" + "" + "#top-right button {" + " color: #f6a81c;" + " border: 2px solid #f6a81c;" + " padding: 10px;" + " margin: 10px;" + " text-transform: uppercase;" + " letter-spacing: 1px;" + " background-color: white;" + " border-radius: 10px;" + " font-weight: bold;" + "}" + "" + ".right {" + " text-align: right;" + " padding: 10px;" + "}" + "" + ":hover {" + " transition-duration: .15s;" + "}" + "" + "#top-right button:hover {" + " border-color: #4b5fab;" + " color: #4b5fab;" + " cursor: pointer;" + "}" + "" + "tr:nth-child(even) {" + " background-color: #eee;" + "}" + "" + ".click {" + " text-decoration: underline dotted #4b5fab;" + "}" + "" + "tr:hover, td:hover, .click:hover {" + " color: white;" + " background-color: #4b5fab;" + "}" + "" + "tr:hover {" + " background-color: #4b5fabcb;" + "}" + "" + "th {" + " background-color: white;" + " color: #252525;" + "}" + "" + ".click:hover {" + " cursor: pointer;" + " color: #f6a81ccc;" + "}" + "" + ".empty {" + " color: #999;" + "}" + "" + ".sm {" + " width: 7%;" + "}" + "" + ".sm-md {" + " width: 13%;" + "}" + "" + ".md {" + " width: 23%;" + "}" + "" + ".lg {" + " width: 63%;" + "}" + "" + "img {" + " width: 50%;" + " max-width: 500px;" + "}" + "" + ".subhead-name {" + " color: #4b5fab;" + "}" + "" + ".has-tooltip {" + " text-decoration: underline dotted #f6a81c;" + "}" + "" + ".has-tooltip:hover .tooltip {" + " display: block;" + "}" + "" + ".tooltip {" + " display: none;" + " position: absolute;" + "}" + "" + ".tooltip, .tooltip:hover {" + " background: #ffffffd9;" + " padding: 10px;" + " z-index: 1000;" + " color: #252525 !important;" + " border-radius: 10px;" + " margin: 3px 20px 0 0;" + "}" + ""; diff --git a/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/threadsafe_span_data.h b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/threadsafe_span_data.h new file mode 100644 index 000000000..3b3c9a7ec --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/threadsafe_span_data.h @@ -0,0 +1,252 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include <chrono> +#include <mutex> +#include <unordered_map> +#include <vector> + +#include "opentelemetry/common/timestamp.h" +#include "opentelemetry/nostd/string_view.h" +#include "opentelemetry/sdk/trace/recordable.h" +#include "opentelemetry/sdk/trace/span_data.h" +#include "opentelemetry/trace/canonical_code.h" +#include "opentelemetry/trace/span.h" +#include "opentelemetry/trace/span_id.h" +#include "opentelemetry/trace/trace_id.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace zpages +{ + +/** + * This class is a threadsafe version of span data used for zpages in OT + */ +class ThreadsafeSpanData final : public opentelemetry::sdk::trace::Recordable +{ +public: + /** + * Get the trace id for this span + * @return the trace id for this span + */ + opentelemetry::trace::TraceId GetTraceId() const noexcept + { + std::lock_guard<std::mutex> lock(mutex_); + return span_context_.trace_id(); + } + + /** + * Get the span id for this span + * @return the span id for this span + */ + opentelemetry::trace::SpanId GetSpanId() const noexcept + { + std::lock_guard<std::mutex> lock(mutex_); + return span_context_.span_id(); + } + + /** + * Get the span context for this span + * @return the span context for this span + */ + const opentelemetry::trace::SpanContext &GetSpanContext() const noexcept + { + std::lock_guard<std::mutex> lock(mutex_); + return span_context_; + } + + /** + * Get the parent span id for this span + * @return the span id for this span's parent + */ + opentelemetry::trace::SpanId GetParentSpanId() const noexcept + { + std::lock_guard<std::mutex> lock(mutex_); + return parent_span_id_; + } + + /** + * Get the name for this span + * @return the name for this span + */ + opentelemetry::nostd::string_view GetName() const noexcept + { + std::lock_guard<std::mutex> lock(mutex_); + return name_; + } + + /** + * Get the status for this span + * @return the status for this span + */ + opentelemetry::trace::StatusCode GetStatus() const noexcept + { + std::lock_guard<std::mutex> lock(mutex_); + return status_code_; + } + + /** + * Get the status description for this span + * @return the description of the the status of this span + */ + opentelemetry::nostd::string_view GetDescription() const noexcept + { + std::lock_guard<std::mutex> lock(mutex_); + return status_desc_; + } + + /** + * Get the start time for this span + * @return the start time for this span + */ + opentelemetry::common::SystemTimestamp GetStartTime() const noexcept + { + std::lock_guard<std::mutex> lock(mutex_); + return start_time_; + } + + /** + * Get the duration for this span + * @return the duration for this span + */ + std::chrono::nanoseconds GetDuration() const noexcept + { + std::lock_guard<std::mutex> lock(mutex_); + return duration_; + } + + /** + * Get the attributes for this span + * @return the attributes for this span + */ + std::unordered_map<std::string, opentelemetry::sdk::common::OwnedAttributeValue> GetAttributes() + const noexcept + { + std::lock_guard<std::mutex> lock(mutex_); + return attributes_; + } + + void SetIdentity(const opentelemetry::trace::SpanContext &span_context, + opentelemetry::trace::SpanId parent_span_id) noexcept override + { + std::lock_guard<std::mutex> lock(mutex_); + span_context_ = span_context; + parent_span_id_ = parent_span_id; + } + + void SetAttribute(nostd::string_view key, const common::AttributeValue &value) noexcept override + { + std::lock_guard<std::mutex> lock(mutex_); + attributes_[std::string(key)] = nostd::visit(converter_, value); + } + + void SetStatus(opentelemetry::trace::StatusCode code, + nostd::string_view description) noexcept override + { + std::lock_guard<std::mutex> lock(mutex_); + status_code_ = code; + status_desc_ = std::string(description); + } + + void SetName(nostd::string_view name) noexcept override + { + std::lock_guard<std::mutex> lock(mutex_); + name_ = std::string(name); + } + + void SetSpanKind(opentelemetry::trace::SpanKind span_kind) noexcept override + { + span_kind_ = span_kind; + } + + void SetResource(const opentelemetry::sdk::resource::Resource & /*resource*/) noexcept override + { + // Not Implemented + } + + void SetStartTime(opentelemetry::common::SystemTimestamp start_time) noexcept override + { + std::lock_guard<std::mutex> lock(mutex_); + start_time_ = start_time; + } + + void SetDuration(std::chrono::nanoseconds duration) noexcept override + { + std::lock_guard<std::mutex> lock(mutex_); + duration_ = duration; + } + + void SetInstrumentationLibrary( + const opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary + &instrumentation_library) noexcept override + { + std::lock_guard<std::mutex> lock(mutex_); + instrumentation_library_ = &instrumentation_library; + } + + void AddLink(const opentelemetry::trace::SpanContext &span_context, + const opentelemetry::common::KeyValueIterable &attributes = + opentelemetry::common::KeyValueIterableView<std::map<std::string, int>>( + {})) noexcept override + { + std::lock_guard<std::mutex> lock(mutex_); + (void)span_context; + (void)attributes; + } + + void AddEvent( + nostd::string_view name, + common::SystemTimestamp timestamp = common::SystemTimestamp(std::chrono::system_clock::now()), + const opentelemetry::common::KeyValueIterable &attributes = + opentelemetry::common::KeyValueIterableView<std::map<std::string, int>>( + {})) noexcept override + { + std::lock_guard<std::mutex> lock(mutex_); + events_.push_back( + opentelemetry::sdk::trace::SpanDataEvent(std::string(name), timestamp, attributes)); + } + + ThreadsafeSpanData() {} + ThreadsafeSpanData(const ThreadsafeSpanData &threadsafe_span_data) + : ThreadsafeSpanData(threadsafe_span_data, + std::lock_guard<std::mutex>(threadsafe_span_data.mutex_)) + {} + +private: + ThreadsafeSpanData(const ThreadsafeSpanData &threadsafe_span_data, + const std::lock_guard<std::mutex> &) + : span_context_(threadsafe_span_data.span_context_), + parent_span_id_(threadsafe_span_data.parent_span_id_), + start_time_(threadsafe_span_data.start_time_), + duration_(threadsafe_span_data.duration_), + name_(threadsafe_span_data.name_), + status_code_(threadsafe_span_data.status_code_), + status_desc_(threadsafe_span_data.status_desc_), + attributes_(threadsafe_span_data.attributes_), + events_(threadsafe_span_data.events_), + converter_(threadsafe_span_data.converter_), + instrumentation_library_(threadsafe_span_data.instrumentation_library_) + {} + + mutable std::mutex mutex_; + opentelemetry::trace::SpanContext span_context_{false, false}; + opentelemetry::trace::SpanId parent_span_id_; + common::SystemTimestamp start_time_; + std::chrono::nanoseconds duration_{0}; + std::string name_; + opentelemetry::trace::SpanKind span_kind_; + opentelemetry::trace::StatusCode status_code_{opentelemetry::trace::StatusCode::kUnset}; + std::string status_desc_; + std::unordered_map<std::string, opentelemetry::sdk::common::OwnedAttributeValue> attributes_; + std::vector<opentelemetry::sdk::trace::SpanDataEvent> events_; + opentelemetry::sdk::common::AttributeConverter converter_; + const opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary + *instrumentation_library_; +}; +} // namespace zpages +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/tracez_data.h b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/tracez_data.h new file mode 100644 index 000000000..4594fbe05 --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/tracez_data.h @@ -0,0 +1,86 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include <array> +#include <iostream> +#include <list> +#include <string> + +#include "opentelemetry/ext/zpages/threadsafe_span_data.h" +#include "opentelemetry/nostd/span.h" +#include "opentelemetry/nostd/string_view.h" +#include "opentelemetry/sdk/trace/span_data.h" +#include "opentelemetry/trace/canonical_code.h" +#include "opentelemetry/trace/span_id.h" +#include "opentelemetry/trace/trace_id.h" +#include "opentelemetry/version.h" + +using opentelemetry::ext::zpages::ThreadsafeSpanData; +using opentelemetry::trace::CanonicalCode; +using opentelemetry::trace::SpanId; +using opentelemetry::trace::TraceId; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace zpages +{ + +/** + * kMaxNumberOfSampleSpans is the maximum number of running, completed or error + * sample spans stored at any given time for a given span name. + * This limit is introduced to reduce memory usage by trimming sample spans + * stored. + */ +const int kMaxNumberOfSampleSpans = 5; + +/** + * TracezData is the data to be displayed for tracez zpages that is stored for + * each span name. + */ +struct TracezData +{ + /** + * TODO: At this time the maximum count is unknown but a larger data type + * might have to be used in the future to store these counts to avoid overflow + */ + unsigned int running_span_count; + unsigned int error_span_count; + + /** + * completed_span_count_per_latency_bucket is an array that stores the count + * of spans for each of the 9 latency buckets. + */ + std::array<unsigned int, kLatencyBoundaries.size()> completed_span_count_per_latency_bucket; + + /** + * sample_latency_spans is an array of lists, each index of the array + * corresponds to a latency boundary(of which there are 9). + * The list in each index stores the sample spans for that latency boundary. + */ + std::array<std::list<ThreadsafeSpanData>, kLatencyBoundaries.size()> sample_latency_spans; + + /** + * sample_error_spans is a list that stores the error samples for a span name. + */ + std::list<ThreadsafeSpanData> sample_error_spans; + + /** + * sample_running_spans is a list that stores the running span samples for a + * span name. + */ + std::list<ThreadsafeSpanData> sample_running_spans; + + TracezData() + { + running_span_count = 0; + error_span_count = 0; + completed_span_count_per_latency_bucket.fill(0); + } +}; + +} // namespace zpages +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/tracez_data_aggregator.h b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/tracez_data_aggregator.h new file mode 100644 index 000000000..28236b60f --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/tracez_data_aggregator.h @@ -0,0 +1,171 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include <array> +#include <atomic> +#include <condition_variable> +#include <list> +#include <map> +#include <mutex> +#include <string> +#include <thread> +#include <unordered_set> + +#include "opentelemetry/ext/zpages/latency_boundaries.h" +#include "opentelemetry/ext/zpages/tracez_data.h" +#include "opentelemetry/ext/zpages/tracez_shared_data.h" +#include "opentelemetry/nostd/span.h" +#include "opentelemetry/nostd/string_view.h" +#include "opentelemetry/sdk/trace/span_data.h" +#include "opentelemetry/trace/canonical_code.h" + +using opentelemetry::trace::CanonicalCode; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace zpages +{ +/** + * TracezDataAggregator object is responsible for collecting raw data and + * converting it to useful information that can be made available to + * display on the tracez zpage. + * + * When this object is created it starts a thread that calls a function + * periodically to update the aggregated data with new spans. + * + * The only exposed function is a getter that returns a copy of the aggregated + * data when requested. This function is ensured to be called in sequence to the + * aggregate spans function which is called periodically. + * + * TODO: Consider a singleton pattern for this class, not sure if multiple + * instances of this class should exist. + */ +class TracezDataAggregator +{ +public: + /** + * Constructor creates a thread that calls a function to aggregate span data + * at regular intervals. + * @param shared_data is the shared set of spans to expose. + * @param update_interval the time duration for updating the aggregated data. + */ + TracezDataAggregator(std::shared_ptr<TracezSharedData> shared_data, + milliseconds update_interval = milliseconds(10)); + + /** Ends the thread set up in the constructor and destroys the object **/ + ~TracezDataAggregator(); + + /** + * GetAggregatedTracezData returns a copy of the updated data. + * @returns a map with the span name as key and the tracez span data as value. + */ + std::map<std::string, TracezData> GetAggregatedTracezData(); + +private: + /** + * AggregateSpans is the function that is called to update the aggregated data + * with newly completed and running span data + */ + void AggregateSpans(); + + /** + * AggregateCompletedSpans is the function that is called to update the + * aggregation with the data of newly completed spans. + * @param completed_spans are the newly completed spans. + */ + void AggregateCompletedSpans(std::vector<std::unique_ptr<ThreadsafeSpanData>> &completed_spans); + + /** + * AggregateRunningSpans aggregates the data for all running spans received + * from the span processor. Running spans are not cleared by the span + * processor and multiple calls to this function may contain running spans for + * which data has already been collected in a previous call. Additionally, + * span names can change while span is running and there seems to be + * no trivial to way to know if it is a new or old running span so at every + * call to this function the available running span data is reset and + * recalculated. At this time there is no unique way to identify a span + * object once this is done, there might be some better ways to do this. + * TODO : SpanProcessor is never notified when a span name is changed while it + * is running and that is propogated to the data aggregator. The running span + * name if changed while it is running will not be updated in the data + * aggregator till the span is completed. + * @param running_spans is the running spans to be aggregated. + */ + void AggregateRunningSpans(std::unordered_set<ThreadsafeSpanData *> &running_spans); + + /** + * AggregateStatusOKSpans is the function called to update the data of spans + * with status code OK. + * @param ok_span is the span who's data is to be aggregated + */ + void AggregateStatusOKSpan(std::unique_ptr<ThreadsafeSpanData> &ok_span); + + /** + * AggregateStatusErrorSpans is the function that is called to update the + * data of error spans + * @param error_span is the error span who's data is to be aggregated + */ + void AggregateStatusErrorSpan(std::unique_ptr<ThreadsafeSpanData> &error_span); + + /** + * ClearRunningSpanData is a function that is used to clear all running span + * at the beginning of a call to AggregateSpan data. + * Running span data has to be cleared before aggregation because running + * span data is recalculated at every call to AggregateSpans. + */ + void ClearRunningSpanData(); + + /** + * FindLatencyBoundary finds the latency boundary to which the duration of + * the given span_data belongs to + * @ param span_data is the ThreadsafeSpanData whose duration for which the latency + * boundary is to be found + * @ returns LatencyBoundary is the latency boundary that the duration belongs + * to + */ + LatencyBoundary FindLatencyBoundary(std::unique_ptr<ThreadsafeSpanData> &ok_span); + + /** + * InsertIntoSampleSpanList is a helper function that is called to insert + * a given span into a sample span list. A function is used for insertion + * because list size is to be limited at a set maximum. + * @param sample_spans the sample span list into which span is to be inserted + * @param span_data the span_data to be inserted into list + */ + void InsertIntoSampleSpanList(std::list<ThreadsafeSpanData> &sample_spans, + ThreadsafeSpanData &span_data); + + /** Instance of shared spans used to collect raw data **/ + std::shared_ptr<TracezSharedData> tracez_shared_data_; + + /** + * Tree map with key being the name of the span and value being a unique ptr + * that stores the tracez span data for the given span name + * A tree map is preferred to a hash map because the the data is to be ordered + * in alphabetical order of span name. + * TODO : A possible memory concern if there are too many unique + * span names, one solution could be to implement a LRU cache that trims the + * DS based on frequency of usage of a span name. + */ + std::map<std::string, TracezData> aggregated_tracez_data_; + std::mutex mtx_; + + /** A boolean that is set to true in the constructor and false in the + * destructor to start and end execution of aggregate spans **/ + std::atomic<bool> execute_; + + /** Thread that executes aggregate spans at regurlar intervals during this + object's lifetime**/ + std::thread aggregate_spans_thread_; + + /** Condition variable that notifies the thread when object is about to be + destroyed **/ + std::condition_variable cv_; +}; + +} // namespace zpages +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/tracez_http_server.h b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/tracez_http_server.h new file mode 100644 index 000000000..eaa3fac8d --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/tracez_http_server.h @@ -0,0 +1,179 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include <algorithm> +#include <fstream> +#include <iostream> +#include <string> +#include <unordered_map> +#include <vector> + +#include "nlohmann/json.hpp" +#include "opentelemetry/ext/zpages/static/tracez_index.h" +#include "opentelemetry/ext/zpages/static/tracez_script.h" +#include "opentelemetry/ext/zpages/static/tracez_style.h" +#include "opentelemetry/ext/zpages/tracez_data_aggregator.h" +#include "opentelemetry/ext/zpages/zpages_http_server.h" + +#define HAVE_HTTP_DEBUG +#define HAVE_CONSOLE_LOG + +using json = nlohmann::json; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace zpages +{ + +class TracezHttpServer : public opentelemetry::ext::zpages::zPagesHttpServer +{ +public: + /** + * Construct the server by initializing the endpoint for querying TraceZ aggregation data and + * files, along with taking ownership of the aggregator whose data is used to send data to the + * frontend + * @param aggregator is the TraceZ Data Aggregator, which calculates aggregation info + * @param host is the host where the TraceZ webpages will be displayed, default being localhost + * @param port is the port where the TraceZ webpages will be displayed, default being 30000 + */ + TracezHttpServer(std::unique_ptr<opentelemetry::ext::zpages::TracezDataAggregator> &&aggregator, + const std::string &host = "localhost", + int port = 30000) + : opentelemetry::ext::zpages::zPagesHttpServer("/tracez", host, port), + data_aggregator_(std::move(aggregator)) + { + InitializeTracezEndpoint(*this); + }; + +private: + /** + * Set the HTTP server to use the "Serve" callback to send the appropriate data when queried + * @param server, which should be an instance of this object + */ + void InitializeTracezEndpoint(TracezHttpServer &server) { server[endpoint_] = Serve; } + + /** + * Updates the stored aggregation data (aggregations_) using the data aggregator + */ + void UpdateAggregations(); + + /** + * First updates the stored aggregations, then translates that data from a C++ map to + * a JSON object + * @returns JSON object of collected spans bucket counts by name + */ + json GetAggregations(); + + /** + * Using the stored aggregations, finds the span group with the right name and returns + * its running span data as a JSON, only grabbing the fields needed for the frontend + * @param name of the span group whose running data we want + * @returns JSON representing running span data with the passed in name + */ + json GetRunningSpansJSON(const std::string &name); + + /** + * Using the stored aggregations, finds the span group with the right name and returns + * its error span data as a JSON, only grabbing the fields needed for the frontend + * @param name of the span group whose running data we want + * @returns JSON representing eoor span data with the passed in name + */ + json GetErrorSpansJSON(const std::string &name); + + /** + * Using the stored aggregations, finds the span group with the right name and bucket index + * returning its latency span data as a JSON, only grabbing the fields needed for the frontend + * @param name of the span group whose latency data we want + * @param index of which latency bucket to grab from + * @returns JSON representing bucket span data with the passed in name and latency range + */ + json GetLatencySpansJSON(const std::string &name, int latency_range_index); + + /** + * Returns attributes, which have varied types, from a span data to convert into JSON + * @param sample current span data, whose attributes we want to extract + * @returns JSON representing attributes for a given threadsafe span data + */ + json GetAttributesJSON(const opentelemetry::ext::zpages::ThreadsafeSpanData &sample); + + /** + * Sets the response object with the TraceZ aggregation data based on the request endpoint + * @param req is the HTTP request, which we use to figure out the response to send + * @param resp is the HTTP response we want to send to the frontend, either webpage or TraceZ + * aggregation data + */ + HTTP_SERVER_NS::HttpRequestCallback Serve{ + [&](HTTP_SERVER_NS::HttpRequest const &req, HTTP_SERVER_NS::HttpResponse &resp) { + std::string query = GetQuery(req.uri); // tracez + + if (StartsWith(query, "get")) + { + resp.headers[HTTP_SERVER_NS::CONTENT_TYPE] = "application/json"; + query = GetAfterSlash(query); + if (StartsWith(query, "latency")) + { + auto queried_latency_name = GetAfterSlash(query); + auto queried_latency_index = std::stoi(GetBeforeSlash(queried_latency_name)); + auto queried_name = GetAfterSlash(queried_latency_name); + ReplaceHtmlChars(queried_name); + resp.body = GetLatencySpansJSON(queried_name, queried_latency_index).dump(); + } + else + { + auto queried_name = GetAfterSlash(query); + ReplaceHtmlChars(queried_name); + if (StartsWith(query, "aggregations")) + { + resp.body = GetAggregations().dump(); + } + else if (StartsWith(query, "running")) + { + resp.body = GetRunningSpansJSON(queried_name).dump(); + } + else if (StartsWith(query, "error")) + { + resp.body = GetErrorSpansJSON(queried_name).dump(); + } + else + { + resp.body = json::array().dump(); + } + } + } + else + { + if (StartsWith(query, "script.js")) + { + resp.headers[HTTP_SERVER_NS::CONTENT_TYPE] = "text/javascript"; + resp.body = tracez_script; + } + else if (StartsWith(query, "style.css")) + { + resp.headers[HTTP_SERVER_NS::CONTENT_TYPE] = "text/css"; + resp.body = tracez_style; + } + else if (query.empty() || query == "/tracez" || StartsWith(query, "index.html")) + { + resp.headers[HTTP_SERVER_NS::CONTENT_TYPE] = "text/html"; + resp.body = tracez_index; + } + else + { + resp.headers[HTTP_SERVER_NS::CONTENT_TYPE] = "text/plain"; + resp.body = "Invalid query: " + query; + } + } + + return 200; + }}; + + std::map<std::string, opentelemetry::ext::zpages::TracezData> aggregated_data_; + std::unique_ptr<opentelemetry::ext::zpages::TracezDataAggregator> data_aggregator_; +}; + +} // namespace zpages +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/tracez_processor.h b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/tracez_processor.h new file mode 100644 index 000000000..5e032c456 --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/tracez_processor.h @@ -0,0 +1,92 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include <chrono> +#include <memory> +#include <mutex> +#include <unordered_set> +#include <utility> +#include <vector> + +#include "opentelemetry/ext/zpages/threadsafe_span_data.h" +#include "opentelemetry/ext/zpages/tracez_shared_data.h" +#include "opentelemetry/sdk/trace/processor.h" +#include "opentelemetry/sdk/trace/recordable.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace zpages +{ +/* + * The span processor passes and stores running and completed recordables (casted as span_data) + * to be used by the TraceZ Data Aggregator. + */ +class TracezSpanProcessor : public opentelemetry::sdk::trace::SpanProcessor +{ +public: + /* + * Initialize a span processor. + */ + explicit TracezSpanProcessor(std::shared_ptr<TracezSharedData> shared_data) noexcept + : shared_data_(shared_data) + {} + + /* + * Create a span recordable, which is span_data + * @return a newly initialized recordable + */ + std::unique_ptr<opentelemetry::sdk::trace::Recordable> MakeRecordable() noexcept override + { + return std::unique_ptr<opentelemetry::sdk::trace::Recordable>(new ThreadsafeSpanData); + } + + /* + * OnStart is called when a span starts; the recordable is cast to span_data and added to + * running_spans. + * @param span a recordable for a span that was just started + */ + void OnStart(opentelemetry::sdk::trace::Recordable &span, + const opentelemetry::trace::SpanContext &parent_context) noexcept override; + + /* + * OnEnd is called when a span ends; that span_data is moved from running_spans to + * completed_spans + * @param span a recordable for a span that was ended + */ + void OnEnd(std::unique_ptr<opentelemetry::sdk::trace::Recordable> &&span) noexcept override; + + /* + * For now, does nothing. In the future, it + * may send all ended spans that have not yet been sent to the aggregator. + * @param timeout an optional timeout. Currently, timeout does nothing. + * @return return the status of the operation. + */ + bool ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override + { + return true; + } + + /* + * Shut down the processor and do any cleanup required, which is none. + * After the call to Shutdown, subsequent calls to OnStart, OnEnd, ForceFlush + * or Shutdown will return immediately without doing anything. + * @param timeout an optional timeout, the default timeout of 0 means that no + * timeout is applied. Currently, timeout does nothing. + * @return return the status of the operation. + */ + bool Shutdown( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override + { + return true; + } + +private: + std::shared_ptr<TracezSharedData> shared_data_; +}; +} // namespace zpages +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/tracez_shared_data.h b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/tracez_shared_data.h new file mode 100644 index 000000000..571661550 --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/tracez_shared_data.h @@ -0,0 +1,67 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include <chrono> +#include <memory> +#include <mutex> +#include <unordered_set> +#include <utility> +#include <vector> + +#include "opentelemetry/ext/zpages/threadsafe_span_data.h" +#include "opentelemetry/sdk/trace/processor.h" +#include "opentelemetry/sdk/trace/recordable.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace zpages +{ +/* + * The span processor passes and stores running and completed recordables (casted as span_data) + * to be used by the TraceZ Data Aggregator. + */ +class TracezSharedData +{ +public: + struct CollectedSpans + { + std::unordered_set<ThreadsafeSpanData *> running; + std::vector<std::unique_ptr<ThreadsafeSpanData>> completed; + }; + + /* + * Initialize a shared data storage. + */ + explicit TracezSharedData() noexcept {} + + /* + * Called when a span has been started. + */ + void OnStart(ThreadsafeSpanData *span) noexcept; + + /* + * Called when a span has ended. + */ + void OnEnd(std::unique_ptr<ThreadsafeSpanData> &&span) noexcept; + + /* + * Returns a snapshot of all spans stored. This snapshot has a copy of the + * stored running_spans and gives ownership of completed spans to the caller. + * Stored completed_spans are cleared from the processor. Currently, + * copy-on-write is utilized where possible to minimize contention, but locks + * may be added in the future. + * @return snapshot of all currently running spans and newly completed spans + * (spans never sent while complete) at the time that the function is called + */ + CollectedSpans GetSpanSnapshot() noexcept; + +private: + mutable std::mutex mtx_; + CollectedSpans spans_; +}; +} // namespace zpages +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/zpages.h b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/zpages.h new file mode 100644 index 000000000..f0242d92d --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/zpages.h @@ -0,0 +1,96 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include <chrono> +#include <memory> + +#include "opentelemetry/ext/zpages/tracez_data_aggregator.h" +#include "opentelemetry/ext/zpages/tracez_http_server.h" +#include "opentelemetry/ext/zpages/tracez_processor.h" +#include "opentelemetry/ext/zpages/tracez_shared_data.h" + +#include "opentelemetry/nostd/shared_ptr.h" +#include "opentelemetry/sdk/trace/tracer_provider.h" +#include "opentelemetry/trace/provider.h" + +using opentelemetry::ext::zpages::TracezDataAggregator; +using opentelemetry::ext::zpages::TracezHttpServer; +using opentelemetry::ext::zpages::TracezSharedData; +using opentelemetry::ext::zpages::TracezSpanProcessor; +using std::chrono::microseconds; + +/** + * Wrapper for zPages that initializes all the components required for zPages, + * and starts the HTTP server in the constructor and ends it in the destructor. + * The constructor and destructor for this object is private to prevent + * creation other than by calling the static function Initialize(). This follows the + * meyers singleton pattern and only a single instance of the class is allowed. + */ +class ZPages +{ +public: + /** + * This function is called if the user wishes to include zPages in their + * application. It creates a static instance of this class and replaces the + * global TracerProvider with one that delegates spans to tracez. + */ + static void Initialize() { Instance().ReplaceGlobalProvider(); } + + /** + * Returns the singletone instnace of ZPages, useful for attaching z-pages span processors to + * non-global providers. + * + * Note: This will instantiate the Tracez instance and webserver if it hasn't already been + * instantiated. + */ + static ZPages &Instance() + { + static ZPages instance; + return instance; + } + + /** Replaces the global tracer provider with an instance that exports to tracez. */ + void ReplaceGlobalProvider() + { + // GCC 4.8 can't infer the type coercion. + std::unique_ptr<opentelemetry::sdk::trace::SpanProcessor> processor( + MakeSpanProcessor().release()); + auto tracez_provider_ = opentelemetry::nostd::shared_ptr<opentelemetry::trace::TracerProvider>( + new opentelemetry::sdk::trace::TracerProvider(std::move(processor))); + opentelemetry::trace::Provider::SetTracerProvider(tracez_provider_); + } + + /** Retruns a new span processor that will output to z-pages. */ + std::unique_ptr<TracezSpanProcessor> MakeSpanProcessor() + { + return std::unique_ptr<TracezSpanProcessor>(new TracezSpanProcessor(tracez_shared_)); + } + +private: + /** + * Constructor is responsible for initializing the tracer, tracez processor, + * tracez data aggregator and the tracez server. The server is also started in + * constructor. + */ + ZPages() + { + // Construct shared data nd start tracez webserver. + tracez_shared_ = std::make_shared<TracezSharedData>(); + auto tracez_aggregator = + std::unique_ptr<TracezDataAggregator>(new TracezDataAggregator(tracez_shared_)); + tracez_server_ = + std::unique_ptr<TracezHttpServer>(new TracezHttpServer(std::move(tracez_aggregator))); + tracez_server_->start(); + } + + ~ZPages() + { + // shut down the server when the object goes out of scope(at the end of the + // program) + tracez_server_->stop(); + } + std::shared_ptr<TracezSharedData> tracez_shared_; + std::unique_ptr<TracezHttpServer> tracez_server_; +}; diff --git a/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/zpages_http_server.h b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/zpages_http_server.h new file mode 100644 index 000000000..abf6c1a5b --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/zpages/zpages_http_server.h @@ -0,0 +1,120 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include <algorithm> +#include <fstream> +#include <iostream> +#include <string> +#include <unordered_map> +#include <vector> + +#include "opentelemetry/ext/http/server/http_server.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace zpages +{ + +class zPagesHttpServer : public HTTP_SERVER_NS::HttpServer +{ +protected: + /** + * Construct the server by initializing the endpoint for serving static files, which show up on + * the web if the user is on the given host:port. Static files can be seen relative to the folder + * where the executable was ran. + * @param host is the host where the TraceZ webpages will be displayed + * @param port is the port where the TraceZ webpages will be displayed + * @param endpoint is where this specific zPage will server files + */ + zPagesHttpServer(const std::string &endpoint, + const std::string &host = "127.0.0.1", + int port = 52620) + : HttpServer(), endpoint_(endpoint) + { + std::ostringstream os; + os << host << ":" << port; + setServerName(os.str()); + addListeningPort(port); + }; + + /** + * Helper function that returns query information by isolating it from the base endpoint + * @param uri is the full query + */ + std::string GetQuery(const std::string &uri) + { + if (endpoint_.length() + 1 > uri.length()) + return uri; + return uri.substr(endpoint_.length() + 1); + } + + /** + * Helper that returns whether a str starts with pre + * @param str is the string we're checking + * @param pre is the prefix we're checking against + */ + bool StartsWith(const std::string &str, const std::string &pre) { return str.rfind(pre, 0) == 0; } + + /** + * Helper that returns the remaining string after the leftmost backslash + * @param str is the string we're extracting from + */ + std::string GetAfterSlash(const std::string &str) + { + std::size_t backslash = str.find("/"); + if (backslash == std::string::npos || backslash == str.length()) + return ""; + return str.substr(backslash + 1); + } + + /** + * Helper that returns the remaining string after the leftmost backslash + * @param str is the string we're extracting from + */ + std::string GetBeforeSlash(const std::string &str) + { + std::size_t backslash = str.find("/"); + if (backslash == std::string::npos || backslash == str.length()) + return str; + return str.substr(0, backslash); + } + + /** + * Helper that replaces all occurrences a string within a string + * @param str string to modify + * @param search substring to remove from str + * @param replacement string to replace search with whenever search is found + */ + void ReplaceAll(std::string &str, const std::string &search, const std::string &replacement) + { + size_t idx = str.find(search, 0); + while (idx != std::string::npos) + { + str.replace(idx, search.length(), replacement); + idx = str.find(search, idx); + } + } + + /** + * Helper that replaces all special HTML/address base encoded characters + * into what they're originally supposed to be + * @param str string to conduct replacements for + */ + void ReplaceHtmlChars(std::string &str) + { + for (const auto &replace_pair : replace_map_) + { + ReplaceAll(str, replace_pair.first, replace_pair.second); + } + } + + const std::string endpoint_; + const std::unordered_map<std::string, std::string> replace_map_ = {{"%20", " "}}; +}; + +} // namespace zpages +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/src/jaegertracing/opentelemetry-cpp/ext/src/CMakeLists.txt b/src/jaegertracing/opentelemetry-cpp/ext/src/CMakeLists.txt new file mode 100644 index 000000000..a976882ff --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/src/CMakeLists.txt @@ -0,0 +1,8 @@ +if(WITH_ZPAGES) + add_subdirectory(zpages) +endif() + +add_subdirectory(http/client/curl) +if(BUILD_TESTING) + add_subdirectory(http/client/nosend) +endif() diff --git a/src/jaegertracing/opentelemetry-cpp/ext/src/http/client/curl/BUILD b/src/jaegertracing/opentelemetry-cpp/ext/src/http/client/curl/BUILD new file mode 100644 index 000000000..33ab814b9 --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/src/http/client/curl/BUILD @@ -0,0 +1,29 @@ +package(default_visibility = ["//visibility:public"]) + +cc_library( + name = "http_client_curl", + srcs = [ + "http_client_curl.cc", + "http_client_factory_curl.cc", + ], + copts = [ + "-DWITH_CURL", + ], + include_prefix = "src/http/client/curl", + linkopts = select({ + "//bazel:windows": [ + "-DEFAULTLIB:advapi32.lib", + "-DEFAULTLIB:crypt32.lib", + "-DEFAULTLIB:Normaliz.lib", + "-DEFAULTLIB:Ws2_32.lib", + ], + "//conditions:default": [], + }), + deps = [ + "//api", + "//ext:headers", + "//sdk:headers", + "//sdk/src/common:random", + "@curl", + ], +) diff --git a/src/jaegertracing/opentelemetry-cpp/ext/src/http/client/curl/CMakeLists.txt b/src/jaegertracing/opentelemetry-cpp/ext/src/http/client/curl/CMakeLists.txt new file mode 100644 index 000000000..78a81cfe3 --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/src/http/client/curl/CMakeLists.txt @@ -0,0 +1,25 @@ +find_package(CURL) +if(CURL_FOUND) + add_library(opentelemetry_http_client_curl http_client_factory_curl.cc + http_client_curl.cc) + + set_target_properties(opentelemetry_http_client_curl + PROPERTIES EXPORT_NAME http_client_curl) + + if(TARGET CURL::libcurl) + target_link_libraries(opentelemetry_http_client_curl + PUBLIC opentelemetry_ext CURL::libcurl) + else() + target_include_directories(opentelemetry_http_client_curl + INTERFACE "${CURL_INCLUDE_DIRS}") + target_link_libraries(opentelemetry_http_client_curl + PUBLIC opentelemetry_ext ${CURL_LIBRARIES}) + endif() + + install( + TARGETS opentelemetry_http_client_curl + EXPORT "${PROJECT_NAME}-target" + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} + LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} + ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}) +endif() diff --git a/src/jaegertracing/opentelemetry-cpp/ext/src/http/client/curl/http_client_curl.cc b/src/jaegertracing/opentelemetry-cpp/ext/src/http/client/curl/http_client_curl.cc new file mode 100644 index 000000000..74ad86ea4 --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/src/http/client/curl/http_client_curl.cc @@ -0,0 +1,18 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "opentelemetry/ext/http/client/curl/http_client_curl.h" + +bool opentelemetry::ext::http::client::curl::Session::CancelSession() noexcept +{ + curl_operation_->Abort(); + http_client_.CleanupSession(session_id_); + return true; +} + +bool opentelemetry::ext::http::client::curl::Session::FinishSession() noexcept +{ + curl_operation_->Finish(); + http_client_.CleanupSession(session_id_); + return true; +} diff --git a/src/jaegertracing/opentelemetry-cpp/ext/src/http/client/curl/http_client_factory_curl.cc b/src/jaegertracing/opentelemetry-cpp/ext/src/http/client/curl/http_client_factory_curl.cc new file mode 100644 index 000000000..f6266c293 --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/src/http/client/curl/http_client_factory_curl.cc @@ -0,0 +1,18 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "opentelemetry/ext/http/client/curl/http_client_curl.h" +#include "opentelemetry/ext/http/client/http_client.h" +#include "opentelemetry/ext/http/client/http_client_factory.h" + +namespace http_client = opentelemetry::ext::http::client; + +std::shared_ptr<http_client::HttpClient> http_client::HttpClientFactory::Create() +{ + return std::make_shared<http_client::curl::HttpClient>(); +} + +std::shared_ptr<http_client::HttpClientSync> http_client::HttpClientFactory::CreateSync() +{ + return std::make_shared<http_client::curl::HttpClientSync>(); +} diff --git a/src/jaegertracing/opentelemetry-cpp/ext/src/http/client/nosend/BUILD b/src/jaegertracing/opentelemetry-cpp/ext/src/http/client/nosend/BUILD new file mode 100644 index 000000000..b27106a16 --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/src/http/client/nosend/BUILD @@ -0,0 +1,19 @@ +package(default_visibility = ["//visibility:public"]) + +cc_library( + name = "http_client_nosend", + srcs = [ + "http_client_factory_nosend.cc", + "http_client_nosend.cc", + ], + include_prefix = "src/http/client/nosend", + tags = [ + "test", + ], + deps = [ + "//api", + "//ext:headers", + "//sdk:headers", + "@com_google_googletest//:gtest_main", + ], +) diff --git a/src/jaegertracing/opentelemetry-cpp/ext/src/http/client/nosend/CMakeLists.txt b/src/jaegertracing/opentelemetry-cpp/ext/src/http/client/nosend/CMakeLists.txt new file mode 100644 index 000000000..497daeb34 --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/src/http/client/nosend/CMakeLists.txt @@ -0,0 +1,36 @@ +if(${BUILD_TESTING}) + add_library(http_client_nosend http_client_factory_nosend.cc + http_client_nosend.cc) + + set_target_properties(http_client_nosend PROPERTIES EXPORT_NAME + http_client_nosend) + + if(MSVC) + # Explicitly specify that we consume GTest from shared library. The rest of + # code logic below determines whether we link Release or Debug flavor of the + # library. These flavors have different prefix on Windows, gmock and gmockd + # respectively. + add_definitions(-DGTEST_LINKED_AS_SHARED_LIBRARY=1) + if(GMOCK_LIB) + # unset GMOCK_LIB to force find_library to redo the lookup, as the cached + # entry could cause linking to incorrect flavor of gmock and leading to + # runtime error. + unset(GMOCK_LIB CACHE) + endif() + endif() + if(MSVC AND CMAKE_BUILD_TYPE STREQUAL "Debug") + find_library(GMOCK_LIB gmockd PATH_SUFFIXES lib) + else() + find_library(GMOCK_LIB gmock PATH_SUFFIXES lib) + endif() + + target_link_libraries(http_client_nosend ${GTEST_BOTH_LIBRARIES} ${GMOCK_LIB} + opentelemetry_ext) + + install( + TARGETS http_client_nosend + EXPORT "${PROJECT_NAME}-target" + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} + LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} + ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}) +endif() diff --git a/src/jaegertracing/opentelemetry-cpp/ext/src/http/client/nosend/http_client_factory_nosend.cc b/src/jaegertracing/opentelemetry-cpp/ext/src/http/client/nosend/http_client_factory_nosend.cc new file mode 100644 index 000000000..841dd2d8e --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/src/http/client/nosend/http_client_factory_nosend.cc @@ -0,0 +1,13 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "opentelemetry/ext/http/client/http_client.h" +#include "opentelemetry/ext/http/client/http_client_factory.h" +#include "opentelemetry/ext/http/client/nosend/http_client_nosend.h" + +namespace http_client = opentelemetry::ext::http::client; + +std::shared_ptr<http_client::HttpClient> http_client::HttpClientFactory::CreateNoSend() +{ + return std::make_shared<http_client::nosend::HttpClient>(); +} diff --git a/src/jaegertracing/opentelemetry-cpp/ext/src/http/client/nosend/http_client_nosend.cc b/src/jaegertracing/opentelemetry-cpp/ext/src/http/client/nosend/http_client_nosend.cc new file mode 100644 index 000000000..c2b1c6acf --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/src/http/client/nosend/http_client_nosend.cc @@ -0,0 +1,71 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifdef ENABLE_TEST +# include "opentelemetry/ext/http/client/nosend/http_client_nosend.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace http +{ +namespace client +{ +namespace nosend +{ +void Request::ReplaceHeader(nostd::string_view name, nostd::string_view value) noexcept +{ + // erase matching headers + auto range = headers_.equal_range(static_cast<std::string>(name)); + headers_.erase(range.first, range.second); + AddHeader(name, value); +} + +bool Response::ForEachHeader( + nostd::function_ref<bool(nostd::string_view name, nostd::string_view value)> callable) + const noexcept +{ + for (const auto &header : headers_) + { + if (!callable(header.first, header.second)) + { + return false; + } + } + return true; +} + +bool Response::ForEachHeader( + const nostd::string_view &name, + nostd::function_ref<bool(nostd::string_view name, nostd::string_view value)> callable) + const noexcept +{ + auto range = headers_.equal_range(static_cast<std::string>(name)); + for (auto it = range.first; it != range.second; ++it) + { + if (!callable(it->first, it->second)) + { + return false; + } + } + return true; +} + +bool Session::CancelSession() noexcept +{ + http_client_.CleanupSession(session_id_); + return true; +} + +bool Session::FinishSession() noexcept +{ + http_client_.CleanupSession(session_id_); + return true; +} + +} // namespace nosend +} // namespace client +} // namespace http +} // namespace ext +OPENTELEMETRY_END_NAMESPACE +#endif diff --git a/src/jaegertracing/opentelemetry-cpp/ext/src/zpages/BUILD b/src/jaegertracing/opentelemetry-cpp/ext/src/zpages/BUILD new file mode 100644 index 000000000..ab9f3507b --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/src/zpages/BUILD @@ -0,0 +1,28 @@ +# Copyright 2020, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +package(default_visibility = ["//visibility:public"]) + +cc_library( + name = "zpages", + srcs = glob(["**/*.cc"]), + hdrs = glob(["**/*.h"]), + include_prefix = "ext/zpages", + deps = [ + "//api", + "//ext:headers", + "//sdk:headers", + "@github_nlohmann_json//:json", + ], +) diff --git a/src/jaegertracing/opentelemetry-cpp/ext/src/zpages/CMakeLists.txt b/src/jaegertracing/opentelemetry-cpp/ext/src/zpages/CMakeLists.txt new file mode 100644 index 000000000..e3fd480df --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/src/zpages/CMakeLists.txt @@ -0,0 +1,21 @@ +add_library( + opentelemetry_zpages + tracez_processor.cc + tracez_shared_data.cc + tracez_data_aggregator.cc + ../../include/opentelemetry/ext/zpages/tracez_shared_data.h + ../../include/opentelemetry/ext/zpages/tracez_processor.h + ../../include/opentelemetry/ext/zpages/tracez_data_aggregator.h + ../../include/opentelemetry/ext/zpages/tracez_http_server.h) + +set_target_properties(opentelemetry_zpages PROPERTIES EXPORT_NAME zpages) + +target_link_libraries(opentelemetry_zpages PUBLIC opentelemetry_ext + opentelemetry_trace) + +install( + TARGETS opentelemetry_zpages + EXPORT "${PROJECT_NAME}-target" + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} + LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} + ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}) diff --git a/src/jaegertracing/opentelemetry-cpp/ext/src/zpages/README.md b/src/jaegertracing/opentelemetry-cpp/ext/src/zpages/README.md new file mode 100644 index 000000000..f182fd43f --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/src/zpages/README.md @@ -0,0 +1,53 @@ +# zPages + +## Overview + +zPages are a quick and light way to view tracing and metrics information on +standard OpenTelemetry C++ instrumented applications. It requires no external +dependencies or backend setup. See more information in the OTel zPages +experimental +[spec](https://github.com/open-telemetry/opentelemetry-specification/blob/5b86d4b6c42e6d1e47d9155ac1e2e27f0f0b7769/experimental/trace/zpages.md). +OTel C++ currently only offers Tracez; future zPages to potentially add include +TraceConfigz, RPCz, and Statsz. Events and links need to be added to Tracez. + +## Usage + +> TODO: Add CMake instructions + +1: Add the following 2 lines of code + +* `#include opentelemetry/ext/zpages/zpages.h // include zPages` +* `zpages::Initialize; // start up zPages in your app, before any tracing/span + code` + +2: Build and run your application normally + +For example, you can do this for the zPages example while at the root +`opentelemetry-cpp` directory with: + +```sh +bazel build //examples/zpages:zpages_example +bazel-bin/examples/zpages/zpages_example +``` + +If you look at the [zPages example's source +code](https://github.com/open-telemetry/opentelemetry-cpp/blob/main/examples/zpages/zpages_example.cc), +it demonstrates adding zPages, manual application instrumentation (which sends +data to zPages for viewing), and simulated use cases for zPages. + +3: View zPages at `http://localhost:3000/tracez` + +## More Information + +* OTel zPages experimental + [spec](https://github.com/open-telemetry/opentelemetry-specification/blob/5b86d4b6c42e6d1e47d9155ac1e2e27f0f0b7769/experimental/trace/zpages.md) +* [zPages General Direction Spec + (OTEP)](https://github.com/open-telemetry/oteps/blob/main/text/0110-z-pages.md) +* OTel C++ Design Docs + * [Tracez Span + Processor](https://docs.google.com/document/d/1kO4iZARYyr-EGBlY2VNM3ELU3iw6ZrC58Omup_YT-fU/edit#) + * [Tracez Data + Aggregator](https://docs.google.com/document/d/1ziKFgvhXFfRXZjOlAHQRR-TzcNcTXzg1p2I9oPCEIoU/edit?ts=5ef0d177#heading=h.5irk4csrpu0y) + * [Tracez Http + Server](https://docs.google.com/document/d/1U1V8QZ5LtGl4Mich-aJ6KZGLHrMIE8pWyspmzvnIefI/edit#) + * includes reference pictures of the zPages/Tracez UI diff --git a/src/jaegertracing/opentelemetry-cpp/ext/src/zpages/tracez_data_aggregator.cc b/src/jaegertracing/opentelemetry-cpp/ext/src/zpages/tracez_data_aggregator.cc new file mode 100644 index 000000000..993fc182c --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/src/zpages/tracez_data_aggregator.cc @@ -0,0 +1,189 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "opentelemetry/ext/zpages/tracez_data_aggregator.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace zpages +{ + +TracezDataAggregator::TracezDataAggregator(std::shared_ptr<TracezSharedData> shared_data, + milliseconds update_interval) +{ + tracez_shared_data_ = shared_data; + + // Start a thread that calls AggregateSpans periodically or till notified. + execute_.store(true, std::memory_order_release); + aggregate_spans_thread_ = std::thread([this, update_interval]() { + while (execute_.load(std::memory_order_acquire)) + { + std::unique_lock<std::mutex> lock(mtx_); + AggregateSpans(); + cv_.wait_for(lock, update_interval); + } + }); +} + +TracezDataAggregator::~TracezDataAggregator() +{ + // Notify and join the thread so object can be destroyed without wait for wake + if (execute_.load(std::memory_order_acquire)) + { + execute_.store(false, std::memory_order_release); + cv_.notify_one(); + aggregate_spans_thread_.join(); + } +} + +std::map<std::string, TracezData> TracezDataAggregator::GetAggregatedTracezData() +{ + std::unique_lock<std::mutex> lock(mtx_); + return aggregated_tracez_data_; +} + +LatencyBoundary TracezDataAggregator::FindLatencyBoundary( + std::unique_ptr<ThreadsafeSpanData> &span_data) +{ + const auto &span_data_duration = span_data->GetDuration(); + for (unsigned int boundary = 0; boundary < kLatencyBoundaries.size() - 1; boundary++) + { + if (span_data_duration < kLatencyBoundaries[boundary + 1]) + return (LatencyBoundary)boundary; + } + return LatencyBoundary::k100SecondToMax; +} + +void TracezDataAggregator::InsertIntoSampleSpanList(std::list<ThreadsafeSpanData> &sample_spans, + ThreadsafeSpanData &span_data) +{ + /** + * Check to see if the sample span list size exceeds the set limit, if it does + * free up memory and remove the earliest inserted sample before appending + */ + if (sample_spans.size() == kMaxNumberOfSampleSpans) + { + sample_spans.pop_front(); + } + sample_spans.push_back(ThreadsafeSpanData(span_data)); +} + +void TracezDataAggregator::ClearRunningSpanData() +{ + auto it = aggregated_tracez_data_.begin(); + while (it != aggregated_tracez_data_.end()) + { + it->second.running_span_count = 0; + it->second.sample_running_spans.clear(); + + // Check if any data exists in the struct, if not delete entry + bool is_completed_span_count_zero = true; + for (const auto &completed_span_count : it->second.completed_span_count_per_latency_bucket) + { + if (completed_span_count > 0) + is_completed_span_count_zero = false; + } + + if (it->second.error_span_count == 0 && is_completed_span_count_zero) + { + it = aggregated_tracez_data_.erase(it); + } + else + { + ++it; + } + } +} + +void TracezDataAggregator::AggregateStatusOKSpan(std::unique_ptr<ThreadsafeSpanData> &ok_span) +{ + // Find and update boundary of aggregated data that span belongs + auto boundary_name = FindLatencyBoundary(ok_span); + + // Get the data for name in aggrgation and update count and sample spans + auto &tracez_data = aggregated_tracez_data_.at(ok_span->GetName().data()); + InsertIntoSampleSpanList(tracez_data.sample_latency_spans[boundary_name], *ok_span.get()); + tracez_data.completed_span_count_per_latency_bucket[boundary_name]++; +} + +void TracezDataAggregator::AggregateStatusErrorSpan(std::unique_ptr<ThreadsafeSpanData> &error_span) +{ + // Get data for name in aggregation and update count and sample spans + auto &tracez_data = aggregated_tracez_data_.at(error_span->GetName().data()); + InsertIntoSampleSpanList(tracez_data.sample_error_spans, *error_span.get()); + tracez_data.error_span_count++; +} + +void TracezDataAggregator::AggregateCompletedSpans( + std::vector<std::unique_ptr<ThreadsafeSpanData>> &completed_spans) +{ + for (auto &completed_span : completed_spans) + { + std::string span_name = completed_span->GetName().data(); + + if (aggregated_tracez_data_.find(span_name) == aggregated_tracez_data_.end()) + { + aggregated_tracez_data_[span_name] = TracezData(); + } + + if (completed_span->GetStatus() == trace::StatusCode::kOk || + completed_span->GetStatus() == trace::StatusCode::kUnset) + AggregateStatusOKSpan(completed_span); + else + AggregateStatusErrorSpan(completed_span); + } +} + +void TracezDataAggregator::AggregateRunningSpans( + std::unordered_set<ThreadsafeSpanData *> &running_spans) +{ + for (auto &running_span : running_spans) + { + std::string span_name = running_span->GetName().data(); + + if (aggregated_tracez_data_.find(span_name) == aggregated_tracez_data_.end()) + { + aggregated_tracez_data_[span_name] = TracezData(); + } + + auto &tracez_data = aggregated_tracez_data_[span_name]; + InsertIntoSampleSpanList(aggregated_tracez_data_[span_name].sample_running_spans, + *running_span); + tracez_data.running_span_count++; + } +} + +void TracezDataAggregator::AggregateSpans() +{ + auto span_snapshot = tracez_shared_data_->GetSpanSnapshot(); + /** + * TODO: At this time in the project, there is no way of uniquely identifying + * a span(their id's are not being set yet). + * If in the future this is added then clearing of running spans will not bee + * required. + * For now this step of clearing and recalculating running span data is + * required because it is unknown which spans have moved from running to + * completed since the previous call. Additionally, the span name can change + * for spans while they are running. + * + * A better approach for identifying moved spans would have been to map + * span id to span name, find these span names in the aggregated data and then + * delete only this information for running span data as opposed to clearing + * all running span data. However this cannot be done at this time because, + * unique identifiers to span data have not been added yet. + * + * A few things to note: + * i) Duplicate running spans may be received from the span processor in one + * multiple successive calls to this function. + * ii) Only the newly completed spans are received by this function. + * Completed spans will not be seen more than once + **/ + ClearRunningSpanData(); + AggregateCompletedSpans(span_snapshot.completed); + AggregateRunningSpans(span_snapshot.running); +} + +} // namespace zpages +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/src/jaegertracing/opentelemetry-cpp/ext/src/zpages/tracez_http_server.cc b/src/jaegertracing/opentelemetry-cpp/ext/src/zpages/tracez_http_server.cc new file mode 100644 index 000000000..a5d007278 --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/src/zpages/tracez_http_server.cc @@ -0,0 +1,163 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "opentelemetry/ext/zpages/tracez_http_server.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace zpages +{ +namespace nostd = opentelemetry::nostd; + +json TracezHttpServer::GetAggregations() +{ + aggregated_data_ = data_aggregator_->GetAggregatedTracezData(); + auto counts_json = json::array(); + + for (const auto &aggregation_group : aggregated_data_) + { + const auto &buckets = aggregation_group.second; + const auto &complete_ok_counts = buckets.completed_span_count_per_latency_bucket; + + auto latency_counts = json::array(); + for (unsigned int boundary = 0; boundary < kLatencyBoundaries.size(); boundary++) + { + latency_counts.push_back(complete_ok_counts[boundary]); + } + + counts_json.push_back({{"name", aggregation_group.first}, + {"error", buckets.error_span_count}, + {"running", buckets.running_span_count}, + {"latency", latency_counts}}); + } + return counts_json; +} + +json TracezHttpServer::GetRunningSpansJSON(const std::string &name) +{ + auto running_json = json::array(); + + auto grouping = aggregated_data_.find(name); + + if (grouping != aggregated_data_.end()) + { + const auto &running_samples = grouping->second.sample_running_spans; + for (const auto &sample : running_samples) + { + running_json.push_back({ + {"spanid", std::string(reinterpret_cast<const char *>(sample.GetSpanId().Id().data()))}, + {"parentid", + std::string(reinterpret_cast<const char *>(sample.GetParentSpanId().Id().data()))}, + {"traceid", std::string(reinterpret_cast<const char *>(sample.GetTraceId().Id().data()))}, + {"start", sample.GetStartTime().time_since_epoch().count()}, + {"attributes", GetAttributesJSON(sample)}, + }); + } + } + return running_json; +} + +json TracezHttpServer::GetErrorSpansJSON(const std::string &name) +{ + auto error_json = json::array(); + + auto grouping = aggregated_data_.find(name); + + if (grouping != aggregated_data_.end()) + { + const auto &error_samples = grouping->second.sample_error_spans; + for (const auto &sample : error_samples) + { + error_json.push_back({ + {"spanid", std::string(reinterpret_cast<const char *>(sample.GetSpanId().Id().data()))}, + {"parentid", + std::string(reinterpret_cast<const char *>(sample.GetParentSpanId().Id().data()))}, + {"traceid", std::string(reinterpret_cast<const char *>(sample.GetTraceId().Id().data()))}, + {"start", sample.GetStartTime().time_since_epoch().count()}, + {"status", (unsigned short)sample.GetStatus()}, + {"attributes", GetAttributesJSON(sample)}, + }); + } + } + return error_json; +} + +json TracezHttpServer::GetLatencySpansJSON(const std::string &name, int latency_range_index) +{ + auto latency_json = json::array(); + + auto grouping = aggregated_data_.find(name); + + if (grouping != aggregated_data_.end()) + { + const auto &latency_samples = grouping->second.sample_latency_spans[latency_range_index]; + for (const auto &sample : latency_samples) + { + latency_json.push_back({ + {"spanid", std::string(reinterpret_cast<const char *>(sample.GetSpanId().Id().data()))}, + {"parentid", + std::string(reinterpret_cast<const char *>(sample.GetParentSpanId().Id().data()))}, + {"traceid", std::string(reinterpret_cast<const char *>(sample.GetTraceId().Id().data()))}, + {"start", sample.GetStartTime().time_since_epoch().count()}, + {"duration", sample.GetDuration().count()}, + {"attributes", GetAttributesJSON(sample)}, + }); + } + } + return latency_json; +} + +json TracezHttpServer::GetAttributesJSON( + const opentelemetry::ext::zpages::ThreadsafeSpanData &sample) +{ + auto attributes_json = json::object(); + for (const auto &sample_attribute : sample.GetAttributes()) + { + auto &key = sample_attribute.first; + auto &val = sample_attribute.second; // OwnedAttributeValue + + /* Convert variant types to into their nonvariant form. This is done this way because + the frontend and JSON doesn't care about type, and variant's get function only allows + const integers or literals */ + + switch (val.index()) + { + case 0: + attributes_json[key] = nostd::get<0>(val); + break; + case 1: + attributes_json[key] = nostd::get<1>(val); + break; + case 2: + attributes_json[key] = nostd::get<2>(val); + break; + case 3: + attributes_json[key] = nostd::get<3>(val); + break; + case 4: + attributes_json[key] = nostd::get<4>(val); + break; + case 5: + attributes_json[key] = nostd::get<5>(val); + break; + case 6: + attributes_json[key] = nostd::get<6>(val); + break; + case 7: + attributes_json[key] = nostd::get<7>(val); + break; + case 8: + attributes_json[key] = nostd::get<8>(val); + break; + case 9: + attributes_json[key] = nostd::get<9>(val); + break; + } + } + return attributes_json; +} + +} // namespace zpages +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/src/jaegertracing/opentelemetry-cpp/ext/src/zpages/tracez_processor.cc b/src/jaegertracing/opentelemetry-cpp/ext/src/zpages/tracez_processor.cc new file mode 100644 index 000000000..1e9115eb9 --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/src/zpages/tracez_processor.cc @@ -0,0 +1,27 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "opentelemetry/ext/zpages/tracez_processor.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace zpages +{ +namespace trace_sdk = opentelemetry::sdk::trace; + +void TracezSpanProcessor::OnStart(trace_sdk::Recordable &span, + const opentelemetry::trace::SpanContext &parent_context) noexcept +{ + shared_data_->OnStart(static_cast<ThreadsafeSpanData *>(&span)); +} + +void TracezSpanProcessor::OnEnd(std::unique_ptr<trace_sdk::Recordable> &&span) noexcept +{ + shared_data_->OnEnd( + std::unique_ptr<ThreadsafeSpanData>(static_cast<ThreadsafeSpanData *>(span.release()))); +} + +} // namespace zpages +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/src/jaegertracing/opentelemetry-cpp/ext/src/zpages/tracez_shared_data.cc b/src/jaegertracing/opentelemetry-cpp/ext/src/zpages/tracez_shared_data.cc new file mode 100644 index 000000000..4f4e4f5e9 --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/src/zpages/tracez_shared_data.cc @@ -0,0 +1,41 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "opentelemetry/ext/zpages/tracez_shared_data.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace zpages +{ + +void TracezSharedData::OnStart(ThreadsafeSpanData *span) noexcept +{ + std::lock_guard<std::mutex> lock(mtx_); + spans_.running.insert(span); +} + +void TracezSharedData::OnEnd(std::unique_ptr<ThreadsafeSpanData> &&span) noexcept +{ + std::lock_guard<std::mutex> lock(mtx_); + auto span_it = spans_.running.find(span.get()); + if (span_it != spans_.running.end()) + { + spans_.running.erase(span_it); + spans_.completed.push_back(std::unique_ptr<ThreadsafeSpanData>(span.release())); + } +} + +TracezSharedData::CollectedSpans TracezSharedData::GetSpanSnapshot() noexcept +{ + CollectedSpans snapshot; + std::lock_guard<std::mutex> lock(mtx_); + snapshot.running = spans_.running; + snapshot.completed = std::move(spans_.completed); + spans_.completed.clear(); + return snapshot; +} + +} // namespace zpages +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/src/jaegertracing/opentelemetry-cpp/ext/test/CMakeLists.txt b/src/jaegertracing/opentelemetry-cpp/ext/test/CMakeLists.txt new file mode 100644 index 000000000..13315b6cf --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/test/CMakeLists.txt @@ -0,0 +1,7 @@ +if(WITH_ZPAGES) + add_subdirectory(zpages) +endif() +add_subdirectory(http) +if(BUILD_W3CTRACECONTEXT_TEST) + add_subdirectory(w3c_tracecontext_test) +endif() diff --git a/src/jaegertracing/opentelemetry-cpp/ext/test/http/BUILD b/src/jaegertracing/opentelemetry-cpp/ext/test/http/BUILD new file mode 100644 index 000000000..d1818ca0e --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/test/http/BUILD @@ -0,0 +1,14 @@ +cc_test( + name = "curl_http_test", + srcs = [ + "curl_http_test.cc", + ], + tags = ["test"], + deps = [ + "//ext:headers", + "//ext/src/http/client/curl:http_client_curl", + "//sdk/src/trace", + "@com_google_googletest//:gtest_main", + "@curl", + ], +) diff --git a/src/jaegertracing/opentelemetry-cpp/ext/test/http/CMakeLists.txt b/src/jaegertracing/opentelemetry-cpp/ext/test/http/CMakeLists.txt new file mode 100644 index 000000000..341648085 --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/test/http/CMakeLists.txt @@ -0,0 +1,29 @@ +find_package(CURL) +if(CURL_FOUND) + set(FILENAME curl_http_test) + add_compile_definitions(WITH_CURL) + add_executable(${FILENAME} ${FILENAME}.cc) + target_link_libraries(${FILENAME} ${GTEST_BOTH_LIBRARIES} + ${CMAKE_THREAD_LIBS_INIT}) + + if(TARGET CURL::libcurl) + target_link_libraries(${FILENAME} opentelemetry_http_client_curl + CURL::libcurl) + else() + include_directories(${CURL_INCLUDE_DIRS}) + target_link_libraries(${FILENAME} ${CURL_LIBRARIES} + opentelemetry_http_client_curl) + endif() + gtest_add_tests( + TARGET ${FILENAME} + TEST_PREFIX ext.http.curl. + TEST_LIST ${FILENAME}) +endif() +set(URL_PARSER_FILENAME url_parser_test) +add_executable(${URL_PARSER_FILENAME} ${URL_PARSER_FILENAME}.cc) +target_link_libraries(${URL_PARSER_FILENAME} ${GTEST_BOTH_LIBRARIES} + ${CMAKE_THREAD_LIBS_INIT}) +gtest_add_tests( + TARGET ${URL_PARSER_FILENAME} + TEST_PREFIX ext.http.urlparser. + TEST_LIST ${URL_PARSER_FILENAME}) diff --git a/src/jaegertracing/opentelemetry-cpp/ext/test/http/curl_http_test.cc b/src/jaegertracing/opentelemetry-cpp/ext/test/http/curl_http_test.cc new file mode 100644 index 000000000..f8d248bae --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/test/http/curl_http_test.cc @@ -0,0 +1,325 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "opentelemetry/ext//http/client/curl/http_client_curl.h" +#include "opentelemetry/ext/http/client/http_client_factory.h" +#include "opentelemetry/ext/http/server/http_server.h" + +#include <assert.h> +#include <atomic> +#include <chrono> +#include <condition_variable> +#include <fstream> +#include <memory> +#include <thread> +#include <vector> + +#define HTTP_PORT 19000 + +#include <gtest/gtest.h> + +namespace curl = opentelemetry::ext::http::client::curl; +namespace http_client = opentelemetry::ext::http::client; +namespace nostd = opentelemetry::nostd; + +class CustomEventHandler : public http_client::EventHandler +{ +public: + virtual void OnResponse(http_client::Response &response) noexcept override{}; + virtual void OnEvent(http_client::SessionState state, nostd::string_view reason) noexcept override + {} + virtual void OnConnecting(const http_client::SSLCertificate &) noexcept {} + virtual ~CustomEventHandler() = default; + bool is_called_ = false; +}; + +class GetEventHandler : public CustomEventHandler +{ + void OnResponse(http_client::Response &response) noexcept override + { + ASSERT_EQ(200, response.GetStatusCode()); + ASSERT_EQ(response.GetBody().size(), 0); + is_called_ = true; + }; +}; + +class PostEventHandler : public CustomEventHandler +{ + void OnResponse(http_client::Response &response) noexcept override + { + ASSERT_EQ(200, response.GetStatusCode()); + std::string body(response.GetBody().begin(), response.GetBody().end()); + ASSERT_EQ(body, "{'k1':'v1', 'k2':'v2', 'k3':'v3'}"); + is_called_ = true; + } +}; + +class BasicCurlHttpTests : public ::testing::Test, public HTTP_SERVER_NS::HttpRequestCallback +{ +protected: + HTTP_SERVER_NS::HttpServer server_; + std::string server_address_; + std::atomic<bool> is_setup_; + std::atomic<bool> is_running_; + std::vector<HTTP_SERVER_NS::HttpRequest> received_requests_; + std::mutex mtx_requests; + std::condition_variable cv_got_events; + std::mutex cv_m; + +public: + BasicCurlHttpTests() : is_setup_(false), is_running_(false){}; + + virtual void SetUp() override + { + if (is_setup_.exchange(true)) + { + return; + } + int port = server_.addListeningPort(HTTP_PORT); + std::ostringstream os; + os << "localhost:" << port; + server_address_ = "http://" + os.str() + "/simple/"; + server_.setServerName(os.str()); + server_.setKeepalive(false); + server_.addHandler("/simple/", *this); + server_.addHandler("/get/", *this); + server_.addHandler("/post/", *this); + server_.start(); + is_running_ = true; + } + + virtual void TearDown() override + { + if (!is_setup_.exchange(false)) + return; + server_.stop(); + is_running_ = false; + } + + virtual int onHttpRequest(HTTP_SERVER_NS::HttpRequest const &request, + HTTP_SERVER_NS::HttpResponse &response) override + { + int response_status = 404; + if (request.uri == "/get/") + { + + std::unique_lock<std::mutex> lk(mtx_requests); + received_requests_.push_back(request); + response.headers["Content-Type"] = "text/plain"; + response_status = 200; + } + if (request.uri == "/post/") + { + std::unique_lock<std::mutex> lk(mtx_requests); + received_requests_.push_back(request); + response.headers["Content-Type"] = "application/json"; + response.body = "{'k1':'v1', 'k2':'v2', 'k3':'v3'}"; + response_status = 200; + } + + cv_got_events.notify_one(); + + return response_status; + } + + bool waitForRequests(unsigned timeOutSec, unsigned expected_count = 1) + { + std::unique_lock<std::mutex> lk(mtx_requests); + if (cv_got_events.wait_for(lk, std::chrono::milliseconds(1000 * timeOutSec), + [&] { return received_requests_.size() >= expected_count; })) + { + return true; + } + return false; + } +}; + +TEST_F(BasicCurlHttpTests, DoNothing) {} + +TEST_F(BasicCurlHttpTests, HttpRequest) +{ + curl::Request req; + const char *b = "test-data"; + http_client::Body body = {b, b + strlen(b)}; + http_client::Body body1 = body; + req.SetBody(body); + ASSERT_EQ(req.body_, body1); + req.AddHeader("name1", "value1"); + req.AddHeader("name2", "value2"); + ASSERT_TRUE(req.headers_.find("name1")->second == "value1"); + ASSERT_TRUE(req.headers_.find("name2")->second == "value2"); + + req.ReplaceHeader("name1", "value3"); + ASSERT_EQ(req.headers_.find("name1")->second, "value3"); + + req.SetTimeoutMs(std::chrono::duration<int>(2000)); + ASSERT_EQ(req.timeout_ms_, std::chrono::duration<int>(2000)); +} + +TEST_F(BasicCurlHttpTests, HttpResponse) +{ + curl::Response res; + http_client::Headers m1 = { + {"name1", "value1_1"}, {"name1", "value1_2"}, {"name2", "value3"}, {"name3", "value3"}}; + res.headers_ = m1; + + const char *b = "test-data"; + http_client::Body body = {b, b + strlen(b)}; + int count = 0; + res.ForEachHeader("name1", [&count](nostd::string_view name, nostd::string_view value) { + if (name != "name1") + return false; + if (value != "value1_1" && value != "value1_2") + return false; + count++; + return true; + }); + ASSERT_EQ(count, 2); + count = 0; + res.ForEachHeader([&count](nostd::string_view name, nostd::string_view value) { + if (name != "name1" && name != "name2" && name != "name3") + return false; + if (value != "value1_1" && value != "value1_2" && value != "value2" && value != "value3") + return false; + count++; + return true; + }); + ASSERT_EQ(count, 4); +} + +TEST_F(BasicCurlHttpTests, SendGetRequest) +{ + received_requests_.clear(); + auto session_manager = http_client::HttpClientFactory::Create(); + EXPECT_TRUE(session_manager != nullptr); + + auto session = session_manager->CreateSession("http://127.0.0.1:19000"); + auto request = session->CreateRequest(); + request->SetUri("get/"); + GetEventHandler *handler = new GetEventHandler(); + session->SendRequest(*handler); + ASSERT_TRUE(waitForRequests(30, 1)); + session->FinishSession(); + ASSERT_TRUE(handler->is_called_); + delete handler; +} + +TEST_F(BasicCurlHttpTests, SendPostRequest) +{ + received_requests_.clear(); + auto session_manager = http_client::HttpClientFactory::Create(); + EXPECT_TRUE(session_manager != nullptr); + + auto session = session_manager->CreateSession("http://127.0.0.1:19000"); + auto request = session->CreateRequest(); + request->SetUri("post/"); + request->SetMethod(http_client::Method::Post); + + const char *b = "test-data"; + http_client::Body body = {b, b + strlen(b)}; + request->SetBody(body); + request->AddHeader("Content-Type", "text/plain"); + PostEventHandler *handler = new PostEventHandler(); + session->SendRequest(*handler); + ASSERT_TRUE(waitForRequests(30, 1)); + session->FinishSession(); + ASSERT_TRUE(handler->is_called_); + + session_manager->CancelAllSessions(); + session_manager->FinishAllSessions(); + + delete handler; +} + +TEST_F(BasicCurlHttpTests, RequestTimeout) +{ + received_requests_.clear(); + auto session_manager = http_client::HttpClientFactory::Create(); + EXPECT_TRUE(session_manager != nullptr); + + auto session = session_manager->CreateSession("222.222.222.200:19000"); // Non Existing address + auto request = session->CreateRequest(); + request->SetUri("get/"); + GetEventHandler *handler = new GetEventHandler(); + session->SendRequest(*handler); + session->FinishSession(); + ASSERT_FALSE(handler->is_called_); + delete handler; +} + +TEST_F(BasicCurlHttpTests, CurlHttpOperations) +{ + GetEventHandler *handler = new GetEventHandler(); + + const char *b = "test-data"; + http_client::Body body = {b, b + strlen(b)}; + + http_client::Headers headers = { + {"name1", "value1_1"}, {"name1", "value1_2"}, {"name2", "value3"}, {"name3", "value3"}}; + + curl::HttpOperation http_operations1(http_client::Method::Head, "/get", handler, + curl::RequestMode::Async, headers, body, true); + http_operations1.Send(); + + curl::HttpOperation http_operations2(http_client::Method::Get, "/get", handler, + curl::RequestMode::Async, headers, body, true); + http_operations2.Send(); + + curl::HttpOperation http_operations3(http_client::Method::Get, "/get", handler, + curl::RequestMode::Async, headers, body, false); + http_operations3.Send(); + delete handler; +} + +TEST_F(BasicCurlHttpTests, SendGetRequestSync) +{ + received_requests_.clear(); + curl::HttpClientSync http_client; + + http_client::Headers m1 = {}; + auto result = http_client.Get("http://127.0.0.1:19000/get/", m1); + EXPECT_EQ(result, true); + EXPECT_EQ(result.GetSessionState(), http_client::SessionState::Response); +} + +TEST_F(BasicCurlHttpTests, SendGetRequestSyncTimeout) +{ + received_requests_.clear(); + curl::HttpClientSync http_client; + + http_client::Headers m1 = {}; + auto result = http_client.Get("http://222.222.222.200:19000/get/", m1); + EXPECT_EQ(result, false); + + // When network is under proxy, it may connect success but closed by peer when send data + EXPECT_TRUE(result.GetSessionState() == http_client::SessionState::ConnectFailed || + result.GetSessionState() == http_client::SessionState::SendFailed); +} + +TEST_F(BasicCurlHttpTests, SendPostRequestSync) +{ + received_requests_.clear(); + curl::HttpClientSync http_client; + + http_client::Headers m1 = {}; + http_client::Body body = {}; + auto result = http_client.Post("http://127.0.0.1:19000/post/", body, m1); + EXPECT_EQ(result, true); + EXPECT_EQ(result.GetSessionState(), http_client::SessionState::Response); +} + +TEST_F(BasicCurlHttpTests, GetBaseUri) +{ + curl::HttpClient session_manager; + + auto session = session_manager.CreateSession("127.0.0.1:80"); + ASSERT_EQ(std::static_pointer_cast<curl::Session>(session)->GetBaseUri(), "http://127.0.0.1:80/"); + + session = session_manager.CreateSession("https://127.0.0.1:443"); + ASSERT_EQ(std::static_pointer_cast<curl::Session>(session)->GetBaseUri(), + "https://127.0.0.1:443/"); + + session = session_manager.CreateSession("http://127.0.0.1:31339"); + ASSERT_EQ(std::static_pointer_cast<curl::Session>(session)->GetBaseUri(), + "http://127.0.0.1:31339/"); +} diff --git a/src/jaegertracing/opentelemetry-cpp/ext/test/http/url_parser_test.cc b/src/jaegertracing/opentelemetry-cpp/ext/test/http/url_parser_test.cc new file mode 100644 index 000000000..b39a5bf90 --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/test/http/url_parser_test.cc @@ -0,0 +1,129 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "opentelemetry/ext/http/common/url_parser.h" + +#include <gtest/gtest.h> + +namespace http_common = opentelemetry::ext::http::common; + +inline const char *const BoolToString(bool b) +{ + return b ? "true" : "false"; +} + +TEST(UrlParserTests, BasicTests) +{ + std::map<std::string, std::map<std::string, std::string>> urls_map{ + {"www.abc.com", + {{"host", "www.abc.com"}, + {"port", "80"}, + {"scheme", "http"}, + {"path", "/"}, + {"query", ""}, + {"success", "true"}}}, + {"http://www.abc.com", + {{"host", "www.abc.com"}, + {"port", "80"}, + {"scheme", "http"}, + {"path", "/"}, + {"query", ""}, + {"success", "true"}}}, + {"https://www.abc.com", + {{"host", "www.abc.com"}, + {"port", "443"}, + {"scheme", "https"}, + {"path", "/"}, + {"query", ""}, + {"success", "true"}}}, + {"https://www.abc.com:4431", + {{"host", "www.abc.com"}, + {"port", "4431"}, + {"scheme", "https"}, + {"path", "/"}, + {"query", ""}, + {"success", "true"}}}, + {"https://www.abc.com:4431", + {{"host", "www.abc.com"}, + {"port", "4431"}, + {"scheme", "https"}, + {"path", "/"}, + {"query", ""}, + {"success", "true"}}}, + {"https://www.abc.com:4431/path1", + {{"host", "www.abc.com"}, + {"port", "4431"}, + {"scheme", "https"}, + {"path", "/path1"}, + {"query", ""}, + {"success", "true"}}}, + {"https://www.abc.com:4431/path1/path2", + {{"host", "www.abc.com"}, + {"port", "4431"}, + {"scheme", "https"}, + {"path", "/path1/path2"}, + {"query", ""}, + {"success", "true"}}}, + {"https://www.abc.com/path1/path2", + {{"host", "www.abc.com"}, + {"port", "443"}, + {"scheme", "https"}, + {"path", "/path1/path2"}, + {"query", ""}, + {"success", "true"}}}, + {"http://www.abc.com/path1/path2?q1=a1&q2=a2", + {{"host", "www.abc.com"}, + {"port", "80"}, + {"scheme", "http"}, + {"path", "/path1/path2"}, + {"query", "q1=a1&q2=a2"}, + {"success", "true"}}}, + {"http://www.abc.com:8080/path1/path2?q1=a1&q2=a2", + {{"host", "www.abc.com"}, + {"port", "8080"}, + {"scheme", "http"}, + {"path", "/path1/path2"}, + {"query", "q1=a1&q2=a2"}, + {"success", "true"}}}, + {"www.abc.com:8080/path1/path2?q1=a1&q2=a2", + {{"host", "www.abc.com"}, + {"port", "8080"}, + {"scheme", "http"}, + {"path", "/path1/path2"}, + {"query", "q1=a1&q2=a2"}, + {"success", "true"}}}, + {"http://user:password@www.abc.com:8080/path1/path2?q1=a1&q2=a2", + {{"host", "www.abc.com"}, + {"port", "8080"}, + {"scheme", "http"}, + {"path", "/path1/path2"}, + {"query", "q1=a1&q2=a2"}, + {"success", "true"}}}, + {"user:password@www.abc.com:8080/path1/path2?q1=a1&q2=a2", + {{"host", "www.abc.com"}, + {"port", "8080"}, + {"scheme", "http"}, + {"path", "/path1/path2"}, + {"query", "q1=a1&q2=a2"}, + {"success", "true"}}}, + {"https://user@www.abc.com/path1/path2?q1=a1&q2=a2", + {{"host", "www.abc.com"}, + {"port", "443"}, + {"scheme", "https"}, + {"path", "/path1/path2"}, + {"query", "q1=a1&q2=a2"}, + {"success", "true"}}}, + + }; + for (auto &url_map : urls_map) + { + http_common::UrlParser url(url_map.first); + auto url_properties = url_map.second; + ASSERT_EQ(BoolToString(url.success_), url_properties["success"]); + ASSERT_EQ(url.host_, url_properties["host"]); + ASSERT_EQ(std::to_string(url.port_), url_properties["port"]); + ASSERT_EQ(url.scheme_, url_properties["scheme"]); + ASSERT_EQ(url.path_, url_properties["path"]); + ASSERT_EQ(url.query_, url_properties["query"]); + } +} diff --git a/src/jaegertracing/opentelemetry-cpp/ext/test/w3c_tracecontext_test/BUILD b/src/jaegertracing/opentelemetry-cpp/ext/test/w3c_tracecontext_test/BUILD new file mode 100644 index 000000000..e52c943fe --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/test/w3c_tracecontext_test/BUILD @@ -0,0 +1,23 @@ +cc_binary( + name = "w3c_tracecontext_test", + srcs = [ + "main.cc", + ], + linkopts = select({ + "//bazel:windows": [ + "-DEFAULTLIB:advapi32.lib", + "-DEFAULTLIB:crypt32.lib", + "-DEFAULTLIB:Normaliz.lib", + ], + "//conditions:default": [], + }), + deps = [ + "//api", + "//exporters/ostream:ostream_span_exporter", + "//ext:headers", + "//ext/src/http/client/curl:http_client_curl", + "//sdk/src/trace", + "@curl", + "@github_nlohmann_json//:json", + ], +) diff --git a/src/jaegertracing/opentelemetry-cpp/ext/test/w3c_tracecontext_test/CMakeLists.txt b/src/jaegertracing/opentelemetry-cpp/ext/test/w3c_tracecontext_test/CMakeLists.txt new file mode 100644 index 000000000..ea74a8eeb --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/test/w3c_tracecontext_test/CMakeLists.txt @@ -0,0 +1,17 @@ +include_directories(${CMAKE_SOURCE_DIR}/exporters/ostream/include) + +find_package(CURL) +if(NOT CURL_FOUND) + message(WARNING "Skipping example_w3c_tracecontext_test: CURL not found") +else() + add_executable(w3c_tracecontext_test main.cc) + target_link_libraries( + w3c_tracecontext_test + PRIVATE ${CMAKE_THREAD_LIBS_INIT} opentelemetry_trace + opentelemetry_http_client_curl opentelemetry_exporter_ostream_span + ${CURL_LIBRARIES} nlohmann_json::nlohmann_json) + if(nlohmann_json_clone) + add_dependencies(w3c_tracecontext_test nlohmann_json::nlohmann_json) + endif() + +endif() diff --git a/src/jaegertracing/opentelemetry-cpp/ext/test/w3c_tracecontext_test/Dockerfile b/src/jaegertracing/opentelemetry-cpp/ext/test/w3c_tracecontext_test/Dockerfile new file mode 100644 index 000000000..1deac123c --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/test/w3c_tracecontext_test/Dockerfile @@ -0,0 +1,8 @@ +FROM python + +RUN pip install aiohttp +RUN git clone https://github.com/w3c/trace-context + +WORKDIR ./trace-context/test + +ENTRYPOINT [ "python", "test.py" ] diff --git a/src/jaegertracing/opentelemetry-cpp/ext/test/w3c_tracecontext_test/README.md b/src/jaegertracing/opentelemetry-cpp/ext/test/w3c_tracecontext_test/README.md new file mode 100644 index 000000000..8eda092f8 --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/test/w3c_tracecontext_test/README.md @@ -0,0 +1,50 @@ +# Test service endpoint for W3C validation + +This test application is intended to be used as a test service for the [W3C +Distributed Tracing Validation +Service](https://github.com/w3c/trace-context/tree/master/test). It is +implemented according to [this +instructions](https://github.com/w3c/trace-context/tree/master/test#implement-test-service). + +## Usage + +1: Build and start the test service endpoint: + +```sh +./w3c_tracecontext_test + +Listening to http://localhost:30000/test +``` + +A custom port number for the test service to listen to can be specified: + +```sh +./w3c_tracecontext_test 31339 + +Listening to http://localhost:31339/test +``` + +The test service will print the full URI that the validation service can connect +to. + +2: In a different terminal, set up and start the validation service according to +the +[instructions](https://github.com/w3c/trace-context/tree/master/test#run-test-cases), +giving the address of the test service endpoint as argument: + +```sh +python test.py http://localhost:31339/test +``` + +One can also use the `Dockerfile` provided in this folder to conveniently run +the validation service: + +```sh +docker build --tag w3c_driver . +docker run --network host w3c_driver http://localhost:31339/test +``` + +3: The validation service will run the test suite and print detailed test +results. + +4: Stop the test service by pressing enter. diff --git a/src/jaegertracing/opentelemetry-cpp/ext/test/w3c_tracecontext_test/main.cc b/src/jaegertracing/opentelemetry-cpp/ext/test/w3c_tracecontext_test/main.cc new file mode 100644 index 000000000..79aa4c916 --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/test/w3c_tracecontext_test/main.cc @@ -0,0 +1,199 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "opentelemetry/context/runtime_context.h" +#include "opentelemetry/exporters/ostream/span_exporter.h" +#include "opentelemetry/ext/http/client/curl/http_client_curl.h" +#include "opentelemetry/ext/http/server/http_server.h" +#include "opentelemetry/sdk/trace/simple_processor.h" +#include "opentelemetry/sdk/trace/tracer_provider.h" +#include "opentelemetry/trace/propagation/http_trace_context.h" +#include "opentelemetry/trace/provider.h" +#include "opentelemetry/trace/scope.h" + +#include <algorithm> +#include "nlohmann/json.hpp" + +namespace trace_api = opentelemetry::trace; +namespace http_client = opentelemetry::ext::http::client; +namespace curl = opentelemetry::ext::http::client::curl; +namespace context = opentelemetry::context; +namespace nostd = opentelemetry::nostd; +namespace trace_sdk = opentelemetry::sdk::trace; + +namespace +{ +static trace_api::propagation::HttpTraceContext propagator_format; + +class TextMapCarrierTest : public context::propagation::TextMapCarrier +{ +public: + TextMapCarrierTest(std::map<std::string, std::string> &headers) : headers_(headers) {} + virtual nostd::string_view Get(nostd::string_view key) const noexcept override + { + auto it = headers_.find(std::string(key)); + if (it != headers_.end()) + { + return nostd::string_view(it->second); + } + return ""; + } + virtual void Set(nostd::string_view key, nostd::string_view value) noexcept override + { + headers_[std::string(key)] = std::string(value); + } + + std::map<std::string, std::string> &headers_; +}; + +void initTracer() +{ + auto exporter = std::unique_ptr<trace_sdk::SpanExporter>( + new opentelemetry::exporter::trace::OStreamSpanExporter); + auto processor = std::unique_ptr<trace_sdk::SpanProcessor>( + new trace_sdk::SimpleSpanProcessor(std::move(exporter))); + std::vector<std::unique_ptr<trace_sdk::SpanProcessor>> processors; + processors.push_back(std::move(processor)); + auto context = std::make_shared<trace_sdk::TracerContext>(std::move(processors)); + auto provider = + nostd::shared_ptr<trace_api::TracerProvider>(new trace_sdk::TracerProvider(context)); + // Set the global trace provider + trace_api::Provider::SetTracerProvider(provider); +} + +nostd::shared_ptr<trace_api::Tracer> get_tracer() +{ + auto provider = trace_api::Provider::GetTracerProvider(); + return provider->GetTracer("w3c_tracecontext_test"); +} + +struct Uri +{ + std::string host; + uint16_t port; + std::string path; + + Uri(std::string uri) + { + size_t host_end = uri.substr(7, std::string::npos).find(":"); + size_t port_end = uri.substr(host_end + 1, std::string::npos).find("/"); + + host = uri.substr(0, host_end + 7); + port = std::stoi(uri.substr(7 + host_end + 1, port_end)); + path = uri.substr(host_end + port_end + 2, std::string::npos); + } +}; + +// A noop event handler for making HTTP requests. We don't care about response bodies and error +// messages. +class NoopEventHandler : public http_client::EventHandler +{ +public: + void OnEvent(http_client::SessionState state, nostd::string_view reason) noexcept override {} + + void OnConnecting(const http_client::SSLCertificate &) noexcept override {} + + void OnResponse(http_client::Response &response) noexcept override {} +}; +} // namespace + +// Sends an HTTP POST request to the given url, with the given body. +void send_request(curl::HttpClient &client, const std::string &url, const std::string &body) +{ + static std::unique_ptr<http_client::EventHandler> handler(new NoopEventHandler()); + + auto request_span = get_tracer()->StartSpan(__func__); + trace_api::Scope scope(request_span); + + Uri uri{url}; + + auto session = client.CreateSession(url); + auto request = session->CreateRequest(); + + request->SetMethod(http_client::Method::Post); + request->SetUri(uri.path); + http_client::Body b = {body.c_str(), body.c_str() + body.size()}; + request->SetBody(b); + request->AddHeader("Content-Type", "application/json"); + request->AddHeader("Content-Length", std::to_string(body.size())); + + std::map<std::string, std::string> headers; + TextMapCarrierTest carrier(headers); + propagator_format.Inject(carrier, context::RuntimeContext::GetCurrent()); + + for (auto const &hdr : headers) + { + request->AddHeader(hdr.first, hdr.second); + } + + session->SendRequest(*handler); + session->FinishSession(); +} + +// This application receives requests from the W3C test service. Each request has a JSON body which +// consists of an array of objects, each containing an URL to which to post to, and arguments which +// need to be used as body when posting to the given URL. +int main(int argc, char *argv[]) +{ + initTracer(); + + constexpr char default_host[] = "localhost"; + constexpr uint16_t default_port = 30000; + uint16_t port; + + // The port the validation service listens to can be specified via the command line. + if (argc > 1) + { + port = atoi(argv[1]); + } + else + { + port = default_port; + } + + auto root_span = get_tracer()->StartSpan(__func__); + trace_api::Scope scope(root_span); + + testing::HttpServer server(default_host, port); + curl::HttpClient client; + + testing::HttpRequestCallback test_cb{ + [&](testing::HttpRequest const &req, testing::HttpResponse &resp) { + auto body = nlohmann::json::parse(req.content); + + std::cout << "Received request with body :\n" << req.content << "\n"; + + for (auto &part : body) + { + const TextMapCarrierTest carrier((std::map<std::string, std::string> &)req.headers); + auto current_ctx = context::RuntimeContext::GetCurrent(); + auto ctx = propagator_format.Extract(carrier, current_ctx); + auto token = context::RuntimeContext::Attach(ctx); + + auto url = part["url"].get<std::string>(); + auto arguments = part["arguments"].dump(); + + std::cout << " Sending request to " << url << "\n"; + + send_request(client, url, arguments); + } + + std::cout << "\n"; + + resp.code = 200; + return 0; + }}; + + server["/test"] = test_cb; + + // Start server + server.start(); + + std::cout << "Listening at http://" << default_host << ":" << port << "/test\n"; + + // Wait for console input + std::cin.get(); + + // Stop server + server.stop(); +} diff --git a/src/jaegertracing/opentelemetry-cpp/ext/test/zpages/BUILD b/src/jaegertracing/opentelemetry-cpp/ext/test/zpages/BUILD new file mode 100644 index 000000000..d75f464af --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/test/zpages/BUILD @@ -0,0 +1,38 @@ +cc_test( + name = "threadsafe_span_data_tests", + srcs = [ + "threadsafe_span_data_test.cc", + ], + tags = ["test"], + deps = [ + "//ext/src/zpages", + "//sdk/src/trace", + "@com_google_googletest//:gtest_main", + ], +) + +cc_test( + name = "tracez_data_aggregator_tests", + srcs = [ + "tracez_data_aggregator_test.cc", + ], + tags = ["test"], + deps = [ + "//ext/src/zpages", + "//sdk/src/trace", + "@com_google_googletest//:gtest_main", + ], +) + +cc_test( + name = "tracez_processor_tests", + srcs = [ + "tracez_processor_test.cc", + ], + tags = ["test"], + deps = [ + "//ext/src/zpages", + "//sdk/src/trace", + "@com_google_googletest//:gtest_main", + ], +) diff --git a/src/jaegertracing/opentelemetry-cpp/ext/test/zpages/CMakeLists.txt b/src/jaegertracing/opentelemetry-cpp/ext/test/zpages/CMakeLists.txt new file mode 100644 index 000000000..c7d0d837c --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/test/zpages/CMakeLists.txt @@ -0,0 +1,11 @@ +foreach(testname tracez_processor_test tracez_data_aggregator_test + threadsafe_span_data_test) + add_executable(${testname} "${testname}.cc") + target_link_libraries(${testname} ${GTEST_BOTH_LIBRARIES} + ${CMAKE_THREAD_LIBS_INIT} opentelemetry_zpages) + + gtest_add_tests( + TARGET ${testname} + TEST_PREFIX ext. + TEST_LIST ${testname}) +endforeach() diff --git a/src/jaegertracing/opentelemetry-cpp/ext/test/zpages/threadsafe_span_data_test.cc b/src/jaegertracing/opentelemetry-cpp/ext/test/zpages/threadsafe_span_data_test.cc new file mode 100644 index 000000000..cee29672e --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/test/zpages/threadsafe_span_data_test.cc @@ -0,0 +1,73 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "opentelemetry/ext/zpages/threadsafe_span_data.h" +#include "opentelemetry/nostd/variant.h" +#include "opentelemetry/trace/span_id.h" +#include "opentelemetry/trace/trace_id.h" + +#include <gtest/gtest.h> +#include <thread> + +using opentelemetry::ext::zpages::ThreadsafeSpanData; +using opentelemetry::sdk::common::AttributeConverter; +using opentelemetry::sdk::common::OwnedAttributeValue; + +namespace trace_api = opentelemetry::trace; + +TEST(ThreadsafeSpanData, DefaultValues) +{ + trace_api::SpanContext empty_span_context{false, false}; + trace_api::SpanId zero_span_id; + ThreadsafeSpanData data; + + ASSERT_EQ(data.GetTraceId(), empty_span_context.trace_id()); + ASSERT_EQ(data.GetSpanId(), empty_span_context.span_id()); + ASSERT_EQ(data.GetSpanContext(), empty_span_context); + ASSERT_EQ(data.GetParentSpanId(), zero_span_id); + ASSERT_EQ(data.GetName(), ""); + ASSERT_EQ(data.GetStatus(), trace_api::StatusCode::kUnset); + ASSERT_EQ(data.GetDescription(), ""); + ASSERT_EQ(data.GetStartTime().time_since_epoch(), std::chrono::nanoseconds(0)); + ASSERT_EQ(data.GetDuration(), std::chrono::nanoseconds(0)); + ASSERT_EQ(data.GetAttributes().size(), 0); +} + +TEST(ThreadsafeSpanData, Set) +{ + constexpr uint8_t trace_id_buf[] = {1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8}; + constexpr uint8_t span_id_buf[] = {1, 2, 3, 4, 5, 6, 7, 8}; + constexpr uint8_t parent_span_id_buf[] = {8, 7, 6, 5, 4, 3, 2, 1}; + trace_api::TraceId trace_id{trace_id_buf}; + trace_api::SpanId span_id{span_id_buf}; + trace_api::SpanId parent_span_id{parent_span_id_buf}; + const auto trace_state = trace_api::TraceState::GetDefault()->Set("key1", "value"); + const trace_api::SpanContext span_context{ + trace_id, span_id, trace_api::TraceFlags{trace_api::TraceFlags::kIsSampled}, true, + trace_state}; + opentelemetry::common::SystemTimestamp now(std::chrono::system_clock::now()); + + ThreadsafeSpanData data; + data.SetIdentity(span_context, parent_span_id); + data.SetName("span name"); + data.SetSpanKind(trace_api::SpanKind::kServer); + data.SetStatus(trace_api::StatusCode::kOk, "description"); + data.SetStartTime(now); + data.SetDuration(std::chrono::nanoseconds(1000000)); + data.SetAttribute("attr1", (int64_t)314159); + data.AddEvent("event1", now); + + ASSERT_EQ(data.GetTraceId(), trace_id); + ASSERT_EQ(data.GetSpanId(), span_id); + ASSERT_EQ(data.GetSpanContext(), span_context); + std::string trace_state_key1_value; + ASSERT_EQ(data.GetSpanContext().trace_state()->Get("key1", trace_state_key1_value), true); + ASSERT_EQ(trace_state_key1_value, "value"); + ASSERT_EQ(data.GetParentSpanId(), parent_span_id); + ASSERT_EQ(data.GetName(), "span name"); + ASSERT_EQ(data.GetStatus(), trace_api::StatusCode::kOk); + ASSERT_EQ(data.GetDescription(), "description"); + ASSERT_EQ(data.GetStartTime().time_since_epoch(), now.time_since_epoch()); + ASSERT_EQ(data.GetDuration(), std::chrono::nanoseconds(1000000)); + ASSERT_EQ(opentelemetry::nostd::get<int64_t>(data.GetAttributes().at("attr1")), 314159); +} diff --git a/src/jaegertracing/opentelemetry-cpp/ext/test/zpages/tracez_data_aggregator_test.cc b/src/jaegertracing/opentelemetry-cpp/ext/test/zpages/tracez_data_aggregator_test.cc new file mode 100644 index 000000000..5139c9e91 --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/test/zpages/tracez_data_aggregator_test.cc @@ -0,0 +1,698 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "opentelemetry/ext/zpages/tracez_data_aggregator.h" + +#include <gtest/gtest.h> + +#include "opentelemetry/ext/zpages/tracez_processor.h" +#include "opentelemetry/sdk/resource/resource.h" +#include "opentelemetry/sdk/trace/recordable.h" +#include "opentelemetry/sdk/trace/tracer.h" + +using namespace opentelemetry::sdk::trace; +using namespace opentelemetry::ext::zpages; +namespace nostd = opentelemetry::nostd; +namespace common = opentelemetry::common; +using opentelemetry::common::SteadyTimestamp; +using opentelemetry::trace::Span; + +const std::string span_name1 = "span 1"; +const std::string span_name2 = "span 2"; +const std::string span_name3 = "span 3"; + +/** + * TODO: Due to the absence of way to simulate the passing of time in the + * testing framework, synthetic delays had to be added in the tests to get the + * object in question to perform correctly. Later on if something like this is + * added the tests should be modified accordingly so that there is no external + * dependency. + * Additionally later on it would be better check for the span id(when set) + * rather than span name. + */ + +/** Test fixture for setting up the data aggregator and tracer for each test **/ +class TracezDataAggregatorTest : public ::testing::Test +{ +protected: + void SetUp() override + { + std::shared_ptr<TracezSharedData> shared_data(new TracezSharedData()); + auto resource = opentelemetry::sdk::resource::Resource::Create({}); + std::unique_ptr<SpanProcessor> processor(new TracezSpanProcessor(shared_data)); + std::vector<std::unique_ptr<SpanProcessor>> processors; + processors.push_back(std::move(processor)); + + auto context = std::make_shared<TracerContext>(std::move(processors), resource); + tracer = std::shared_ptr<opentelemetry::trace::Tracer>(new Tracer(context)); + tracez_data_aggregator = std::unique_ptr<TracezDataAggregator>( + new TracezDataAggregator(shared_data, milliseconds(10))); + } + + std::unique_ptr<TracezDataAggregator> tracez_data_aggregator; + std::shared_ptr<opentelemetry::trace::Tracer> tracer; +}; + +/** + * Helper function to check if the counts of running, error and latency spans + * match what is expected + */ +void VerifySpanCountsInTracezData( + const std::string &span_name, + const TracezData &aggregated_data, + size_t running_span_count, + size_t error_span_count, + std::array<unsigned int, kLatencyBoundaries.size()> completed_span_count_per_latency_bucket) +{ + // Asserts are needed to check the size of the container because they may need + // to be checked and if size checks fail it must be stopped + EXPECT_EQ(aggregated_data.running_span_count, running_span_count) + << " Count of running spans incorrect for " << span_name << "\n"; + + EXPECT_EQ(aggregated_data.sample_running_spans.size(), + std::min<size_t>(running_span_count, kMaxNumberOfSampleSpans)) + << " Size of sample running spans incorrect for " << span_name << "\n"; + + EXPECT_EQ(aggregated_data.error_span_count, error_span_count) + << " Count of error spans incorrect for " << span_name << "\n"; + + EXPECT_EQ(aggregated_data.sample_error_spans.size(), + std::min<size_t>(error_span_count, kMaxNumberOfSampleSpans)) + << " Count of running spans incorrect for " << span_name << "\n"; + + for (unsigned int boundary = 0; boundary < kLatencyBoundaries.size(); boundary++) + { + EXPECT_EQ(aggregated_data.completed_span_count_per_latency_bucket[boundary], + completed_span_count_per_latency_bucket[boundary]) + << " Count of completed spans in latency boundary " << boundary << " incorrect for " + << span_name << "\n"; + EXPECT_EQ(aggregated_data.sample_latency_spans[boundary].size(), + std::min<size_t>(completed_span_count_per_latency_bucket[boundary], + kMaxNumberOfSampleSpans)) + << " Count of sample completed spans in latency boundary " << boundary << " incorrect for " + << span_name << "\n"; + } +} + +/**************************** No Span Test ************************************/ + +/** Test to check if data aggregator works as expected when there are no spans + * **/ +TEST_F(TracezDataAggregatorTest, NoSpans) +{ + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_EQ(data.size(), 0); +} + +/*********************** Single span tests ************************************/ + +/** Test to check if data aggregator works as expected when there are + * is exactly a single running span **/ +TEST_F(TracezDataAggregatorTest, SingleRunningSpan) +{ + // Start the span get the data + auto span_first = tracer->StartSpan(span_name1); + std::this_thread::sleep_for(milliseconds(500)); + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + + // Check to see if span name exists + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name1) != data.end()); + auto &aggregated_data = data.at(span_name1); + + // Verify span counts then content of spans + VerifySpanCountsInTracezData(span_name1, aggregated_data, 1, 0, {0, 0, 0, 0, 0, 0, 0, 0, 0}); + + ASSERT_EQ(aggregated_data.sample_running_spans.size(), 1); + ASSERT_EQ(aggregated_data.sample_running_spans.front().GetName().data(), span_name1); + span_first->End(); +} + +/** Test to check if data aggregator works as expected when there is exactly one + * completed span **/ +TEST_F(TracezDataAggregatorTest, SingleCompletedSpan) +{ + // Start and end the span at a specified times + opentelemetry::trace::StartSpanOptions start; + start.start_steady_time = SteadyTimestamp(nanoseconds(10)); + opentelemetry::trace::EndSpanOptions end; + end.end_steady_time = SteadyTimestamp(nanoseconds(40)); + tracer->StartSpan(span_name1, start)->End(end); + + // Get the data and make sure span name exists in the data + std::this_thread::sleep_for(milliseconds(500)); + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name1) != data.end()); + + auto &aggregated_data = data.at(span_name1); + // Make sure counts of spans are in order + VerifySpanCountsInTracezData(span_name1, aggregated_data, 0, 0, {1, 0, 0, 0, 0, 0, 0, 0, 0}); + + // Check if the span is correctly updated in the first boundary + ASSERT_EQ(aggregated_data.sample_latency_spans[LatencyBoundary::k0MicroTo10Micro].size(), 1); + ASSERT_EQ(aggregated_data.sample_latency_spans[LatencyBoundary::k0MicroTo10Micro] + .front() + .GetDuration() + .count(), + 30); +} + +/** Test to check if data aggregator works as expected when there is exactly + * one error span **/ +TEST_F(TracezDataAggregatorTest, SingleErrorSpan) +{ + // Start and end a single error span + auto span = tracer->StartSpan(span_name1); + span->SetStatus(opentelemetry::trace::StatusCode::kError, "span cancelled"); + span->End(); + std::this_thread::sleep_for(milliseconds(500)); + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + + // Check to see if span name can be found in aggregation + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name1) != data.end()); + + auto &aggregated_data = data.at(span_name1); + // Make sure counts of spans are in order + VerifySpanCountsInTracezData(span_name1, aggregated_data, 0, 1, {0, 0, 0, 0, 0, 0, 0, 0, 0}); + + // Check the value of the error span introduced + ASSERT_EQ(aggregated_data.sample_error_spans.size(), 1); + ASSERT_EQ(aggregated_data.sample_error_spans.front().GetName().data(), span_name1); +} + +/************************* Multiple span tests ********************************/ + +/** Test to check if multiple running spans behaves as expected**/ +TEST_F(TracezDataAggregatorTest, MultipleRunningSpans) +{ + // A container that maps a span name to the number of spans to start with that + // span name + std::unordered_map<std::string, int> running_span_name_to_count({ + {span_name1, 1}, + {span_name2, 2}, + {span_name3, 3}, + }); + + // Start and store spans based on the above map + std::vector<nostd::shared_ptr<Span>> running_span_container; + for (auto span_name : running_span_name_to_count) + { + for (int count = 0; count < span_name.second; count++) + running_span_container.push_back(tracer->StartSpan(span_name.first)); + } + + // give time for aggregation and then get data + std::this_thread::sleep_for(milliseconds(500)); + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_EQ(data.size(), running_span_name_to_count.size()); + + // Check to see if the running span counts were updated correctly + for (auto &span_name : running_span_name_to_count) + { + ASSERT_TRUE(data.find(span_name.first) != data.end()); + + // Make sure counts of spans are in order + VerifySpanCountsInTracezData(span_name.first, data.at(span_name.first), span_name.second, 0, + {0, 0, 0, 0, 0, 0, 0, 0, 0}); + + ASSERT_EQ(data.at(span_name.first).sample_running_spans.size(), span_name.second); + for (auto &span_sample : data.at(span_name.first).sample_running_spans) + { + ASSERT_EQ(span_sample.GetName().data(), span_name.first); + } + } + + for (auto i = running_span_container.begin(); i != running_span_container.end(); i++) + (*i)->End(); +} + +/** Test to check if multiple completed spans updates the aggregated data + * correctly **/ +TEST_F(TracezDataAggregatorTest, MultipleCompletedSpan) +{ + // Start spans with span name and the corresponding durations in one of the 9 + // latency buckets + const std::unordered_map<std::string, std::vector<std::vector<nanoseconds>>> + span_name_to_duration( + {{span_name1, {{nanoseconds(10), nanoseconds(4600)}, {}, {}, {}, {}, {}, {}, {}, {}}}, + {span_name2, + {{}, + {nanoseconds(38888), nanoseconds(98768)}, + {nanoseconds(983251)}, + {}, + {}, + {}, + {}, + {}, + {}}}, + {span_name3, + {{}, + {}, + {}, + {nanoseconds(1234567), nanoseconds(1234567)}, + {}, + {}, + {}, + {}, + {nanoseconds(9999999999999)}}}}); + opentelemetry::trace::StartSpanOptions start; + opentelemetry::trace::EndSpanOptions end; + for (auto &span : span_name_to_duration) + { + for (auto &buckets : span.second) + { + for (auto &duration : buckets) + { + long long int end_time = duration.count() + 1; + start.start_steady_time = SteadyTimestamp(nanoseconds(1)); + end.end_steady_time = SteadyTimestamp(nanoseconds(end_time)); + tracer->StartSpan(span.first, start)->End(end); + } + } + } + + // Give time for aggregation and get data + std::this_thread::sleep_for(milliseconds(500)); + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + + ASSERT_EQ(data.size(), span_name_to_duration.size()); + + for (auto &span : span_name_to_duration) + { + ASSERT_TRUE(data.find(span.first) != data.end()); + auto &aggregated_data = data.at(span.first); + + // Make sure counts of spans are in order + VerifySpanCountsInTracezData( + span.first, aggregated_data, 0, 0, + {(unsigned int)span.second[0].size(), (unsigned int)span.second[1].size(), + (unsigned int)span.second[2].size(), (unsigned int)span.second[3].size(), + (unsigned int)span.second[4].size(), (unsigned int)span.second[5].size(), + (unsigned int)span.second[6].size(), (unsigned int)span.second[7].size(), + (unsigned int)span.second[8].size()}); + + for (unsigned int boundary = 0; boundary < kLatencyBoundaries.size(); boundary++) + { + ASSERT_EQ(aggregated_data.sample_latency_spans[boundary].size(), + span.second[boundary].size()); + auto latency_sample = aggregated_data.sample_latency_spans[boundary].begin(); + for (unsigned int idx = 0; idx < span.second[boundary].size(); idx++) + { + ASSERT_EQ(span.second[boundary][idx].count(), latency_sample->GetDuration().count()); + latency_sample = std::next(latency_sample); + } + } + } +} + +/** + * This test checks to see if the aggregated data is updated correctly + * when there are multiple error spans. + * It checks both the count of error spans and the error samples + */ +TEST_F(TracezDataAggregatorTest, MultipleErrorSpans) +{ + // Container to store the span names --> error messges for the span name + std::unordered_map<std::string, std::vector<std::string>> span_name_to_error( + {{span_name1, {"span 1 error"}}, + {span_name2, {"span 2 error 1", "span 2 error 2"}}, + {span_name3, + {"span 3 error 1", "span 3 error 2", "span 3 error 3", "span 3 error 4", + "span 3 error 5"}}}); + + // Start spans with the error messages based on the map + for (auto &span_error : span_name_to_error) + { + for (auto error_desc : span_error.second) + { + auto span = tracer->StartSpan(span_error.first); + span->SetStatus(opentelemetry::trace::StatusCode::kError, error_desc); + span->End(); + } + } + + // Give some time and then get data + std::this_thread::sleep_for(milliseconds(500)); + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_EQ(data.size(), span_name_to_error.size()); + + // Check if error spans were updated correctly for the different span names + for (auto &span_error : span_name_to_error) + { + // First try to find the span name in aggregation, then check the count of + // the error spans and then check values + ASSERT_TRUE(data.find(span_error.first) != data.end()); + + auto &aggregated_data = data.at(span_error.first); + + // Make sure counts of spans are in order + VerifySpanCountsInTracezData(span_error.first, aggregated_data, 0, span_error.second.size(), + {0, 0, 0, 0, 0, 0, 0, 0, 0}); + ASSERT_EQ(aggregated_data.error_span_count, span_error.second.size()); + + auto error_sample = aggregated_data.sample_error_spans.begin(); + for (unsigned int idx = 0; idx < span_error.second.size(); idx++) + { + ASSERT_EQ(span_error.second[idx], error_sample->GetDescription()); + error_sample = std::next(error_sample); + } + } +} + +/************************ Sample spans tests **********************************/ + +/** + * This test checks to see that the maximum number of running samples(5) for a + * bucket is not exceeded. If there are more spans than this for a single bucket + * it removes the earliest span that was received + */ +TEST_F(TracezDataAggregatorTest, RunningSampleSpansOverCapacity) +{ + int running_span_count = 6; + // Start and store spans based on the above map + std::vector<nostd::shared_ptr<Span>> running_span_container; + for (int count = 0; count < running_span_count; count++) + running_span_container.push_back(tracer->StartSpan(span_name1)); + + std::this_thread::sleep_for(milliseconds(500)); + // Fetch data and check if span name is spresent + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name1) != data.end()); + + // Check if error spans are updated according to spans started + auto &aggregated_data = data.at(span_name1); + VerifySpanCountsInTracezData(span_name1, aggregated_data, 6, 0, {0, 0, 0, 0, 0, 0, 0, 0, 0}); + + ASSERT_EQ(aggregated_data.sample_running_spans.size(), kMaxNumberOfSampleSpans); + + for (auto i = running_span_container.begin(); i != running_span_container.end(); i++) + { + (*i)->End(); + } +} + +/** + * This test checks to see that the maximum number of error samples(5) for a + * bucket is not exceeded. If there are more spans than this for a single bucket + * it removes the earliest span that was received + */ +TEST_F(TracezDataAggregatorTest, ErrorSampleSpansOverCapacity) +{ + // Create error spans with the descriptions in the vector + std::vector<std::string> span_error_descriptions = {"error span 1", "error span 2", + "error span 3", "error span 4", + "error span 5", "error span 6"}; + for (auto span_error_description : span_error_descriptions) + { + auto span = tracer->StartSpan(span_name1); + span->SetStatus(opentelemetry::trace::StatusCode::kError, span_error_description); + span->End(); + } + + std::this_thread::sleep_for(milliseconds(500)); + + // Fetch data and check if span name is spresent + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name1) != data.end()); + + std::this_thread::sleep_for(milliseconds(500)); + + // Check if error spans are updated according to spans started + auto &aggregated_data = data.at(span_name1); + VerifySpanCountsInTracezData(span_name1, aggregated_data, 0, span_error_descriptions.size(), + {0, 0, 0, 0, 0, 0, 0, 0, 0}); + + // Check if the latest 5 error spans exist out of the total 6 that were + // introduced + auto error_sample = aggregated_data.sample_error_spans.begin(); + for (unsigned int idx = 1; idx < span_error_descriptions.size(); idx++) + { + ASSERT_EQ(error_sample->GetDescription(), span_error_descriptions[idx]); + error_sample = std::next(error_sample); + } +} + +/** + * This test checks to see that the maximum number of latency samples(5) for a + * bucket is not exceeded. If there are more spans than this for a single bucket + * it removes the earliest span that was received + */ +TEST_F(TracezDataAggregatorTest, CompletedSampleSpansOverCapacity) +{ + opentelemetry::trace::StartSpanOptions start; + opentelemetry::trace::EndSpanOptions end; + + // Start and end 6 spans with the same name that fall into the first latency + // bucket + std::vector<std::pair<nanoseconds, nanoseconds>> timestamps = { + make_pair(nanoseconds(10), nanoseconds(100)), + make_pair(nanoseconds(1), nanoseconds(10000)), + make_pair(nanoseconds(1000), nanoseconds(3000)), + make_pair(nanoseconds(12), nanoseconds(12)), + make_pair(nanoseconds(10), nanoseconds(5000)), + make_pair(nanoseconds(10), nanoseconds(60))}; + for (auto timestamp : timestamps) + { + start.start_steady_time = SteadyTimestamp(timestamp.first); + end.end_steady_time = SteadyTimestamp(timestamp.second); + tracer->StartSpan(span_name1, start)->End(end); + } + + // Give some time and get data + std::this_thread::sleep_for(milliseconds(500)); + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name1) != data.end()); + + std::this_thread::sleep_for(milliseconds(500)); + auto &aggregated_data = data.at(span_name1); + VerifySpanCountsInTracezData(span_name1, aggregated_data, 0, 0, + {(unsigned int)timestamps.size(), 0, 0, 0, 0, 0, 0, 0, 0}); + + // Check the count of completed spans in the buckets and the samples stored + auto latency_sample = + aggregated_data.sample_latency_spans[LatencyBoundary::k0MicroTo10Micro].begin(); + + // idx starts from 1 and not 0 because there are 6 completed spans in the same + // bucket the and the first one is removed + for (unsigned int idx = 1; idx < timestamps.size(); idx++) + { + ASSERT_EQ(latency_sample->GetDuration().count(), + timestamps[idx].second.count() - timestamps[idx].first.count()); + latency_sample = std::next(latency_sample); + } +} + +/************************* Miscellaneous tests ********************************/ + +/** Test to see if the span names are in alphabetical order **/ +TEST_F(TracezDataAggregatorTest, SpanNameInAlphabeticalOrder) +{ + std::vector<std::string> span_names = {span_name1, span_name2, span_name3}; + + auto span_first = tracer->StartSpan(span_name2); + tracer->StartSpan(span_name1)->End(); + auto span_third = tracer->StartSpan(span_name3); + span_third->SetStatus(opentelemetry::trace::StatusCode::kError, "span cancelled"); + span_third->End(); + std::this_thread::sleep_for(milliseconds(500)); + // Get data and check if span name exists in aggregation + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_EQ(data.size(), span_names.size()); + + int span_names_idx = 0; + for (auto &spans : data) + { + ASSERT_EQ(spans.first, span_names[span_names_idx]); + span_names_idx++; + } + span_first->End(); +} + +/** This test checks to see that there is no double counting of running spans + * when get aggregated data is called twice**/ +TEST_F(TracezDataAggregatorTest, AdditionToRunningSpans) +{ + // Start a span and check the data + auto span_first = tracer->StartSpan(span_name1); + std::this_thread::sleep_for(milliseconds(500)); + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name1) != data.end()); + VerifySpanCountsInTracezData(span_name1, data.at(span_name1), 1, 0, {0, 0, 0, 0, 0, 0, 0, 0, 0}); + + // Start another span and check to see if there is no double counting of spans + auto span_second = tracer->StartSpan(span_name1); + + // Give some time and get updated data + std::this_thread::sleep_for(milliseconds(500)); + data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name1) != data.end()); + auto &aggregated_data = data.at(span_name1); + VerifySpanCountsInTracezData(span_name1, aggregated_data, 2, 0, {0, 0, 0, 0, 0, 0, 0, 0, 0}); + + ASSERT_EQ(aggregated_data.sample_running_spans.size(), 2); + for (auto &sample_span : aggregated_data.sample_running_spans) + { + ASSERT_EQ(sample_span.GetName().data(), span_name1); + } + span_first->End(); + span_second->End(); +} + +/** This test checks to see that once a running span is completed it the + * aggregated data is updated correctly **/ +TEST_F(TracezDataAggregatorTest, RemovalOfRunningSpanWhenCompleted) +{ + opentelemetry::trace::StartSpanOptions start; + start.start_steady_time = SteadyTimestamp(nanoseconds(10)); + opentelemetry::trace::EndSpanOptions end; + end.end_steady_time = SteadyTimestamp(nanoseconds(40)); + + // Start a span and make sure data is updated + auto span_first = tracer->StartSpan(span_name1, start); + std::this_thread::sleep_for(milliseconds(500)); + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name1) != data.end()); + VerifySpanCountsInTracezData(span_name1, data.at(span_name1), 1, 0, {0, 0, 0, 0, 0, 0, 0, 0, 0}); + ASSERT_EQ(data.at(span_name1).sample_running_spans.front().GetName().data(), span_name1); + // End the span and make sure running span is removed and completed span is + // updated, there should be only one completed span + span_first->End(end); + std::this_thread::sleep_for(milliseconds(500)); + + // Make sure sample span still exists before next aggregation + ASSERT_TRUE(data.find(span_name1) != data.end()); + ASSERT_EQ(data.at(span_name1).sample_running_spans.front().GetName().data(), span_name1); + + data = tracez_data_aggregator->GetAggregatedTracezData(); + + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name1) != data.end()); + + // Check if completed span fields are correctly updated + auto &aggregated_data = data.at(span_name1); + VerifySpanCountsInTracezData(span_name1, aggregated_data, 0, 0, {1, 0, 0, 0, 0, 0, 0, 0, 0}); + ASSERT_EQ(aggregated_data.sample_latency_spans[LatencyBoundary::k0MicroTo10Micro] + .front() + .GetDuration() + .count(), + 30); +} + +TEST_F(TracezDataAggregatorTest, RunningSpanChangesNameBeforeCompletion) +{ + opentelemetry::trace::StartSpanOptions start; + start.start_steady_time = SteadyTimestamp(nanoseconds(10)); + opentelemetry::trace::EndSpanOptions end; + end.end_steady_time = SteadyTimestamp(nanoseconds(40)); + + // Start a span and make sure data is updated + auto span_first = tracer->StartSpan(span_name1, start); + std::this_thread::sleep_for(milliseconds(500)); + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name1) != data.end()); + VerifySpanCountsInTracezData(span_name1, data.at(span_name1), 1, 0, {0, 0, 0, 0, 0, 0, 0, 0, 0}); + ASSERT_EQ(data.at(span_name1).sample_running_spans.front().GetName().data(), span_name1); + + // End the span and make sure running span is removed and completed span is + // updated, there should be only one completed span + span_first->UpdateName(span_name2); + span_first->End(end); + + // Check if sample span is present before fetching updated data + std::this_thread::sleep_for(milliseconds(500)); + ASSERT_TRUE(data.find(span_name1) != data.end()); + ASSERT_EQ(data.at(span_name1).sample_running_spans.front().GetName(), span_name1); + + data = tracez_data_aggregator->GetAggregatedTracezData(); + + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name2) != data.end()); + + // Check if completed span fields are correctly updated + auto &aggregated_data = data.at(span_name2); + VerifySpanCountsInTracezData(span_name2, aggregated_data, 0, 0, {1, 0, 0, 0, 0, 0, 0, 0, 0}); + ASSERT_EQ(aggregated_data.sample_latency_spans[LatencyBoundary::k0MicroTo10Micro] + .front() + .GetDuration() + .count(), + 30); +} + +/** Test to check if the span latencies with duration at the edge of boundaries + * fall in the correct bucket **/ +TEST_F(TracezDataAggregatorTest, EdgeSpanLatenciesFallInCorrectBoundaries) +{ + opentelemetry::trace::StartSpanOptions start; + opentelemetry::trace::EndSpanOptions end; + + // Start and end 6 spans with the same name that fall into the first latency + // bucket + std::vector<nanoseconds> durations = { + nanoseconds(0), nanoseconds(10000), nanoseconds(100000), + nanoseconds(1000000), nanoseconds(10000000), nanoseconds(100000000), + nanoseconds(1000000000), nanoseconds(10000000000), nanoseconds(100000000000)}; + for (auto duration : durations) + { + start.start_steady_time = SteadyTimestamp(nanoseconds(1)); + end.end_steady_time = SteadyTimestamp(nanoseconds(duration.count() + 1)); + tracer->StartSpan(span_name1, start)->End(end); + } + + std::this_thread::sleep_for(milliseconds(500)); + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name1) != data.end()); + + std::this_thread::sleep_for(milliseconds(500)); + auto &aggregated_data = data.at(span_name1); + VerifySpanCountsInTracezData(span_name1, aggregated_data, 0, 0, {1, 1, 1, 1, 1, 1, 1, 1, 1}); + + // Check if the latency boundary is updated correctly + for (unsigned int boundary = 0; boundary < kLatencyBoundaries.size(); boundary++) + { + ASSERT_EQ(aggregated_data.sample_latency_spans[boundary].front().GetDuration().count(), + durations[boundary].count()); + } +} + +/** This test makes sure that the data is consistent when there are multiple + * calls to the data aggegator with no change in data **/ +TEST_F(TracezDataAggregatorTest, NoChangeInBetweenCallsToAggregator) +{ + opentelemetry::trace::StartSpanOptions start; + start.start_steady_time = SteadyTimestamp(nanoseconds(1)); + + opentelemetry::trace::EndSpanOptions end; + end.end_steady_time = SteadyTimestamp(nanoseconds(1)); + + tracer->StartSpan(span_name1, start)->End(end); + auto running_span = tracer->StartSpan(span_name2); + auto span = tracer->StartSpan(span_name3); + span->SetStatus(opentelemetry::trace::StatusCode::kError, "span cancelled"); + span->End(); + std::this_thread::sleep_for(milliseconds(500)); + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + std::this_thread::sleep_for(milliseconds(500)); + // Get data and check if span name exists in aggregation + data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_TRUE(data.find(span_name1) != data.end()); + VerifySpanCountsInTracezData(span_name1, data.at(span_name1), 0, 0, {1, 0, 0, 0, 0, 0, 0, 0, 0}); + + ASSERT_TRUE(data.find(span_name2) != data.end()); + VerifySpanCountsInTracezData(span_name2, data.at(span_name2), 1, 0, {0, 0, 0, 0, 0, 0, 0, 0, 0}); + + ASSERT_TRUE(data.find(span_name3) != data.end()); + VerifySpanCountsInTracezData(span_name3, data.at(span_name3), 0, 1, {0, 0, 0, 0, 0, 0, 0, 0, 0}); + + running_span->End(); +} diff --git a/src/jaegertracing/opentelemetry-cpp/ext/test/zpages/tracez_processor_test.cc b/src/jaegertracing/opentelemetry-cpp/ext/test/zpages/tracez_processor_test.cc new file mode 100644 index 000000000..f96385acd --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/ext/test/zpages/tracez_processor_test.cc @@ -0,0 +1,646 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "opentelemetry/ext/zpages/tracez_processor.h" + +#include <gtest/gtest.h> + +#include <thread> + +#include "opentelemetry/ext/zpages/threadsafe_span_data.h" +#include "opentelemetry/nostd/span.h" +#include "opentelemetry/sdk/resource/resource.h" +#include "opentelemetry/sdk/trace/tracer.h" + +using namespace opentelemetry::sdk::trace; +using namespace opentelemetry::ext::zpages; + +//////////////////////////////////// TEST HELPER FUNCTIONS ////////////////////////////// + +/* + * Helper function uses the current processor to update spans contained in completed_spans + * and running_spans. completed_spans contains all spans (cumulative), unless marked otherwise + */ +void UpdateSpans(std::shared_ptr<TracezSharedData> &data, + std::vector<std::unique_ptr<ThreadsafeSpanData>> &completed, + std::unordered_set<ThreadsafeSpanData *> &running, + bool store_only_new_completed = false) +{ + auto spans = data->GetSpanSnapshot(); + running = spans.running; + if (store_only_new_completed) + { + completed.clear(); + completed = std::move(spans.completed); + } + else + { + std::move(spans.completed.begin(), spans.completed.end(), + std::inserter(completed, completed.end())); + } + spans.completed.clear(); +} + +/* + * Returns true if all the span names in the name vector within the given range appears in + * at least the same frequency as they do in running_spans. + * + * If no start value is given, start at index 0 + * If no end value is given, end at name vector end + * If 1-1 correspondance marked, return true if completed has all names in same frequency, + * no more or less + */ +bool ContainsNames(const std::vector<std::string> &names, + std::unordered_set<ThreadsafeSpanData *> &running, + size_t name_start = 0, + size_t name_end = 0, + bool one_to_one_correspondence = false) +{ + if (name_end == 0) + name_end = names.size(); + + size_t num_names = name_end - name_start; + + if (num_names > running.size() || // More names than spans, can't have all names + (one_to_one_correspondence && num_names != running.size())) + { + return false; + } + std::vector<bool> is_contained(num_names, false); + + // Mark all names that are contained only once + // in the order they appear + for (auto &span : running) + { + for (unsigned int i = 0; i < num_names; i++) + { + if (span->GetName() == names[name_start + i] && !is_contained[i]) + { + is_contained[i] = true; + break; + } + } + } + + for (auto &&b : is_contained) + if (!b) + return false; + + return true; +} + +/* + * Returns true if all the span names in the nam vector within the given range appears in + * at least the same frequency as they do in completed_spans + * + * If no start value is given, start at index 0 + * If no end value is given, end at name vector end + * If 1-1 correspondance marked, return true if completed has all names in same frequency, + * no more or less + */ +bool ContainsNames(const std::vector<std::string> &names, + std::vector<std::unique_ptr<ThreadsafeSpanData>> &completed, + size_t name_start = 0, + size_t name_end = 0, + bool one_to_one_correspondence = false) +{ + + if (name_end == 0) + name_end = names.size(); + + size_t num_names = name_end - name_start; + + if (num_names > completed.size() || (one_to_one_correspondence && num_names != completed.size())) + { + return false; + } + std::vector<bool> is_contained(num_names, false); + + for (auto &span : completed) + { + for (unsigned int i = 0; i < num_names; i++) + { + if (span->GetName() == names[name_start + i] && !is_contained[i]) + { + is_contained[i] = true; + break; + } + } + } + + for (auto &&b : is_contained) + if (!b) + return false; + + return true; +} + +/* + * Helper function calls GetSpanSnapshot() i times and does nothing with it + * otherwise. Used for testing thread safety + */ +void GetManySnapshots(std::shared_ptr<TracezSharedData> &data, int i) +{ + for (; i > 0; i--) + data->GetSpanSnapshot(); +} + +/* + * Helper function that creates i spans, which are added into the passed + * in vector. Used for testing thread safety + */ +void StartManySpans( + std::vector<opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>> &spans, + std::shared_ptr<opentelemetry::trace::Tracer> tracer, + int i) +{ + for (; i > 0; i--) + spans.push_back(tracer->StartSpan("span")); +} + +/* + * Helper function that ends all spans in the passed in span vector. Used + * for testing thread safety + */ +void EndAllSpans(std::vector<opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>> &spans) +{ + for (auto &span : spans) + span->End(); +} + +//////////////////////////////// TEST FIXTURE ////////////////////////////////////// + +/* + * Reduce code duplication by having single area with shared setup code + */ +class TracezProcessor : public ::testing::Test +{ +protected: + void SetUp() override + { + shared_data = std::shared_ptr<TracezSharedData>(new TracezSharedData()); + processor = std::shared_ptr<TracezSpanProcessor>(new TracezSpanProcessor(shared_data)); + std::unique_ptr<SpanProcessor> processor2(new TracezSpanProcessor(shared_data)); + std::vector<std::unique_ptr<SpanProcessor>> processors; + processors.push_back(std::move(processor2)); + auto resource = opentelemetry::sdk::resource::Resource::Create({}); + + // Note: we make a *different* processor for the tracercontext. THis is because + // all the tests use shared data, and we want to make sure this works correctly. + auto context = std::make_shared<TracerContext>(std::move(processors), resource); + + tracer = std::shared_ptr<opentelemetry::trace::Tracer>(new Tracer(context)); + auto spans = shared_data->GetSpanSnapshot(); + running = spans.running; + completed = std::move(spans.completed); + + span_names = {"s0", "s2", "s1", "s1", "s"}; + } + + std::shared_ptr<TracezSharedData> shared_data; + std::shared_ptr<TracezSpanProcessor> processor; + std::shared_ptr<opentelemetry::trace::Tracer> tracer; + + std::vector<std::string> span_names; + std::vector<opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>> span_vars; + + std::unordered_set<ThreadsafeSpanData *> running; + std::vector<std::unique_ptr<ThreadsafeSpanData>> completed; +}; + +///////////////////////////////////////// TESTS /////////////////////////////////// + +/* + * Test if both span containers are empty when no spans exist or are added. + * Ensures no rogue spans appear in the containers somehow. + */ +TEST_F(TracezProcessor, NoSpans) +{ + auto recordable = processor->MakeRecordable(); + + EXPECT_EQ(running.size(), 0); + EXPECT_EQ(completed.size(), 0); +} + +/* + * Test if a single span moves from running to completed at expected times. + * All completed spans are stored. Ensures basic functionality and that accumulation + * can happen + */ +TEST_F(TracezProcessor, OneSpanCumulative) +{ + auto span = tracer->StartSpan(span_names[0]); + UpdateSpans(shared_data, completed, running); + + EXPECT_TRUE(ContainsNames(span_names, running, 0, 1, true)); + EXPECT_EQ(running.size(), 1); + EXPECT_EQ(completed.size(), 0); + + span->End(); + UpdateSpans(shared_data, completed, running); + + EXPECT_TRUE(ContainsNames(span_names, completed, 0, 1, true)); + EXPECT_EQ(running.size(), 0); + EXPECT_EQ(completed.size(), 1); +} + +/* + * Test if multiple spans move from running to completed at expected times. Check if + * all are in a container, either running/completed during checks. Ensures basic functionality + * and that accumulation can happen for many spans + * All completed spans are stored. + */ +TEST_F(TracezProcessor, MultipleSpansCumulative) +{ + EXPECT_EQ(running.size(), 0); + EXPECT_EQ(completed.size(), 0); + + // Start and store spans using span_names + for (const auto &name : span_names) + span_vars.push_back(tracer->StartSpan(name)); + UpdateSpans(shared_data, completed, running); + + EXPECT_TRUE(ContainsNames(span_names, running)); // s0 s2 s1 s1 s + EXPECT_EQ(running.size(), span_names.size()); + EXPECT_EQ(completed.size(), 0); + + // End all spans + for (auto &span : span_vars) + span->End(); + UpdateSpans(shared_data, completed, running); + + EXPECT_TRUE(ContainsNames(span_names, completed)); // s0 s2 s1 s1 s + EXPECT_EQ(running.size(), 0); + EXPECT_EQ(completed.size(), span_names.size()); +} + +/* + * Test if multiple spans move from running to completed at expected times, + * running/completed spans are split. Middle spans end first. Ensures basic functionality + * and that accumulation can happen for many spans even spans that start and end non- + * sequentially. All completed spans are stored. + */ +TEST_F(TracezProcessor, MultipleSpansMiddleSplitCumulative) +{ + for (const auto &name : span_names) + span_vars.push_back(tracer->StartSpan(name)); + UpdateSpans(shared_data, completed, running); + + EXPECT_TRUE(ContainsNames(span_names, running)); // s0 s2 s1 s1 s + EXPECT_EQ(running.size(), span_names.size()); + EXPECT_EQ(completed.size(), 0); + + // End 4th span + span_vars[3]->End(); + UpdateSpans(shared_data, completed, running); + + EXPECT_TRUE(ContainsNames(span_names, running, 0, 3)); // s0 s2 s1 + EXPECT_TRUE(ContainsNames(span_names, running, 4)); // + s + EXPECT_TRUE(ContainsNames(span_names, completed, 3, 4)); // s1 + EXPECT_EQ(running.size(), 4); + EXPECT_EQ(completed.size(), 1); + + // End 2nd span + span_vars[1]->End(); + UpdateSpans(shared_data, completed, running); + + EXPECT_TRUE(ContainsNames(span_names, running, 0, 1)); // s0 + EXPECT_TRUE(ContainsNames(span_names, running, 2, 3)); // + s1 + EXPECT_TRUE(ContainsNames(span_names, running, 4)); // + s + EXPECT_TRUE(ContainsNames(span_names, completed, 1, 2)); // s2 + EXPECT_TRUE(ContainsNames(span_names, completed, 3, 4)); // s1 + EXPECT_EQ(running.size(), 3); + EXPECT_EQ(completed.size(), 2); + + // End 3rd span (last middle span) + span_vars[2]->End(); + UpdateSpans(shared_data, completed, running); + + EXPECT_TRUE(ContainsNames(span_names, running, 0, 1)); // s0 + EXPECT_TRUE(ContainsNames(span_names, running, 4)); // + s + EXPECT_TRUE(ContainsNames(span_names, completed, 1, 4)); // s2 s1 s1 + EXPECT_EQ(running.size(), 2); + EXPECT_EQ(completed.size(), 3); + + // End remaining Spans + span_vars[0]->End(); + span_vars[4]->End(); + UpdateSpans(shared_data, completed, running); + + EXPECT_TRUE(ContainsNames(span_names, completed)); // s0 s2 s1 s1 s + EXPECT_EQ(running.size(), 0); + EXPECT_EQ(completed.size(), 5); +} + +/* + * Test if multiple spans move from running to completed at expected times, + * running/completed spans are split. Ensures basic functionality and that + * accumulation can happen for many spans even spans that start and end non- + * sequentially. All completed spans are stored. + */ +TEST_F(TracezProcessor, MultipleSpansOuterSplitCumulative) +{ + for (const auto &name : span_names) + span_vars.push_back(tracer->StartSpan(name)); + + // End last span + span_vars[4]->End(); + UpdateSpans(shared_data, completed, running); + + EXPECT_TRUE(ContainsNames(span_names, running, 0, 4)); // s0 s2 s1 s1 + EXPECT_TRUE(ContainsNames(span_names, completed, 4)); // s + EXPECT_EQ(running.size(), 4); + EXPECT_EQ(completed.size(), 1); + + // End first span + span_vars[0]->End(); + UpdateSpans(shared_data, completed, running); + + EXPECT_TRUE(ContainsNames(span_names, running, 1, 4)); // s2 s1 s1 + EXPECT_TRUE(ContainsNames(span_names, completed, 0, 1)); // s0 + EXPECT_TRUE(ContainsNames(span_names, completed, 4)); // s + EXPECT_EQ(running.size(), 3); + EXPECT_EQ(completed.size(), 2); + + // End remaining Spans + for (int i = 1; i < 4; i++) + span_vars[i]->End(); + UpdateSpans(shared_data, completed, running); + + EXPECT_TRUE(ContainsNames(span_names, completed)); // s0 s2 s1 s1 s + EXPECT_EQ(running.size(), 0); + EXPECT_EQ(completed.size(), 5); +} + +/* + * Test if a single span moves from running to completed at expected times. + * Ensure correct behavior even when spans are discarded. Only new completed + * spans are stored. + */ +TEST_F(TracezProcessor, OneSpanNewOnly) +{ + auto span = tracer->StartSpan(span_names[0]); + UpdateSpans(shared_data, completed, running, true); + + EXPECT_TRUE(ContainsNames(span_names, running, 0, 1, true)); + EXPECT_EQ(running.size(), 1); + EXPECT_EQ(completed.size(), 0); + + span->End(); + UpdateSpans(shared_data, completed, running, true); + + EXPECT_TRUE(ContainsNames(span_names, completed, 0, 1, true)); + EXPECT_EQ(running.size(), 0); + EXPECT_EQ(completed.size(), 1); + + UpdateSpans(shared_data, completed, running, true); + + EXPECT_EQ(running.size(), 0); + EXPECT_EQ(completed.size(), 0); +} + +/* + * Test if multiple spans move from running to completed at expected times, + * running/completed spans are split. Middle spans end first. Ensure correct + * behavior even when multiple spans are discarded, even when span starting and + * ending is non-sequential. Only new completed spans are stored. + */ +TEST_F(TracezProcessor, MultipleSpansMiddleSplitNewOnly) +{ + for (const auto &name : span_names) + span_vars.push_back(tracer->StartSpan(name)); + UpdateSpans(shared_data, completed, running); + + EXPECT_TRUE(ContainsNames(span_names, running, 0, 5, true)); // s0 s2 s1 s1 s + EXPECT_EQ(running.size(), span_names.size()); + EXPECT_EQ(completed.size(), 0); + + // End 4th span + span_vars[3]->End(); + UpdateSpans(shared_data, completed, running, true); + + EXPECT_TRUE(ContainsNames(span_names, running, 0, 3)); // s0 s2 s1 + EXPECT_TRUE(ContainsNames(span_names, running, 4)); // + s + EXPECT_TRUE(ContainsNames(span_names, completed, 3, 4, true)); // s1 + EXPECT_EQ(running.size(), 4); + EXPECT_EQ(completed.size(), 1); + + // End 2nd and 3rd span + span_vars[1]->End(); + span_vars[2]->End(); + UpdateSpans(shared_data, completed, running, true); + + EXPECT_TRUE(ContainsNames(span_names, running, 0, 1)); // s0 + EXPECT_TRUE(ContainsNames(span_names, running, 4)); // + s + EXPECT_TRUE(ContainsNames(span_names, completed, 1, 3, true)); // s2 s1 + EXPECT_EQ(running.size(), 2); + EXPECT_EQ(completed.size(), 2); + + // End remaining Spans + span_vars[0]->End(); + span_vars[4]->End(); + UpdateSpans(shared_data, completed, running, true); + + EXPECT_TRUE(ContainsNames(span_names, completed, 0, 1)); // s0 + EXPECT_TRUE(ContainsNames(span_names, completed, 4)); // s + EXPECT_EQ(running.size(), 0); + EXPECT_EQ(completed.size(), 2); + + UpdateSpans(shared_data, completed, running, true); + + EXPECT_EQ(running.size(), 0); + EXPECT_EQ(completed.size(), 0); +} + +/* + * Test if multiple spans move from running to completed at expected times, + * running/completed spans are split. Ensure correct behavior even when + * multiple spans are discarded, even when span starting and ending is + * non-sequential. Only new completed spans are stored. + */ +TEST_F(TracezProcessor, MultipleSpansOuterSplitNewOnly) +{ + for (const auto &name : span_names) + span_vars.push_back(tracer->StartSpan(name)); + + // End last span + span_vars[4]->End(); + UpdateSpans(shared_data, completed, running, true); + + EXPECT_TRUE(ContainsNames(span_names, running, 0, 4, true)); // s0 s2 s1 s1 + EXPECT_TRUE(ContainsNames(span_names, completed, 4, 5, true)); // s + EXPECT_EQ(running.size(), 4); + EXPECT_EQ(completed.size(), 1); + + // End first span + span_vars[0]->End(); + UpdateSpans(shared_data, completed, running, true); + + EXPECT_TRUE(ContainsNames(span_names, running, 1, 4, true)); // s2 s1 s1 + EXPECT_TRUE(ContainsNames(span_names, completed, 0, 1, true)); // s0 + EXPECT_EQ(running.size(), 3); + EXPECT_EQ(completed.size(), 1); + + // End remaining middle spans + for (int i = 1; i < 4; i++) + span_vars[i]->End(); + UpdateSpans(shared_data, completed, running, true); + + EXPECT_TRUE(ContainsNames(span_names, completed, 1, 4, true)); // s2 s1 s1 + EXPECT_EQ(running.size(), 0); + EXPECT_EQ(completed.size(), 3); + + UpdateSpans(shared_data, completed, running, true); + + EXPECT_EQ(running.size(), 0); + EXPECT_EQ(completed.size(), 0); +} + +/* + * Test for ForceFlush and Shutdown code coverage, which do nothing. + */ +TEST_F(TracezProcessor, FlushShutdown) +{ + auto pre_running_sz = running.size(); + auto pre_completed_sz = completed.size(); + + EXPECT_TRUE(processor->ForceFlush()); + EXPECT_TRUE(processor->Shutdown()); + + UpdateSpans(shared_data, completed, running); + + EXPECT_EQ(pre_running_sz, running.size()); + EXPECT_EQ(pre_completed_sz, completed.size()); +} + +/* + * Test for thread safety when many spans start at the same time. + */ +TEST_F(TracezProcessor, RunningThreadSafety) +{ + std::vector<opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>> spans1; + std::vector<opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>> spans2; + + std::thread start1(StartManySpans, std::ref(spans1), tracer, 500); + std::thread start2(StartManySpans, std::ref(spans2), tracer, 500); + + start1.join(); + start2.join(); + + EndAllSpans(spans1); + EndAllSpans(spans2); +} + +/* + * Test for thread safety when many spans end at the same time + */ +TEST_F(TracezProcessor, CompletedThreadSafety) +{ + std::vector<opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>> spans1; + std::vector<opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>> spans2; + + StartManySpans(spans1, tracer, 500); + StartManySpans(spans2, tracer, 500); + + std::thread end1(EndAllSpans, std::ref(spans1)); + std::thread end2(EndAllSpans, std::ref(spans2)); + + end1.join(); + end2.join(); +} + +/* + * Test for thread safety when many snapshots are grabbed at the same time. + */ +TEST_F(TracezProcessor, SnapshotThreadSafety) +{ + std::vector<opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>> spans; + + std::thread snap1(GetManySnapshots, std::ref(shared_data), 500); + std::thread snap2(GetManySnapshots, std::ref(shared_data), 500); + + snap1.join(); + snap2.join(); + + StartManySpans(spans, tracer, 500); + + std::thread snap3(GetManySnapshots, std::ref(shared_data), 500); + std::thread snap4(GetManySnapshots, std::ref(shared_data), 500); + + snap3.join(); + snap4.join(); + + EndAllSpans(spans); +} + +/* + * Test for thread safety when many spans start while others are ending. + */ +TEST_F(TracezProcessor, RunningCompletedThreadSafety) +{ + std::vector<opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>> spans1; + std::vector<opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>> spans2; + + StartManySpans(spans1, tracer, 500); + + std::thread start(StartManySpans, std::ref(spans2), tracer, 500); + std::thread end(EndAllSpans, std::ref(spans1)); + + start.join(); + end.join(); + + EndAllSpans(spans2); +} + +/* + * Test for thread safety when many span start while snapshots are being grabbed. + */ +TEST_F(TracezProcessor, RunningSnapshotThreadSafety) +{ + std::vector<opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>> spans; + + std::thread start(StartManySpans, std::ref(spans), tracer, 500); + std::thread snapshots(GetManySnapshots, std::ref(shared_data), 500); + + start.join(); + snapshots.join(); + + EndAllSpans(spans); +} + +/* + * Test for thread safety when many spans end while snapshots are being grabbed. + */ +TEST_F(TracezProcessor, SnapshotCompletedThreadSafety) +{ + std::vector<opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>> spans; + + StartManySpans(spans, tracer, 500); + + std::thread snapshots(GetManySnapshots, std::ref(shared_data), 500); + std::thread end(EndAllSpans, std::ref(spans)); + + snapshots.join(); + end.join(); +} + +/* + * Test for thread safety when many spans start and end while snapshots are being grabbed. + */ +TEST_F(TracezProcessor, RunningSnapshotCompletedThreadSafety) +{ + std::vector<opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>> spans1; + std::vector<opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>> spans2; + + StartManySpans(spans1, tracer, 500); + + std::thread start(StartManySpans, std::ref(spans2), tracer, 500); + std::thread snapshots(GetManySnapshots, std::ref(shared_data), 500); + std::thread end(EndAllSpans, std::ref(spans1)); + + start.join(); + snapshots.join(); + end.join(); + + EndAllSpans(spans2); +} |