mqtt_rrpc-example.c 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. /*
  2. * Copyright (c) 2014-2016 Alibaba Group. All rights reserved.
  3. * License-Identifier: Apache-2.0
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License"); you may
  6. * not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  13. * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #include <stdio.h>
  19. #include <stdlib.h>
  20. #include <string.h>
  21. #include "iot_import.h"
  22. #include "iot_export.h"
  23. #define PRODUCT_KEY "lLeATwv18gi"
  24. #define DEVICE_NAME "test1"
  25. #define DEVICE_SECRET "HQ4AB77MRpAxzgRQAnJdjewGaiEBoJZR"
  26. /* These are the pre-defined topics */
  27. #define TOPIC_RRPC_REQ "/sys/"PRODUCT_KEY"/"DEVICE_NAME"/rrpc/request/"
  28. #define TOPIC_RRPC_RSP "/sys/"PRODUCT_KEY"/"DEVICE_NAME"/rrpc/response/"
  29. #define TEST_TOPIC "/sys/lLeATwv18gi/test1/rrpc/request/890192612580343808"
  30. #define TEST_PAYLOAD "hello world"
  31. #define TEST_TOPIC_PAYLOAD "/sys/lLeATwv18gi/test1/rrpc/request/890192612580343808hello world"
  32. char g_product_key[PRODUCT_KEY_LEN + 1];
  33. char g_product_secret[PRODUCT_SECRET_LEN + 1];
  34. char g_device_name[DEVICE_NAME_LEN + 1];
  35. char g_device_secret[DEVICE_SECRET_LEN + 1];
  36. #define RRPC_MQTT_MSGLEN (1024)
  37. #define MSG_ID_LEN_MAX (64)
  38. #define TOPIC_LEN_MAX (1024)
  39. static int running_unittest = 0;
  40. void event_handle(void *pcontext, void *pclient, iotx_mqtt_event_msg_pt msg)
  41. {
  42. uintptr_t packet_id = (uintptr_t)msg->msg;
  43. iotx_mqtt_topic_info_pt topic_info = (iotx_mqtt_topic_info_pt)msg->msg;
  44. switch (msg->event_type) {
  45. case IOTX_MQTT_EVENT_UNDEF:
  46. HAL_Printf("undefined event occur.\n");
  47. break;
  48. case IOTX_MQTT_EVENT_DISCONNECT:
  49. HAL_Printf("MQTT disconnect.\n");
  50. break;
  51. case IOTX_MQTT_EVENT_RECONNECT:
  52. HAL_Printf("MQTT reconnect.\n");
  53. break;
  54. case IOTX_MQTT_EVENT_SUBCRIBE_SUCCESS:
  55. HAL_Printf("subscribe success, packet-id=%u\n", (unsigned int)packet_id);
  56. break;
  57. case IOTX_MQTT_EVENT_SUBCRIBE_TIMEOUT:
  58. HAL_Printf("subscribe wait ack timeout, packet-id=%u\n", (unsigned int)packet_id);
  59. break;
  60. case IOTX_MQTT_EVENT_SUBCRIBE_NACK:
  61. HAL_Printf("subscribe nack, packet-id=%u\n", (unsigned int)packet_id);
  62. break;
  63. case IOTX_MQTT_EVENT_UNSUBCRIBE_SUCCESS:
  64. HAL_Printf("unsubscribe success, packet-id=%u\n", (unsigned int)packet_id);
  65. break;
  66. case IOTX_MQTT_EVENT_UNSUBCRIBE_TIMEOUT:
  67. HAL_Printf("unsubscribe timeout, packet-id=%u\n", (unsigned int)packet_id);
  68. break;
  69. case IOTX_MQTT_EVENT_UNSUBCRIBE_NACK:
  70. HAL_Printf("unsubscribe nack, packet-id=%u\n", (unsigned int)packet_id);
  71. break;
  72. case IOTX_MQTT_EVENT_PUBLISH_SUCCESS:
  73. HAL_Printf("publish success, packet-id=%u\n", (unsigned int)packet_id);
  74. break;
  75. case IOTX_MQTT_EVENT_PUBLISH_TIMEOUT:
  76. HAL_Printf("publish timeout, packet-id=%u\n", (unsigned int)packet_id);
  77. break;
  78. case IOTX_MQTT_EVENT_PUBLISH_NACK:
  79. HAL_Printf("publish nack, packet-id=%u\n", (unsigned int)packet_id);
  80. break;
  81. case IOTX_MQTT_EVENT_PUBLISH_RECVEIVED:
  82. HAL_Printf("topic message arrived but without any related handle: topic=%.*s, topic_msg=%.*s\n",
  83. topic_info->topic_len,
  84. topic_info->ptopic,
  85. topic_info->payload_len,
  86. topic_info->payload);
  87. break;
  88. default:
  89. HAL_Printf("Should NOT arrive here.\n");
  90. break;
  91. }
  92. }
  93. void mqtt_rrpc_msg_arrive(void *pcontext, void *pclient, iotx_mqtt_event_msg_pt msg)
  94. {
  95. iotx_mqtt_topic_info_pt ptopic_info = (iotx_mqtt_topic_info_pt) msg->msg;
  96. iotx_mqtt_topic_info_t topic_msg;
  97. char msg_pub[RRPC_MQTT_MSGLEN] = {0};
  98. char topic[TOPIC_LEN_MAX] = {0};
  99. char msg_id[MSG_ID_LEN_MAX] = {0};
  100. /* print topic name and topic message */
  101. HAL_Printf("----\n");
  102. HAL_Printf("Topic: '%.*s' (Length: %d)\n",
  103. ptopic_info->topic_len,
  104. ptopic_info->ptopic,
  105. ptopic_info->topic_len);
  106. HAL_Printf("Payload: '%.*s' (Length: %d)\n",
  107. ptopic_info->payload_len,
  108. ptopic_info->payload,
  109. ptopic_info->payload_len);
  110. HAL_Printf("----\n");
  111. if (snprintf(msg_id,
  112. ptopic_info->topic_len - strlen(TOPIC_RRPC_REQ) + 1,
  113. "%s",
  114. ptopic_info->ptopic + strlen(TOPIC_RRPC_REQ))
  115. > sizeof(msg_id)) {
  116. HAL_Printf("snprintf error!\n");
  117. return;
  118. }
  119. HAL_Printf("response msg_id = %s\n", msg_id);
  120. if (snprintf(topic, sizeof(topic), "%s%s", TOPIC_RRPC_RSP, msg_id) > sizeof(topic)) {
  121. HAL_Printf("snprintf error!\n");
  122. return;
  123. }
  124. HAL_Printf("response topic = %s\n", topic);
  125. sprintf(msg_pub, "rrpc client has received message!\n");
  126. topic_msg.qos = IOTX_MQTT_QOS0;
  127. topic_msg.retain = 0;
  128. topic_msg.dup = 0;
  129. topic_msg.payload = (void *)msg_pub;
  130. topic_msg.payload_len = strlen(msg_pub);
  131. if (IOT_MQTT_Publish(pclient, topic, &topic_msg) < 0) {
  132. HAL_Printf("error occur when publish!\n");
  133. }
  134. }
  135. int mqtt_rrpc_client(void)
  136. {
  137. int rc = 0;
  138. void *pclient;
  139. iotx_conn_info_pt pconn_info;
  140. iotx_mqtt_param_t mqtt_params;
  141. char *msg_buf = NULL, *msg_readbuf = NULL;
  142. if (NULL == (msg_buf = (char *)HAL_Malloc(RRPC_MQTT_MSGLEN))) {
  143. HAL_Printf("not enough memory!\n");
  144. rc = -1;
  145. goto do_exit;
  146. }
  147. if (NULL == (msg_readbuf = (char *)HAL_Malloc(RRPC_MQTT_MSGLEN))) {
  148. HAL_Printf("not enough memory!\n");
  149. rc = -1;
  150. goto do_exit;
  151. }
  152. /**< get device info */
  153. HAL_GetProductKey(g_product_key);
  154. HAL_GetDeviceName(g_device_name);
  155. HAL_GetDeviceSecret(g_device_secret);
  156. /**< end*/
  157. /* Device AUTH */
  158. if (0 != IOT_SetupConnInfo(g_product_key, g_device_name, g_device_secret, (void **)&pconn_info)) {
  159. HAL_Printf("AUTH request failed!\n");
  160. rc = -1;
  161. goto do_exit;
  162. }
  163. /* Initialize MQTT parameter */
  164. memset(&mqtt_params, 0x0, sizeof(mqtt_params));
  165. mqtt_params.port = pconn_info->port;
  166. mqtt_params.host = pconn_info->host_name;
  167. mqtt_params.client_id = pconn_info->client_id;
  168. mqtt_params.username = pconn_info->username;
  169. mqtt_params.password = pconn_info->password;
  170. mqtt_params.pub_key = pconn_info->pub_key;
  171. mqtt_params.request_timeout_ms = 2000;
  172. mqtt_params.clean_session = 0;
  173. mqtt_params.keepalive_interval_ms = 60000;
  174. mqtt_params.pread_buf = msg_readbuf;
  175. mqtt_params.read_buf_size = RRPC_MQTT_MSGLEN;
  176. mqtt_params.pwrite_buf = msg_buf;
  177. mqtt_params.write_buf_size = RRPC_MQTT_MSGLEN;
  178. mqtt_params.handle_event.h_fp = event_handle;
  179. mqtt_params.handle_event.pcontext = NULL;
  180. /* Construct a MQTT client with specify parameter */
  181. pclient = IOT_MQTT_Construct(&mqtt_params);
  182. if (NULL == pclient) {
  183. HAL_Printf("MQTT construct failed\n");
  184. rc = -1;
  185. goto do_exit;
  186. }
  187. /* Subscribe the specific topic */
  188. rc = IOT_MQTT_Subscribe(pclient, TOPIC_RRPC_REQ "+", IOTX_MQTT_QOS0, mqtt_rrpc_msg_arrive, NULL);
  189. if (rc < 0) {
  190. IOT_MQTT_Destroy(&pclient);
  191. HAL_Printf("IOT_MQTT_Subscribe failed, rc = %d\n", rc);
  192. rc = -1;
  193. goto do_exit;
  194. }
  195. HAL_SleepMs(1000);
  196. do {
  197. /* handle the MQTT packet received from TCP or SSL connection */
  198. IOT_MQTT_Yield(pclient, 200);
  199. HAL_SleepMs(1000);
  200. HAL_Printf("Waiting RRPC from Cloud ...\n");
  201. if (running_unittest) {
  202. HAL_Printf("Break waiting since in unittest mode\n");
  203. break;
  204. }
  205. } while (1);
  206. IOT_MQTT_Unsubscribe(pclient, TOPIC_RRPC_REQ"+");
  207. HAL_SleepMs(200);
  208. IOT_MQTT_Destroy(&pclient);
  209. do_exit:
  210. if (NULL != msg_buf) {
  211. HAL_Free(msg_buf);
  212. }
  213. if (NULL != msg_readbuf) {
  214. HAL_Free(msg_readbuf);
  215. }
  216. return rc;
  217. }
  218. void test_mqtt_rrpc_msg_arrive(void)
  219. {
  220. iotx_mqtt_topic_info_t topic_info;
  221. iotx_mqtt_event_msg_t msg;
  222. topic_info.packet_id = 0;
  223. topic_info.qos = 0;
  224. topic_info.dup = 0;
  225. topic_info.retain = 0;
  226. topic_info.topic_len = strlen(TEST_TOPIC);
  227. topic_info.payload_len = strlen(TEST_PAYLOAD);
  228. topic_info.ptopic = TEST_TOPIC_PAYLOAD;
  229. topic_info.payload = TEST_TOPIC_PAYLOAD + strlen(TEST_TOPIC);
  230. msg.event_type = 0;
  231. msg.msg = &topic_info;
  232. mqtt_rrpc_msg_arrive(NULL, NULL, &msg);
  233. }
  234. int main(int argc, char *argv[])
  235. {
  236. if (argc == 2 && !strcmp(argv[1], "unittest")) {
  237. HAL_Printf("***********unittest start*****************\n");
  238. test_mqtt_rrpc_msg_arrive();
  239. HAL_Printf("***********unittest end*****************\n");
  240. running_unittest = 1;
  241. }
  242. /**< set device info*/
  243. HAL_SetProductKey(PRODUCT_KEY);
  244. HAL_SetDeviceName(DEVICE_NAME);
  245. HAL_SetDeviceSecret(DEVICE_SECRET);
  246. /**< end*/
  247. mqtt_rrpc_client();
  248. HAL_Printf("out of sample!\n");
  249. return 0;
  250. }