example.c 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. /*
  2. * Copyright (c) 2006-2021, RT-Thread Development Team
  3. *
  4. * SPDX-License-Identifier: Apache-2.0
  5. *
  6. * Change Logs:
  7. * Date Author Notes
  8. * 2025-05-26 RTT the first version
  9. */
  10. #include <rtthread.h>
  11. #include <core_mqtt.h>
  12. #include <sys/socket.h>
  13. #include <netdb.h>
  14. #include <arpa/inet.h>
  15. #include <errno.h>
  16. #include <unistd.h>
  17. #include <string.h>
  18. #include "port.h"
  19. MQTTStatus_t MQTTStatus;
  20. static NetworkContext_t networkContext;
  21. static MQTTContext_t mqttContext;
  22. static TransportInterface_t transport;
  23. static MQTTFixedBuffer_t networkBuffer;
  24. static uint8_t buffer[2048];
  25. static volatile struct sockaddr_in server_addr;
  26. void mqtt_event_callback(MQTTContext_t * pContext, MQTTPacketInfo_t * pPacketInfo, MQTTDeserializedInfo_t * pDeserializedInfo) {
  27. if (pPacketInfo->type == MQTT_PACKET_TYPE_PUBLISH) {
  28. rt_kprintf("Received message on topic %.*s: %.*s\n",
  29. pDeserializedInfo->pPublishInfo->topicNameLength,
  30. pDeserializedInfo->pPublishInfo->pTopicName,
  31. pDeserializedInfo->pPublishInfo->payloadLength,
  32. pDeserializedInfo->pPublishInfo->pPayload);
  33. }
  34. }
  35. static void mqtt_task(void * parameter) {
  36. const char * broker_host = "broker.emqx.io";
  37. struct hostent * host = gethostbyname(broker_host);
  38. if (host == NULL) {
  39. rt_kprintf("Failed to resolve %s\n", broker_host);
  40. return;
  41. }
  42. char * broker_ip = inet_ntoa(*(struct in_addr *)host->h_addr_list[0]);
  43. rt_kprintf("Resolved to IP: %s\n", broker_ip);
  44. networkContext.socket = socket(AF_INET, SOCK_STREAM, 0);
  45. if (networkContext.socket < 0) {
  46. rt_kprintf("Socket creation failed: %d\n", errno);
  47. return;
  48. }
  49. server_addr.sin_family = AF_INET;
  50. server_addr.sin_port = htons(1883); // EMQX 的 TCP 端口
  51. inet_aton(broker_ip, &server_addr.sin_addr);
  52. if (connect(networkContext.socket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
  53. rt_kprintf("Socket connect failed: %d\n", errno);
  54. close(networkContext.socket);
  55. return;
  56. }
  57. // 配置传输接口和缓冲区
  58. networkBuffer.pBuffer = buffer;
  59. networkBuffer.size = sizeof(buffer);
  60. transport.pNetworkContext = &networkContext;
  61. transport.send = rtthread_send;
  62. transport.recv = rtthread_recv;
  63. // 初始化 MQTT
  64. RT_ASSERT(MQTT_Init(&mqttContext, &transport, getCurrentTime, mqtt_event_callback, &networkBuffer) == MQTTSuccess);
  65. // 连接到 MQTT 代理
  66. MQTTConnectInfo_t connectInfo = {
  67. .pClientIdentifier = "rtthread-client",
  68. .clientIdentifierLength = strlen("rtthread-client"),
  69. .keepAliveSeconds = 60,
  70. .pUserName = NULL, // EMQX 公共服务器不需要认证
  71. .pPassword = NULL,
  72. .userNameLength = 0,
  73. .passwordLength = 0
  74. };
  75. bool sessionPresent;
  76. if (MQTT_Connect(&mqttContext, &connectInfo, NULL, 10000, &sessionPresent) != MQTTSuccess) {
  77. rt_kprintf("MQTT connect failed\n");
  78. close(networkContext.socket);
  79. return;
  80. }
  81. // 订阅主题
  82. MQTTSubscribeInfo_t subscribeInfo = {
  83. .pTopicFilter = "rtthread/topic/sub",
  84. .topicFilterLength = strlen("rtthread/topic/sub"),
  85. .qos = MQTTQoS0
  86. };
  87. if (MQTT_Subscribe(&mqttContext, &subscribeInfo, 1, MQTT_GetPacketId(&mqttContext)) != MQTTSuccess) {
  88. rt_kprintf("MQTT subscribe failed\n");
  89. close(networkContext.socket);
  90. return;
  91. }
  92. // 发布测试消息
  93. MQTTPublishInfo_t publishInfo = {
  94. .qos = MQTTQoS0,
  95. .pTopicName = "rtthread/topic/pub",
  96. .topicNameLength = strlen("rtthread/topic/pub"),
  97. .pPayload = "Hello from RT-Thread!",
  98. .payloadLength = strlen("Hello from RT-Thread!")
  99. };
  100. MQTTStatus = MQTT_Publish(&mqttContext, &publishInfo, MQTT_GetPacketId(&mqttContext));
  101. if (MQTTStatus != MQTTSuccess)
  102. {
  103. rt_kprintf("MQTT publish failed %d\n", MQTTStatus);
  104. }
  105. int retry_delay = 1000;
  106. while (1) {
  107. MQTTStatus_t status = MQTT_ProcessLoop(&mqttContext);
  108. if (status != MQTTSuccess) {
  109. rt_kprintf("MQTT process loop failed: %d\n", status);
  110. close(networkContext.socket);
  111. networkContext.socket = socket(AF_INET, SOCK_STREAM, 0);
  112. if (networkContext.socket < 0) {
  113. rt_kprintf("Socket creation failed: %d\n", errno);
  114. rt_thread_mdelay(retry_delay);
  115. retry_delay = min(retry_delay * 2, 8000); // 使用 min 宏
  116. continue;
  117. }
  118. if (connect(networkContext.socket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
  119. rt_kprintf("Socket reconnect failed: %d\n", errno);
  120. close(networkContext.socket);
  121. rt_thread_mdelay(retry_delay);
  122. retry_delay = min(retry_delay * 2, 8000); // 使用 min 宏
  123. continue;
  124. }
  125. if (MQTT_Connect(&mqttContext, &connectInfo, NULL, 90000, &sessionPresent) == MQTTSuccess) {
  126. if (MQTT_Subscribe(&mqttContext, &subscribeInfo, 1, MQTT_GetPacketId(&mqttContext)) != MQTTSuccess) {
  127. rt_kprintf("MQTT resubscribe failed\n");
  128. }
  129. }
  130. }
  131. rt_thread_mdelay(100);
  132. }
  133. }
  134. int mqtt_init(void)
  135. {
  136. rt_thread_t mqtt_thread = rt_thread_create("MQTT",
  137. mqtt_task,
  138. RT_NULL,
  139. 4096,
  140. 10,
  141. 20);
  142. if (mqtt_thread != RT_NULL)
  143. {
  144. rt_thread_startup(mqtt_thread);
  145. }
  146. else
  147. {
  148. rt_kprintf("Failed to create MQTT thread\n");
  149. }
  150. return 0;
  151. }
  152. MSH_CMD_EXPORT(mqtt_init, mqtt_init);