diff options
Diffstat (limited to 'src/s3select/rapidjson/example/parsebyparts')
-rw-r--r-- | src/s3select/rapidjson/example/parsebyparts/parsebyparts.cpp | 176 |
1 files changed, 176 insertions, 0 deletions
diff --git a/src/s3select/rapidjson/example/parsebyparts/parsebyparts.cpp b/src/s3select/rapidjson/example/parsebyparts/parsebyparts.cpp new file mode 100644 index 000000000..ff735394e --- /dev/null +++ b/src/s3select/rapidjson/example/parsebyparts/parsebyparts.cpp @@ -0,0 +1,176 @@ +// Example of parsing JSON to document by parts. + +// Using C++11 threads +// Temporarily disable for clang (older version) due to incompatibility with libstdc++ +#if (__cplusplus >= 201103L || (defined(_MSC_VER) && _MSC_VER >= 1700)) && !defined(__clang__) + +#include "rapidjson/document.h" +#include "rapidjson/error/en.h" +#include "rapidjson/writer.h" +#include "rapidjson/ostreamwrapper.h" +#include <condition_variable> +#include <iostream> +#include <mutex> +#include <thread> + +using namespace rapidjson; + +template<unsigned parseFlags = kParseDefaultFlags> +class AsyncDocumentParser { +public: + AsyncDocumentParser(Document& d) + : stream_(*this) + , d_(d) + , parseThread_() + , mutex_() + , notEmpty_() + , finish_() + , completed_() + { + // Create and execute thread after all member variables are initialized. + parseThread_ = std::thread(&AsyncDocumentParser::Parse, this); + } + + ~AsyncDocumentParser() { + if (!parseThread_.joinable()) + return; + + { + std::unique_lock<std::mutex> lock(mutex_); + + // Wait until the buffer is read up (or parsing is completed) + while (!stream_.Empty() && !completed_) + finish_.wait(lock); + + // Automatically append '\0' as the terminator in the stream. + static const char terminator[] = ""; + stream_.src_ = terminator; + stream_.end_ = terminator + 1; + notEmpty_.notify_one(); // unblock the AsyncStringStream + } + + parseThread_.join(); + } + + void ParsePart(const char* buffer, size_t length) { + std::unique_lock<std::mutex> lock(mutex_); + + // Wait until the buffer is read up (or parsing is completed) + while (!stream_.Empty() && !completed_) + finish_.wait(lock); + + // Stop further parsing if the parsing process is completed. + if (completed_) + return; + + // Set the buffer to stream and unblock the AsyncStringStream + stream_.src_ = buffer; + stream_.end_ = buffer + length; + notEmpty_.notify_one(); + } + +private: + void Parse() { + d_.ParseStream<parseFlags>(stream_); + + // The stream may not be fully read, notify finish anyway to unblock ParsePart() + std::unique_lock<std::mutex> lock(mutex_); + completed_ = true; // Parsing process is completed + finish_.notify_one(); // Unblock ParsePart() or destructor if they are waiting. + } + + struct AsyncStringStream { + typedef char Ch; + + AsyncStringStream(AsyncDocumentParser& parser) : parser_(parser), src_(), end_(), count_() {} + + char Peek() const { + std::unique_lock<std::mutex> lock(parser_.mutex_); + + // If nothing in stream, block to wait. + while (Empty()) + parser_.notEmpty_.wait(lock); + + return *src_; + } + + char Take() { + std::unique_lock<std::mutex> lock(parser_.mutex_); + + // If nothing in stream, block to wait. + while (Empty()) + parser_.notEmpty_.wait(lock); + + count_++; + char c = *src_++; + + // If all stream is read up, notify that the stream is finish. + if (Empty()) + parser_.finish_.notify_one(); + + return c; + } + + size_t Tell() const { return count_; } + + // Not implemented + char* PutBegin() { return 0; } + void Put(char) {} + void Flush() {} + size_t PutEnd(char*) { return 0; } + + bool Empty() const { return src_ == end_; } + + AsyncDocumentParser& parser_; + const char* src_; //!< Current read position. + const char* end_; //!< End of buffer + size_t count_; //!< Number of characters taken so far. + }; + + AsyncStringStream stream_; + Document& d_; + std::thread parseThread_; + std::mutex mutex_; + std::condition_variable notEmpty_; + std::condition_variable finish_; + bool completed_; +}; + +int main() { + Document d; + + { + AsyncDocumentParser<> parser(d); + + const char json1[] = " { \"hello\" : \"world\", \"t\" : tr"; + //const char json1[] = " { \"hello\" : \"world\", \"t\" : trX"; // For test parsing error + const char json2[] = "ue, \"f\" : false, \"n\": null, \"i\":123, \"pi\": 3.14"; + const char json3[] = "16, \"a\":[1, 2, 3, 4] } "; + + parser.ParsePart(json1, sizeof(json1) - 1); + parser.ParsePart(json2, sizeof(json2) - 1); + parser.ParsePart(json3, sizeof(json3) - 1); + } + + if (d.HasParseError()) { + std::cout << "Error at offset " << d.GetErrorOffset() << ": " << GetParseError_En(d.GetParseError()) << std::endl; + return EXIT_FAILURE; + } + + // Stringify the JSON to cout + OStreamWrapper os(std::cout); + Writer<OStreamWrapper> writer(os); + d.Accept(writer); + std::cout << std::endl; + + return EXIT_SUCCESS; +} + +#else // Not supporting C++11 + +#include <iostream> +int main() { + std::cout << "This example requires C++11 compiler" << std::endl; +} + +#endif |