parsebyparts.cpp 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. // Example of parsing JSON to document by parts.
  2. // Using C++11 threads
  3. // Temporarily disable for clang (older version) due to incompatibility with libstdc++
  4. #include <rtthread.h>
  5. #if (__cplusplus >= 201103L || (defined(_MSC_VER) && _MSC_VER >= 1700)) && !defined(__clang__)
  6. #include "rapidjson/document.h"
  7. #include "rapidjson/error/en.h"
  8. #include "rapidjson/writer.h"
  9. #include "rapidjson/ostreamwrapper.h"
  10. #include <condition_variable>
  11. #include <iostream>
  12. #include <mutex>
  13. #include <thread>
  14. using namespace rapidjson;
  15. template<unsigned parseFlags = kParseDefaultFlags>
  16. class AsyncDocumentParser {
  17. public:
  18. AsyncDocumentParser(Document& d)
  19. : stream_(*this)
  20. , d_(d)
  21. , parseThread_()
  22. , mutex_()
  23. , notEmpty_()
  24. , finish_()
  25. , completed_()
  26. {
  27. // Create and execute thread after all member variables are initialized.
  28. parseThread_ = std::thread(&AsyncDocumentParser::Parse, this);
  29. }
  30. ~AsyncDocumentParser() {
  31. if (!parseThread_.joinable())
  32. return;
  33. {
  34. std::unique_lock<std::mutex> lock(mutex_);
  35. // Wait until the buffer is read up (or parsing is completed)
  36. while (!stream_.Empty() && !completed_)
  37. finish_.wait(lock);
  38. // Automatically append '\0' as the terminator in the stream.
  39. static const char terminator[] = "";
  40. stream_.src_ = terminator;
  41. stream_.end_ = terminator + 1;
  42. notEmpty_.notify_one(); // unblock the AsyncStringStream
  43. }
  44. parseThread_.join();
  45. }
  46. void ParsePart(const char* buffer, size_t length) {
  47. std::unique_lock<std::mutex> lock(mutex_);
  48. // Wait until the buffer is read up (or parsing is completed)
  49. while (!stream_.Empty() && !completed_)
  50. finish_.wait(lock);
  51. // Stop further parsing if the parsing process is completed.
  52. if (completed_)
  53. return;
  54. // Set the buffer to stream and unblock the AsyncStringStream
  55. stream_.src_ = buffer;
  56. stream_.end_ = buffer + length;
  57. notEmpty_.notify_one();
  58. }
  59. private:
  60. void Parse() {
  61. d_.ParseStream<parseFlags>(stream_);
  62. // The stream may not be fully read, notify finish anyway to unblock ParsePart()
  63. std::unique_lock<std::mutex> lock(mutex_);
  64. completed_ = true; // Parsing process is completed
  65. finish_.notify_one(); // Unblock ParsePart() or destructor if they are waiting.
  66. }
  67. struct AsyncStringStream {
  68. typedef char Ch;
  69. AsyncStringStream(AsyncDocumentParser& parser) : parser_(parser), src_(), end_(), count_() {}
  70. char Peek() const {
  71. std::unique_lock<std::mutex> lock(parser_.mutex_);
  72. // If nothing in stream, block to wait.
  73. while (Empty())
  74. parser_.notEmpty_.wait(lock);
  75. return *src_;
  76. }
  77. char Take() {
  78. std::unique_lock<std::mutex> lock(parser_.mutex_);
  79. // If nothing in stream, block to wait.
  80. while (Empty())
  81. parser_.notEmpty_.wait(lock);
  82. count_++;
  83. char c = *src_++;
  84. // If all stream is read up, notify that the stream is finish.
  85. if (Empty())
  86. parser_.finish_.notify_one();
  87. return c;
  88. }
  89. size_t Tell() const { return count_; }
  90. // Not implemented
  91. char* PutBegin() { return 0; }
  92. void Put(char) {}
  93. void Flush() {}
  94. size_t PutEnd(char*) { return 0; }
  95. bool Empty() const { return src_ == end_; }
  96. AsyncDocumentParser& parser_;
  97. const char* src_; //!< Current read position.
  98. const char* end_; //!< End of buffer
  99. size_t count_; //!< Number of characters taken so far.
  100. };
  101. AsyncStringStream stream_;
  102. Document& d_;
  103. std::thread parseThread_;
  104. std::mutex mutex_;
  105. std::condition_variable notEmpty_;
  106. std::condition_variable finish_;
  107. bool completed_;
  108. };
  109. int parse_by_parts() {
  110. Document d;
  111. {
  112. AsyncDocumentParser<> parser(d);
  113. const char json1[] = " { \"hello\" : \"world\", \"t\" : tr";
  114. //const char json1[] = " { \"hello\" : \"world\", \"t\" : trX"; // For test parsing error
  115. const char json2[] = "ue, \"f\" : false, \"n\": null, \"i\":123, \"pi\": 3.14";
  116. const char json3[] = "16, \"a\":[1, 2, 3, 4] } ";
  117. parser.ParsePart(json1, sizeof(json1) - 1);
  118. parser.ParsePart(json2, sizeof(json2) - 1);
  119. parser.ParsePart(json3, sizeof(json3) - 1);
  120. }
  121. if (d.HasParseError()) {
  122. std::cout << "Error at offset " << d.GetErrorOffset() << ": " << GetParseError_En(d.GetParseError()) << std::endl;
  123. return EXIT_FAILURE;
  124. }
  125. // Stringify the JSON to cout
  126. OStreamWrapper os(std::cout);
  127. Writer<OStreamWrapper> writer(os);
  128. d.Accept(writer);
  129. std::cout << std::endl;
  130. return EXIT_SUCCESS;
  131. }
  132. MSH_CMD_EXPORT(parse_by_parts, fast json parse by parts example);
  133. #else // Not supporting C++11
  134. #include <iostream>
  135. int parse_by_parts() {
  136. std::cout << "This example requires C++11 compiler" << std::endl;
  137. }
  138. MSH_CMD_EXPORT(parse_by_parts, rapid json parse by parts example);
  139. #endif