Quellcode durchsuchen

[add] MQTT connect OneNET cloud code.

Signed-off-by: chenyong <1521761801@qq.com>
chenyong vor 7 Jahren
Ursprung
Commit
129e396234
6 geänderte Dateien mit 638 neuen und 0 gelöschten Zeilen
  1. 16 0
      SConscript
  2. 131 0
      inc/onenet.h
  3. 18 0
      ports/onenet_port.c
  4. 51 0
      samples/onenet_sample.c
  5. 150 0
      src/onenet_mqtt.c
  6. 272 0
      src/onenet_send_data.c

+ 16 - 0
SConscript

@@ -0,0 +1,16 @@
+from building import *
+
+cwd = GetCurrentDir()
+src = Glob('src/onenet_send_data.c')
+
+if GetDepend(['PKG_USING_ONENET_SAMPLE']):
+    src += Glob('samples/*.c')
+
+if GetDepend(['ONENET_USING_MQTT']):
+    src += Glob('src/onenet_mqtt.c')
+
+path = [cwd + '/inc']
+
+group = DefineGroup('onenet', src, depend = ['PKG_USING_ONENET'], CPPPATH = path)
+
+Return('group')

+ 131 - 0
inc/onenet.h

@@ -0,0 +1,131 @@
+/*
+ * File      : onenet.h
+ * COPYRIGHT (C) 2012-2018, Shanghai Real-Thread Technology Co., Ltd
+ *
+ * Change Logs:
+ * Date           Author       Notes
+ * 2018-04-23    chenyong     the first version
+ */
+#ifndef _ONENET_H_
+#define _ONENET_H_
+
+#include <rtthread.h>
+
+#define ONENET_DEBUG                   1
+
+#define ONENET_SW_VERSION              "0.1.0"
+
+#ifndef ONENET_MALLOC
+#define ONENET_MALLOC                  rt_malloc
+#endif
+
+#ifndef ONENET_CALLOC
+#define ONENET_CALLOC                  rt_calloc
+#endif
+
+#ifndef ONENET_FREE
+#define ONENET_FREE                    rt_free
+#endif
+
+#if ONENET_DEBUG
+#ifdef assert
+#undef assert
+#endif
+#define assert(EXPR)                                                           \
+if (!(EXPR))                                                                   \
+{                                                                              \
+    rt_kprintf("(%s) has assert failed at %s.\n", #EXPR, __FUNCTION__);        \
+    while (1);                                                                 \
+}
+
+/* error level log */
+#ifdef  log_e
+#undef  log_e
+#endif
+#define log_e(...)                     rt_kprintf("\033[31;22m[E/ONENET] (%s:%d) ", __FUNCTION__, __LINE__);rt_kprintf(__VA_ARGS__);rt_kprintf("\033[0m\n")
+
+/* info level log */
+#ifdef  log_i
+#undef  log_i
+#endif
+#define log_i(...)                     rt_kprintf("\033[36;22m[I/ONENET] ");                                rt_kprintf(__VA_ARGS__);rt_kprintf("\033[0m\n")
+
+/* debug level log */
+#ifdef  log_d
+#undef  log_d
+#endif
+#define log_d(...)                     rt_kprintf("[D/ONENET] (%s:%d) ", __FUNCTION__, __LINE__);           rt_kprintf(__VA_ARGS__);rt_kprintf("\n")
+
+#else
+
+#ifdef assert
+#undef assert
+#endif
+#define assert(EXPR)                   ((void)0);
+
+/* error level log */
+#ifdef  log_e
+#undef  log_e
+#endif
+#define log_e(...)
+
+/* info level log */
+#ifdef  log_i
+#undef  log_i
+#endif
+#define log_i(...)
+
+/* debug level log */
+#ifdef  log_d
+#undef  log_d
+#endif
+#define log_d(...)
+#endif /* ONENET_DEBUG */
+
+#ifndef ONENET_MQTT_SUBTOPIC
+#define ONENET_MQTT_SUBTOPIC           "/topic_test"
+#endif
+
+#if !defined(ONENET_INFO_DEVID) || !defined(ONENET_INFO_APIKEY) || !defined(ONENET_INFO_PROID) || !defined(ONENET_INFO_AUTH)
+#define ONENET_INFO_DEVID		       "29573339"
+#define ONENET_INFO_APIKEY		       "a2gVVf1hggZfuATkNogulHK1V=s="
+#define ONENET_INFO_PROID		       "131494"
+#define ONENET_INFO_AUTH			   "EF4016D6658466CA3E3606"
+#endif 
+
+#define ONENET_SERVER_URL			   "tcp://183.230.40.39:6002"
+#define ONENET_INFO_DEVID_LEN          16
+#define ONENET_INFO_APIKEY_LEN         32
+#define ONENET_INFO_PROID_LEN          16
+#define ONENET_INFO_AUTH_LEN           64
+#define ONENET_INFO_URL_LEN            32
+
+struct rt_onenet_info
+{
+    char device_id[ONENET_INFO_DEVID_LEN];
+    char api_key[ONENET_INFO_APIKEY_LEN];
+
+    char pro_id[ONENET_INFO_PROID_LEN];
+    char auth_info[ONENET_INFO_AUTH_LEN];
+
+    char server_uri[ONENET_INFO_URL_LEN];
+
+};
+typedef struct rt_onenet_info *rt_onenet_info_t;
+
+/* OneNET MQTT initialize. */
+int onenet_init(void);
+
+/* Publish MQTT data to subscribe topic. */
+int onenet_mqtt_publish(const char *topic, const char *msg_str);
+
+/* Device send data to OneNET cloud. */
+rt_err_t onenet_send_digit(const char *name, int digit);
+rt_err_t onenet_send_string(const char *name, char *str);
+
+/* ========================== User port function ============================ */
+
+/* Get MQTT data from OneNET cloud and process data. */
+int onenet_port_data_process(char *recv_data, rt_size_t size);
+
+#endif /* _ONENET_H_ */

+ 18 - 0
ports/onenet_port.c

@@ -0,0 +1,18 @@
+/*
+ * File      : onenet_port.c
+ * COPYRIGHT (C) 2012-2018, Shanghai Real-Thread Technology Co., Ltd
+ *
+ * Change Logs:
+ * Date           Author       Notes
+ * 2018-04-20    chenyong     the first version
+ */
+#include <stdlib.h>
+
+#include <onenet.h>
+
+int onenet_port_data_process(char *recv_data, rt_size_t size)
+{
+	log_d("Recv data : %.*s", size, recv_data);
+	
+	return 0;
+}

+ 51 - 0
samples/onenet_sample.c

@@ -0,0 +1,51 @@
+/*
+ * File      : onenet_sample.c
+ * COPYRIGHT (C) 2012-2018, Shanghai Real-Thread Technology Co., Ltd
+ *
+ * Change Logs:
+ * Date           Author       Notes
+ * 2018-04-23    chenyong     the first version
+ */
+#include <stdlib.h>
+
+#include <onenet.h>
+
+static void onenet_send_data_entry(void *parameter)
+{
+    int value = 0;
+
+    while (1)
+    {
+        value = rand() % 100;
+
+        if (onenet_send_digit("temperature", value) < 0)
+        {
+            break;
+        }
+
+        rt_thread_delay(5 * 1000);
+    }
+}
+
+int onenet_send_data_cycle(void)
+{
+    rt_thread_t tid;
+
+    tid = rt_thread_create("onenet_send",
+            onenet_send_data_entry,
+            RT_NULL,
+            2 * 1024,
+            RT_THREAD_PRIORITY_MAX / 3 - 1,
+            5);
+    if (tid)
+    {
+        rt_thread_startup(tid);
+    }
+
+    return 0;
+}
+
+#ifdef FINSH_USING_MSH
+#include <finsh.h>
+MSH_CMD_EXPORT(onenet_send_data_cycle, send data to OneNET cloud cycle);
+#endif

+ 150 - 0
src/onenet_mqtt.c

@@ -0,0 +1,150 @@
+/*
+ * File      : onenet_mqtt.c
+ * COPYRIGHT (C) 2012-2018, Shanghai Real-Thread Technology Co., Ltd
+ *
+ * Change Logs:
+ * Date           Author       Notes
+ * 2018-04-20    chenyong     the first version
+ */
+#include <stdlib.h>
+#include <string.h>
+
+#include <paho_mqtt.h>
+
+#include <onenet.h>
+
+static rt_bool_t init_ok = RT_FALSE;
+static MQTTClient mq_client;
+struct rt_onenet_info onenet_info;
+
+static void mqtt_callback(MQTTClient *c, MessageData *msg_data)
+{
+	onenet_port_data_process((char *)msg_data->message->payload, msg_data->message->payloadlen);
+}
+
+static void mqtt_connect_callback(MQTTClient *c)
+{
+    log_d("Enter mqtt_connect_callback!");
+}
+
+static void mqtt_online_callback(MQTTClient *c)
+{
+    log_d("Enter mqtt_online_callback!");
+}
+
+static void mqtt_offline_callback(MQTTClient *c)
+{
+    log_d("Enter mqtt_offline_callback!");
+}
+
+static rt_err_t onenet_mqtt_init(void)
+{
+    MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;
+
+    mq_client.uri = onenet_info.server_uri;
+    memcpy(&(mq_client.condata), &condata, sizeof(condata));
+    mq_client.condata.clientID.cstring = onenet_info.device_id;
+    mq_client.condata.keepAliveInterval = 30;
+    mq_client.condata.cleansession = 1;
+    mq_client.condata.username.cstring = onenet_info.pro_id;
+    mq_client.condata.password.cstring = onenet_info.auth_info;
+
+    mq_client.buf_size = mq_client.readbuf_size = 1024 * 2;
+    mq_client.buf = ONENET_CALLOC(1, mq_client.buf_size);
+    mq_client.readbuf = ONENET_CALLOC(1, mq_client.readbuf_size);
+    if (!(mq_client.buf && mq_client.readbuf))
+    {
+        log_e("No memory for MQTT client buffer!");
+        return -RT_ENOMEM;
+    }
+
+    /* registered callback */
+    mq_client.connect_callback = mqtt_connect_callback;
+    mq_client.online_callback = mqtt_online_callback;
+    mq_client.offline_callback = mqtt_offline_callback;
+
+    /* set subscribe table. */
+    mq_client.messageHandlers[0].topicFilter = ONENET_MQTT_SUBTOPIC;
+    mq_client.messageHandlers[0].callback = mqtt_callback;
+
+    mq_client.defaultMessageHandler = mqtt_callback;
+
+    paho_mqtt_start(&mq_client);
+    log_i("OneNET MQTT is startup!");
+    
+    return RT_EOK;
+}
+
+static void onenet_get_info(void)
+{
+	strncpy(onenet_info.device_id, ONENET_INFO_DEVID, strlen(ONENET_INFO_DEVID));
+    strncpy(onenet_info.api_key, ONENET_INFO_APIKEY, strlen(ONENET_INFO_APIKEY));
+    strncpy(onenet_info.pro_id, ONENET_INFO_PROID, strlen(ONENET_INFO_PROID));
+    strncpy(onenet_info.auth_info, ONENET_INFO_AUTH, strlen(ONENET_INFO_AUTH));
+    strncpy(onenet_info.server_uri, ONENET_SERVER_URL, strlen(ONENET_SERVER_URL));  
+} 
+
+int onenet_init(void)
+{
+    int result = 0;
+    
+    if (init_ok)
+    {
+        return 0;
+    }
+    
+    onenet_get_info();
+    
+    if(onenet_mqtt_init() < 0)
+    {
+        result = -1;
+    }
+
+    if(!result)
+    {
+        init_ok = RT_TRUE;
+        log_i("OneNET cloud(V%s) initialize success.", ONENET_SW_VERSION);
+    }
+    else
+    {
+        log_e("OneNET cloud(V%s) initialize failed(%d).", ONENET_SW_VERSION, result);
+    }
+
+    return result;
+}
+
+int onenet_mqtt_publish(const char *topic, const char *msg_str)
+{
+    MQTTMessage message;
+    message.qos = 1;
+    message.retained = 0;
+    message.payload = (void*) msg_str;
+    message.payloadlen = strlen(message.payload);
+
+    if (MQTTPublish(&mq_client, topic, &message) < 0)
+    {
+        return -1;
+    }
+
+    return 0;
+}
+
+int onenet_publish(int argc, char **argv)
+{
+    if (argc != 3)
+    {
+        log_e("onenet_publish [topic] [message]    -OneNET mqtt pulish message to this topic.\n");
+        return 0;
+    }
+
+    onenet_mqtt_publish(argv[1], argv[2]);
+
+    return 0;
+}
+
+#ifdef FINSH_USING_MSH
+#include <finsh.h>
+MSH_CMD_EXPORT(onenet_init, OneNET cloud mqtt initializate);
+MSH_CMD_EXPORT(onenet_publish, OneNET cloud send data to subscribe topic);
+#endif
+

+ 272 - 0
src/onenet_send_data.c

@@ -0,0 +1,272 @@
+/*
+ * File      : rt_onenet_send_data.c
+ * COPYRIGHT (C) 2012-2018, Shanghai Real-Thread Technology Co., Ltd
+ *
+ * Change Logs:
+ * Date           Author       Notes
+ * 2018-04-23   chenyong     the first version
+ */
+#include <stdlib.h>
+#include <string.h>
+ 
+#include <cJSON.h>
+#include <webclient.h>
+ 
+#include <onenet.h>
+
+#define ONENET_SEND_DATA_LEN           1024
+#define ONENET_HEAD_DATA_LEN           256
+#define ONENET_CON_URI_LEN             256
+ 
+ static rt_err_t onenet_send_data(char *send_buffer)
+{
+    struct webclient_session* session = NULL;
+    char *header = RT_NULL, *header_ptr;
+    char *buffer = send_buffer;
+    char *URI = RT_NULL;
+    rt_err_t result = RT_EOK;
+
+    session = (struct webclient_session *) ONENET_CALLOC(1, sizeof(struct webclient_session));
+    if (!session)
+    {
+        log_e("OneNet Send data failed! No memory for session structure!");
+        result = -RT_ENOMEM;
+        goto __exit;
+    }
+
+    URI = ONENET_CALLOC(1, ONENET_CON_URI_LEN);
+    if (URI == NULL)
+    {
+        log_e("OneNet Send data failed! No memory for URI buffer!");
+        result = -RT_ENOMEM;
+        goto __exit;
+    }
+
+#ifdef ONENET_USING_MQTT
+    extern struct rt_onenet_info onenet_info;
+
+    rt_snprintf(URI, ONENET_CON_URI_LEN, "http://api.heclouds.com/devices/%s/datapoints?type=3", onenet_info.device_id);
+#else
+    rt_snprintf(URI, ONENET_CON_URI_LEN, "http://api.heclouds.com/devices/%s/datapoints?type=3", ONENET_INFO_DEVID);
+#endif
+
+    /* connect OneNET cloud */
+    result = webclient_connect(session, URI);
+    if (result < 0)
+    {
+        log_e("OneNet Send data failed! Webclient connect URI(%s) failed!", URI);
+        goto __exit;
+    }
+
+    header = (char*) ONENET_CALLOC(1, ONENET_HEAD_DATA_LEN);
+    if (header == NULL)
+    {
+        log_e("OneNet Send data failed! No memory for header buffer!");
+        result = -RT_ENOMEM;
+        goto __exit;
+    }
+    header_ptr = header;
+
+    /* build header for upload */
+#ifdef ONENET_USING_MQTT
+    header_ptr += rt_snprintf(header_ptr,
+            WEBCLIENT_HEADER_BUFSZ - (header_ptr - header),
+            "api-key: %s\r\n", onenet_info.api_key);
+#else
+    header_ptr += rt_snprintf(header_ptr,
+            WEBCLIENT_HEADER_BUFSZ - (header_ptr - header),
+            "api-key: %s\r\n", ONENET_INFO_APIKEY);
+#endif
+    header_ptr += rt_snprintf(header_ptr,
+            WEBCLIENT_HEADER_BUFSZ - (header_ptr - header),
+            "Content-Length: %d\r\n", strlen(buffer));
+    header_ptr += rt_snprintf(header_ptr,
+            WEBCLIENT_HEADER_BUFSZ - (header_ptr - header),
+            "Content-Type: application/octet-stream\r\n");
+
+    /* send header data */
+    result = webclient_send_header(session, WEBCLIENT_POST, header, header_ptr - header);
+    if (result < 0)
+    {
+        log_e("OneNet Send data failed! Send header buffer failed return %d!", result);
+        goto __exit;
+    }
+
+    /* send body data */
+    webclient_write(session, (unsigned char *) buffer, strlen(buffer));
+    log_d("buffer : %.*s", strlen(buffer), buffer);
+
+    if (webclient_handle_response(session))
+    {
+        if (session->response != 200)
+        {
+            log_e("OneNet Send data failed! Handle response(%d) error!", session->response);
+            result = -RT_ERROR;
+            goto __exit;
+        }
+    }
+
+__exit:
+    if (session)
+    {
+        webclient_close(session);
+    }
+    if (URI)
+    {
+        ONENET_FREE(URI);
+    }
+    if (header)
+    {
+        ONENET_FREE(header);
+    }
+    return result;
+}
+
+
+static rt_err_t onenet_get_string_data(const char *name, char *str, char *out_buff)
+{
+    rt_err_t result = RT_EOK;
+    cJSON *root = RT_NULL;
+    char * msg_str = RT_NULL;
+
+    root = cJSON_CreateObject();
+    if (!root)
+    {
+        log_e("MQTT online push failed! cJSON create object error return NULL!");
+        return -RT_ENOMEM;
+    }
+
+    cJSON_AddStringToObject(root, name, str);
+
+    /* render a cJSON structure to buffer */
+    msg_str = cJSON_PrintUnformatted(root);
+    if (!msg_str)
+    {
+        log_e("Device online push failed! cJSON print unformatted error return NULL!");
+        result = -RT_ENOMEM;
+        goto __exit;
+    }
+
+    strncpy(out_buff, msg_str, strlen(msg_str));
+
+__exit:
+    if (root)
+    {
+        cJSON_Delete(root);
+    }
+    if (msg_str)
+    {
+        ONENET_FREE(msg_str);
+    }
+
+    return result;
+}
+
+static rt_err_t onenet_get_digit_data(const char *name, int digit, char *out_buff)
+{
+    rt_err_t result = RT_EOK;
+    cJSON *root = RT_NULL;
+    char * msg_str = RT_NULL;
+
+    root = cJSON_CreateObject();
+    if (!root)
+    {
+        log_e("MQTT online push failed! cJSON create object error return NULL!");
+        return -RT_ENOMEM;
+    }
+
+    cJSON_AddNumberToObject(root, name, digit);
+
+    /* render a cJSON structure to buffer */
+    msg_str = cJSON_PrintUnformatted(root);
+    if (!msg_str)
+    {
+        log_e("Device online push failed! cJSON print unformatted error return NULL!");
+        result = -RT_ENOMEM;
+        goto __exit;
+    }
+
+    strncpy(out_buff, msg_str, strlen(msg_str));
+
+__exit:
+    if (root)
+    {
+        cJSON_Delete(root);
+    }
+    if (msg_str)
+    {
+        ONENET_FREE(msg_str);
+    }
+
+    return result;
+}
+
+rt_err_t onenet_send_digit(const char *name, int digit)
+{
+    char *send_buffer = RT_NULL;
+    rt_err_t result = RT_EOK;
+
+    send_buffer = ONENET_CALLOC(1, ONENET_SEND_DATA_LEN);
+    if (!send_buffer)
+    {
+        log_e("RT ONENET send digit failed! No memory for send buffer!");
+        return -RT_ENOMEM;
+    }
+
+    /* get JSON format data */
+    result = onenet_get_digit_data(name, digit, send_buffer);
+    if (result < 0)
+    {
+        goto __exit;
+    }
+
+    /* send data to cloud by HTTP */
+    result = onenet_send_data(send_buffer);
+    if (result < 0)
+    {
+        goto __exit;
+    }
+
+__exit:
+    if (send_buffer)
+    {
+        ONENET_FREE(send_buffer);
+    }
+
+    return result;
+}
+
+rt_err_t onenet_send_string(const char *name, char *str)
+{
+    char *send_buffer = RT_NULL;
+    rt_err_t result = RT_EOK;
+
+    send_buffer = RT_ONENET_CALLOC(1, ONENET_SEND_DATA_LEN);
+    if (!send_buffer)
+    {
+        log_e("RT ONENET send digit failed! No memory for send buffer!");
+        return -RT_ENOMEM;
+    }
+
+    /* get JSON format data */
+    result = onenet_get_string_data(name, str, send_buffer);
+    if (result < 0)
+    {
+        goto __exit;
+    }
+
+    /* send data to cloud by HTTP */
+    result = onenet_send_data(send_buffer);
+    if (result < 0)
+    {
+        goto __exit;
+    }
+
+__exit:
+    if (send_buffer)
+    {
+        ONENET_FREE(send_buffer);
+    }
+
+    return result;
+}