RyanMqttSubTest.c 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. #include "RyanMqttTest.h"
  2. static RyanMqttQos_e exportQos = RyanMqttQos0;
  3. static char *subscribeArr[] = {
  4. "testlinux/pub1",
  5. "testlinux/pub2",
  6. "testlinux/pub3",
  7. "testlinux/pub4",
  8. "testlinux/pub5",
  9. "testlinux/pub6",
  10. "testlinux/pub7",
  11. "testlinux/pub8",
  12. "testlinux/pub9",
  13. "testlinux/pub10",
  14. };
  15. static RyanMqttBool_e topicIsSubscribeArr(char *topic)
  16. {
  17. RyanMqttBool_e isFindflag = RyanMqttFalse;
  18. for (int32_t j = 0; j < getArraySize(subscribeArr); j++)
  19. {
  20. if (0 == strcmp(topic, subscribeArr[j]))
  21. {
  22. isFindflag = RyanMqttTrue;
  23. break;
  24. }
  25. }
  26. return isFindflag;
  27. }
  28. static void RyanMqttSubEventHandle(void *pclient, RyanMqttEventId_e event, const void *eventData)
  29. {
  30. switch (event)
  31. {
  32. case RyanMqttEventSubscribed:
  33. {
  34. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  35. rlog_i("mqtt订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  36. if (exportQos != msgHandler->qos)
  37. rlog_e("mqtt 订阅主题降级 topic: %s, exportQos: %d, qos: %d", msgHandler->topic, exportQos, msgHandler->qos);
  38. break;
  39. }
  40. case RyanMqttEventSubscribedFaile:
  41. {
  42. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  43. rlog_i("mqtt订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  44. break;
  45. }
  46. case RyanMqttEventUnSubscribed:
  47. {
  48. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  49. rlog_i("mqtt取消订阅成功回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  50. if (exportQos != msgHandler->qos || RyanMqttFalse == topicIsSubscribeArr(msgHandler->topic))
  51. {
  52. rlog_e("mqtt 取消订阅主题信息不对 topic: %s, exportQos: %d, qos: %d", msgHandler->topic, exportQos, msgHandler->qos);
  53. while (1)
  54. {
  55. delay(100);
  56. }
  57. }
  58. break;
  59. }
  60. case RyanMqttEventUnSubscribedFaile:
  61. {
  62. RyanMqttMsgHandler_t *msgHandler = (RyanMqttMsgHandler_t *)eventData;
  63. rlog_w("mqtt取消订阅失败回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
  64. break;
  65. }
  66. default:
  67. mqttEventBaseHandle(pclient, event, eventData);
  68. break;
  69. }
  70. }
  71. static RyanMqttError_e RyanMqttSubscribeTest(RyanMqttQos_e qos)
  72. {
  73. RyanMqttError_e result = RyanMqttSuccessError;
  74. RyanMqttClient_t *client;
  75. RyanMqttMsgHandler_t msgHandles[20] = {0};
  76. int32_t subscribeNum = 0;
  77. exportQos = qos;
  78. RyanMqttInitSync(&client, RyanMqttTrue, RyanMqttSubEventHandle);
  79. // 订阅5个主题
  80. for (uint32_t i = 0; i < getArraySize(subscribeArr); i++)
  81. {
  82. RyanMqttSubscribe(client, subscribeArr[i], qos);
  83. }
  84. delay(100);
  85. for (int32_t i = 0; i < 600; i++)
  86. {
  87. result = RyanMqttGetSubscribe(client, msgHandles, getArraySize(msgHandles), &subscribeNum);
  88. if (result == RyanMqttNoRescourceError)
  89. rlog_w("订阅主题数超过缓冲区%d个,已截断,请修改msgHandles缓冲区", getArraySize(msgHandles));
  90. if (subscribeNum == getArraySize(subscribeArr))
  91. break;
  92. rlog_i("mqtt客户端已订阅的主题数: %d, 应该订阅主题数: %d", subscribeNum, getArraySize(subscribeArr));
  93. for (int32_t i = 0; i < subscribeNum; i++)
  94. rlog_i("已经订阅主题: %d, topic: %s, QOS: %d", i, msgHandles[i].topic, msgHandles[i].qos);
  95. if (i > 500)
  96. {
  97. result = RyanMqttFailedError;
  98. goto __exit;
  99. }
  100. delay(100);
  101. }
  102. // 检查订阅主题是否正确
  103. for (int32_t i = 0; i < subscribeNum; i++)
  104. {
  105. RyanMqttBool_e isFindflag = topicIsSubscribeArr(msgHandles[i].topic);
  106. if (RyanMqttTrue != isFindflag)
  107. {
  108. rlog_e("主题不匹配或者qos不对, topic: %s, qos: %d", msgHandles[i].topic, msgHandles[i].qos);
  109. result = RyanMqttFailedError;
  110. goto __exit;
  111. }
  112. }
  113. // 取消所有订阅消息
  114. for (int32_t i = 0; i < getArraySize(subscribeArr); i++)
  115. RyanMqttUnSubscribe(client, subscribeArr[i]);
  116. delay(100);
  117. for (int32_t i = 0; i < 600; i++)
  118. {
  119. result = RyanMqttGetSubscribe(client, msgHandles, getArraySize(msgHandles), &subscribeNum);
  120. if (result == RyanMqttNoRescourceError)
  121. rlog_w("订阅主题数超过缓冲区%d个,已截断,请修改msgHandles缓冲区", getArraySize(msgHandles));
  122. if (0 == subscribeNum)
  123. break;
  124. if (i > 500)
  125. {
  126. result = RyanMqttFailedError;
  127. goto __exit;
  128. }
  129. delay(100);
  130. }
  131. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == checkAckList(client), RyanMqttFailedError, rlog_e, { goto __exit; });
  132. __exit:
  133. rlog_i("mqtt 订阅测试,销毁mqtt客户端");
  134. RyanMqttDestorySync(client);
  135. return result;
  136. }
  137. RyanMqttError_e RyanMqttSubTest()
  138. {
  139. RyanMqttError_e result = RyanMqttSuccessError;
  140. result = RyanMqttSubscribeTest(RyanMqttQos0);
  141. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, rlog_e, { goto __exit; });
  142. checkMemory;
  143. result = RyanMqttSubscribeTest(RyanMqttQos1);
  144. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, rlog_e, { goto __exit; });
  145. checkMemory;
  146. result = RyanMqttSubscribeTest(RyanMqttQos2);
  147. RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, rlog_e, { goto __exit; });
  148. checkMemory;
  149. return RyanMqttSuccessError;
  150. __exit:
  151. return RyanMqttFailedError;
  152. }