core_mqtt.c 135 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770
  1. /*
  2. * coreMQTT <DEVELOPMENT BRANCH>
  3. * Copyright (C) 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  4. *
  5. * SPDX-License-Identifier: MIT
  6. *
  7. * Permission is hereby granted, free of charge, to any person obtaining a copy of
  8. * this software and associated documentation files (the "Software"), to deal in
  9. * the Software without restriction, including without limitation the rights to
  10. * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  11. * the Software, and to permit persons to whom the Software is furnished to do so,
  12. * subject to the following conditions:
  13. *
  14. * The above copyright notice and this permission notice shall be included in all
  15. * copies or substantial portions of the Software.
  16. *
  17. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  19. * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  20. * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  21. * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  22. * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  23. */
  24. /**
  25. * @file core_mqtt.c
  26. * @brief Implements the user-facing functions in core_mqtt.h.
  27. */
  28. #include <string.h>
  29. #include <assert.h>
  30. #include "core_mqtt.h"
  31. #include "core_mqtt_state.h"
  32. /* Include config defaults header to get default values of configs. */
  33. #include "core_mqtt_config_defaults.h"
  34. #ifndef MQTT_PRE_SEND_HOOK
  35. /**
  36. * @brief Hook called before a 'send' operation is executed.
  37. */
  38. #define MQTT_PRE_SEND_HOOK( pContext )
  39. #endif /* !MQTT_PRE_SEND_HOOK */
  40. #ifndef MQTT_POST_SEND_HOOK
  41. /**
  42. * @brief Hook called after the 'send' operation is complete.
  43. */
  44. #define MQTT_POST_SEND_HOOK( pContext )
  45. #endif /* !MQTT_POST_SEND_HOOK */
  46. #ifndef MQTT_PRE_STATE_UPDATE_HOOK
  47. /**
  48. * @brief Hook called just before an update to the MQTT state is made.
  49. */
  50. #define MQTT_PRE_STATE_UPDATE_HOOK( pContext )
  51. #endif /* !MQTT_PRE_STATE_UPDATE_HOOK */
  52. #ifndef MQTT_POST_STATE_UPDATE_HOOK
  53. /**
  54. * @brief Hook called just after an update to the MQTT state has
  55. * been made.
  56. */
  57. #define MQTT_POST_STATE_UPDATE_HOOK( pContext )
  58. #endif /* !MQTT_POST_STATE_UPDATE_HOOK */
  59. /**
  60. * @brief Bytes required to encode any string length in an MQTT packet header.
  61. * Length is always encoded in two bytes according to the MQTT specification.
  62. */
  63. #define CORE_MQTT_SERIALIZED_LENGTH_FIELD_BYTES ( 2U )
  64. /**
  65. * @brief Number of vectors required to encode one topic filter in a subscribe
  66. * request. Three vectors are required as there are three fields in the
  67. * subscribe request namely:
  68. * 1. Topic filter length; 2. Topic filter; and 3. QoS in this order.
  69. */
  70. #define CORE_MQTT_SUBSCRIBE_PER_TOPIC_VECTOR_LENGTH ( 3U )
  71. /**
  72. * @brief Number of vectors required to encode one topic filter in an
  73. * unsubscribe request. Two vectors are required as there are two fields in the
  74. * unsubscribe request namely:
  75. * 1. Topic filter length; and 2. Topic filter in this order.
  76. */
  77. #define CORE_MQTT_UNSUBSCRIBE_PER_TOPIC_VECTOR_LENGTH ( 2U )
  78. struct MQTTVec
  79. {
  80. TransportOutVector_t * pVector; /**< Pointer to transport vector. USER SHOULD NOT ACCESS THIS DIRECTLY - IT IS AN INTERNAL DETAIL AND CAN CHANGE. */
  81. size_t vectorLen; /**< Length of the transport vector. USER SHOULD NOT ACCESS THIS DIRECTLY - IT IS AN INTERNAL DETAIL AND CAN CHANGE. */
  82. };
  83. /*-----------------------------------------------------------*/
  84. /**
  85. * @brief Sends provided buffer to network using transport send.
  86. *
  87. * @brief param[in] pContext Initialized MQTT context.
  88. * @brief param[in] pBufferToSend Buffer to be sent to network.
  89. * @brief param[in] bytesToSend Number of bytes to be sent.
  90. *
  91. * @note This operation may call the transport send function
  92. * repeatedly to send bytes over the network until either:
  93. * 1. The requested number of bytes @a bytesToSend have been sent.
  94. * OR
  95. * 2. MQTT_SEND_TIMEOUT_MS milliseconds have gone by since entering this
  96. * function.
  97. * OR
  98. * 3. There is an error in sending data over the network.
  99. *
  100. * @return Total number of bytes sent, or negative value on network error.
  101. */
  102. static int32_t sendBuffer( MQTTContext_t * pContext,
  103. const uint8_t * pBufferToSend,
  104. size_t bytesToSend );
  105. /**
  106. * @brief Sends MQTT connect without copying the users data into any buffer.
  107. *
  108. * @brief param[in] pContext Initialized MQTT context.
  109. * @brief param[in] pConnectInfo MQTT CONNECT packet information.
  110. * @brief param[in] pWillInfo Last Will and Testament. Pass NULL if Last Will and
  111. * Testament is not used.
  112. * @brief param[in] remainingLength the length of the connect packet.
  113. *
  114. * @note This operation may call the transport send function
  115. * repeatedly to send bytes over the network until either:
  116. * 1. The requested number of bytes @a remainingLength have been sent.
  117. * OR
  118. * 2. MQTT_SEND_TIMEOUT_MS milliseconds have gone by since entering this
  119. * function.
  120. * OR
  121. * 3. There is an error in sending data over the network.
  122. *
  123. * @return #MQTTSendFailed or #MQTTSuccess.
  124. */
  125. static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext,
  126. const MQTTConnectInfo_t * pConnectInfo,
  127. const MQTTPublishInfo_t * pWillInfo,
  128. size_t remainingLength );
  129. /**
  130. * @brief Sends the vector array passed through the parameters over the network.
  131. *
  132. * @note The preference is given to 'writev' function if it is present in the
  133. * transport interface. Otherwise, a send call is made repeatedly to achieve the
  134. * result.
  135. *
  136. * @param[in] pContext Initialized MQTT context.
  137. * @param[in] pIoVec The vector array to be sent.
  138. * @param[in] ioVecCount The number of elements in the array.
  139. *
  140. * @note This operation may call the transport send or writev functions
  141. * repeatedly to send bytes over the network until either:
  142. * 1. The requested number of bytes have been sent.
  143. * OR
  144. * 2. MQTT_SEND_TIMEOUT_MS milliseconds have gone by since entering this
  145. * function.
  146. * OR
  147. * 3. There is an error in sending data over the network.
  148. *
  149. * @return The total number of bytes sent or the error code as received from the
  150. * transport interface.
  151. */
  152. static int32_t sendMessageVector( MQTTContext_t * pContext,
  153. TransportOutVector_t * pIoVec,
  154. size_t ioVecCount );
  155. /**
  156. * @brief Add a string and its length after serializing it in a manner outlined by
  157. * the MQTT specification.
  158. *
  159. * @param[in] serializedLength Array of two bytes to which the vector will point.
  160. * The array must remain in scope until the message has been sent.
  161. * @param[in] string The string to be serialized.
  162. * @param[in] length The length of the string to be serialized.
  163. * @param[in] iterator The iterator pointing to the first element in the
  164. * transport interface IO array.
  165. * @param[out] updatedLength This parameter will be added to with the number of
  166. * bytes added to the vector.
  167. *
  168. * @return The number of vectors added.
  169. */
  170. static size_t addEncodedStringToVector( uint8_t serializedLength[ CORE_MQTT_SERIALIZED_LENGTH_FIELD_BYTES ],
  171. const char * const string,
  172. uint16_t length,
  173. TransportOutVector_t * iterator,
  174. size_t * updatedLength );
  175. /**
  176. * @brief Send MQTT SUBSCRIBE message without copying the user data into a buffer and
  177. * directly sending it.
  178. *
  179. * @param[in] pContext Initialized MQTT context.
  180. * @param[in] pSubscriptionList List of MQTT subscription info.
  181. * @param[in] subscriptionCount The count of elements in the list.
  182. * @param[in] packetId The packet ID of the subscribe packet
  183. * @param[in] remainingLength The remaining length of the subscribe packet.
  184. *
  185. * @return #MQTTSuccess or #MQTTSendFailed.
  186. */
  187. static MQTTStatus_t sendSubscribeWithoutCopy( MQTTContext_t * pContext,
  188. const MQTTSubscribeInfo_t * pSubscriptionList,
  189. size_t subscriptionCount,
  190. uint16_t packetId,
  191. size_t remainingLength );
  192. /**
  193. * @brief Send MQTT UNSUBSCRIBE message without copying the user data into a buffer and
  194. * directly sending it.
  195. *
  196. * @param[in] pContext Initialized MQTT context.
  197. * @param[in] pSubscriptionList MQTT subscription info.
  198. * @param[in] subscriptionCount The count of elements in the list.
  199. * @param[in] packetId The packet ID of the unsubscribe packet.
  200. * @param[in] remainingLength The remaining length of the unsubscribe packet.
  201. *
  202. * @return #MQTTSuccess or #MQTTSendFailed.
  203. */
  204. static MQTTStatus_t sendUnsubscribeWithoutCopy( MQTTContext_t * pContext,
  205. const MQTTSubscribeInfo_t * pSubscriptionList,
  206. size_t subscriptionCount,
  207. uint16_t packetId,
  208. size_t remainingLength );
  209. /**
  210. * @brief Calculate the interval between two millisecond timestamps, including
  211. * when the later value has overflowed.
  212. *
  213. * @note In C, the operands are promoted to signed integers in subtraction.
  214. * Using this function avoids the need to cast the result of subtractions back
  215. * to uint32_t.
  216. *
  217. * @param[in] later The later time stamp, in milliseconds.
  218. * @param[in] start The earlier time stamp, in milliseconds.
  219. *
  220. * @return later - start.
  221. */
  222. static uint32_t calculateElapsedTime( uint32_t later,
  223. uint32_t start );
  224. /**
  225. * @brief Convert a byte indicating a publish ack type to an #MQTTPubAckType_t.
  226. *
  227. * @param[in] packetType First byte of fixed header.
  228. *
  229. * @return Type of ack.
  230. */
  231. static MQTTPubAckType_t getAckFromPacketType( uint8_t packetType );
  232. /**
  233. * @brief Receive bytes into the network buffer.
  234. *
  235. * @param[in] pContext Initialized MQTT Context.
  236. * @param[in] bytesToRecv Number of bytes to receive.
  237. *
  238. * @note This operation calls the transport receive function
  239. * repeatedly to read bytes from the network until either:
  240. * 1. The requested number of bytes @a bytesToRecv are read.
  241. * OR
  242. * 2. No data is received from the network for MQTT_RECV_POLLING_TIMEOUT_MS duration.
  243. *
  244. * OR
  245. * 3. There is an error in reading from the network.
  246. *
  247. *
  248. * @return Number of bytes received, or negative number on network error.
  249. */
  250. static int32_t recvExact( MQTTContext_t * pContext,
  251. size_t bytesToRecv );
  252. /**
  253. * @brief Discard a packet from the transport interface.
  254. *
  255. * @param[in] pContext MQTT Connection context.
  256. * @param[in] remainingLength Remaining length of the packet to dump.
  257. * @param[in] timeoutMs Time remaining to discard the packet.
  258. *
  259. * @return #MQTTRecvFailed or #MQTTNoDataAvailable.
  260. */
  261. static MQTTStatus_t discardPacket( MQTTContext_t * pContext,
  262. size_t remainingLength,
  263. uint32_t timeoutMs );
  264. /**
  265. * @brief Discard a packet from the MQTT buffer and the transport interface.
  266. *
  267. * @param[in] pContext MQTT Connection context.
  268. * @param[in] pPacketInfo Information struct of the packet to be discarded.
  269. *
  270. * @return #MQTTRecvFailed or #MQTTNoDataAvailable.
  271. */
  272. static MQTTStatus_t discardStoredPacket( MQTTContext_t * pContext,
  273. const MQTTPacketInfo_t * pPacketInfo );
  274. /**
  275. * @brief Receive a packet from the transport interface.
  276. *
  277. * @param[in] pContext MQTT Connection context.
  278. * @param[in] incomingPacket packet struct with remaining length.
  279. * @param[in] remainingTimeMs Time remaining to receive the packet.
  280. *
  281. * @return #MQTTSuccess or #MQTTRecvFailed.
  282. */
  283. static MQTTStatus_t receivePacket( MQTTContext_t * pContext,
  284. MQTTPacketInfo_t incomingPacket,
  285. uint32_t remainingTimeMs );
  286. /**
  287. * @brief Get the correct ack type to send.
  288. *
  289. * @param[in] state Current state of publish.
  290. *
  291. * @return Packet Type byte of PUBACK, PUBREC, PUBREL, or PUBCOMP if one of
  292. * those should be sent, else 0.
  293. */
  294. static uint8_t getAckTypeToSend( MQTTPublishState_t state );
  295. /**
  296. * @brief Send acks for received QoS 1/2 publishes.
  297. *
  298. * @param[in] pContext MQTT Connection context.
  299. * @param[in] packetId packet ID of original PUBLISH.
  300. * @param[in] publishState Current publish state in record.
  301. *
  302. * @return #MQTTSuccess, #MQTTIllegalState or #MQTTSendFailed.
  303. */
  304. static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
  305. uint16_t packetId,
  306. MQTTPublishState_t publishState );
  307. /**
  308. * @brief Send a keep alive PINGREQ if the keep alive interval has elapsed.
  309. *
  310. * @param[in] pContext Initialized MQTT Context.
  311. *
  312. * @return #MQTTKeepAliveTimeout if a PINGRESP is not received in time,
  313. * #MQTTSendFailed if the PINGREQ cannot be sent, or #MQTTSuccess.
  314. */
  315. static MQTTStatus_t handleKeepAlive( MQTTContext_t * pContext );
  316. /**
  317. * @brief Handle received MQTT PUBLISH packet.
  318. *
  319. * @param[in] pContext MQTT Connection context.
  320. * @param[in] pIncomingPacket Incoming packet.
  321. *
  322. * @return MQTTSuccess, MQTTIllegalState or deserialization error.
  323. */
  324. static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext,
  325. MQTTPacketInfo_t * pIncomingPacket );
  326. /**
  327. * @brief Handle received MQTT publish acks.
  328. *
  329. * @param[in] pContext MQTT Connection context.
  330. * @param[in] pIncomingPacket Incoming packet.
  331. *
  332. * @return MQTTSuccess, MQTTIllegalState, or deserialization error.
  333. */
  334. static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext,
  335. MQTTPacketInfo_t * pIncomingPacket );
  336. /**
  337. * @brief Handle received MQTT ack.
  338. *
  339. * @param[in] pContext MQTT Connection context.
  340. * @param[in] pIncomingPacket Incoming packet.
  341. * @param[in] manageKeepAlive Flag indicating if PINGRESPs should not be given
  342. * to the application
  343. *
  344. * @return MQTTSuccess, MQTTIllegalState, or deserialization error.
  345. */
  346. static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext,
  347. MQTTPacketInfo_t * pIncomingPacket,
  348. bool manageKeepAlive );
  349. /**
  350. * @brief Run a single iteration of the receive loop.
  351. *
  352. * @param[in] pContext MQTT Connection context.
  353. * @param[in] manageKeepAlive Flag indicating if keep alive should be handled.
  354. *
  355. * @return #MQTTRecvFailed if a network error occurs during reception;
  356. * #MQTTSendFailed if a network error occurs while sending an ACK or PINGREQ;
  357. * #MQTTBadResponse if an invalid packet is received;
  358. * #MQTTKeepAliveTimeout if the server has not sent a PINGRESP before
  359. * #MQTT_PINGRESP_TIMEOUT_MS milliseconds;
  360. * #MQTTIllegalState if an incoming QoS 1/2 publish or ack causes an
  361. * invalid transition for the internal state machine;
  362. * #MQTTSuccess on success.
  363. */
  364. static MQTTStatus_t receiveSingleIteration( MQTTContext_t * pContext,
  365. bool manageKeepAlive );
  366. /**
  367. * @brief Validates parameters of #MQTT_Subscribe or #MQTT_Unsubscribe.
  368. *
  369. * @param[in] pContext Initialized MQTT context.
  370. * @param[in] pSubscriptionList List of MQTT subscription info.
  371. * @param[in] subscriptionCount The number of elements in pSubscriptionList.
  372. * @param[in] packetId Packet identifier.
  373. *
  374. * @return #MQTTBadParameter if invalid parameters are passed;
  375. * #MQTTSuccess otherwise.
  376. */
  377. static MQTTStatus_t validateSubscribeUnsubscribeParams( const MQTTContext_t * pContext,
  378. const MQTTSubscribeInfo_t * pSubscriptionList,
  379. size_t subscriptionCount,
  380. uint16_t packetId );
  381. /**
  382. * @brief Receives a CONNACK MQTT packet.
  383. *
  384. * @param[in] pContext Initialized MQTT context.
  385. * @param[in] timeoutMs Timeout for waiting for CONNACK packet.
  386. * @param[in] cleanSession Clean session flag set by application.
  387. * @param[out] pIncomingPacket List of MQTT subscription info.
  388. * @param[out] pSessionPresent Whether a previous session was present.
  389. * Only relevant if not establishing a clean session.
  390. *
  391. * @return #MQTTBadResponse if a bad response is received;
  392. * #MQTTNoDataAvailable if no data available for transport recv;
  393. * ##MQTTRecvFailed if transport recv failed;
  394. * #MQTTSuccess otherwise.
  395. */
  396. static MQTTStatus_t receiveConnack( MQTTContext_t * pContext,
  397. uint32_t timeoutMs,
  398. bool cleanSession,
  399. MQTTPacketInfo_t * pIncomingPacket,
  400. bool * pSessionPresent );
  401. /**
  402. * @brief Resends pending acks for a re-established MQTT session
  403. *
  404. * @param[in] pContext Initialized MQTT context.
  405. *
  406. * @return #MQTTSendFailed if transport send during resend failed;
  407. * #MQTTSuccess otherwise.
  408. */
  409. static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext );
  410. /**
  411. * @brief Clears existing state records for a clean session.
  412. *
  413. * @param[in] pContext Initialized MQTT context.
  414. *
  415. * @return #MQTTSuccess always otherwise.
  416. */
  417. static MQTTStatus_t handleCleanSession( MQTTContext_t * pContext );
  418. /**
  419. * @brief Send the publish packet without copying the topic string and payload in
  420. * the buffer.
  421. *
  422. * @brief param[in] pContext Initialized MQTT context.
  423. * @brief param[in] pPublishInfo MQTT PUBLISH packet parameters.
  424. * @brief param[in] pMqttHeader the serialized MQTT header with the header byte;
  425. * the encoded length of the packet; and the encoded length of the topic string.
  426. * @brief param[in] headerSize Size of the serialized PUBLISH header.
  427. * @brief param[in] packetId Packet Id of the publish packet.
  428. *
  429. * @return #MQTTSendFailed if transport send during resend failed;
  430. * #MQTTSuccess otherwise.
  431. */
  432. static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext,
  433. const MQTTPublishInfo_t * pPublishInfo,
  434. uint8_t * pMqttHeader,
  435. size_t headerSize,
  436. uint16_t packetId );
  437. /**
  438. * @brief Function to validate #MQTT_Publish parameters.
  439. *
  440. * @brief param[in] pContext Initialized MQTT context.
  441. * @brief param[in] pPublishInfo MQTT PUBLISH packet parameters.
  442. * @brief param[in] packetId Packet Id for the MQTT PUBLISH packet.
  443. *
  444. * @return #MQTTBadParameter if invalid parameters are passed;
  445. * #MQTTSuccess otherwise.
  446. */
  447. static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext,
  448. const MQTTPublishInfo_t * pPublishInfo,
  449. uint16_t packetId );
  450. /**
  451. * @brief Performs matching for special cases when a topic filter ends
  452. * with a wildcard character.
  453. *
  454. * When the topic name has been consumed but there are remaining characters to
  455. * to match in topic filter, this function handles the following 2 cases:
  456. * - When the topic filter ends with "/+" or "/#" characters, but the topic
  457. * name only ends with '/'.
  458. * - When the topic filter ends with "/#" characters, but the topic name
  459. * ends at the parent level.
  460. *
  461. * @note This function ASSUMES that the topic name been consumed in linear
  462. * matching with the topic filer, but the topic filter has remaining characters
  463. * to be matched.
  464. *
  465. * @param[in] pTopicFilter The topic filter containing the wildcard.
  466. * @param[in] topicFilterLength Length of the topic filter being examined.
  467. * @param[in] filterIndex Index of the topic filter being examined.
  468. *
  469. * @return Returns whether the topic filter and the topic name match.
  470. */
  471. static bool matchEndWildcardsSpecialCases( const char * pTopicFilter,
  472. uint16_t topicFilterLength,
  473. uint16_t filterIndex );
  474. /**
  475. * @brief Attempt to match topic name with a topic filter starting with a wildcard.
  476. *
  477. * If the topic filter starts with a '+' (single-level) wildcard, the function
  478. * advances the @a pNameIndex by a level in the topic name.
  479. * If the topic filter starts with a '#' (multi-level) wildcard, the function
  480. * concludes that both the topic name and topic filter match.
  481. *
  482. * @param[in] pTopicName The topic name to match.
  483. * @param[in] topicNameLength Length of the topic name.
  484. * @param[in] pTopicFilter The topic filter to match.
  485. * @param[in] topicFilterLength Length of the topic filter.
  486. * @param[in,out] pNameIndex Current index in the topic name being examined. It is
  487. * advanced by one level for `+` wildcards.
  488. * @param[in, out] pFilterIndex Current index in the topic filter being examined.
  489. * It is advanced to position of '/' level separator for '+' wildcard.
  490. * @param[out] pMatch Whether the topic filter and topic name match.
  491. *
  492. * @return `true` if the caller of this function should exit; `false` if the
  493. * caller should continue parsing the topics.
  494. */
  495. static bool matchWildcards( const char * pTopicName,
  496. uint16_t topicNameLength,
  497. const char * pTopicFilter,
  498. uint16_t topicFilterLength,
  499. uint16_t * pNameIndex,
  500. uint16_t * pFilterIndex,
  501. bool * pMatch );
  502. /**
  503. * @brief Match a topic name and topic filter allowing the use of wildcards.
  504. *
  505. * @param[in] pTopicName The topic name to check.
  506. * @param[in] topicNameLength Length of the topic name.
  507. * @param[in] pTopicFilter The topic filter to check.
  508. * @param[in] topicFilterLength Length of topic filter.
  509. *
  510. * @return `true` if the topic name and topic filter match; `false` otherwise.
  511. */
  512. static bool matchTopicFilter( const char * pTopicName,
  513. uint16_t topicNameLength,
  514. const char * pTopicFilter,
  515. uint16_t topicFilterLength );
  516. /*-----------------------------------------------------------*/
  517. static bool matchEndWildcardsSpecialCases( const char * pTopicFilter,
  518. uint16_t topicFilterLength,
  519. uint16_t filterIndex )
  520. {
  521. bool matchFound = false;
  522. assert( pTopicFilter != NULL );
  523. assert( topicFilterLength != 0U );
  524. /* Check if the topic filter has 2 remaining characters and it ends in
  525. * "/#". This check handles the case to match filter "sport/#" with topic
  526. * "sport". The reason is that the '#' wildcard represents the parent and
  527. * any number of child levels in the topic name.*/
  528. if( ( topicFilterLength >= 3U ) &&
  529. ( filterIndex == ( topicFilterLength - 3U ) ) &&
  530. ( pTopicFilter[ filterIndex + 1U ] == '/' ) &&
  531. ( pTopicFilter[ filterIndex + 2U ] == '#' ) )
  532. {
  533. matchFound = true;
  534. }
  535. /* Check if the next character is "#" or "+" and the topic filter ends in
  536. * "/#" or "/+". This check handles the cases to match:
  537. *
  538. * - Topic filter "sport/+" with topic "sport/".
  539. * - Topic filter "sport/#" with topic "sport/".
  540. */
  541. if( ( filterIndex == ( topicFilterLength - 2U ) ) &&
  542. ( pTopicFilter[ filterIndex ] == '/' ) )
  543. {
  544. /* Check that the last character is a wildcard. */
  545. matchFound = ( pTopicFilter[ filterIndex + 1U ] == '+' ) ||
  546. ( pTopicFilter[ filterIndex + 1U ] == '#' );
  547. }
  548. return matchFound;
  549. }
  550. /*-----------------------------------------------------------*/
  551. static bool matchWildcards( const char * pTopicName,
  552. uint16_t topicNameLength,
  553. const char * pTopicFilter,
  554. uint16_t topicFilterLength,
  555. uint16_t * pNameIndex,
  556. uint16_t * pFilterIndex,
  557. bool * pMatch )
  558. {
  559. bool shouldStopMatching = false;
  560. bool locationIsValidForWildcard;
  561. uint16_t nameIndex;
  562. assert( pTopicName != NULL );
  563. assert( topicNameLength != 0U );
  564. assert( pTopicFilter != NULL );
  565. assert( topicFilterLength != 0U );
  566. assert( pNameIndex != NULL );
  567. assert( pFilterIndex != NULL );
  568. assert( pMatch != NULL );
  569. nameIndex = *pNameIndex;
  570. /* Wild card in a topic filter is only valid either at the starting position
  571. * or when it is preceded by a '/'.*/
  572. locationIsValidForWildcard = ( *pFilterIndex == 0u ) ||
  573. ( pTopicFilter[ *pFilterIndex - 1U ] == '/' );
  574. if( ( pTopicFilter[ *pFilterIndex ] == '+' ) && ( locationIsValidForWildcard == true ) )
  575. {
  576. bool nextLevelExistsInTopicName = false;
  577. bool nextLevelExistsinTopicFilter = false;
  578. /* Move topic name index to the end of the current level. The end of the
  579. * current level is identified by the last character before the next level
  580. * separator '/'. */
  581. while( nameIndex < topicNameLength )
  582. {
  583. /* Exit the loop if we hit the level separator. */
  584. if( pTopicName[ nameIndex ] == '/' )
  585. {
  586. nextLevelExistsInTopicName = true;
  587. break;
  588. }
  589. nameIndex += 1;
  590. }
  591. /* Determine if the topic filter contains a child level after the current level
  592. * represented by the '+' wildcard. */
  593. if( ( *pFilterIndex < ( topicFilterLength - 1U ) ) &&
  594. ( pTopicFilter[ *pFilterIndex + 1U ] == '/' ) )
  595. {
  596. nextLevelExistsinTopicFilter = true;
  597. }
  598. /* If the topic name contains a child level but the topic filter ends at
  599. * the current level, then there does not exist a match. */
  600. if( ( nextLevelExistsInTopicName == true ) &&
  601. ( nextLevelExistsinTopicFilter == false ) )
  602. {
  603. *pMatch = false;
  604. shouldStopMatching = true;
  605. }
  606. /* If the topic name and topic filter have child levels, then advance the
  607. * filter index to the level separator in the topic filter, so that match
  608. * can be performed in the next level.
  609. * Note: The name index already points to the level separator in the topic
  610. * name. */
  611. else if( nextLevelExistsInTopicName == true )
  612. {
  613. ( *pFilterIndex )++;
  614. }
  615. else
  616. {
  617. /* If we have reached here, the the loop terminated on the
  618. * ( nameIndex < topicNameLength) condition, which means that have
  619. * reached past the end of the topic name, and thus, we decrement the
  620. * index to the last character in the topic name.*/
  621. /* coverity[integer_overflow] */
  622. nameIndex -= 1;
  623. }
  624. }
  625. /* '#' matches everything remaining in the topic name. It must be the
  626. * last character in a topic filter. */
  627. else if( ( pTopicFilter[ *pFilterIndex ] == '#' ) &&
  628. ( *pFilterIndex == ( topicFilterLength - 1U ) ) &&
  629. ( locationIsValidForWildcard == true ) )
  630. {
  631. /* Subsequent characters don't need to be checked for the
  632. * multi-level wildcard. */
  633. *pMatch = true;
  634. shouldStopMatching = true;
  635. }
  636. else
  637. {
  638. /* Any character mismatch other than '+' or '#' means the topic
  639. * name does not match the topic filter. */
  640. *pMatch = false;
  641. shouldStopMatching = true;
  642. }
  643. *pNameIndex = nameIndex;
  644. return shouldStopMatching;
  645. }
  646. /*-----------------------------------------------------------*/
  647. static bool matchTopicFilter( const char * pTopicName,
  648. uint16_t topicNameLength,
  649. const char * pTopicFilter,
  650. uint16_t topicFilterLength )
  651. {
  652. bool matchFound = false, shouldStopMatching = false;
  653. uint16_t nameIndex = 0, filterIndex = 0;
  654. assert( pTopicName != NULL );
  655. assert( topicNameLength != 0 );
  656. assert( pTopicFilter != NULL );
  657. assert( topicFilterLength != 0 );
  658. while( ( nameIndex < topicNameLength ) && ( filterIndex < topicFilterLength ) )
  659. {
  660. /* Check if the character in the topic name matches the corresponding
  661. * character in the topic filter string. */
  662. if( pTopicName[ nameIndex ] == pTopicFilter[ filterIndex ] )
  663. {
  664. /* If the topic name has been consumed but the topic filter has not
  665. * been consumed, match for special cases when the topic filter ends
  666. * with wildcard character. */
  667. if( nameIndex == ( topicNameLength - 1U ) )
  668. {
  669. matchFound = matchEndWildcardsSpecialCases( pTopicFilter,
  670. topicFilterLength,
  671. filterIndex );
  672. }
  673. }
  674. else
  675. {
  676. /* Check for matching wildcards. */
  677. shouldStopMatching = matchWildcards( pTopicName,
  678. topicNameLength,
  679. pTopicFilter,
  680. topicFilterLength,
  681. &nameIndex,
  682. &filterIndex,
  683. &matchFound );
  684. }
  685. if( ( matchFound == true ) || ( shouldStopMatching == true ) )
  686. {
  687. break;
  688. }
  689. /* Increment indexes. */
  690. nameIndex++;
  691. filterIndex++;
  692. }
  693. if( matchFound == false )
  694. {
  695. /* If the end of both strings has been reached, they match. This represents the
  696. * case when the topic filter contains the '+' wildcard at a non-starting position.
  697. * For example, when matching either of "sport/+/player" OR "sport/hockey/+" topic
  698. * filters with "sport/hockey/player" topic name. */
  699. matchFound = ( nameIndex == topicNameLength ) &&
  700. ( filterIndex == topicFilterLength );
  701. }
  702. return matchFound;
  703. }
  704. /*-----------------------------------------------------------*/
  705. static int32_t sendMessageVector( MQTTContext_t * pContext,
  706. TransportOutVector_t * pIoVec,
  707. size_t ioVecCount )
  708. {
  709. int32_t sendResult;
  710. uint32_t startTime;
  711. TransportOutVector_t * pIoVectIterator;
  712. size_t vectorsToBeSent = ioVecCount;
  713. size_t bytesToSend = 0U;
  714. int32_t bytesSentOrError = 0;
  715. assert( pContext != NULL );
  716. assert( pIoVec != NULL );
  717. assert( pContext->getTime != NULL );
  718. /* Send must always be defined */
  719. assert( pContext->transportInterface.send != NULL );
  720. /* Count the total number of bytes to be sent as outlined in the vector. */
  721. for( pIoVectIterator = pIoVec; pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ); pIoVectIterator++ )
  722. {
  723. bytesToSend += pIoVectIterator->iov_len;
  724. }
  725. /* Reset the iterator to point to the first entry in the array. */
  726. pIoVectIterator = pIoVec;
  727. /* Note the start time. */
  728. startTime = pContext->getTime();
  729. while( ( bytesSentOrError < ( int32_t ) bytesToSend ) && ( bytesSentOrError >= 0 ) )
  730. {
  731. if( pContext->transportInterface.writev != NULL )
  732. {
  733. sendResult = pContext->transportInterface.writev( pContext->transportInterface.pNetworkContext,
  734. pIoVectIterator,
  735. vectorsToBeSent );
  736. }
  737. else
  738. {
  739. sendResult = pContext->transportInterface.send( pContext->transportInterface.pNetworkContext,
  740. pIoVectIterator->iov_base,
  741. pIoVectIterator->iov_len );
  742. }
  743. if( sendResult > 0 )
  744. {
  745. /* It is a bug in the application's transport send implementation if
  746. * more bytes than expected are sent. */
  747. assert( sendResult <= ( ( int32_t ) bytesToSend - bytesSentOrError ) );
  748. bytesSentOrError += sendResult;
  749. /* Set last transmission time. */
  750. pContext->lastPacketTxTime = pContext->getTime();
  751. LogDebug( ( "sendMessageVector: Bytes Sent=%ld, Bytes Remaining=%lu",
  752. ( long int ) sendResult,
  753. ( unsigned long ) ( bytesToSend - ( size_t ) bytesSentOrError ) ) );
  754. }
  755. else if( sendResult < 0 )
  756. {
  757. bytesSentOrError = sendResult;
  758. LogError( ( "sendMessageVector: Unable to send packet: Network Error." ) );
  759. if( pContext->connectStatus == MQTTConnected )
  760. {
  761. pContext->connectStatus = MQTTDisconnectPending;
  762. }
  763. }
  764. else
  765. {
  766. /* MISRA Empty body */
  767. }
  768. /* Check for timeout. */
  769. if( calculateElapsedTime( pContext->getTime(), startTime ) > MQTT_SEND_TIMEOUT_MS )
  770. {
  771. LogError( ( "sendMessageVector: Unable to send packet: Timed out." ) );
  772. break;
  773. }
  774. /* Update the send pointer to the correct vector and offset. */
  775. while( ( pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ) ) &&
  776. ( sendResult >= ( int32_t ) pIoVectIterator->iov_len ) )
  777. {
  778. sendResult -= ( int32_t ) pIoVectIterator->iov_len;
  779. pIoVectIterator++;
  780. /* Update the number of vector which are yet to be sent. */
  781. vectorsToBeSent--;
  782. }
  783. /* Some of the bytes from this vector were sent as well, update the length
  784. * and the pointer to data in this vector. */
  785. if( ( sendResult > 0 ) &&
  786. ( pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ) ) )
  787. {
  788. pIoVectIterator->iov_base = ( const void * ) &( ( ( const uint8_t * ) pIoVectIterator->iov_base )[ sendResult ] );
  789. pIoVectIterator->iov_len -= ( size_t ) sendResult;
  790. }
  791. }
  792. return bytesSentOrError;
  793. }
  794. static int32_t sendBuffer( MQTTContext_t * pContext,
  795. const uint8_t * pBufferToSend,
  796. size_t bytesToSend )
  797. {
  798. int32_t sendResult;
  799. uint32_t startTime;
  800. int32_t bytesSentOrError = 0;
  801. const uint8_t * pIndex = pBufferToSend;
  802. assert( pContext != NULL );
  803. assert( pContext->getTime != NULL );
  804. assert( pContext->transportInterface.send != NULL );
  805. assert( pIndex != NULL );
  806. /* Set the timeout. */
  807. startTime = pContext->getTime();
  808. while( ( bytesSentOrError < ( int32_t ) bytesToSend ) && ( bytesSentOrError >= 0 ) )
  809. {
  810. sendResult = pContext->transportInterface.send( pContext->transportInterface.pNetworkContext,
  811. pIndex,
  812. bytesToSend - ( size_t ) bytesSentOrError );
  813. if( sendResult > 0 )
  814. {
  815. /* It is a bug in the application's transport send implementation if
  816. * more bytes than expected are sent. */
  817. assert( sendResult <= ( ( int32_t ) bytesToSend - bytesSentOrError ) );
  818. bytesSentOrError += sendResult;
  819. pIndex = &pIndex[ sendResult ];
  820. /* Set last transmission time. */
  821. pContext->lastPacketTxTime = pContext->getTime();
  822. LogDebug( ( "sendBuffer: Bytes Sent=%ld, Bytes Remaining=%lu",
  823. ( long int ) sendResult,
  824. ( unsigned long ) ( bytesToSend - ( size_t ) bytesSentOrError ) ) );
  825. }
  826. else if( sendResult < 0 )
  827. {
  828. bytesSentOrError = sendResult;
  829. LogError( ( "sendBuffer: Unable to send packet: Network Error." ) );
  830. if( pContext->connectStatus == MQTTConnected )
  831. {
  832. pContext->connectStatus = MQTTDisconnectPending;
  833. }
  834. }
  835. else
  836. {
  837. /* MISRA Empty body */
  838. }
  839. /* Check for timeout. */
  840. if( calculateElapsedTime( pContext->getTime(), startTime ) >= ( MQTT_SEND_TIMEOUT_MS ) )
  841. {
  842. LogError( ( "sendBuffer: Unable to send packet: Timed out." ) );
  843. break;
  844. }
  845. }
  846. return bytesSentOrError;
  847. }
  848. /*-----------------------------------------------------------*/
  849. static uint32_t calculateElapsedTime( uint32_t later,
  850. uint32_t start )
  851. {
  852. return later - start;
  853. }
  854. /*-----------------------------------------------------------*/
  855. static MQTTPubAckType_t getAckFromPacketType( uint8_t packetType )
  856. {
  857. MQTTPubAckType_t ackType = MQTTPuback;
  858. switch( packetType )
  859. {
  860. case MQTT_PACKET_TYPE_PUBACK:
  861. ackType = MQTTPuback;
  862. break;
  863. case MQTT_PACKET_TYPE_PUBREC:
  864. ackType = MQTTPubrec;
  865. break;
  866. case MQTT_PACKET_TYPE_PUBREL:
  867. ackType = MQTTPubrel;
  868. break;
  869. default:
  870. /* This function is only called after checking the type is one of
  871. * the above four values, so packet type must be PUBCOMP here. */
  872. assert( packetType == MQTT_PACKET_TYPE_PUBCOMP );
  873. ackType = MQTTPubcomp;
  874. break;
  875. }
  876. return ackType;
  877. }
  878. /*-----------------------------------------------------------*/
  879. static int32_t recvExact( MQTTContext_t * pContext,
  880. size_t bytesToRecv )
  881. {
  882. uint8_t * pIndex = NULL;
  883. size_t bytesRemaining = bytesToRecv;
  884. int32_t totalBytesRecvd = 0, bytesRecvd;
  885. uint32_t lastDataRecvTimeMs = 0U, timeSinceLastRecvMs = 0U;
  886. TransportRecv_t recvFunc = NULL;
  887. MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL;
  888. bool receiveError = false;
  889. assert( pContext != NULL );
  890. assert( bytesToRecv <= pContext->networkBuffer.size );
  891. assert( pContext->getTime != NULL );
  892. assert( pContext->transportInterface.recv != NULL );
  893. assert( pContext->networkBuffer.pBuffer != NULL );
  894. pIndex = pContext->networkBuffer.pBuffer;
  895. recvFunc = pContext->transportInterface.recv;
  896. getTimeStampMs = pContext->getTime;
  897. /* Part of the MQTT packet has been read before calling this function. */
  898. lastDataRecvTimeMs = getTimeStampMs();
  899. while( ( bytesRemaining > 0U ) && ( receiveError == false ) )
  900. {
  901. bytesRecvd = recvFunc( pContext->transportInterface.pNetworkContext,
  902. pIndex,
  903. bytesRemaining );
  904. if( bytesRecvd < 0 )
  905. {
  906. LogError( ( "Network error while receiving packet: ReturnCode=%ld.",
  907. ( long int ) bytesRecvd ) );
  908. totalBytesRecvd = bytesRecvd;
  909. receiveError = true;
  910. MQTT_PRE_STATE_UPDATE_HOOK( pContext );
  911. if( pContext->connectStatus == MQTTConnected )
  912. {
  913. pContext->connectStatus = MQTTDisconnectPending;
  914. }
  915. MQTT_POST_STATE_UPDATE_HOOK( pContext );
  916. }
  917. else if( bytesRecvd > 0 )
  918. {
  919. /* Reset the starting time as we have received some data from the network. */
  920. lastDataRecvTimeMs = getTimeStampMs();
  921. /* It is a bug in the application's transport receive implementation
  922. * if more bytes than expected are received. To avoid a possible
  923. * overflow in converting bytesRemaining from unsigned to signed,
  924. * this assert must exist after the check for bytesRecvd being
  925. * negative. */
  926. assert( ( size_t ) bytesRecvd <= bytesRemaining );
  927. bytesRemaining -= ( size_t ) bytesRecvd;
  928. totalBytesRecvd += ( int32_t ) bytesRecvd;
  929. /* Increment the index. */
  930. pIndex = &pIndex[ bytesRecvd ];
  931. LogDebug( ( "BytesReceived=%ld, BytesRemaining=%lu, TotalBytesReceived=%ld.",
  932. ( long int ) bytesRecvd,
  933. ( unsigned long ) bytesRemaining,
  934. ( long int ) totalBytesRecvd ) );
  935. }
  936. else
  937. {
  938. /* No bytes were read from the network. */
  939. timeSinceLastRecvMs = calculateElapsedTime( getTimeStampMs(), lastDataRecvTimeMs );
  940. /* Check for timeout if we have been waiting to receive any byte on the network. */
  941. if( timeSinceLastRecvMs >= MQTT_RECV_POLLING_TIMEOUT_MS )
  942. {
  943. LogError( ( "Unable to receive packet: Timed out in transport recv." ) );
  944. receiveError = true;
  945. }
  946. }
  947. }
  948. return totalBytesRecvd;
  949. }
  950. /*-----------------------------------------------------------*/
  951. static MQTTStatus_t discardPacket( MQTTContext_t * pContext,
  952. size_t remainingLength,
  953. uint32_t timeoutMs )
  954. {
  955. MQTTStatus_t status = MQTTRecvFailed;
  956. int32_t bytesReceived = 0;
  957. size_t bytesToReceive = 0U;
  958. uint32_t totalBytesReceived = 0U;
  959. uint32_t entryTimeMs = 0U;
  960. uint32_t elapsedTimeMs = 0U;
  961. MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL;
  962. bool receiveError = false;
  963. assert( pContext != NULL );
  964. assert( pContext->getTime != NULL );
  965. bytesToReceive = pContext->networkBuffer.size;
  966. getTimeStampMs = pContext->getTime;
  967. entryTimeMs = getTimeStampMs();
  968. while( ( totalBytesReceived < remainingLength ) && ( receiveError == false ) )
  969. {
  970. if( ( remainingLength - totalBytesReceived ) < bytesToReceive )
  971. {
  972. bytesToReceive = remainingLength - totalBytesReceived;
  973. }
  974. bytesReceived = recvExact( pContext, bytesToReceive );
  975. if( bytesReceived != ( int32_t ) bytesToReceive )
  976. {
  977. LogError( ( "Receive error while discarding packet."
  978. "ReceivedBytes=%ld, ExpectedBytes=%lu.",
  979. ( long int ) bytesReceived,
  980. ( unsigned long ) bytesToReceive ) );
  981. receiveError = true;
  982. }
  983. else
  984. {
  985. totalBytesReceived += ( uint32_t ) bytesReceived;
  986. elapsedTimeMs = calculateElapsedTime( getTimeStampMs(), entryTimeMs );
  987. /* Check for timeout. */
  988. if( elapsedTimeMs >= timeoutMs )
  989. {
  990. LogError( ( "Time expired while discarding packet." ) );
  991. receiveError = true;
  992. }
  993. }
  994. }
  995. if( totalBytesReceived == remainingLength )
  996. {
  997. LogError( ( "Dumped packet. DumpedBytes=%lu.",
  998. ( unsigned long ) totalBytesReceived ) );
  999. /* Packet dumped, so no data is available. */
  1000. status = MQTTNoDataAvailable;
  1001. }
  1002. return status;
  1003. }
  1004. /*-----------------------------------------------------------*/
  1005. static MQTTStatus_t discardStoredPacket( MQTTContext_t * pContext,
  1006. const MQTTPacketInfo_t * pPacketInfo )
  1007. {
  1008. MQTTStatus_t status = MQTTRecvFailed;
  1009. int32_t bytesReceived = 0;
  1010. size_t bytesToReceive = 0U;
  1011. uint32_t totalBytesReceived = 0U;
  1012. bool receiveError = false;
  1013. size_t mqttPacketSize = 0;
  1014. size_t remainingLength;
  1015. assert( pContext != NULL );
  1016. assert( pPacketInfo != NULL );
  1017. mqttPacketSize = pPacketInfo->remainingLength + pPacketInfo->headerLength;
  1018. /* Assert that the packet being discarded is bigger than the
  1019. * receive buffer. */
  1020. assert( mqttPacketSize > pContext->networkBuffer.size );
  1021. /* Discard these many bytes at a time. */
  1022. bytesToReceive = pContext->networkBuffer.size;
  1023. /* Number of bytes depicted by 'index' have already been received. */
  1024. remainingLength = mqttPacketSize - pContext->index;
  1025. while( ( totalBytesReceived < remainingLength ) && ( receiveError == false ) )
  1026. {
  1027. if( ( remainingLength - totalBytesReceived ) < bytesToReceive )
  1028. {
  1029. bytesToReceive = remainingLength - totalBytesReceived;
  1030. }
  1031. bytesReceived = recvExact( pContext, bytesToReceive );
  1032. if( bytesReceived != ( int32_t ) bytesToReceive )
  1033. {
  1034. LogError( ( "Receive error while discarding packet."
  1035. "ReceivedBytes=%ld, ExpectedBytes=%lu.",
  1036. ( long int ) bytesReceived,
  1037. ( unsigned long ) bytesToReceive ) );
  1038. receiveError = true;
  1039. }
  1040. else
  1041. {
  1042. totalBytesReceived += ( uint32_t ) bytesReceived;
  1043. }
  1044. }
  1045. if( totalBytesReceived == remainingLength )
  1046. {
  1047. LogError( ( "Dumped packet. DumpedBytes=%lu.",
  1048. ( unsigned long ) totalBytesReceived ) );
  1049. /* Packet dumped, so no data is available. */
  1050. status = MQTTNoDataAvailable;
  1051. }
  1052. /* Clear the buffer */
  1053. ( void ) memset( pContext->networkBuffer.pBuffer,
  1054. 0,
  1055. pContext->networkBuffer.size );
  1056. /* Reset the index. */
  1057. pContext->index = 0;
  1058. return status;
  1059. }
  1060. /*-----------------------------------------------------------*/
  1061. static MQTTStatus_t receivePacket( MQTTContext_t * pContext,
  1062. MQTTPacketInfo_t incomingPacket,
  1063. uint32_t remainingTimeMs )
  1064. {
  1065. MQTTStatus_t status = MQTTSuccess;
  1066. int32_t bytesReceived = 0;
  1067. size_t bytesToReceive = 0U;
  1068. assert( pContext != NULL );
  1069. assert( pContext->networkBuffer.pBuffer != NULL );
  1070. if( incomingPacket.remainingLength > pContext->networkBuffer.size )
  1071. {
  1072. LogError( ( "Incoming packet will be dumped: "
  1073. "Packet length exceeds network buffer size."
  1074. "PacketSize=%lu, NetworkBufferSize=%lu.",
  1075. ( unsigned long ) incomingPacket.remainingLength,
  1076. ( unsigned long ) pContext->networkBuffer.size ) );
  1077. status = discardPacket( pContext,
  1078. incomingPacket.remainingLength,
  1079. remainingTimeMs );
  1080. }
  1081. else
  1082. {
  1083. bytesToReceive = incomingPacket.remainingLength;
  1084. bytesReceived = recvExact( pContext, bytesToReceive );
  1085. if( bytesReceived == ( int32_t ) bytesToReceive )
  1086. {
  1087. /* Receive successful, bytesReceived == bytesToReceive. */
  1088. LogDebug( ( "Packet received. ReceivedBytes=%ld.",
  1089. ( long int ) bytesReceived ) );
  1090. }
  1091. else
  1092. {
  1093. LogError( ( "Packet reception failed. ReceivedBytes=%ld, "
  1094. "ExpectedBytes=%lu.",
  1095. ( long int ) bytesReceived,
  1096. ( unsigned long ) bytesToReceive ) );
  1097. status = MQTTRecvFailed;
  1098. }
  1099. }
  1100. return status;
  1101. }
  1102. /*-----------------------------------------------------------*/
  1103. static uint8_t getAckTypeToSend( MQTTPublishState_t state )
  1104. {
  1105. uint8_t packetTypeByte = 0U;
  1106. switch( state )
  1107. {
  1108. case MQTTPubAckSend:
  1109. packetTypeByte = MQTT_PACKET_TYPE_PUBACK;
  1110. break;
  1111. case MQTTPubRecSend:
  1112. packetTypeByte = MQTT_PACKET_TYPE_PUBREC;
  1113. break;
  1114. case MQTTPubRelSend:
  1115. packetTypeByte = MQTT_PACKET_TYPE_PUBREL;
  1116. break;
  1117. case MQTTPubCompSend:
  1118. packetTypeByte = MQTT_PACKET_TYPE_PUBCOMP;
  1119. break;
  1120. default:
  1121. /* Take no action for states that do not require sending an ack. */
  1122. break;
  1123. }
  1124. return packetTypeByte;
  1125. }
  1126. /*-----------------------------------------------------------*/
  1127. static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
  1128. uint16_t packetId,
  1129. MQTTPublishState_t publishState )
  1130. {
  1131. MQTTStatus_t status = MQTTSuccess;
  1132. MQTTPublishState_t newState = MQTTStateNull;
  1133. int32_t sendResult = 0;
  1134. uint8_t packetTypeByte = 0U;
  1135. MQTTPubAckType_t packetType;
  1136. MQTTFixedBuffer_t localBuffer;
  1137. MQTTConnectionStatus_t connectStatus;
  1138. uint8_t pubAckPacket[ MQTT_PUBLISH_ACK_PACKET_SIZE ];
  1139. localBuffer.pBuffer = pubAckPacket;
  1140. localBuffer.size = MQTT_PUBLISH_ACK_PACKET_SIZE;
  1141. assert( pContext != NULL );
  1142. packetTypeByte = getAckTypeToSend( publishState );
  1143. if( packetTypeByte != 0U )
  1144. {
  1145. packetType = getAckFromPacketType( packetTypeByte );
  1146. status = MQTT_SerializeAck( &localBuffer,
  1147. packetTypeByte,
  1148. packetId );
  1149. if( status == MQTTSuccess )
  1150. {
  1151. MQTT_PRE_STATE_UPDATE_HOOK( pContext );
  1152. connectStatus = pContext->connectStatus;
  1153. if( connectStatus != MQTTConnected )
  1154. {
  1155. status = ( connectStatus == MQTTNotConnected ) ? MQTTStatusNotConnected : MQTTStatusDisconnectPending;
  1156. }
  1157. if( status == MQTTSuccess )
  1158. {
  1159. /* Here, we are not using the vector approach for efficiency. There is just one buffer
  1160. * to be sent which can be achieved with a normal send call. */
  1161. sendResult = sendBuffer( pContext,
  1162. localBuffer.pBuffer,
  1163. MQTT_PUBLISH_ACK_PACKET_SIZE );
  1164. if( sendResult < ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE )
  1165. {
  1166. status = MQTTSendFailed;
  1167. }
  1168. }
  1169. MQTT_POST_STATE_UPDATE_HOOK( pContext );
  1170. }
  1171. if( status == MQTTSuccess )
  1172. {
  1173. pContext->controlPacketSent = true;
  1174. MQTT_PRE_STATE_UPDATE_HOOK( pContext );
  1175. status = MQTT_UpdateStateAck( pContext,
  1176. packetId,
  1177. packetType,
  1178. MQTT_SEND,
  1179. &newState );
  1180. MQTT_POST_STATE_UPDATE_HOOK( pContext );
  1181. if( status != MQTTSuccess )
  1182. {
  1183. LogError( ( "Failed to update state of publish %hu.",
  1184. ( unsigned short ) packetId ) );
  1185. }
  1186. }
  1187. else
  1188. {
  1189. LogError( ( "Failed to send ACK packet: PacketType=%02x, SentBytes=%ld, "
  1190. "PacketSize=%lu.",
  1191. ( unsigned int ) packetTypeByte, ( long int ) sendResult,
  1192. MQTT_PUBLISH_ACK_PACKET_SIZE ) );
  1193. }
  1194. }
  1195. return status;
  1196. }
  1197. /*-----------------------------------------------------------*/
  1198. static MQTTStatus_t handleKeepAlive( MQTTContext_t * pContext )
  1199. {
  1200. MQTTStatus_t status = MQTTSuccess;
  1201. uint32_t now = 0U;
  1202. uint32_t packetTxTimeoutMs = 0U;
  1203. uint32_t lastPacketTxTime = 0U;
  1204. assert( pContext != NULL );
  1205. assert( pContext->getTime != NULL );
  1206. now = pContext->getTime();
  1207. packetTxTimeoutMs = 1000U * ( uint32_t ) pContext->keepAliveIntervalSec;
  1208. if( PACKET_TX_TIMEOUT_MS < packetTxTimeoutMs )
  1209. {
  1210. packetTxTimeoutMs = PACKET_TX_TIMEOUT_MS;
  1211. }
  1212. /* If keep alive interval is 0, it is disabled. */
  1213. if( pContext->waitingForPingResp == true )
  1214. {
  1215. /* Has time expired? */
  1216. if( calculateElapsedTime( now, pContext->pingReqSendTimeMs ) >
  1217. MQTT_PINGRESP_TIMEOUT_MS )
  1218. {
  1219. status = MQTTKeepAliveTimeout;
  1220. }
  1221. }
  1222. else
  1223. {
  1224. MQTT_PRE_STATE_UPDATE_HOOK( pContext );
  1225. lastPacketTxTime = pContext->lastPacketTxTime;
  1226. MQTT_POST_STATE_UPDATE_HOOK( pContext );
  1227. if( ( packetTxTimeoutMs != 0U ) && ( calculateElapsedTime( now, lastPacketTxTime ) >= packetTxTimeoutMs ) )
  1228. {
  1229. status = MQTT_Ping( pContext );
  1230. }
  1231. else
  1232. {
  1233. const uint32_t timeElapsed = calculateElapsedTime( now, pContext->lastPacketRxTime );
  1234. if( ( timeElapsed != 0U ) && ( timeElapsed >= PACKET_RX_TIMEOUT_MS ) )
  1235. {
  1236. status = MQTT_Ping( pContext );
  1237. }
  1238. }
  1239. }
  1240. return status;
  1241. }
  1242. /*-----------------------------------------------------------*/
  1243. static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext,
  1244. MQTTPacketInfo_t * pIncomingPacket )
  1245. {
  1246. MQTTStatus_t status;
  1247. MQTTPublishState_t publishRecordState = MQTTStateNull;
  1248. uint16_t packetIdentifier = 0U;
  1249. MQTTPublishInfo_t publishInfo;
  1250. MQTTDeserializedInfo_t deserializedInfo;
  1251. bool duplicatePublish = false;
  1252. assert( pContext != NULL );
  1253. assert( pIncomingPacket != NULL );
  1254. assert( pContext->appCallback != NULL );
  1255. status = MQTT_DeserializePublish( pIncomingPacket, &packetIdentifier, &publishInfo );
  1256. LogInfo( ( "De-serialized incoming PUBLISH packet: DeserializerResult=%s.",
  1257. MQTT_Status_strerror( status ) ) );
  1258. if( ( status == MQTTSuccess ) &&
  1259. ( pContext->incomingPublishRecords == NULL ) &&
  1260. ( publishInfo.qos > MQTTQoS0 ) )
  1261. {
  1262. LogError( ( "Incoming publish has QoS > MQTTQoS0 but incoming "
  1263. "publish records have not been initialized. Dropping the "
  1264. "incoming publish. Please call MQTT_InitStatefulQoS to enable "
  1265. "use of QoS1 and QoS2 publishes." ) );
  1266. status = MQTTRecvFailed;
  1267. }
  1268. if( status == MQTTSuccess )
  1269. {
  1270. MQTT_PRE_STATE_UPDATE_HOOK( pContext );
  1271. status = MQTT_UpdateStatePublish( pContext,
  1272. packetIdentifier,
  1273. MQTT_RECEIVE,
  1274. publishInfo.qos,
  1275. &publishRecordState );
  1276. MQTT_POST_STATE_UPDATE_HOOK( pContext );
  1277. if( status == MQTTSuccess )
  1278. {
  1279. LogInfo( ( "State record updated. New state=%s.",
  1280. MQTT_State_strerror( publishRecordState ) ) );
  1281. }
  1282. /* Different cases in which an incoming publish with duplicate flag is
  1283. * handled are as listed below.
  1284. * 1. No collision - This is the first instance of the incoming publish
  1285. * packet received or an earlier received packet state is lost. This
  1286. * will be handled as a new incoming publish for both QoS1 and QoS2
  1287. * publishes.
  1288. * 2. Collision - The incoming packet was received before and a state
  1289. * record is present in the state engine. For QoS1 and QoS2 publishes
  1290. * this case can happen at 2 different cases and handling is
  1291. * different.
  1292. * a. QoS1 - If a PUBACK is not successfully sent for the incoming
  1293. * publish due to a connection issue, it can result in broker
  1294. * sending out a duplicate publish with dup flag set, when a
  1295. * session is reestablished. It can result in a collision in
  1296. * state engine. This will be handled by processing the incoming
  1297. * publish as a new publish ignoring the
  1298. * #MQTTStateCollision status from the state engine. The publish
  1299. * data is not passed to the application.
  1300. * b. QoS2 - If a PUBREC is not successfully sent for the incoming
  1301. * publish or the PUBREC sent is not successfully received by the
  1302. * broker due to a connection issue, it can result in broker
  1303. * sending out a duplicate publish with dup flag set, when a
  1304. * session is reestablished. It can result in a collision in
  1305. * state engine. This will be handled by ignoring the
  1306. * #MQTTStateCollision status from the state engine. The publish
  1307. * data is not passed to the application. */
  1308. else if( status == MQTTStateCollision )
  1309. {
  1310. status = MQTTSuccess;
  1311. duplicatePublish = true;
  1312. /* Calculate the state for the ack packet that needs to be sent out
  1313. * for the duplicate incoming publish. */
  1314. publishRecordState = MQTT_CalculateStatePublish( MQTT_RECEIVE,
  1315. publishInfo.qos );
  1316. LogDebug( ( "Incoming publish packet with packet id %hu already exists.",
  1317. ( unsigned short ) packetIdentifier ) );
  1318. if( publishInfo.dup == false )
  1319. {
  1320. LogError( ( "DUP flag is 0 for duplicate packet (MQTT-3.3.1.-1)." ) );
  1321. }
  1322. }
  1323. else
  1324. {
  1325. LogError( ( "Error in updating publish state for incoming publish with packet id %hu."
  1326. " Error is %s",
  1327. ( unsigned short ) packetIdentifier,
  1328. MQTT_Status_strerror( status ) ) );
  1329. }
  1330. }
  1331. if( status == MQTTSuccess )
  1332. {
  1333. /* Set fields of deserialized struct. */
  1334. deserializedInfo.packetIdentifier = packetIdentifier;
  1335. deserializedInfo.pPublishInfo = &publishInfo;
  1336. deserializedInfo.deserializationResult = status;
  1337. /* Invoke application callback to hand the buffer over to application
  1338. * before sending acks.
  1339. * Application callback will be invoked for all publishes, except for
  1340. * duplicate incoming publishes. */
  1341. if( duplicatePublish == false )
  1342. {
  1343. pContext->appCallback( pContext,
  1344. pIncomingPacket,
  1345. &deserializedInfo );
  1346. }
  1347. /* Send PUBACK or PUBREC if necessary. */
  1348. status = sendPublishAcks( pContext,
  1349. packetIdentifier,
  1350. publishRecordState );
  1351. }
  1352. return status;
  1353. }
  1354. /*-----------------------------------------------------------*/
  1355. static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext,
  1356. MQTTPacketInfo_t * pIncomingPacket )
  1357. {
  1358. MQTTStatus_t status;
  1359. MQTTPublishState_t publishRecordState = MQTTStateNull;
  1360. uint16_t packetIdentifier;
  1361. MQTTPubAckType_t ackType;
  1362. MQTTEventCallback_t appCallback;
  1363. MQTTDeserializedInfo_t deserializedInfo;
  1364. assert( pContext != NULL );
  1365. assert( pIncomingPacket != NULL );
  1366. assert( pContext->appCallback != NULL );
  1367. appCallback = pContext->appCallback;
  1368. ackType = getAckFromPacketType( pIncomingPacket->type );
  1369. status = MQTT_DeserializeAck( pIncomingPacket, &packetIdentifier, NULL );
  1370. LogInfo( ( "Ack packet deserialized with result: %s.",
  1371. MQTT_Status_strerror( status ) ) );
  1372. if( status == MQTTSuccess )
  1373. {
  1374. MQTT_PRE_STATE_UPDATE_HOOK( pContext );
  1375. status = MQTT_UpdateStateAck( pContext,
  1376. packetIdentifier,
  1377. ackType,
  1378. MQTT_RECEIVE,
  1379. &publishRecordState );
  1380. MQTT_POST_STATE_UPDATE_HOOK( pContext );
  1381. if( status == MQTTSuccess )
  1382. {
  1383. LogInfo( ( "State record updated. New state=%s.",
  1384. MQTT_State_strerror( publishRecordState ) ) );
  1385. }
  1386. else
  1387. {
  1388. LogError( ( "Updating the state engine for packet id %hu"
  1389. " failed with error %s.",
  1390. ( unsigned short ) packetIdentifier,
  1391. MQTT_Status_strerror( status ) ) );
  1392. }
  1393. }
  1394. if( ( ackType == MQTTPuback ) || ( ackType == MQTTPubrec ) )
  1395. {
  1396. if( ( status == MQTTSuccess ) &&
  1397. ( pContext->clearFunction != NULL ) )
  1398. {
  1399. pContext->clearFunction( pContext, packetIdentifier );
  1400. }
  1401. }
  1402. if( status == MQTTSuccess )
  1403. {
  1404. /* Set fields of deserialized struct. */
  1405. deserializedInfo.packetIdentifier = packetIdentifier;
  1406. deserializedInfo.deserializationResult = status;
  1407. deserializedInfo.pPublishInfo = NULL;
  1408. /* Invoke application callback to hand the buffer over to application
  1409. * before sending acks. */
  1410. appCallback( pContext, pIncomingPacket, &deserializedInfo );
  1411. /* Send PUBREL or PUBCOMP if necessary. */
  1412. status = sendPublishAcks( pContext,
  1413. packetIdentifier,
  1414. publishRecordState );
  1415. }
  1416. return status;
  1417. }
  1418. /*-----------------------------------------------------------*/
  1419. static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext,
  1420. MQTTPacketInfo_t * pIncomingPacket,
  1421. bool manageKeepAlive )
  1422. {
  1423. MQTTStatus_t status = MQTTBadResponse;
  1424. uint16_t packetIdentifier = MQTT_PACKET_ID_INVALID;
  1425. MQTTDeserializedInfo_t deserializedInfo;
  1426. /* We should always invoke the app callback unless we receive a PINGRESP
  1427. * and are managing keep alive, or if we receive an unknown packet. We
  1428. * initialize this to false since the callback must be invoked before
  1429. * sending any PUBREL or PUBCOMP. However, for other cases, we invoke it
  1430. * at the end to reduce the complexity of this function. */
  1431. bool invokeAppCallback = false;
  1432. MQTTEventCallback_t appCallback = NULL;
  1433. assert( pContext != NULL );
  1434. assert( pIncomingPacket != NULL );
  1435. assert( pContext->appCallback != NULL );
  1436. appCallback = pContext->appCallback;
  1437. LogDebug( ( "Received packet of type %02x.",
  1438. ( unsigned int ) pIncomingPacket->type ) );
  1439. switch( pIncomingPacket->type )
  1440. {
  1441. case MQTT_PACKET_TYPE_PUBACK:
  1442. case MQTT_PACKET_TYPE_PUBREC:
  1443. case MQTT_PACKET_TYPE_PUBREL:
  1444. case MQTT_PACKET_TYPE_PUBCOMP:
  1445. /* Handle all the publish acks. The app callback is invoked here. */
  1446. status = handlePublishAcks( pContext, pIncomingPacket );
  1447. break;
  1448. case MQTT_PACKET_TYPE_PINGRESP:
  1449. status = MQTT_DeserializeAck( pIncomingPacket, &packetIdentifier, NULL );
  1450. invokeAppCallback = ( status == MQTTSuccess ) && !manageKeepAlive;
  1451. if( ( status == MQTTSuccess ) && ( manageKeepAlive == true ) )
  1452. {
  1453. pContext->waitingForPingResp = false;
  1454. }
  1455. break;
  1456. case MQTT_PACKET_TYPE_SUBACK:
  1457. case MQTT_PACKET_TYPE_UNSUBACK:
  1458. /* Deserialize and give these to the app provided callback. */
  1459. status = MQTT_DeserializeAck( pIncomingPacket, &packetIdentifier, NULL );
  1460. invokeAppCallback = ( status == MQTTSuccess ) || ( status == MQTTServerRefused );
  1461. break;
  1462. default:
  1463. /* Bad response from the server. */
  1464. LogError( ( "Unexpected packet type from server: PacketType=%02x.",
  1465. ( unsigned int ) pIncomingPacket->type ) );
  1466. status = MQTTBadResponse;
  1467. break;
  1468. }
  1469. if( invokeAppCallback == true )
  1470. {
  1471. /* Set fields of deserialized struct. */
  1472. deserializedInfo.packetIdentifier = packetIdentifier;
  1473. deserializedInfo.deserializationResult = status;
  1474. deserializedInfo.pPublishInfo = NULL;
  1475. appCallback( pContext, pIncomingPacket, &deserializedInfo );
  1476. /* In case a SUBACK indicated refusal, reset the status to continue the loop. */
  1477. status = MQTTSuccess;
  1478. }
  1479. return status;
  1480. }
  1481. /*-----------------------------------------------------------*/
  1482. static MQTTStatus_t receiveSingleIteration( MQTTContext_t * pContext,
  1483. bool manageKeepAlive )
  1484. {
  1485. MQTTStatus_t status = MQTTSuccess;
  1486. MQTTPacketInfo_t incomingPacket = { 0 };
  1487. int32_t recvBytes;
  1488. size_t totalMQTTPacketLength = 0;
  1489. assert( pContext != NULL );
  1490. assert( pContext->networkBuffer.pBuffer != NULL );
  1491. /* Read as many bytes as possible into the network buffer. */
  1492. recvBytes = pContext->transportInterface.recv( pContext->transportInterface.pNetworkContext,
  1493. &( pContext->networkBuffer.pBuffer[ pContext->index ] ),
  1494. pContext->networkBuffer.size - pContext->index );
  1495. if( recvBytes < 0 )
  1496. {
  1497. /* The receive function has failed. Bubble up the error up to the user. */
  1498. status = MQTTRecvFailed;
  1499. MQTT_PRE_STATE_UPDATE_HOOK( pContext );
  1500. if( pContext->connectStatus == MQTTConnected )
  1501. {
  1502. pContext->connectStatus = MQTTDisconnectPending;
  1503. }
  1504. MQTT_POST_STATE_UPDATE_HOOK( pContext );
  1505. }
  1506. else if( ( recvBytes == 0 ) && ( pContext->index == 0U ) )
  1507. {
  1508. /* No more bytes available since the last read and neither is anything in
  1509. * the buffer. */
  1510. status = MQTTNoDataAvailable;
  1511. }
  1512. /* Either something was received, or there is still data to be processed in the
  1513. * buffer, or both. */
  1514. else
  1515. {
  1516. /* Update the number of bytes in the MQTT fixed buffer. */
  1517. pContext->index += ( size_t ) recvBytes;
  1518. status = MQTT_ProcessIncomingPacketTypeAndLength( pContext->networkBuffer.pBuffer,
  1519. &( pContext->index ),
  1520. &incomingPacket );
  1521. totalMQTTPacketLength = incomingPacket.remainingLength + incomingPacket.headerLength;
  1522. }
  1523. /* No data was received, check for keep alive timeout. */
  1524. if( recvBytes == 0 )
  1525. {
  1526. if( manageKeepAlive == true )
  1527. {
  1528. /* Keep the copy of the status to be reset later. */
  1529. MQTTStatus_t statusCopy = status;
  1530. /* Assign status so an error can be bubbled up to application,
  1531. * but reset it on success. */
  1532. status = handleKeepAlive( pContext );
  1533. if( status == MQTTSuccess )
  1534. {
  1535. /* Reset the status. */
  1536. status = statusCopy;
  1537. }
  1538. else
  1539. {
  1540. LogError( ( "Handling of keep alive failed. Status=%s",
  1541. MQTT_Status_strerror( status ) ) );
  1542. }
  1543. }
  1544. }
  1545. /* Check whether there is data available before processing the packet further. */
  1546. if( ( status == MQTTNeedMoreBytes ) || ( status == MQTTNoDataAvailable ) )
  1547. {
  1548. /* Do nothing as there is nothing to be processed right now. The proper
  1549. * error code will be bubbled up to the user. */
  1550. }
  1551. /* Any other error code. */
  1552. else if( status != MQTTSuccess )
  1553. {
  1554. LogError( ( "Call to receiveSingleIteration failed. Status=%s",
  1555. MQTT_Status_strerror( status ) ) );
  1556. }
  1557. /* If the MQTT Packet size is bigger than the buffer itself. */
  1558. else if( totalMQTTPacketLength > pContext->networkBuffer.size )
  1559. {
  1560. /* Discard the packet from the receive buffer and drain the pending
  1561. * data from the socket buffer. */
  1562. status = discardStoredPacket( pContext,
  1563. &incomingPacket );
  1564. }
  1565. /* If the total packet is of more length than the bytes we have available. */
  1566. else if( totalMQTTPacketLength > pContext->index )
  1567. {
  1568. status = MQTTNeedMoreBytes;
  1569. }
  1570. else
  1571. {
  1572. /* MISRA else. */
  1573. }
  1574. /* Handle received packet. If incomplete data was read then this will not execute. */
  1575. if( status == MQTTSuccess )
  1576. {
  1577. incomingPacket.pRemainingData = &pContext->networkBuffer.pBuffer[ incomingPacket.headerLength ];
  1578. /* PUBLISH packets allow flags in the lower four bits. For other
  1579. * packet types, they are reserved. */
  1580. if( ( incomingPacket.type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH )
  1581. {
  1582. status = handleIncomingPublish( pContext, &incomingPacket );
  1583. }
  1584. else
  1585. {
  1586. status = handleIncomingAck( pContext, &incomingPacket, manageKeepAlive );
  1587. }
  1588. /* Update the index to reflect the remaining bytes in the buffer. */
  1589. pContext->index -= totalMQTTPacketLength;
  1590. /* Move the remaining bytes to the front of the buffer. */
  1591. ( void ) memmove( pContext->networkBuffer.pBuffer,
  1592. &( pContext->networkBuffer.pBuffer[ totalMQTTPacketLength ] ),
  1593. pContext->index );
  1594. if( status == MQTTSuccess )
  1595. {
  1596. pContext->lastPacketRxTime = pContext->getTime();
  1597. }
  1598. }
  1599. if( status == MQTTNoDataAvailable )
  1600. {
  1601. /* No data available is not an error. Reset to MQTTSuccess so the
  1602. * return code will indicate success. */
  1603. status = MQTTSuccess;
  1604. }
  1605. return status;
  1606. }
  1607. /*-----------------------------------------------------------*/
  1608. static MQTTStatus_t validateSubscribeUnsubscribeParams( const MQTTContext_t * pContext,
  1609. const MQTTSubscribeInfo_t * pSubscriptionList,
  1610. size_t subscriptionCount,
  1611. uint16_t packetId )
  1612. {
  1613. MQTTStatus_t status = MQTTSuccess;
  1614. size_t iterator;
  1615. /* Validate all the parameters. */
  1616. if( ( pContext == NULL ) || ( pSubscriptionList == NULL ) )
  1617. {
  1618. LogError( ( "Argument cannot be NULL: pContext=%p, "
  1619. "pSubscriptionList=%p.",
  1620. ( void * ) pContext,
  1621. ( void * ) pSubscriptionList ) );
  1622. status = MQTTBadParameter;
  1623. }
  1624. else if( subscriptionCount == 0UL )
  1625. {
  1626. LogError( ( "Subscription count is 0." ) );
  1627. status = MQTTBadParameter;
  1628. }
  1629. else if( packetId == 0U )
  1630. {
  1631. LogError( ( "Packet Id for subscription packet is 0." ) );
  1632. status = MQTTBadParameter;
  1633. }
  1634. else
  1635. {
  1636. if( pContext->incomingPublishRecords == NULL )
  1637. {
  1638. for( iterator = 0; iterator < subscriptionCount; iterator++ )
  1639. {
  1640. if( pSubscriptionList[ iterator ].qos > MQTTQoS0 )
  1641. {
  1642. LogError( ( "The incoming publish record list is not "
  1643. "initialised for QoS1/QoS2 records. Please call "
  1644. " MQTT_InitStatefulQoS to enable use of QoS1 and "
  1645. " QoS2 packets." ) );
  1646. status = MQTTBadParameter;
  1647. break;
  1648. }
  1649. }
  1650. }
  1651. }
  1652. return status;
  1653. }
  1654. /*-----------------------------------------------------------*/
  1655. static size_t addEncodedStringToVector( uint8_t serializedLength[ CORE_MQTT_SERIALIZED_LENGTH_FIELD_BYTES ],
  1656. const char * const string,
  1657. uint16_t length,
  1658. TransportOutVector_t * iterator,
  1659. size_t * updatedLength )
  1660. {
  1661. size_t packetLength = 0U;
  1662. TransportOutVector_t * pLocalIterator = iterator;
  1663. size_t vectorsAdded = 0U;
  1664. /* When length is non-zero, the string must be non-NULL. */
  1665. assert( ( length != 0U ) ? ( string != NULL ) : true );
  1666. serializedLength[ 0 ] = ( ( uint8_t ) ( ( length ) >> 8 ) );
  1667. serializedLength[ 1 ] = ( ( uint8_t ) ( ( length ) & 0x00ffU ) );
  1668. /* Add the serialized length of the string first. */
  1669. pLocalIterator[ 0 ].iov_base = serializedLength;
  1670. pLocalIterator[ 0 ].iov_len = CORE_MQTT_SERIALIZED_LENGTH_FIELD_BYTES;
  1671. vectorsAdded++;
  1672. packetLength = CORE_MQTT_SERIALIZED_LENGTH_FIELD_BYTES;
  1673. /* Sometimes the string can be NULL that is, of 0 length. In that case,
  1674. * only the length field should be encoded in the vector. */
  1675. if( ( string != NULL ) && ( length != 0U ) )
  1676. {
  1677. /* Then add the pointer to the string itself. */
  1678. pLocalIterator[ 1 ].iov_base = string;
  1679. pLocalIterator[ 1 ].iov_len = length;
  1680. vectorsAdded++;
  1681. packetLength += length;
  1682. }
  1683. ( *updatedLength ) = ( *updatedLength ) + packetLength;
  1684. return vectorsAdded;
  1685. }
  1686. /*-----------------------------------------------------------*/
  1687. static MQTTStatus_t sendSubscribeWithoutCopy( MQTTContext_t * pContext,
  1688. const MQTTSubscribeInfo_t * pSubscriptionList,
  1689. size_t subscriptionCount,
  1690. uint16_t packetId,
  1691. size_t remainingLength )
  1692. {
  1693. MQTTStatus_t status = MQTTSuccess;
  1694. uint8_t * pIndex;
  1695. TransportOutVector_t pIoVector[ MQTT_SUB_UNSUB_MAX_VECTORS ];
  1696. TransportOutVector_t * pIterator;
  1697. uint8_t serializedTopicFieldLength[ MQTT_SUB_UNSUB_MAX_VECTORS ][ CORE_MQTT_SERIALIZED_LENGTH_FIELD_BYTES ];
  1698. size_t totalPacketLength = 0U;
  1699. size_t ioVectorLength = 0U;
  1700. size_t subscriptionsSent = 0U;
  1701. size_t vectorsAdded;
  1702. size_t topicFieldLengthIndex;
  1703. /* Maximum number of bytes required by the 'fixed' part of the SUBSCRIBE
  1704. * packet header according to the MQTT specification.
  1705. * MQTT Control Byte 0 + 1 = 1
  1706. * Remaining length (max) + 4 = 5
  1707. * Packet ID + 2 = 7 */
  1708. uint8_t subscribeheader[ 7U ];
  1709. /* The vector array should be at least three element long as the topic
  1710. * string needs these many vector elements to be stored. */
  1711. assert( MQTT_SUB_UNSUB_MAX_VECTORS >= CORE_MQTT_SUBSCRIBE_PER_TOPIC_VECTOR_LENGTH );
  1712. pIndex = subscribeheader;
  1713. pIterator = pIoVector;
  1714. pIndex = MQTT_SerializeSubscribeHeader( remainingLength,
  1715. pIndex,
  1716. packetId );
  1717. /* The header is to be sent first. */
  1718. pIterator->iov_base = subscribeheader;
  1719. /* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-182 */
  1720. /* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-108 */
  1721. /* coverity[misra_c_2012_rule_18_2_violation] */
  1722. /* coverity[misra_c_2012_rule_10_8_violation] */
  1723. pIterator->iov_len = ( size_t ) ( pIndex - subscribeheader );
  1724. totalPacketLength += pIterator->iov_len;
  1725. pIterator++;
  1726. ioVectorLength++;
  1727. while( ( status == MQTTSuccess ) && ( subscriptionsSent < subscriptionCount ) )
  1728. {
  1729. /* Reset the index for next iteration. */
  1730. topicFieldLengthIndex = 0;
  1731. /* Check whether the subscription topic (with QoS) will fit in the
  1732. * given vector. */
  1733. while( ( ioVectorLength <= ( MQTT_SUB_UNSUB_MAX_VECTORS - CORE_MQTT_SUBSCRIBE_PER_TOPIC_VECTOR_LENGTH ) ) &&
  1734. ( subscriptionsSent < subscriptionCount ) )
  1735. {
  1736. /* The topic filter and the filter length gets sent next. */
  1737. vectorsAdded = addEncodedStringToVector( serializedTopicFieldLength[ topicFieldLengthIndex ],
  1738. pSubscriptionList[ subscriptionsSent ].pTopicFilter,
  1739. pSubscriptionList[ subscriptionsSent ].topicFilterLength,
  1740. pIterator,
  1741. &totalPacketLength );
  1742. /* Update the pointer after the above operation. */
  1743. pIterator = &pIterator[ vectorsAdded ];
  1744. /* Lastly, the QoS gets sent. */
  1745. pIterator->iov_base = &( pSubscriptionList[ subscriptionsSent ].qos );
  1746. pIterator->iov_len = 1U;
  1747. totalPacketLength += pIterator->iov_len;
  1748. /* Increment the pointer. */
  1749. pIterator++;
  1750. /* Two slots get used by the topic string length and topic string.
  1751. * One slot gets used by the quality of service. */
  1752. ioVectorLength += vectorsAdded + 1U;
  1753. subscriptionsSent++;
  1754. /* The index needs to be updated for next iteration. */
  1755. topicFieldLengthIndex++;
  1756. }
  1757. if( sendMessageVector( pContext,
  1758. pIoVector,
  1759. ioVectorLength ) != ( int32_t ) totalPacketLength )
  1760. {
  1761. status = MQTTSendFailed;
  1762. }
  1763. /* Update the iterator for the next potential loop iteration. */
  1764. pIterator = pIoVector;
  1765. /* Reset the vector length for the next potential loop iteration. */
  1766. ioVectorLength = 0U;
  1767. /* Reset the packet length for the next potential loop iteration. */
  1768. totalPacketLength = 0U;
  1769. }
  1770. return status;
  1771. }
  1772. /*-----------------------------------------------------------*/
  1773. static MQTTStatus_t sendUnsubscribeWithoutCopy( MQTTContext_t * pContext,
  1774. const MQTTSubscribeInfo_t * pSubscriptionList,
  1775. size_t subscriptionCount,
  1776. uint16_t packetId,
  1777. size_t remainingLength )
  1778. {
  1779. MQTTStatus_t status = MQTTSuccess;
  1780. uint8_t * pIndex;
  1781. TransportOutVector_t pIoVector[ MQTT_SUB_UNSUB_MAX_VECTORS ];
  1782. TransportOutVector_t * pIterator;
  1783. uint8_t serializedTopicFieldLength[ MQTT_SUB_UNSUB_MAX_VECTORS ][ CORE_MQTT_SERIALIZED_LENGTH_FIELD_BYTES ];
  1784. size_t totalPacketLength = 0U;
  1785. size_t unsubscriptionsSent = 0U;
  1786. size_t ioVectorLength = 0U;
  1787. size_t vectorsAdded;
  1788. size_t topicFieldLengthIndex;
  1789. /* Maximum number of bytes required by the 'fixed' part of the UNSUBSCRIBE
  1790. * packet header according to the MQTT specification.
  1791. * MQTT Control Byte 0 + 1 = 1
  1792. * Remaining length (max) + 4 = 5
  1793. * Packet ID + 2 = 7 */
  1794. uint8_t unsubscribeheader[ 7U ];
  1795. /* The vector array should be at least three element long as the topic
  1796. * string needs these many vector elements to be stored. */
  1797. assert( MQTT_SUB_UNSUB_MAX_VECTORS >= CORE_MQTT_UNSUBSCRIBE_PER_TOPIC_VECTOR_LENGTH );
  1798. pIndex = unsubscribeheader;
  1799. pIterator = pIoVector;
  1800. pIndex = MQTT_SerializeUnsubscribeHeader( remainingLength,
  1801. pIndex,
  1802. packetId );
  1803. /* The header is to be sent first. */
  1804. pIterator->iov_base = unsubscribeheader;
  1805. /* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-182 */
  1806. /* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-108 */
  1807. /* coverity[misra_c_2012_rule_18_2_violation] */
  1808. /* coverity[misra_c_2012_rule_10_8_violation] */
  1809. pIterator->iov_len = ( size_t ) ( pIndex - unsubscribeheader );
  1810. totalPacketLength += pIterator->iov_len;
  1811. pIterator++;
  1812. ioVectorLength++;
  1813. while( ( status == MQTTSuccess ) && ( unsubscriptionsSent < subscriptionCount ) )
  1814. {
  1815. /* Reset the index for next iteration. */
  1816. topicFieldLengthIndex = 0;
  1817. /* Check whether the subscription topic will fit in the given vector. */
  1818. while( ( ioVectorLength <= ( MQTT_SUB_UNSUB_MAX_VECTORS - CORE_MQTT_UNSUBSCRIBE_PER_TOPIC_VECTOR_LENGTH ) ) &&
  1819. ( unsubscriptionsSent < subscriptionCount ) )
  1820. {
  1821. /* The topic filter gets sent next. */
  1822. vectorsAdded = addEncodedStringToVector( serializedTopicFieldLength[ topicFieldLengthIndex ],
  1823. pSubscriptionList[ unsubscriptionsSent ].pTopicFilter,
  1824. pSubscriptionList[ unsubscriptionsSent ].topicFilterLength,
  1825. pIterator,
  1826. &totalPacketLength );
  1827. /* Update the iterator to point to the next empty location. */
  1828. pIterator = &pIterator[ vectorsAdded ];
  1829. /* Update the total count based on how many vectors were added. */
  1830. ioVectorLength += vectorsAdded;
  1831. unsubscriptionsSent++;
  1832. /* Update the index for next iteration. */
  1833. topicFieldLengthIndex++;
  1834. }
  1835. if( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalPacketLength )
  1836. {
  1837. status = MQTTSendFailed;
  1838. }
  1839. /* Update the iterator for the next potential loop iteration. */
  1840. pIterator = pIoVector;
  1841. /* Reset the vector length for the next potential loop iteration. */
  1842. ioVectorLength = 0U;
  1843. /* Reset the packet length for the next potential loop iteration. */
  1844. totalPacketLength = 0U;
  1845. }
  1846. return status;
  1847. }
  1848. /*-----------------------------------------------------------*/
  1849. static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext,
  1850. const MQTTPublishInfo_t * pPublishInfo,
  1851. uint8_t * pMqttHeader,
  1852. size_t headerSize,
  1853. uint16_t packetId )
  1854. {
  1855. MQTTStatus_t status = MQTTSuccess;
  1856. size_t ioVectorLength;
  1857. size_t totalMessageLength;
  1858. bool dupFlagChanged = false;
  1859. /* Bytes required to encode the packet ID in an MQTT header according to
  1860. * the MQTT specification. */
  1861. uint8_t serializedPacketID[ 2U ];
  1862. /* Maximum number of vectors required to encode and send a publish
  1863. * packet. The breakdown is shown below.
  1864. * Fixed header (including topic string length) 0 + 1 = 1
  1865. * Topic string + 1 = 2
  1866. * Packet ID (only when QoS > QoS0) + 1 = 3
  1867. * Payload + 1 = 4 */
  1868. TransportOutVector_t pIoVector[ 4U ];
  1869. /* The header is sent first. */
  1870. pIoVector[ 0U ].iov_base = pMqttHeader;
  1871. pIoVector[ 0U ].iov_len = headerSize;
  1872. totalMessageLength = headerSize;
  1873. /* Then the topic name has to be sent. */
  1874. pIoVector[ 1U ].iov_base = pPublishInfo->pTopicName;
  1875. pIoVector[ 1U ].iov_len = pPublishInfo->topicNameLength;
  1876. totalMessageLength += pPublishInfo->topicNameLength;
  1877. /* The next field's index should be 2 as the first two fields
  1878. * have been filled in. */
  1879. ioVectorLength = 2U;
  1880. if( pPublishInfo->qos > MQTTQoS0 )
  1881. {
  1882. /* Encode the packet ID. */
  1883. serializedPacketID[ 0 ] = ( ( uint8_t ) ( ( packetId ) >> 8 ) );
  1884. serializedPacketID[ 1 ] = ( ( uint8_t ) ( ( packetId ) & 0x00ffU ) );
  1885. pIoVector[ ioVectorLength ].iov_base = serializedPacketID;
  1886. pIoVector[ ioVectorLength ].iov_len = sizeof( serializedPacketID );
  1887. ioVectorLength++;
  1888. totalMessageLength += sizeof( serializedPacketID );
  1889. }
  1890. /* Publish packets are allowed to contain no payload. */
  1891. if( pPublishInfo->payloadLength > 0U )
  1892. {
  1893. pIoVector[ ioVectorLength ].iov_base = pPublishInfo->pPayload;
  1894. pIoVector[ ioVectorLength ].iov_len = pPublishInfo->payloadLength;
  1895. ioVectorLength++;
  1896. totalMessageLength += pPublishInfo->payloadLength;
  1897. }
  1898. /* store a copy of the publish for retransmission purposes */
  1899. if( ( pPublishInfo->qos > MQTTQoS0 ) &&
  1900. ( pContext->storeFunction != NULL ) )
  1901. {
  1902. /* If not already set, set the dup flag before storing a copy of the publish
  1903. * this is because on retrieving back this copy we will get it in the form of an
  1904. * array of TransportOutVector_t that holds the data in a const pointer which cannot be
  1905. * changed after retrieving. */
  1906. if( pPublishInfo->dup != true )
  1907. {
  1908. status = MQTT_UpdateDuplicatePublishFlag( pMqttHeader, true );
  1909. dupFlagChanged = ( status == MQTTSuccess );
  1910. }
  1911. if( status == MQTTSuccess )
  1912. {
  1913. MQTTVec_t mqttVec;
  1914. mqttVec.pVector = pIoVector;
  1915. mqttVec.vectorLen = ioVectorLength;
  1916. if( pContext->storeFunction( pContext, packetId, &mqttVec ) != true )
  1917. {
  1918. status = MQTTPublishStoreFailed;
  1919. }
  1920. }
  1921. /* change the value of the dup flag to its original, if it was changed */
  1922. if( ( status == MQTTSuccess ) && ( dupFlagChanged == true ) )
  1923. {
  1924. status = MQTT_UpdateDuplicatePublishFlag( pMqttHeader, false );
  1925. }
  1926. }
  1927. if( ( status == MQTTSuccess ) &&
  1928. ( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalMessageLength ) )
  1929. {
  1930. status = MQTTSendFailed;
  1931. }
  1932. return status;
  1933. }
  1934. /*-----------------------------------------------------------*/
  1935. static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext,
  1936. const MQTTConnectInfo_t * pConnectInfo,
  1937. const MQTTPublishInfo_t * pWillInfo,
  1938. size_t remainingLength )
  1939. {
  1940. MQTTStatus_t status = MQTTSuccess;
  1941. TransportOutVector_t * iterator;
  1942. size_t ioVectorLength = 0U;
  1943. size_t totalMessageLength = 0U;
  1944. int32_t bytesSentOrError;
  1945. uint8_t * pIndex;
  1946. uint8_t serializedClientIDLength[ 2 ];
  1947. uint8_t serializedTopicLength[ 2 ];
  1948. uint8_t serializedPayloadLength[ 2 ];
  1949. uint8_t serializedUsernameLength[ 2 ];
  1950. uint8_t serializedPasswordLength[ 2 ];
  1951. size_t vectorsAdded;
  1952. /* Maximum number of bytes required by the 'fixed' part of the CONNECT
  1953. * packet header according to the MQTT specification.
  1954. * MQTT Control Byte 0 + 1 = 1
  1955. * Remaining length (max) + 4 = 5
  1956. * Protocol Name Length + 2 = 7
  1957. * Protocol Name (MQTT) + 4 = 11
  1958. * Protocol level + 1 = 12
  1959. * Connect flags + 1 = 13
  1960. * Keep alive + 2 = 15 */
  1961. uint8_t connectPacketHeader[ 15U ];
  1962. /* The maximum vectors required to encode and send a connect packet. The
  1963. * breakdown is shown below.
  1964. * Fixed header 0 + 1 = 1
  1965. * Client ID + 2 = 3
  1966. * Will topic + 2 = 5
  1967. * Will payload + 2 = 7
  1968. * Username + 2 = 9
  1969. * Password + 2 = 11 */
  1970. TransportOutVector_t pIoVector[ 11U ];
  1971. iterator = pIoVector;
  1972. pIndex = connectPacketHeader;
  1973. /* Validate arguments. */
  1974. if( ( pWillInfo != NULL ) && ( pWillInfo->pTopicName == NULL ) )
  1975. {
  1976. LogError( ( "pWillInfo->pTopicName cannot be NULL if Will is present." ) );
  1977. status = MQTTBadParameter;
  1978. }
  1979. else
  1980. {
  1981. pIndex = MQTT_SerializeConnectFixedHeader( pIndex,
  1982. pConnectInfo,
  1983. pWillInfo,
  1984. remainingLength );
  1985. assert( ( ( size_t ) ( pIndex - connectPacketHeader ) ) <= sizeof( connectPacketHeader ) );
  1986. /* The header gets sent first. */
  1987. iterator->iov_base = connectPacketHeader;
  1988. /* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-182 */
  1989. /* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-108 */
  1990. /* coverity[misra_c_2012_rule_18_2_violation] */
  1991. /* coverity[misra_c_2012_rule_10_8_violation] */
  1992. iterator->iov_len = ( size_t ) ( pIndex - connectPacketHeader );
  1993. totalMessageLength += iterator->iov_len;
  1994. iterator++;
  1995. ioVectorLength++;
  1996. /* Serialize the client ID. */
  1997. vectorsAdded = addEncodedStringToVector( serializedClientIDLength,
  1998. pConnectInfo->pClientIdentifier,
  1999. pConnectInfo->clientIdentifierLength,
  2000. iterator,
  2001. &totalMessageLength );
  2002. /* Update the iterator to point to the next empty slot. */
  2003. iterator = &iterator[ vectorsAdded ];
  2004. ioVectorLength += vectorsAdded;
  2005. if( pWillInfo != NULL )
  2006. {
  2007. /* Serialize the topic. */
  2008. vectorsAdded = addEncodedStringToVector( serializedTopicLength,
  2009. pWillInfo->pTopicName,
  2010. pWillInfo->topicNameLength,
  2011. iterator,
  2012. &totalMessageLength );
  2013. /* Update the iterator to point to the next empty slot. */
  2014. iterator = &iterator[ vectorsAdded ];
  2015. ioVectorLength += vectorsAdded;
  2016. /* Serialize the payload. Payload of last will and testament can be NULL. */
  2017. vectorsAdded = addEncodedStringToVector( serializedPayloadLength,
  2018. pWillInfo->pPayload,
  2019. ( uint16_t ) pWillInfo->payloadLength,
  2020. iterator,
  2021. &totalMessageLength );
  2022. /* Update the iterator to point to the next empty slot. */
  2023. iterator = &iterator[ vectorsAdded ];
  2024. ioVectorLength += vectorsAdded;
  2025. }
  2026. /* Encode the user name if provided. */
  2027. if( pConnectInfo->pUserName != NULL )
  2028. {
  2029. /* Serialize the user name string. */
  2030. vectorsAdded = addEncodedStringToVector( serializedUsernameLength,
  2031. pConnectInfo->pUserName,
  2032. pConnectInfo->userNameLength,
  2033. iterator,
  2034. &totalMessageLength );
  2035. /* Update the iterator to point to the next empty slot. */
  2036. iterator = &iterator[ vectorsAdded ];
  2037. ioVectorLength += vectorsAdded;
  2038. }
  2039. /* Encode the password if provided. */
  2040. if( pConnectInfo->pPassword != NULL )
  2041. {
  2042. /* Serialize the user name string. */
  2043. vectorsAdded = addEncodedStringToVector( serializedPasswordLength,
  2044. pConnectInfo->pPassword,
  2045. pConnectInfo->passwordLength,
  2046. iterator,
  2047. &totalMessageLength );
  2048. /* Update the iterator to point to the next empty slot. */
  2049. ioVectorLength += vectorsAdded;
  2050. }
  2051. bytesSentOrError = sendMessageVector( pContext, pIoVector, ioVectorLength );
  2052. if( bytesSentOrError != ( int32_t ) totalMessageLength )
  2053. {
  2054. status = MQTTSendFailed;
  2055. }
  2056. }
  2057. return status;
  2058. }
  2059. /*-----------------------------------------------------------*/
  2060. static MQTTStatus_t receiveConnack( MQTTContext_t * pContext,
  2061. uint32_t timeoutMs,
  2062. bool cleanSession,
  2063. MQTTPacketInfo_t * pIncomingPacket,
  2064. bool * pSessionPresent )
  2065. {
  2066. MQTTStatus_t status = MQTTSuccess;
  2067. MQTTGetCurrentTimeFunc_t getTimeStamp = NULL;
  2068. uint32_t entryTimeMs = 0U, remainingTimeMs = 0U, timeTakenMs = 0U;
  2069. bool breakFromLoop = false;
  2070. uint16_t loopCount = 0U;
  2071. assert( pContext != NULL );
  2072. assert( pIncomingPacket != NULL );
  2073. assert( pContext->getTime != NULL );
  2074. getTimeStamp = pContext->getTime;
  2075. /* Get the entry time for the function. */
  2076. entryTimeMs = getTimeStamp();
  2077. do
  2078. {
  2079. /* Transport read for incoming CONNACK packet type and length.
  2080. * MQTT_GetIncomingPacketTypeAndLength is a blocking call and it is
  2081. * returned after a transport receive timeout, an error, or a successful
  2082. * receive of packet type and length. */
  2083. status = MQTT_GetIncomingPacketTypeAndLength( pContext->transportInterface.recv,
  2084. pContext->transportInterface.pNetworkContext,
  2085. pIncomingPacket );
  2086. /* The loop times out based on 2 conditions.
  2087. * 1. If timeoutMs is greater than 0:
  2088. * Loop times out based on the timeout calculated by getTime()
  2089. * function.
  2090. * 2. If timeoutMs is 0:
  2091. * Loop times out based on the maximum number of retries config
  2092. * MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT. This config will control
  2093. * maximum the number of retry attempts to read the CONNACK packet.
  2094. * A value of 0 for the config will try once to read CONNACK. */
  2095. if( timeoutMs > 0U )
  2096. {
  2097. breakFromLoop = calculateElapsedTime( getTimeStamp(), entryTimeMs ) >= timeoutMs;
  2098. }
  2099. else
  2100. {
  2101. breakFromLoop = loopCount >= MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT;
  2102. loopCount++;
  2103. }
  2104. /* Loop until there is data to read or if we have exceeded the timeout/retries. */
  2105. } while( ( status == MQTTNoDataAvailable ) && ( breakFromLoop == false ) );
  2106. if( status == MQTTSuccess )
  2107. {
  2108. /* Time taken in this function so far. */
  2109. timeTakenMs = calculateElapsedTime( getTimeStamp(), entryTimeMs );
  2110. if( timeTakenMs < timeoutMs )
  2111. {
  2112. /* Calculate remaining time for receiving the remainder of
  2113. * the packet. */
  2114. remainingTimeMs = timeoutMs - timeTakenMs;
  2115. }
  2116. /* Reading the remainder of the packet by transport recv.
  2117. * Attempt to read once even if the timeout has expired.
  2118. * Invoking receivePacket with remainingTime as 0 would attempt to
  2119. * recv from network once. If using retries, the remainder of the
  2120. * CONNACK packet is tried to be read only once. Reading once would be
  2121. * good as the packet type and remaining length was already read. Hence,
  2122. * the probability of the remaining 2 bytes available to read is very high. */
  2123. if( pIncomingPacket->type == MQTT_PACKET_TYPE_CONNACK )
  2124. {
  2125. status = receivePacket( pContext,
  2126. *pIncomingPacket,
  2127. remainingTimeMs );
  2128. }
  2129. else
  2130. {
  2131. LogError( ( "Incorrect packet type %X received while expecting"
  2132. " CONNACK(%X).",
  2133. ( unsigned int ) pIncomingPacket->type,
  2134. MQTT_PACKET_TYPE_CONNACK ) );
  2135. status = MQTTBadResponse;
  2136. }
  2137. }
  2138. if( status == MQTTSuccess )
  2139. {
  2140. /* Update the packet info pointer to the buffer read. */
  2141. pIncomingPacket->pRemainingData = pContext->networkBuffer.pBuffer;
  2142. /* Deserialize CONNACK. */
  2143. status = MQTT_DeserializeAck( pIncomingPacket, NULL, pSessionPresent );
  2144. }
  2145. /* If a clean session is requested, a session present should not be set by
  2146. * broker. */
  2147. if( status == MQTTSuccess )
  2148. {
  2149. if( ( cleanSession == true ) && ( *pSessionPresent == true ) )
  2150. {
  2151. LogError( ( "Unexpected session present flag in CONNACK response from broker."
  2152. " CONNECT request with clean session was made with broker." ) );
  2153. status = MQTTBadResponse;
  2154. }
  2155. }
  2156. if( status == MQTTSuccess )
  2157. {
  2158. LogDebug( ( "Received MQTT CONNACK successfully from broker." ) );
  2159. }
  2160. else
  2161. {
  2162. LogError( ( "CONNACK recv failed with status = %s.",
  2163. MQTT_Status_strerror( status ) ) );
  2164. }
  2165. return status;
  2166. }
  2167. /*-----------------------------------------------------------*/
  2168. static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext )
  2169. {
  2170. MQTTStatus_t status = MQTTSuccess;
  2171. MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER;
  2172. uint16_t packetId = MQTT_PACKET_ID_INVALID;
  2173. MQTTPublishState_t state = MQTTStateNull;
  2174. size_t totalMessageLength = 0;
  2175. uint8_t * pMqttPacket = NULL;
  2176. assert( pContext != NULL );
  2177. /* Get the next packet ID for which a PUBREL need to be resent. */
  2178. packetId = MQTT_PubrelToResend( pContext, &cursor, &state );
  2179. /* Resend all the PUBREL acks after session is reestablished. */
  2180. while( ( packetId != MQTT_PACKET_ID_INVALID ) &&
  2181. ( status == MQTTSuccess ) )
  2182. {
  2183. status = sendPublishAcks( pContext, packetId, state );
  2184. packetId = MQTT_PubrelToResend( pContext, &cursor, &state );
  2185. }
  2186. if( ( status == MQTTSuccess ) &&
  2187. ( pContext->retrieveFunction != NULL ) )
  2188. {
  2189. cursor = MQTT_STATE_CURSOR_INITIALIZER;
  2190. /* Resend all the PUBLISH for which PUBACK/PUBREC is not received
  2191. * after session is reestablished. */
  2192. do
  2193. {
  2194. packetId = MQTT_PublishToResend( pContext, &cursor );
  2195. if( packetId != MQTT_PACKET_ID_INVALID )
  2196. {
  2197. if( pContext->retrieveFunction( pContext, packetId, &pMqttPacket, &totalMessageLength ) != true )
  2198. {
  2199. status = MQTTPublishRetrieveFailed;
  2200. break;
  2201. }
  2202. MQTT_PRE_STATE_UPDATE_HOOK( pContext );
  2203. if( sendBuffer( pContext, pMqttPacket, totalMessageLength ) != ( int32_t ) totalMessageLength )
  2204. {
  2205. status = MQTTSendFailed;
  2206. }
  2207. MQTT_POST_STATE_UPDATE_HOOK( pContext );
  2208. }
  2209. } while( ( packetId != MQTT_PACKET_ID_INVALID ) &&
  2210. ( status == MQTTSuccess ) );
  2211. }
  2212. return status;
  2213. }
  2214. static MQTTStatus_t handleCleanSession( MQTTContext_t * pContext )
  2215. {
  2216. MQTTStatus_t status = MQTTSuccess;
  2217. MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER;
  2218. uint16_t packetId = MQTT_PACKET_ID_INVALID;
  2219. assert( pContext != NULL );
  2220. /* Reset the index and clear the buffer when a new session is established. */
  2221. pContext->index = 0;
  2222. ( void ) memset( pContext->networkBuffer.pBuffer, 0, pContext->networkBuffer.size );
  2223. if( pContext->clearFunction != NULL )
  2224. {
  2225. cursor = MQTT_STATE_CURSOR_INITIALIZER;
  2226. /* Resend all the PUBLISH for which PUBACK/PUBREC is not received
  2227. * after session is reestablished. */
  2228. do
  2229. {
  2230. packetId = MQTT_PublishToResend( pContext, &cursor );
  2231. if( packetId != MQTT_PACKET_ID_INVALID )
  2232. {
  2233. pContext->clearFunction( pContext, packetId );
  2234. }
  2235. } while( packetId != MQTT_PACKET_ID_INVALID );
  2236. }
  2237. if( pContext->outgoingPublishRecordMaxCount > 0U )
  2238. {
  2239. /* Clear any existing records if a new session is established. */
  2240. ( void ) memset( pContext->outgoingPublishRecords,
  2241. 0x00,
  2242. pContext->outgoingPublishRecordMaxCount * sizeof( *pContext->outgoingPublishRecords ) );
  2243. }
  2244. if( pContext->incomingPublishRecordMaxCount > 0U )
  2245. {
  2246. ( void ) memset( pContext->incomingPublishRecords,
  2247. 0x00,
  2248. pContext->incomingPublishRecordMaxCount * sizeof( *pContext->incomingPublishRecords ) );
  2249. }
  2250. return status;
  2251. }
  2252. static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext,
  2253. const MQTTPublishInfo_t * pPublishInfo,
  2254. uint16_t packetId )
  2255. {
  2256. MQTTStatus_t status = MQTTSuccess;
  2257. /* Validate arguments. */
  2258. if( ( pContext == NULL ) || ( pPublishInfo == NULL ) )
  2259. {
  2260. LogError( ( "Argument cannot be NULL: pContext=%p, "
  2261. "pPublishInfo=%p.",
  2262. ( void * ) pContext,
  2263. ( void * ) pPublishInfo ) );
  2264. status = MQTTBadParameter;
  2265. }
  2266. else if( ( pPublishInfo->qos != MQTTQoS0 ) && ( packetId == 0U ) )
  2267. {
  2268. LogError( ( "Packet Id is 0 for PUBLISH with QoS=%u.",
  2269. ( unsigned int ) pPublishInfo->qos ) );
  2270. status = MQTTBadParameter;
  2271. }
  2272. else if( ( pPublishInfo->payloadLength > 0U ) && ( pPublishInfo->pPayload == NULL ) )
  2273. {
  2274. LogError( ( "A nonzero payload length requires a non-NULL payload: "
  2275. "payloadLength=%lu, pPayload=%p.",
  2276. ( unsigned long ) pPublishInfo->payloadLength,
  2277. pPublishInfo->pPayload ) );
  2278. status = MQTTBadParameter;
  2279. }
  2280. else if( ( pContext->outgoingPublishRecords == NULL ) && ( pPublishInfo->qos > MQTTQoS0 ) )
  2281. {
  2282. LogError( ( "Trying to publish a QoS > MQTTQoS0 packet when outgoing publishes "
  2283. "for QoS1/QoS2 have not been enabled. Please, call MQTT_InitStatefulQoS "
  2284. "to initialize and enable the use of QoS1/QoS2 publishes." ) );
  2285. status = MQTTBadParameter;
  2286. }
  2287. else
  2288. {
  2289. /* MISRA else */
  2290. }
  2291. return status;
  2292. }
  2293. /*-----------------------------------------------------------*/
  2294. MQTTStatus_t MQTT_Init( MQTTContext_t * pContext,
  2295. const TransportInterface_t * pTransportInterface,
  2296. MQTTGetCurrentTimeFunc_t getTimeFunction,
  2297. MQTTEventCallback_t userCallback,
  2298. const MQTTFixedBuffer_t * pNetworkBuffer )
  2299. {
  2300. MQTTStatus_t status = MQTTSuccess;
  2301. /* Validate arguments. */
  2302. if( ( pContext == NULL ) || ( pTransportInterface == NULL ) ||
  2303. ( pNetworkBuffer == NULL ) )
  2304. {
  2305. LogError( ( "Argument cannot be NULL: pContext=%p, "
  2306. "pTransportInterface=%p, "
  2307. "pNetworkBuffer=%p",
  2308. ( void * ) pContext,
  2309. ( void * ) pTransportInterface,
  2310. ( void * ) pNetworkBuffer ) );
  2311. status = MQTTBadParameter;
  2312. }
  2313. else if( getTimeFunction == NULL )
  2314. {
  2315. LogError( ( "Invalid parameter: getTimeFunction is NULL" ) );
  2316. status = MQTTBadParameter;
  2317. }
  2318. else if( userCallback == NULL )
  2319. {
  2320. LogError( ( "Invalid parameter: userCallback is NULL" ) );
  2321. status = MQTTBadParameter;
  2322. }
  2323. else if( pTransportInterface->recv == NULL )
  2324. {
  2325. LogError( ( "Invalid parameter: pTransportInterface->recv is NULL" ) );
  2326. status = MQTTBadParameter;
  2327. }
  2328. else if( pTransportInterface->send == NULL )
  2329. {
  2330. LogError( ( "Invalid parameter: pTransportInterface->send is NULL" ) );
  2331. status = MQTTBadParameter;
  2332. }
  2333. else
  2334. {
  2335. ( void ) memset( pContext, 0x00, sizeof( MQTTContext_t ) );
  2336. pContext->connectStatus = MQTTNotConnected;
  2337. pContext->transportInterface = *pTransportInterface;
  2338. pContext->getTime = getTimeFunction;
  2339. pContext->appCallback = userCallback;
  2340. pContext->networkBuffer = *pNetworkBuffer;
  2341. /* Zero is not a valid packet ID per MQTT spec. Start from 1. */
  2342. pContext->nextPacketId = 1;
  2343. }
  2344. return status;
  2345. }
  2346. /*-----------------------------------------------------------*/
  2347. MQTTStatus_t MQTT_InitStatefulQoS( MQTTContext_t * pContext,
  2348. MQTTPubAckInfo_t * pOutgoingPublishRecords,
  2349. size_t outgoingPublishCount,
  2350. MQTTPubAckInfo_t * pIncomingPublishRecords,
  2351. size_t incomingPublishCount )
  2352. {
  2353. MQTTStatus_t status = MQTTSuccess;
  2354. if( pContext == NULL )
  2355. {
  2356. LogError( ( "Argument cannot be NULL: pContext=%p\n",
  2357. ( void * ) pContext ) );
  2358. status = MQTTBadParameter;
  2359. }
  2360. /* Check whether the arguments make sense. Not equal here behaves
  2361. * like an exclusive-or operator for boolean values. */
  2362. else if( ( outgoingPublishCount == 0U ) !=
  2363. ( pOutgoingPublishRecords == NULL ) )
  2364. {
  2365. LogError( ( "Arguments do not match: pOutgoingPublishRecords=%p, "
  2366. "outgoingPublishCount=%lu",
  2367. ( void * ) pOutgoingPublishRecords,
  2368. ( unsigned long ) outgoingPublishCount ) );
  2369. status = MQTTBadParameter;
  2370. }
  2371. /* Check whether the arguments make sense. Not equal here behaves
  2372. * like an exclusive-or operator for boolean values. */
  2373. else if( ( incomingPublishCount == 0U ) !=
  2374. ( pIncomingPublishRecords == NULL ) )
  2375. {
  2376. LogError( ( "Arguments do not match: pIncomingPublishRecords=%p, "
  2377. "incomingPublishCount=%lu",
  2378. ( void * ) pIncomingPublishRecords,
  2379. ( unsigned long ) incomingPublishCount ) );
  2380. status = MQTTBadParameter;
  2381. }
  2382. else if( pContext->appCallback == NULL )
  2383. {
  2384. LogError( ( "MQTT_InitStatefulQoS must be called only after MQTT_Init has"
  2385. " been called successfully.\n" ) );
  2386. status = MQTTBadParameter;
  2387. }
  2388. else
  2389. {
  2390. pContext->incomingPublishRecordMaxCount = incomingPublishCount;
  2391. pContext->incomingPublishRecords = pIncomingPublishRecords;
  2392. pContext->outgoingPublishRecordMaxCount = outgoingPublishCount;
  2393. pContext->outgoingPublishRecords = pOutgoingPublishRecords;
  2394. }
  2395. return status;
  2396. }
  2397. /*-----------------------------------------------------------*/
  2398. MQTTStatus_t MQTT_InitRetransmits( MQTTContext_t * pContext,
  2399. MQTTStorePacketForRetransmit storeFunction,
  2400. MQTTRetrievePacketForRetransmit retrieveFunction,
  2401. MQTTClearPacketForRetransmit clearFunction )
  2402. {
  2403. MQTTStatus_t status = MQTTSuccess;
  2404. if( pContext == NULL )
  2405. {
  2406. LogError( ( "Argument cannot be NULL: pContext=%p\n",
  2407. ( void * ) pContext ) );
  2408. status = MQTTBadParameter;
  2409. }
  2410. else if( storeFunction == NULL )
  2411. {
  2412. LogError( ( "Invalid parameter: storeFunction is NULL" ) );
  2413. status = MQTTBadParameter;
  2414. }
  2415. else if( retrieveFunction == NULL )
  2416. {
  2417. LogError( ( "Invalid parameter: retrieveFunction is NULL" ) );
  2418. status = MQTTBadParameter;
  2419. }
  2420. else if( clearFunction == NULL )
  2421. {
  2422. LogError( ( "Invalid parameter: clearFunction is NULL" ) );
  2423. status = MQTTBadParameter;
  2424. }
  2425. else
  2426. {
  2427. pContext->storeFunction = storeFunction;
  2428. pContext->retrieveFunction = retrieveFunction;
  2429. pContext->clearFunction = clearFunction;
  2430. }
  2431. return status;
  2432. }
  2433. /*-----------------------------------------------------------*/
  2434. MQTTStatus_t MQTT_CancelCallback( const MQTTContext_t * pContext,
  2435. uint16_t packetId )
  2436. {
  2437. MQTTStatus_t status = MQTTSuccess;
  2438. if( pContext == NULL )
  2439. {
  2440. LogWarn( ( "pContext is NULL\n" ) );
  2441. status = MQTTBadParameter;
  2442. }
  2443. else if( pContext->outgoingPublishRecords == NULL )
  2444. {
  2445. LogError( ( "QoS1/QoS2 is not initialized for use. Please, "
  2446. "call MQTT_InitStatefulQoS to enable QoS1 and QoS2 "
  2447. "publishes.\n" ) );
  2448. status = MQTTBadParameter;
  2449. }
  2450. else
  2451. {
  2452. MQTT_PRE_STATE_UPDATE_HOOK( pContext );
  2453. status = MQTT_RemoveStateRecord( pContext,
  2454. packetId );
  2455. MQTT_POST_STATE_UPDATE_HOOK( pContext );
  2456. }
  2457. return status;
  2458. }
  2459. /*-----------------------------------------------------------*/
  2460. MQTTStatus_t MQTT_CheckConnectStatus( const MQTTContext_t * pContext )
  2461. {
  2462. MQTTConnectionStatus_t connectStatus;
  2463. MQTTStatus_t status = MQTTSuccess;
  2464. if( pContext == NULL )
  2465. {
  2466. LogError( ( "Argument cannot be NULL: pContext=%p",
  2467. ( void * ) pContext ) );
  2468. status = MQTTBadParameter;
  2469. }
  2470. if( status == MQTTSuccess )
  2471. {
  2472. MQTT_PRE_STATE_UPDATE_HOOK( pContext );
  2473. connectStatus = pContext->connectStatus;
  2474. MQTT_POST_STATE_UPDATE_HOOK( pContext );
  2475. switch( connectStatus )
  2476. {
  2477. case MQTTConnected:
  2478. status = MQTTStatusConnected;
  2479. break;
  2480. case MQTTDisconnectPending:
  2481. status = MQTTStatusDisconnectPending;
  2482. break;
  2483. default:
  2484. status = MQTTStatusNotConnected;
  2485. break;
  2486. }
  2487. }
  2488. return status;
  2489. }
  2490. /*-----------------------------------------------------------*/
  2491. MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext,
  2492. const MQTTConnectInfo_t * pConnectInfo,
  2493. const MQTTPublishInfo_t * pWillInfo,
  2494. uint32_t timeoutMs,
  2495. bool * pSessionPresent )
  2496. {
  2497. size_t remainingLength = 0UL, packetSize = 0UL;
  2498. MQTTStatus_t status = MQTTSuccess;
  2499. MQTTPacketInfo_t incomingPacket = { 0 };
  2500. MQTTConnectionStatus_t connectStatus;
  2501. incomingPacket.type = ( uint8_t ) 0;
  2502. if( ( pContext == NULL ) || ( pConnectInfo == NULL ) || ( pSessionPresent == NULL ) )
  2503. {
  2504. LogError( ( "Argument cannot be NULL: pContext=%p, "
  2505. "pConnectInfo=%p, pSessionPresent=%p.",
  2506. ( void * ) pContext,
  2507. ( void * ) pConnectInfo,
  2508. ( void * ) pSessionPresent ) );
  2509. status = MQTTBadParameter;
  2510. }
  2511. if( status == MQTTSuccess )
  2512. {
  2513. /* Get MQTT connect packet size and remaining length. */
  2514. status = MQTT_GetConnectPacketSize( pConnectInfo,
  2515. pWillInfo,
  2516. &remainingLength,
  2517. &packetSize );
  2518. /* coverity[sensitive_data_leak] */
  2519. LogDebug( ( "CONNECT packet size is %lu and remaining length is %lu.",
  2520. ( unsigned long ) packetSize,
  2521. ( unsigned long ) remainingLength ) );
  2522. }
  2523. if( status == MQTTSuccess )
  2524. {
  2525. MQTT_PRE_STATE_UPDATE_HOOK( pContext );
  2526. connectStatus = pContext->connectStatus;
  2527. if( connectStatus != MQTTNotConnected )
  2528. {
  2529. status = ( connectStatus == MQTTConnected ) ? MQTTStatusConnected : MQTTStatusDisconnectPending;
  2530. }
  2531. if( status == MQTTSuccess )
  2532. {
  2533. status = sendConnectWithoutCopy( pContext,
  2534. pConnectInfo,
  2535. pWillInfo,
  2536. remainingLength );
  2537. }
  2538. /* Read CONNACK from transport layer. */
  2539. if( status == MQTTSuccess )
  2540. {
  2541. status = receiveConnack( pContext,
  2542. timeoutMs,
  2543. pConnectInfo->cleanSession,
  2544. &incomingPacket,
  2545. pSessionPresent );
  2546. }
  2547. if( ( status == MQTTSuccess ) && ( *pSessionPresent != true ) )
  2548. {
  2549. status = handleCleanSession( pContext );
  2550. }
  2551. if( status == MQTTSuccess )
  2552. {
  2553. pContext->connectStatus = MQTTConnected;
  2554. /* Initialize keep-alive fields after a successful connection. */
  2555. pContext->keepAliveIntervalSec = pConnectInfo->keepAliveSeconds;
  2556. pContext->waitingForPingResp = false;
  2557. pContext->pingReqSendTimeMs = 0U;
  2558. }
  2559. MQTT_POST_STATE_UPDATE_HOOK( pContext );
  2560. }
  2561. if( ( status == MQTTSuccess ) && ( *pSessionPresent == true ) )
  2562. {
  2563. /* Resend PUBRELs and PUBLISHES when reestablishing a session */
  2564. status = handleUncleanSessionResumption( pContext );
  2565. }
  2566. if( status == MQTTSuccess )
  2567. {
  2568. LogInfo( ( "MQTT connection established with the broker." ) );
  2569. }
  2570. else if( ( status == MQTTStatusConnected ) || ( status == MQTTStatusDisconnectPending ) )
  2571. {
  2572. LogInfo( ( "MQTT Connection is either already established or a disconnect is pending, return status = %s.",
  2573. MQTT_Status_strerror( status ) ) );
  2574. }
  2575. else if( pContext == NULL )
  2576. {
  2577. LogError( ( "MQTT connection failed with status = %s.",
  2578. MQTT_Status_strerror( status ) ) );
  2579. }
  2580. else
  2581. {
  2582. LogError( ( "MQTT connection failed with status = %s.",
  2583. MQTT_Status_strerror( status ) ) );
  2584. MQTT_PRE_STATE_UPDATE_HOOK( pContext );
  2585. if( pContext->connectStatus == MQTTConnected )
  2586. {
  2587. /* This will only be executed if after the connack is received
  2588. * the retransmits fail for some reason on an unclean session
  2589. * connection. In this case we need to retry the re-transmits
  2590. * which can only be done using the connect API and that can only
  2591. * be done once we are disconnected, hence we ask the user to
  2592. * call disconnect here */
  2593. pContext->connectStatus = MQTTDisconnectPending;
  2594. }
  2595. MQTT_POST_STATE_UPDATE_HOOK( pContext );
  2596. }
  2597. return status;
  2598. }
  2599. /*-----------------------------------------------------------*/
  2600. MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext,
  2601. const MQTTSubscribeInfo_t * pSubscriptionList,
  2602. size_t subscriptionCount,
  2603. uint16_t packetId )
  2604. {
  2605. MQTTConnectionStatus_t connectStatus;
  2606. size_t remainingLength = 0UL, packetSize = 0UL;
  2607. /* Validate arguments. */
  2608. MQTTStatus_t status = validateSubscribeUnsubscribeParams( pContext,
  2609. pSubscriptionList,
  2610. subscriptionCount,
  2611. packetId );
  2612. if( status == MQTTSuccess )
  2613. {
  2614. /* Get the remaining length and packet size.*/
  2615. status = MQTT_GetSubscribePacketSize( pSubscriptionList,
  2616. subscriptionCount,
  2617. &remainingLength,
  2618. &packetSize );
  2619. LogDebug( ( "SUBSCRIBE packet size is %lu and remaining length is %lu.",
  2620. ( unsigned long ) packetSize,
  2621. ( unsigned long ) remainingLength ) );
  2622. }
  2623. if( status == MQTTSuccess )
  2624. {
  2625. MQTT_PRE_STATE_UPDATE_HOOK( pContext );
  2626. connectStatus = pContext->connectStatus;
  2627. if( connectStatus != MQTTConnected )
  2628. {
  2629. status = ( connectStatus == MQTTNotConnected ) ? MQTTStatusNotConnected : MQTTStatusDisconnectPending;
  2630. }
  2631. if( status == MQTTSuccess )
  2632. {
  2633. /* Send MQTT SUBSCRIBE packet. */
  2634. status = sendSubscribeWithoutCopy( pContext,
  2635. pSubscriptionList,
  2636. subscriptionCount,
  2637. packetId,
  2638. remainingLength );
  2639. }
  2640. MQTT_POST_STATE_UPDATE_HOOK( pContext );
  2641. }
  2642. return status;
  2643. }
  2644. /*-----------------------------------------------------------*/
  2645. MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
  2646. const MQTTPublishInfo_t * pPublishInfo,
  2647. uint16_t packetId )
  2648. {
  2649. size_t headerSize = 0UL;
  2650. size_t remainingLength = 0UL;
  2651. size_t packetSize = 0UL;
  2652. MQTTPublishState_t publishStatus = MQTTStateNull;
  2653. MQTTConnectionStatus_t connectStatus;
  2654. /* Maximum number of bytes required by the 'fixed' part of the PUBLISH
  2655. * packet header according to the MQTT specifications.
  2656. * Header byte 0 + 1 = 1
  2657. * Length (max) + 4 = 5
  2658. * Topic string length + 2 = 7
  2659. *
  2660. * Note that since publish is one of the most common operations in MQTT
  2661. * connection, we have moved the topic string length to the 'fixed' part of
  2662. * the header so efficiency. Otherwise, we would need an extra vector and
  2663. * an extra call to 'send' (in case writev is not defined) to send the
  2664. * topic length. */
  2665. uint8_t mqttHeader[ 7U ];
  2666. /* Validate arguments. */
  2667. MQTTStatus_t status = validatePublishParams( pContext, pPublishInfo, packetId );
  2668. if( status == MQTTSuccess )
  2669. {
  2670. /* Get the remaining length and packet size.*/
  2671. status = MQTT_GetPublishPacketSize( pPublishInfo,
  2672. &remainingLength,
  2673. &packetSize );
  2674. }
  2675. if( status == MQTTSuccess )
  2676. {
  2677. status = MQTT_SerializePublishHeaderWithoutTopic( pPublishInfo,
  2678. remainingLength,
  2679. mqttHeader,
  2680. &headerSize );
  2681. }
  2682. if( status == MQTTSuccess )
  2683. {
  2684. /* Take the mutex as multiple send calls are required for sending this
  2685. * packet. */
  2686. MQTT_PRE_STATE_UPDATE_HOOK( pContext );
  2687. connectStatus = pContext->connectStatus;
  2688. if( connectStatus != MQTTConnected )
  2689. {
  2690. status = ( connectStatus == MQTTNotConnected ) ? MQTTStatusNotConnected : MQTTStatusDisconnectPending;
  2691. }
  2692. if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) )
  2693. {
  2694. /* Set the flag so that the corresponding hook can be called later. */
  2695. status = MQTT_ReserveState( pContext,
  2696. packetId,
  2697. pPublishInfo->qos );
  2698. /* State already exists for a duplicate packet.
  2699. * If a state doesn't exist, it will be handled as a new publish in
  2700. * state engine. */
  2701. if( ( status == MQTTStateCollision ) && ( pPublishInfo->dup == true ) )
  2702. {
  2703. status = MQTTSuccess;
  2704. }
  2705. }
  2706. if( status == MQTTSuccess )
  2707. {
  2708. status = sendPublishWithoutCopy( pContext,
  2709. pPublishInfo,
  2710. mqttHeader,
  2711. headerSize,
  2712. packetId );
  2713. }
  2714. if( ( status == MQTTSuccess ) &&
  2715. ( pPublishInfo->qos > MQTTQoS0 ) )
  2716. {
  2717. /* Update state machine after PUBLISH is sent.
  2718. * Only to be done for QoS1 or QoS2. */
  2719. status = MQTT_UpdateStatePublish( pContext,
  2720. packetId,
  2721. MQTT_SEND,
  2722. pPublishInfo->qos,
  2723. &publishStatus );
  2724. if( status != MQTTSuccess )
  2725. {
  2726. LogError( ( "Update state for publish failed with status %s."
  2727. " However PUBLISH packet was sent to the broker."
  2728. " Any further handling of ACKs for the packet Id"
  2729. " will fail.",
  2730. MQTT_Status_strerror( status ) ) );
  2731. }
  2732. }
  2733. /* mutex should be released and not before updating the state
  2734. * because we need to make sure that the state is updated
  2735. * after sending the publish packet, before the receive
  2736. * loop receives ack for this and would want to update its state
  2737. */
  2738. MQTT_POST_STATE_UPDATE_HOOK( pContext );
  2739. }
  2740. if( status != MQTTSuccess )
  2741. {
  2742. LogError( ( "MQTT PUBLISH failed with status %s.",
  2743. MQTT_Status_strerror( status ) ) );
  2744. }
  2745. return status;
  2746. }
  2747. /*-----------------------------------------------------------*/
  2748. MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )
  2749. {
  2750. int32_t sendResult = 0;
  2751. MQTTStatus_t status = MQTTSuccess;
  2752. size_t packetSize = 0U;
  2753. /* MQTT ping packets are of fixed length. */
  2754. uint8_t pingreqPacket[ 2U ];
  2755. MQTTFixedBuffer_t localBuffer;
  2756. MQTTConnectionStatus_t connectStatus;
  2757. localBuffer.pBuffer = pingreqPacket;
  2758. localBuffer.size = sizeof( pingreqPacket );
  2759. if( pContext == NULL )
  2760. {
  2761. LogError( ( "pContext is NULL." ) );
  2762. status = MQTTBadParameter;
  2763. }
  2764. if( status == MQTTSuccess )
  2765. {
  2766. /* Get MQTT PINGREQ packet size. */
  2767. status = MQTT_GetPingreqPacketSize( &packetSize );
  2768. if( status == MQTTSuccess )
  2769. {
  2770. assert( packetSize == localBuffer.size );
  2771. LogDebug( ( "MQTT PINGREQ packet size is %lu.",
  2772. ( unsigned long ) packetSize ) );
  2773. }
  2774. else
  2775. {
  2776. LogError( ( "Failed to get the PINGREQ packet size." ) );
  2777. }
  2778. }
  2779. if( status == MQTTSuccess )
  2780. {
  2781. /* Serialize MQTT PINGREQ. */
  2782. status = MQTT_SerializePingreq( &localBuffer );
  2783. }
  2784. if( status == MQTTSuccess )
  2785. {
  2786. /* Take the mutex as the send call should not be interrupted in
  2787. * between. */
  2788. MQTT_PRE_STATE_UPDATE_HOOK( pContext );
  2789. connectStatus = pContext->connectStatus;
  2790. if( connectStatus != MQTTConnected )
  2791. {
  2792. status = ( connectStatus == MQTTNotConnected ) ? MQTTStatusNotConnected : MQTTStatusDisconnectPending;
  2793. }
  2794. if( status == MQTTSuccess )
  2795. {
  2796. /* Send the serialized PINGREQ packet to transport layer.
  2797. * Here, we do not use the vectored IO approach for efficiency as the
  2798. * Ping packet does not have numerous fields which need to be copied
  2799. * from the user provided buffers. Thus it can be sent directly. */
  2800. sendResult = sendBuffer( pContext,
  2801. localBuffer.pBuffer,
  2802. packetSize );
  2803. /* It is an error to not send the entire PINGREQ packet. */
  2804. if( sendResult < ( int32_t ) packetSize )
  2805. {
  2806. LogError( ( "Transport send failed for PINGREQ packet." ) );
  2807. status = MQTTSendFailed;
  2808. }
  2809. else
  2810. {
  2811. pContext->pingReqSendTimeMs = pContext->lastPacketTxTime;
  2812. pContext->waitingForPingResp = true;
  2813. LogDebug( ( "Sent %ld bytes of PINGREQ packet.",
  2814. ( long int ) sendResult ) );
  2815. }
  2816. }
  2817. MQTT_POST_STATE_UPDATE_HOOK( pContext );
  2818. }
  2819. return status;
  2820. }
  2821. /*-----------------------------------------------------------*/
  2822. MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext,
  2823. const MQTTSubscribeInfo_t * pSubscriptionList,
  2824. size_t subscriptionCount,
  2825. uint16_t packetId )
  2826. {
  2827. MQTTConnectionStatus_t connectStatus;
  2828. size_t remainingLength = 0UL, packetSize = 0UL;
  2829. /* Validate arguments. */
  2830. MQTTStatus_t status = validateSubscribeUnsubscribeParams( pContext,
  2831. pSubscriptionList,
  2832. subscriptionCount,
  2833. packetId );
  2834. if( status == MQTTSuccess )
  2835. {
  2836. /* Get the remaining length and packet size.*/
  2837. status = MQTT_GetUnsubscribePacketSize( pSubscriptionList,
  2838. subscriptionCount,
  2839. &remainingLength,
  2840. &packetSize );
  2841. LogDebug( ( "UNSUBSCRIBE packet size is %lu and remaining length is %lu.",
  2842. ( unsigned long ) packetSize,
  2843. ( unsigned long ) remainingLength ) );
  2844. }
  2845. if( status == MQTTSuccess )
  2846. {
  2847. /* Take the mutex because the below call should not be interrupted. */
  2848. MQTT_PRE_STATE_UPDATE_HOOK( pContext );
  2849. connectStatus = pContext->connectStatus;
  2850. if( connectStatus != MQTTConnected )
  2851. {
  2852. status = ( connectStatus == MQTTNotConnected ) ? MQTTStatusNotConnected : MQTTStatusDisconnectPending;
  2853. }
  2854. if( status == MQTTSuccess )
  2855. {
  2856. status = sendUnsubscribeWithoutCopy( pContext,
  2857. pSubscriptionList,
  2858. subscriptionCount,
  2859. packetId,
  2860. remainingLength );
  2861. }
  2862. MQTT_POST_STATE_UPDATE_HOOK( pContext );
  2863. }
  2864. return status;
  2865. }
  2866. /*-----------------------------------------------------------*/
  2867. MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext )
  2868. {
  2869. size_t packetSize = 0U;
  2870. int32_t sendResult = 0;
  2871. MQTTStatus_t status = MQTTSuccess;
  2872. MQTTFixedBuffer_t localBuffer;
  2873. uint8_t disconnectPacket[ 2U ];
  2874. MQTTConnectionStatus_t connectStatus;
  2875. localBuffer.pBuffer = disconnectPacket;
  2876. localBuffer.size = 2U;
  2877. /* Validate arguments. */
  2878. if( pContext == NULL )
  2879. {
  2880. LogError( ( "pContext cannot be NULL." ) );
  2881. status = MQTTBadParameter;
  2882. }
  2883. if( status == MQTTSuccess )
  2884. {
  2885. /* Get MQTT DISCONNECT packet size. */
  2886. status = MQTT_GetDisconnectPacketSize( &packetSize );
  2887. LogDebug( ( "MQTT DISCONNECT packet size is %lu.",
  2888. ( unsigned long ) packetSize ) );
  2889. }
  2890. if( status == MQTTSuccess )
  2891. {
  2892. /* Serialize MQTT DISCONNECT packet. */
  2893. status = MQTT_SerializeDisconnect( &localBuffer );
  2894. }
  2895. if( status == MQTTSuccess )
  2896. {
  2897. /* Take the mutex because the below call should not be interrupted. */
  2898. MQTT_PRE_STATE_UPDATE_HOOK( pContext );
  2899. connectStatus = pContext->connectStatus;
  2900. if( connectStatus == MQTTNotConnected )
  2901. {
  2902. status = MQTTStatusNotConnected;
  2903. }
  2904. if( status == MQTTSuccess )
  2905. {
  2906. LogInfo( ( "Disconnected from the broker." ) );
  2907. pContext->connectStatus = MQTTNotConnected;
  2908. /* Reset the index and clean the buffer on a successful disconnect. */
  2909. pContext->index = 0;
  2910. ( void ) memset( pContext->networkBuffer.pBuffer, 0, pContext->networkBuffer.size );
  2911. LogError( ( "MQTT Connection Disconnected Successfully" ) );
  2912. /* Here we do not use vectors as the disconnect packet has fixed fields
  2913. * which do not reside in user provided buffers. Thus, it can be sent
  2914. * using a simple send call. */
  2915. sendResult = sendBuffer( pContext,
  2916. localBuffer.pBuffer,
  2917. packetSize );
  2918. if( sendResult < ( int32_t ) packetSize )
  2919. {
  2920. LogError( ( "Transport send failed for DISCONNECT packet." ) );
  2921. status = MQTTSendFailed;
  2922. }
  2923. else
  2924. {
  2925. LogDebug( ( "Sent %ld bytes of DISCONNECT packet.",
  2926. ( long int ) sendResult ) );
  2927. }
  2928. }
  2929. MQTT_POST_STATE_UPDATE_HOOK( pContext );
  2930. }
  2931. return status;
  2932. }
  2933. /*-----------------------------------------------------------*/
  2934. MQTTStatus_t MQTT_ProcessLoop( MQTTContext_t * pContext )
  2935. {
  2936. MQTTStatus_t status = MQTTBadParameter;
  2937. if( pContext == NULL )
  2938. {
  2939. LogError( ( "Invalid input parameter: MQTT Context cannot be NULL." ) );
  2940. }
  2941. else if( pContext->getTime == NULL )
  2942. {
  2943. LogError( ( "Invalid input parameter: MQTT Context must have valid getTime." ) );
  2944. }
  2945. else if( pContext->networkBuffer.pBuffer == NULL )
  2946. {
  2947. LogError( ( "Invalid input parameter: The MQTT context's networkBuffer must not be NULL." ) );
  2948. }
  2949. else
  2950. {
  2951. pContext->controlPacketSent = false;
  2952. status = receiveSingleIteration( pContext, true );
  2953. }
  2954. return status;
  2955. }
  2956. /*-----------------------------------------------------------*/
  2957. MQTTStatus_t MQTT_ReceiveLoop( MQTTContext_t * pContext )
  2958. {
  2959. MQTTStatus_t status = MQTTBadParameter;
  2960. if( pContext == NULL )
  2961. {
  2962. LogError( ( "Invalid input parameter: MQTT Context cannot be NULL." ) );
  2963. }
  2964. else if( pContext->getTime == NULL )
  2965. {
  2966. LogError( ( "Invalid input parameter: MQTT Context must have a valid getTime function." ) );
  2967. }
  2968. else if( pContext->networkBuffer.pBuffer == NULL )
  2969. {
  2970. LogError( ( "Invalid input parameter: MQTT context's networkBuffer must not be NULL." ) );
  2971. }
  2972. else
  2973. {
  2974. status = receiveSingleIteration( pContext, false );
  2975. }
  2976. return status;
  2977. }
  2978. /*-----------------------------------------------------------*/
  2979. uint16_t MQTT_GetPacketId( MQTTContext_t * pContext )
  2980. {
  2981. uint16_t packetId = 0U;
  2982. if( pContext != NULL )
  2983. {
  2984. MQTT_PRE_STATE_UPDATE_HOOK( pContext );
  2985. packetId = pContext->nextPacketId;
  2986. /* A packet ID of zero is not a valid packet ID. When the max ID
  2987. * is reached the next one should start at 1. */
  2988. if( pContext->nextPacketId == ( uint16_t ) UINT16_MAX )
  2989. {
  2990. pContext->nextPacketId = 1;
  2991. }
  2992. else
  2993. {
  2994. pContext->nextPacketId++;
  2995. }
  2996. MQTT_POST_STATE_UPDATE_HOOK( pContext );
  2997. }
  2998. return packetId;
  2999. }
  3000. /*-----------------------------------------------------------*/
  3001. MQTTStatus_t MQTT_MatchTopic( const char * pTopicName,
  3002. const uint16_t topicNameLength,
  3003. const char * pTopicFilter,
  3004. const uint16_t topicFilterLength,
  3005. bool * pIsMatch )
  3006. {
  3007. MQTTStatus_t status = MQTTSuccess;
  3008. bool topicFilterStartsWithWildcard = false;
  3009. bool matchStatus = false;
  3010. if( ( pTopicName == NULL ) || ( topicNameLength == 0u ) )
  3011. {
  3012. LogError( ( "Invalid paramater: Topic name should be non-NULL and its "
  3013. "length should be > 0: TopicName=%p, TopicNameLength=%hu",
  3014. ( void * ) pTopicName,
  3015. ( unsigned short ) topicNameLength ) );
  3016. status = MQTTBadParameter;
  3017. }
  3018. else if( ( pTopicFilter == NULL ) || ( topicFilterLength == 0u ) )
  3019. {
  3020. LogError( ( "Invalid paramater: Topic filter should be non-NULL and "
  3021. "its length should be > 0: TopicName=%p, TopicFilterLength=%hu",
  3022. ( void * ) pTopicFilter,
  3023. ( unsigned short ) topicFilterLength ) );
  3024. status = MQTTBadParameter;
  3025. }
  3026. else if( pIsMatch == NULL )
  3027. {
  3028. LogError( ( "Invalid paramater: Output parameter, pIsMatch, is NULL" ) );
  3029. status = MQTTBadParameter;
  3030. }
  3031. else
  3032. {
  3033. /* Check for an exact match if the incoming topic name and the registered
  3034. * topic filter length match. */
  3035. if( topicNameLength == topicFilterLength )
  3036. {
  3037. matchStatus = strncmp( pTopicName, pTopicFilter, topicNameLength ) == 0;
  3038. }
  3039. if( matchStatus == false )
  3040. {
  3041. /* If an exact match was not found, match against wildcard characters in
  3042. * topic filter.*/
  3043. /* Determine if topic filter starts with a wildcard. */
  3044. topicFilterStartsWithWildcard = ( pTopicFilter[ 0 ] == '+' ) ||
  3045. ( pTopicFilter[ 0 ] == '#' );
  3046. /* Note: According to the MQTT 3.1.1 specification, incoming PUBLISH topic names
  3047. * starting with "$" character cannot be matched against topic filter starting with
  3048. * a wildcard, i.e. for example, "$SYS/sport" cannot be matched with "#" or
  3049. * "+/sport" topic filters. */
  3050. if( !( ( pTopicName[ 0 ] == '$' ) && ( topicFilterStartsWithWildcard == true ) ) )
  3051. {
  3052. matchStatus = matchTopicFilter( pTopicName, topicNameLength, pTopicFilter, topicFilterLength );
  3053. }
  3054. }
  3055. /* Update the output parameter with the match result. */
  3056. *pIsMatch = matchStatus;
  3057. }
  3058. return status;
  3059. }
  3060. /*-----------------------------------------------------------*/
  3061. MQTTStatus_t MQTT_GetSubAckStatusCodes( const MQTTPacketInfo_t * pSubackPacket,
  3062. uint8_t ** pPayloadStart,
  3063. size_t * pPayloadSize )
  3064. {
  3065. MQTTStatus_t status = MQTTSuccess;
  3066. if( pSubackPacket == NULL )
  3067. {
  3068. LogError( ( "Invalid parameter: pSubackPacket is NULL." ) );
  3069. status = MQTTBadParameter;
  3070. }
  3071. else if( pPayloadStart == NULL )
  3072. {
  3073. LogError( ( "Invalid parameter: pPayloadStart is NULL." ) );
  3074. status = MQTTBadParameter;
  3075. }
  3076. else if( pPayloadSize == NULL )
  3077. {
  3078. LogError( ( "Invalid parameter: pPayloadSize is NULL." ) );
  3079. status = MQTTBadParameter;
  3080. }
  3081. else if( pSubackPacket->type != MQTT_PACKET_TYPE_SUBACK )
  3082. {
  3083. LogError( ( "Invalid parameter: Input packet is not a SUBACK packet: "
  3084. "ExpectedType=%02x, InputType=%02x",
  3085. ( int ) MQTT_PACKET_TYPE_SUBACK,
  3086. ( int ) pSubackPacket->type ) );
  3087. status = MQTTBadParameter;
  3088. }
  3089. else if( pSubackPacket->pRemainingData == NULL )
  3090. {
  3091. LogError( ( "Invalid parameter: pSubackPacket->pRemainingData is NULL" ) );
  3092. status = MQTTBadParameter;
  3093. }
  3094. /* A SUBACK must have a remaining length of at least 3 to accommodate the
  3095. * packet identifier and at least 1 return code. */
  3096. else if( pSubackPacket->remainingLength < 3U )
  3097. {
  3098. LogError( ( "Invalid parameter: Packet remaining length is invalid: "
  3099. "Should be greater than 2 for SUBACK packet: InputRemainingLength=%lu",
  3100. ( unsigned long ) pSubackPacket->remainingLength ) );
  3101. status = MQTTBadParameter;
  3102. }
  3103. else
  3104. {
  3105. /* According to the MQTT 3.1.1 protocol specification, the "Remaining Length" field is a
  3106. * length of the variable header (2 bytes) plus the length of the payload.
  3107. * Therefore, we add 2 positions for the starting address of the payload, and
  3108. * subtract 2 bytes from the remaining length for the length of the payload.*/
  3109. *pPayloadStart = &pSubackPacket->pRemainingData[ sizeof( uint16_t ) ];
  3110. *pPayloadSize = pSubackPacket->remainingLength - sizeof( uint16_t );
  3111. }
  3112. return status;
  3113. }
  3114. /*-----------------------------------------------------------*/
  3115. const char * MQTT_Status_strerror( MQTTStatus_t status )
  3116. {
  3117. const char * str = NULL;
  3118. switch( status )
  3119. {
  3120. case MQTTSuccess:
  3121. str = "MQTTSuccess";
  3122. break;
  3123. case MQTTBadParameter:
  3124. str = "MQTTBadParameter";
  3125. break;
  3126. case MQTTNoMemory:
  3127. str = "MQTTNoMemory";
  3128. break;
  3129. case MQTTSendFailed:
  3130. str = "MQTTSendFailed";
  3131. break;
  3132. case MQTTRecvFailed:
  3133. str = "MQTTRecvFailed";
  3134. break;
  3135. case MQTTBadResponse:
  3136. str = "MQTTBadResponse";
  3137. break;
  3138. case MQTTServerRefused:
  3139. str = "MQTTServerRefused";
  3140. break;
  3141. case MQTTNoDataAvailable:
  3142. str = "MQTTNoDataAvailable";
  3143. break;
  3144. case MQTTIllegalState:
  3145. str = "MQTTIllegalState";
  3146. break;
  3147. case MQTTStateCollision:
  3148. str = "MQTTStateCollision";
  3149. break;
  3150. case MQTTKeepAliveTimeout:
  3151. str = "MQTTKeepAliveTimeout";
  3152. break;
  3153. case MQTTNeedMoreBytes:
  3154. str = "MQTTNeedMoreBytes";
  3155. break;
  3156. case MQTTStatusConnected:
  3157. str = "MQTTStatusConnected";
  3158. break;
  3159. case MQTTStatusNotConnected:
  3160. str = "MQTTStatusNotConnected";
  3161. break;
  3162. case MQTTStatusDisconnectPending:
  3163. str = "MQTTStatusDisconnectPending";
  3164. break;
  3165. case MQTTPublishStoreFailed:
  3166. str = "MQTTPublishStoreFailed";
  3167. break;
  3168. case MQTTPublishRetrieveFailed:
  3169. str = "MQTTPublishRetrieveFailed";
  3170. break;
  3171. default:
  3172. str = "Invalid MQTT Status code";
  3173. break;
  3174. }
  3175. return str;
  3176. }
  3177. /*-----------------------------------------------------------*/
  3178. size_t MQTT_GetBytesInMQTTVec( const MQTTVec_t * pVec )
  3179. {
  3180. size_t memoryRequired = 0;
  3181. size_t i;
  3182. const TransportOutVector_t * pTransportVec = pVec->pVector;
  3183. size_t vecLen = pVec->vectorLen;
  3184. for( i = 0; i < vecLen; i++ )
  3185. {
  3186. memoryRequired += pTransportVec[ i ].iov_len;
  3187. }
  3188. return memoryRequired;
  3189. }
  3190. /*-----------------------------------------------------------*/
  3191. void MQTT_SerializeMQTTVec( uint8_t * pAllocatedMem,
  3192. const MQTTVec_t * pVec )
  3193. {
  3194. const TransportOutVector_t * pTransportVec = pVec->pVector;
  3195. const size_t vecLen = pVec->vectorLen;
  3196. size_t index = 0;
  3197. size_t i = 0;
  3198. for( i = 0; i < vecLen; i++ )
  3199. {
  3200. ( void ) memcpy( ( void * ) &pAllocatedMem[ index ], ( const void * ) pTransportVec[ i ].iov_base, pTransportVec[ i ].iov_len );
  3201. index += pTransportVec[ i ].iov_len;
  3202. }
  3203. }
  3204. /*-----------------------------------------------------------*/