parsebyparts.cpp 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. // Example of parsing JSON to document by parts.
  2. // Using C++11 threads
  3. #if __cplusplus >= 201103L || (defined(_MSC_VER) && _MSC_VER >= 1700)
  4. #include "rapidjson/document.h"
  5. #include "rapidjson/error/en.h"
  6. #include "rapidjson/writer.h"
  7. #include "rapidjson/ostreamwrapper.h"
  8. #include <condition_variable>
  9. #include <iostream>
  10. #include <mutex>
  11. #include <thread>
  12. using namespace rapidjson;
  13. template<unsigned parseFlags = kParseDefaultFlags>
  14. class AsyncDocumentParser {
  15. public:
  16. AsyncDocumentParser(Document& d)
  17. : stream_(*this)
  18. , d_(d)
  19. , parseThread_(&AsyncDocumentParser::Parse, this)
  20. , mutex_()
  21. , notEmpty_()
  22. , finish_()
  23. , completed_()
  24. {}
  25. ~AsyncDocumentParser() {
  26. if (!parseThread_.joinable())
  27. return;
  28. {
  29. std::unique_lock<std::mutex> lock(mutex_);
  30. // Wait until the buffer is read up (or parsing is completed)
  31. while (!stream_.Empty() && !completed_)
  32. finish_.wait(lock);
  33. // Automatically append '\0' as the terminator in the stream.
  34. static const char terminator[] = "";
  35. stream_.src_ = terminator;
  36. stream_.end_ = terminator + 1;
  37. notEmpty_.notify_one(); // unblock the AsyncStringStream
  38. }
  39. parseThread_.join();
  40. }
  41. void ParsePart(const char* buffer, size_t length) {
  42. std::unique_lock<std::mutex> lock(mutex_);
  43. // Wait until the buffer is read up (or parsing is completed)
  44. while (!stream_.Empty() && !completed_)
  45. finish_.wait(lock);
  46. // Stop further parsing if the parsing process is completed.
  47. if (completed_)
  48. return;
  49. // Set the buffer to stream and unblock the AsyncStringStream
  50. stream_.src_ = buffer;
  51. stream_.end_ = buffer + length;
  52. notEmpty_.notify_one();
  53. }
  54. private:
  55. void Parse() {
  56. d_.ParseStream<parseFlags>(stream_);
  57. // The stream may not be fully read, notify finish anyway to unblock ParsePart()
  58. std::unique_lock<std::mutex> lock(mutex_);
  59. completed_ = true; // Parsing process is completed
  60. finish_.notify_one(); // Unblock ParsePart() or destructor if they are waiting.
  61. }
  62. struct AsyncStringStream {
  63. typedef char Ch;
  64. AsyncStringStream(AsyncDocumentParser& parser) : parser_(parser), src_(), end_(), count_() {}
  65. char Peek() const {
  66. std::unique_lock<std::mutex> lock(parser_.mutex_);
  67. // If nothing in stream, block to wait.
  68. while (Empty())
  69. parser_.notEmpty_.wait(lock);
  70. return *src_;
  71. }
  72. char Take() {
  73. std::unique_lock<std::mutex> lock(parser_.mutex_);
  74. // If nothing in stream, block to wait.
  75. while (Empty())
  76. parser_.notEmpty_.wait(lock);
  77. count_++;
  78. char c = *src_++;
  79. // If all stream is read up, notify that the stream is finish.
  80. if (Empty())
  81. parser_.finish_.notify_one();
  82. return c;
  83. }
  84. size_t Tell() const { return count_; }
  85. // Not implemented
  86. char* PutBegin() { return 0; }
  87. void Put(char) {}
  88. void Flush() {}
  89. size_t PutEnd(char*) { return 0; }
  90. bool Empty() const { return src_ == end_; }
  91. AsyncDocumentParser& parser_;
  92. const char* src_; //!< Current read position.
  93. const char* end_; //!< End of buffer
  94. size_t count_; //!< Number of characters taken so far.
  95. };
  96. AsyncStringStream stream_;
  97. Document& d_;
  98. std::thread parseThread_;
  99. std::mutex mutex_;
  100. std::condition_variable notEmpty_;
  101. std::condition_variable finish_;
  102. bool completed_;
  103. };
  104. int main() {
  105. Document d;
  106. {
  107. AsyncDocumentParser<> parser(d);
  108. const char json1[] = " { \"hello\" : \"world\", \"t\" : tr";
  109. //const char json1[] = " { \"hello\" : \"world\", \"t\" : trX"; // Fot test parsing error
  110. const char json2[] = "ue, \"f\" : false, \"n\": null, \"i\":123, \"pi\": 3.14";
  111. const char json3[] = "16, \"a\":[1, 2, 3, 4] } ";
  112. parser.ParsePart(json1, sizeof(json1) - 1);
  113. parser.ParsePart(json2, sizeof(json2) - 1);
  114. parser.ParsePart(json3, sizeof(json3) - 1);
  115. }
  116. if (d.HasParseError()) {
  117. std::cout << "Error at offset " << d.GetErrorOffset() << ": " << GetParseError_En(d.GetParseError()) << std::endl;
  118. return EXIT_FAILURE;
  119. }
  120. // Stringify the JSON to cout
  121. OStreamWrapper os(std::cout);
  122. Writer<OStreamWrapper> writer(os);
  123. d.Accept(writer);
  124. std::cout << std::endl;
  125. return EXIT_SUCCESS;
  126. }
  127. #else // Not supporting C++11
  128. #include <iostream>
  129. int main() {
  130. std::cout << "This example requires C++11 compiler" << std::endl;
  131. }
  132. #endif