|
|
@@ -12,31 +12,17 @@
|
|
|
|
|
|
#include "paho_mqtt.h"
|
|
|
|
|
|
-/**
|
|
|
- * MQTT URI farmat:
|
|
|
- * domain mode
|
|
|
- * tcp://iot.eclipse.org:1883
|
|
|
- *
|
|
|
- * ipv4 mode
|
|
|
- * tcp://192.168.10.1:1883
|
|
|
- * ssl://192.168.10.1:1884
|
|
|
- *
|
|
|
- * ipv6 mode
|
|
|
- * tcp://[fe80::20c:29ff:fe9a:a07e]:1883
|
|
|
- * ssl://[fe80::20c:29ff:fe9a:a07e]:1884
|
|
|
- */
|
|
|
-#define MQTT_URI "tcp://iot.eclipse.org:1883"
|
|
|
-#define MQTT_USERNAME "admin"
|
|
|
-#define MQTT_PASSWORD "admin"
|
|
|
-#define MQTT_SUBTOPIC "/mqtt/test"
|
|
|
-#define MQTT_PUBTOPIC "/mqtt/test"
|
|
|
+#define MQTT_URI PKG_USING_IOTSHARP_DEVICE_SERVER
|
|
|
+#define MQTT_USERNAME PKG_USING_IOTSHARP_DEVICE_NAME
|
|
|
+#define MQTT_PASSWORD PKG_USING_IOTSHARP_DEVICE_SECRET
|
|
|
+
|
|
|
#define MQTT_WILLMSG "Goodbye!"
|
|
|
|
|
|
/* define MQTT client context */
|
|
|
static MQTTClient client;
|
|
|
static int is_started = 0;
|
|
|
|
|
|
-static void mqtt_sub_callback(MQTTClient *c, MessageData *msg_data)
|
|
|
+ void mqtt_sub_callback(MQTTClient *c, MessageData *msg_data)
|
|
|
{
|
|
|
*((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
|
|
|
LOG_D("mqtt sub callback: %.*s %.*s",
|
|
|
@@ -46,7 +32,8 @@ static void mqtt_sub_callback(MQTTClient *c, MessageData *msg_data)
|
|
|
(char *)msg_data->message->payload);
|
|
|
}
|
|
|
|
|
|
-static void mqtt_sub_default_callback(MQTTClient *c, MessageData *msg_data)
|
|
|
+
|
|
|
+ void mqtt_sub_default_callback(MQTTClient *c, MessageData *msg_data)
|
|
|
{
|
|
|
*((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
|
|
|
LOG_D("mqtt sub default callback: %.*s %.*s",
|
|
|
@@ -59,29 +46,47 @@ static void mqtt_sub_default_callback(MQTTClient *c, MessageData *msg_data)
|
|
|
static void mqtt_connect_callback(MQTTClient *c)
|
|
|
{
|
|
|
LOG_D("inter mqtt_connect_callback!");
|
|
|
+
|
|
|
+}
|
|
|
+static void mqtt_new_sub_callback(MQTTClient *client, MessageData *msg_data)
|
|
|
+{
|
|
|
+ *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
|
|
|
+ LOG_D("mqtt new subscribe callback: %.*s %.*s",
|
|
|
+ msg_data->topicName->lenstring.len,
|
|
|
+ msg_data->topicName->lenstring.data,
|
|
|
+ msg_data->message->payloadlen,
|
|
|
+ (char *)msg_data->message->payload);
|
|
|
}
|
|
|
|
|
|
static void mqtt_online_callback(MQTTClient *c)
|
|
|
{
|
|
|
LOG_D("inter mqtt_online_callback!");
|
|
|
+ char _rpc_topic[200] = { 0 };
|
|
|
+ char _attup_topic[200] = { 0 };
|
|
|
+ sprintf(_rpc_topic, "devices/"PKG_USING_IOTSHARP_DEVICE_NAME"/rpc/request/+/+");
|
|
|
+ sprintf(_attup_topic, "devices/"PKG_USING_IOTSHARP_DEVICE_NAME"/attributes/update/");
|
|
|
+ paho_mqtt_subscribe(&client, QOS1, _rpc_topic, mqtt_new_sub_callback);
|
|
|
+ paho_mqtt_subscribe(&client, QOS1, _rpc_topic, mqtt_new_sub_callback);
|
|
|
+
|
|
|
}
|
|
|
|
|
|
static void mqtt_offline_callback(MQTTClient *c)
|
|
|
{
|
|
|
LOG_D("inter mqtt_offline_callback!");
|
|
|
+ char _rpc_topic[200] = { 0 };
|
|
|
+ char _attup_topic[200] = { 0 };
|
|
|
+ sprintf(_rpc_topic, "devices/"PKG_USING_IOTSHARP_DEVICE_NAME"/rpc/request/+/+");
|
|
|
+ sprintf(_attup_topic, "devices/"PKG_USING_IOTSHARP_DEVICE_NAME"/attributes/update/");
|
|
|
+ paho_mqtt_unsubscribe(&client, _rpc_topic);
|
|
|
+ paho_mqtt_unsubscribe(&client, _rpc_topic);
|
|
|
}
|
|
|
|
|
|
-static int mqtt_start(int argc, char **argv)
|
|
|
+int iotsharp_start(void)
|
|
|
{
|
|
|
/* init condata param by using MQTTPacket_connectData_initializer */
|
|
|
MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;
|
|
|
static char cid[20] = { 0 };
|
|
|
|
|
|
- if (argc != 1)
|
|
|
- {
|
|
|
- rt_kprintf("mqtt_start --start a mqtt worker thread.\n");
|
|
|
- return -1;
|
|
|
- }
|
|
|
|
|
|
if (is_started)
|
|
|
{
|
|
|
@@ -94,7 +99,7 @@ static int mqtt_start(int argc, char **argv)
|
|
|
client.uri = MQTT_URI;
|
|
|
|
|
|
/* generate the random client ID */
|
|
|
- rt_snprintf(cid, sizeof(cid), "rtthread%d", rt_tick_get());
|
|
|
+ rt_snprintf(cid, sizeof(cid), "iotsharp%d", rt_tick_get());
|
|
|
/* config connect param */
|
|
|
memcpy(&client.condata, &condata, sizeof(condata));
|
|
|
client.condata.clientID.cstring = cid;
|
|
|
@@ -107,7 +112,7 @@ static int mqtt_start(int argc, char **argv)
|
|
|
client.condata.willFlag = 1;
|
|
|
client.condata.will.qos = 1;
|
|
|
client.condata.will.retained = 0;
|
|
|
- client.condata.will.topicName.cstring = MQTT_PUBTOPIC;
|
|
|
+ client.condata.will.topicName.cstring = "/device/me/disconnect";
|
|
|
client.condata.will.message.cstring = MQTT_WILLMSG;
|
|
|
|
|
|
/* malloc buffer. */
|
|
|
@@ -126,9 +131,9 @@ static int mqtt_start(int argc, char **argv)
|
|
|
client.offline_callback = mqtt_offline_callback;
|
|
|
|
|
|
/* set subscribe table and event callback */
|
|
|
- client.messageHandlers[0].topicFilter = rt_strdup(MQTT_SUBTOPIC);
|
|
|
- client.messageHandlers[0].callback = mqtt_sub_callback;
|
|
|
- client.messageHandlers[0].qos = QOS1;
|
|
|
+ // client.messageHandlers[0].topicFilter = rt_strdup(MQTT_SUBTOPIC);
|
|
|
+ //client.messageHandlers[0].callback = mqtt_sub_callback;
|
|
|
+ // client.messageHandlers[0].qos = QOS1;
|
|
|
|
|
|
/* set default subscribe event callback */
|
|
|
client.defaultMessageHandler = mqtt_sub_default_callback;
|
|
|
@@ -141,92 +146,62 @@ static int mqtt_start(int argc, char **argv)
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
-static int mqtt_stop(int argc, char **argv)
|
|
|
+ int iotsahrp_stop(void)
|
|
|
{
|
|
|
- if (argc != 1)
|
|
|
- {
|
|
|
- rt_kprintf("mqtt_stop --stop mqtt worker thread and free mqtt client object.\n");
|
|
|
- }
|
|
|
-
|
|
|
is_started = 0;
|
|
|
|
|
|
return paho_mqtt_stop(&client);
|
|
|
}
|
|
|
|
|
|
-static int mqtt_publish(int argc, char **argv)
|
|
|
+
|
|
|
+ static int mqtt_publish_for_gateway(char* subdevicename, int datatype, char *playload)
|
|
|
{
|
|
|
+ char _telemetry_topic[200] = { 0 };
|
|
|
+ char attributes_topic[200] = { 0 };
|
|
|
+ sprintf(_telemetry_topic, "devices/%s/telemetry",subdevicename);
|
|
|
+ sprintf(attributes_topic, "devices/%s/attributes",subdevicename);
|
|
|
if (is_started == 0)
|
|
|
{
|
|
|
LOG_E("mqtt client is not connected.");
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
|
- if (argc == 2)
|
|
|
+ if (datatype == 1)
|
|
|
{
|
|
|
- paho_mqtt_publish(&client, QOS1, MQTT_PUBTOPIC, argv[1]);
|
|
|
+ paho_mqtt_publish(&client, QOS1, _telemetry_topic, playload);
|
|
|
}
|
|
|
- else if (argc == 3)
|
|
|
+ else if (datatype== 2)
|
|
|
{
|
|
|
- paho_mqtt_publish(&client, QOS1, argv[1], argv[2]);
|
|
|
+ paho_mqtt_publish(&client, QOS1, attributes_topic, playload);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- rt_kprintf("mqtt_publish <topic> [message] --mqtt publish message to specified topic.\n");
|
|
|
+ rt_kprintf("publish message to specified datatype.\n");
|
|
|
return -1;
|
|
|
}
|
|
|
-
|
|
|
return 0;
|
|
|
}
|
|
|
-
|
|
|
-static void mqtt_new_sub_callback(MQTTClient *client, MessageData *msg_data)
|
|
|
+ int iotsharp_upload_telemetry_for_gateway(char* _devname,char* playload)
|
|
|
+ {
|
|
|
+ return mqtt_publish_for_gateway(_devname,1,playload);
|
|
|
+ }
|
|
|
+ int iotsharp_upload_telemetry_to_device(char* playload)
|
|
|
+ {
|
|
|
+ return mqtt_publish_for_gateway("me",1,playload);
|
|
|
+ }
|
|
|
+int iotsharp_upload_attribute_for_gateway(char* _devname,char* playload)
|
|
|
{
|
|
|
- *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
|
|
|
- LOG_D("mqtt new subscribe callback: %.*s %.*s",
|
|
|
- msg_data->topicName->lenstring.len,
|
|
|
- msg_data->topicName->lenstring.data,
|
|
|
- msg_data->message->payloadlen,
|
|
|
- (char *)msg_data->message->payload);
|
|
|
+ return mqtt_publish_for_gateway(_devname,2,playload);
|
|
|
}
|
|
|
-
|
|
|
-static int mqtt_subscribe(int argc, char **argv)
|
|
|
+int iotsharp_upload_attribute_to_device(char* playload)
|
|
|
{
|
|
|
- if (argc != 2)
|
|
|
- {
|
|
|
- rt_kprintf("mqtt_subscribe [topic] --send an mqtt subscribe packet and wait for suback before returning.\n");
|
|
|
- return -1;
|
|
|
- }
|
|
|
-
|
|
|
- if (is_started == 0)
|
|
|
- {
|
|
|
- LOG_E("mqtt client is not connected.");
|
|
|
- return -1;
|
|
|
- }
|
|
|
-
|
|
|
- return paho_mqtt_subscribe(&client, QOS1, argv[1], mqtt_new_sub_callback);
|
|
|
+ return mqtt_publish_for_gateway("me",2,playload);
|
|
|
}
|
|
|
|
|
|
-static int mqtt_unsubscribe(int argc, char **argv)
|
|
|
-{
|
|
|
- if (argc != 2)
|
|
|
- {
|
|
|
- rt_kprintf("mqtt_unsubscribe [topic] --send an mqtt unsubscribe packet and wait for suback before returning.\n");
|
|
|
- return -1;
|
|
|
- }
|
|
|
-
|
|
|
- if (is_started == 0)
|
|
|
- {
|
|
|
- LOG_E("mqtt client is not connected.");
|
|
|
- return -1;
|
|
|
- }
|
|
|
|
|
|
- return paho_mqtt_unsubscribe(&client, argv[1]);
|
|
|
-}
|
|
|
|
|
|
#ifdef FINSH_USING_MSH
|
|
|
-MSH_CMD_EXPORT(mqtt_start, startup mqtt client);
|
|
|
-MSH_CMD_EXPORT(mqtt_stop, stop mqtt client);
|
|
|
-MSH_CMD_EXPORT(mqtt_publish, mqtt publish message to specified topic);
|
|
|
-MSH_CMD_EXPORT(mqtt_subscribe, mqtt subscribe topic);
|
|
|
-MSH_CMD_EXPORT(mqtt_unsubscribe, mqtt unsubscribe topic);
|
|
|
+
|
|
|
+
|
|
|
#endif /* FINSH_USING_MSH */
|
|
|
|