summaryrefslogtreecommitdiffstats
path: root/src/s3select/rapidjson/example/parsebyparts/parsebyparts.cpp
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/s3select/rapidjson/example/parsebyparts/parsebyparts.cpp
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/s3select/rapidjson/example/parsebyparts/parsebyparts.cpp')
-rw-r--r--src/s3select/rapidjson/example/parsebyparts/parsebyparts.cpp176
1 files changed, 176 insertions, 0 deletions
diff --git a/src/s3select/rapidjson/example/parsebyparts/parsebyparts.cpp b/src/s3select/rapidjson/example/parsebyparts/parsebyparts.cpp
new file mode 100644
index 000000000..ff735394e
--- /dev/null
+++ b/src/s3select/rapidjson/example/parsebyparts/parsebyparts.cpp
@@ -0,0 +1,176 @@
+// Example of parsing JSON to document by parts.
+
+// Using C++11 threads
+// Temporarily disable for clang (older version) due to incompatibility with libstdc++
+#if (__cplusplus >= 201103L || (defined(_MSC_VER) && _MSC_VER >= 1700)) && !defined(__clang__)
+
+#include "rapidjson/document.h"
+#include "rapidjson/error/en.h"
+#include "rapidjson/writer.h"
+#include "rapidjson/ostreamwrapper.h"
+#include <condition_variable>
+#include <iostream>
+#include <mutex>
+#include <thread>
+
+using namespace rapidjson;
+
+template<unsigned parseFlags = kParseDefaultFlags>
+class AsyncDocumentParser {
+public:
+ AsyncDocumentParser(Document& d)
+ : stream_(*this)
+ , d_(d)
+ , parseThread_()
+ , mutex_()
+ , notEmpty_()
+ , finish_()
+ , completed_()
+ {
+ // Create and execute thread after all member variables are initialized.
+ parseThread_ = std::thread(&AsyncDocumentParser::Parse, this);
+ }
+
+ ~AsyncDocumentParser() {
+ if (!parseThread_.joinable())
+ return;
+
+ {
+ std::unique_lock<std::mutex> lock(mutex_);
+
+ // Wait until the buffer is read up (or parsing is completed)
+ while (!stream_.Empty() && !completed_)
+ finish_.wait(lock);
+
+ // Automatically append '\0' as the terminator in the stream.
+ static const char terminator[] = "";
+ stream_.src_ = terminator;
+ stream_.end_ = terminator + 1;
+ notEmpty_.notify_one(); // unblock the AsyncStringStream
+ }
+
+ parseThread_.join();
+ }
+
+ void ParsePart(const char* buffer, size_t length) {
+ std::unique_lock<std::mutex> lock(mutex_);
+
+ // Wait until the buffer is read up (or parsing is completed)
+ while (!stream_.Empty() && !completed_)
+ finish_.wait(lock);
+
+ // Stop further parsing if the parsing process is completed.
+ if (completed_)
+ return;
+
+ // Set the buffer to stream and unblock the AsyncStringStream
+ stream_.src_ = buffer;
+ stream_.end_ = buffer + length;
+ notEmpty_.notify_one();
+ }
+
+private:
+ void Parse() {
+ d_.ParseStream<parseFlags>(stream_);
+
+ // The stream may not be fully read, notify finish anyway to unblock ParsePart()
+ std::unique_lock<std::mutex> lock(mutex_);
+ completed_ = true; // Parsing process is completed
+ finish_.notify_one(); // Unblock ParsePart() or destructor if they are waiting.
+ }
+
+ struct AsyncStringStream {
+ typedef char Ch;
+
+ AsyncStringStream(AsyncDocumentParser& parser) : parser_(parser), src_(), end_(), count_() {}
+
+ char Peek() const {
+ std::unique_lock<std::mutex> lock(parser_.mutex_);
+
+ // If nothing in stream, block to wait.
+ while (Empty())
+ parser_.notEmpty_.wait(lock);
+
+ return *src_;
+ }
+
+ char Take() {
+ std::unique_lock<std::mutex> lock(parser_.mutex_);
+
+ // If nothing in stream, block to wait.
+ while (Empty())
+ parser_.notEmpty_.wait(lock);
+
+ count_++;
+ char c = *src_++;
+
+ // If all stream is read up, notify that the stream is finish.
+ if (Empty())
+ parser_.finish_.notify_one();
+
+ return c;
+ }
+
+ size_t Tell() const { return count_; }
+
+ // Not implemented
+ char* PutBegin() { return 0; }
+ void Put(char) {}
+ void Flush() {}
+ size_t PutEnd(char*) { return 0; }
+
+ bool Empty() const { return src_ == end_; }
+
+ AsyncDocumentParser& parser_;
+ const char* src_; //!< Current read position.
+ const char* end_; //!< End of buffer
+ size_t count_; //!< Number of characters taken so far.
+ };
+
+ AsyncStringStream stream_;
+ Document& d_;
+ std::thread parseThread_;
+ std::mutex mutex_;
+ std::condition_variable notEmpty_;
+ std::condition_variable finish_;
+ bool completed_;
+};
+
+int main() {
+ Document d;
+
+ {
+ AsyncDocumentParser<> parser(d);
+
+ const char json1[] = " { \"hello\" : \"world\", \"t\" : tr";
+ //const char json1[] = " { \"hello\" : \"world\", \"t\" : trX"; // For test parsing error
+ const char json2[] = "ue, \"f\" : false, \"n\": null, \"i\":123, \"pi\": 3.14";
+ const char json3[] = "16, \"a\":[1, 2, 3, 4] } ";
+
+ parser.ParsePart(json1, sizeof(json1) - 1);
+ parser.ParsePart(json2, sizeof(json2) - 1);
+ parser.ParsePart(json3, sizeof(json3) - 1);
+ }
+
+ if (d.HasParseError()) {
+ std::cout << "Error at offset " << d.GetErrorOffset() << ": " << GetParseError_En(d.GetParseError()) << std::endl;
+ return EXIT_FAILURE;
+ }
+
+ // Stringify the JSON to cout
+ OStreamWrapper os(std::cout);
+ Writer<OStreamWrapper> writer(os);
+ d.Accept(writer);
+ std::cout << std::endl;
+
+ return EXIT_SUCCESS;
+}
+
+#else // Not supporting C++11
+
+#include <iostream>
+int main() {
+ std::cout << "This example requires C++11 compiler" << std::endl;
+}
+
+#endif