| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- #include "RyanMqttTest.h"
- static int32_t pubTestPublishedEventCount = 0;
- static int32_t pubTestDataEventCount = 0;
- static char *pubStr = NULL;
- static int32_t pubStrLen = 0;
- static void RyanMqttPublishEventHandle(void *pclient, RyanMqttEventId_e event, const void *eventData)
- {
- switch (event)
- {
- case RyanMqttEventPublished:
- {
- RyanMqttMsgHandler_t *msgHandler = ((RyanMqttAckHandler_t *)eventData)->msgHandler;
- rlog_w("qos1 / qos2发送成功事件回调 topic: %s, qos: %d", msgHandler->topic, msgHandler->qos);
- pubTestPublishedEventCount++;
- break;
- }
- case RyanMqttEventData:
- {
- RyanMqttMsgData_t *msgData = (RyanMqttMsgData_t *)eventData;
- // rlog_i("接收到mqtt消息事件回调 topic: %.*s, packetId: %d, payload len: %d, qos: %d",
- // msgData->topicLen, msgData->topic, msgData->packetId, msgData->payloadLen, msgData->qos);
- if (0 == strncmp(msgData->payload, pubStr, pubStrLen))
- pubTestDataEventCount++;
- else
- rlog_e("pub测试收到数据不一致 %.*s", msgData->payloadLen, msgData->payload);
- break;
- }
- default:
- mqttEventBaseHandle(pclient, event, eventData);
- break;
- }
- }
- static RyanMqttError_e RyanMqttPublishTest(RyanMqttQos_e qos, int32_t count, uint32_t delayms)
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- RyanMqttClient_t *client;
- time_t timeStampNow = 0;
- RyanMqttInitSync(&client, RyanMqttTrue, RyanMqttPublishEventHandle);
- RyanMqttSubscribe(client, "testlinux/pub", qos);
- delay(100);
- time(&timeStampNow);
- pubStr = (char *)malloc(2048);
- memset(pubStr, 0, 2048);
- srand(timeStampNow);
- // NOLINTNEXTLINE(concurrency-mt-unsafe)
- for (int32_t i = 0; i < rand() % 250 + 1 + 100; i++)
- {
- // NOLINTNEXTLINE(concurrency-mt-unsafe)
- snprintf(pubStr + 4 * i, 2048 - 4 * i, "%04d", rand());
- }
- pubStrLen = (int32_t)strlen(pubStr);
- pubTestPublishedEventCount = 0;
- pubTestDataEventCount = 0;
- for (int32_t i = 0; i < count; i++)
- {
- RyanMqttError_e pubResult = RyanMqttPublish(client, "testlinux/pub", pubStr, pubStrLen, qos, RyanMqttFalse);
- if (RyanMqttSuccessError != pubResult)
- {
- rlog_e("QOS发布错误 Qos: %d, result: %d", qos, pubResult);
- result = RyanMqttFailedError;
- goto __exit;
- }
- if (delayms)
- delay(delayms);
- }
- for (int32_t i = 0;; i++)
- {
- if (RyanMqttQos0 == qos)
- {
- if (count == pubTestDataEventCount)
- break;
- }
- else if (pubTestPublishedEventCount == count && pubTestPublishedEventCount == pubTestDataEventCount)
- break;
- if (i > 300)
- {
- rlog_e("QOS测试失败 Qos: %d, PublishedEventCount: %d, dataEventCount: %d", qos, pubTestPublishedEventCount, pubTestDataEventCount);
- result = RyanMqttFailedError;
- goto __exit;
- }
- delay(100);
- }
- RyanMqttUnSubscribe(client, "testlinux/pub");
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == checkAckList(client), RyanMqttFailedError, rlog_e, { goto __exit; });
- __exit:
- free(pubStr);
- pubStr = NULL;
- rlog_i("mqtt 发布测试,销毁mqtt客户端");
- RyanMqttDestorySync(client);
- return result;
- }
- RyanMqttError_e RyanMqttPubTest()
- {
- RyanMqttError_e result = RyanMqttSuccessError;
- // 发布 & 订阅 qos 测试
- result = RyanMqttPublishTest(RyanMqttQos0, 1000, 0);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, rlog_e, { goto __exit; });
- checkMemory;
- result = RyanMqttPublishTest(RyanMqttQos1, 1000, 2);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, rlog_e, { goto __exit; });
- checkMemory;
- result = RyanMqttPublishTest(RyanMqttQos2, 1000, 5);
- RyanMqttCheckCodeNoReturn(RyanMqttSuccessError == result, RyanMqttFailedError, rlog_e, { goto __exit; });
- checkMemory;
- return RyanMqttSuccessError;
- __exit:
- return RyanMqttFailedError;
- }
|