| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770 |
- /*
- * coreMQTT <DEVELOPMENT BRANCH>
- * Copyright (C) 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * SPDX-License-Identifier: MIT
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy of
- * this software and associated documentation files (the "Software"), to deal in
- * the Software without restriction, including without limitation the rights to
- * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
- * the Software, and to permit persons to whom the Software is furnished to do so,
- * subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
- * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
- * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
- * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
- * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
- */
- /**
- * @file core_mqtt.c
- * @brief Implements the user-facing functions in core_mqtt.h.
- */
- #include <string.h>
- #include <assert.h>
- #include "core_mqtt.h"
- #include "core_mqtt_state.h"
- /* Include config defaults header to get default values of configs. */
- #include "core_mqtt_config_defaults.h"
- #ifndef MQTT_PRE_SEND_HOOK
- /**
- * @brief Hook called before a 'send' operation is executed.
- */
- #define MQTT_PRE_SEND_HOOK( pContext )
- #endif /* !MQTT_PRE_SEND_HOOK */
- #ifndef MQTT_POST_SEND_HOOK
- /**
- * @brief Hook called after the 'send' operation is complete.
- */
- #define MQTT_POST_SEND_HOOK( pContext )
- #endif /* !MQTT_POST_SEND_HOOK */
- #ifndef MQTT_PRE_STATE_UPDATE_HOOK
- /**
- * @brief Hook called just before an update to the MQTT state is made.
- */
- #define MQTT_PRE_STATE_UPDATE_HOOK( pContext )
- #endif /* !MQTT_PRE_STATE_UPDATE_HOOK */
- #ifndef MQTT_POST_STATE_UPDATE_HOOK
- /**
- * @brief Hook called just after an update to the MQTT state has
- * been made.
- */
- #define MQTT_POST_STATE_UPDATE_HOOK( pContext )
- #endif /* !MQTT_POST_STATE_UPDATE_HOOK */
- /**
- * @brief Bytes required to encode any string length in an MQTT packet header.
- * Length is always encoded in two bytes according to the MQTT specification.
- */
- #define CORE_MQTT_SERIALIZED_LENGTH_FIELD_BYTES ( 2U )
- /**
- * @brief Number of vectors required to encode one topic filter in a subscribe
- * request. Three vectors are required as there are three fields in the
- * subscribe request namely:
- * 1. Topic filter length; 2. Topic filter; and 3. QoS in this order.
- */
- #define CORE_MQTT_SUBSCRIBE_PER_TOPIC_VECTOR_LENGTH ( 3U )
- /**
- * @brief Number of vectors required to encode one topic filter in an
- * unsubscribe request. Two vectors are required as there are two fields in the
- * unsubscribe request namely:
- * 1. Topic filter length; and 2. Topic filter in this order.
- */
- #define CORE_MQTT_UNSUBSCRIBE_PER_TOPIC_VECTOR_LENGTH ( 2U )
- struct MQTTVec
- {
- TransportOutVector_t * pVector; /**< Pointer to transport vector. USER SHOULD NOT ACCESS THIS DIRECTLY - IT IS AN INTERNAL DETAIL AND CAN CHANGE. */
- size_t vectorLen; /**< Length of the transport vector. USER SHOULD NOT ACCESS THIS DIRECTLY - IT IS AN INTERNAL DETAIL AND CAN CHANGE. */
- };
- /*-----------------------------------------------------------*/
- /**
- * @brief Sends provided buffer to network using transport send.
- *
- * @brief param[in] pContext Initialized MQTT context.
- * @brief param[in] pBufferToSend Buffer to be sent to network.
- * @brief param[in] bytesToSend Number of bytes to be sent.
- *
- * @note This operation may call the transport send function
- * repeatedly to send bytes over the network until either:
- * 1. The requested number of bytes @a bytesToSend have been sent.
- * OR
- * 2. MQTT_SEND_TIMEOUT_MS milliseconds have gone by since entering this
- * function.
- * OR
- * 3. There is an error in sending data over the network.
- *
- * @return Total number of bytes sent, or negative value on network error.
- */
- static int32_t sendBuffer( MQTTContext_t * pContext,
- const uint8_t * pBufferToSend,
- size_t bytesToSend );
- /**
- * @brief Sends MQTT connect without copying the users data into any buffer.
- *
- * @brief param[in] pContext Initialized MQTT context.
- * @brief param[in] pConnectInfo MQTT CONNECT packet information.
- * @brief param[in] pWillInfo Last Will and Testament. Pass NULL if Last Will and
- * Testament is not used.
- * @brief param[in] remainingLength the length of the connect packet.
- *
- * @note This operation may call the transport send function
- * repeatedly to send bytes over the network until either:
- * 1. The requested number of bytes @a remainingLength have been sent.
- * OR
- * 2. MQTT_SEND_TIMEOUT_MS milliseconds have gone by since entering this
- * function.
- * OR
- * 3. There is an error in sending data over the network.
- *
- * @return #MQTTSendFailed or #MQTTSuccess.
- */
- static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext,
- const MQTTConnectInfo_t * pConnectInfo,
- const MQTTPublishInfo_t * pWillInfo,
- size_t remainingLength );
- /**
- * @brief Sends the vector array passed through the parameters over the network.
- *
- * @note The preference is given to 'writev' function if it is present in the
- * transport interface. Otherwise, a send call is made repeatedly to achieve the
- * result.
- *
- * @param[in] pContext Initialized MQTT context.
- * @param[in] pIoVec The vector array to be sent.
- * @param[in] ioVecCount The number of elements in the array.
- *
- * @note This operation may call the transport send or writev functions
- * repeatedly to send bytes over the network until either:
- * 1. The requested number of bytes have been sent.
- * OR
- * 2. MQTT_SEND_TIMEOUT_MS milliseconds have gone by since entering this
- * function.
- * OR
- * 3. There is an error in sending data over the network.
- *
- * @return The total number of bytes sent or the error code as received from the
- * transport interface.
- */
- static int32_t sendMessageVector( MQTTContext_t * pContext,
- TransportOutVector_t * pIoVec,
- size_t ioVecCount );
- /**
- * @brief Add a string and its length after serializing it in a manner outlined by
- * the MQTT specification.
- *
- * @param[in] serializedLength Array of two bytes to which the vector will point.
- * The array must remain in scope until the message has been sent.
- * @param[in] string The string to be serialized.
- * @param[in] length The length of the string to be serialized.
- * @param[in] iterator The iterator pointing to the first element in the
- * transport interface IO array.
- * @param[out] updatedLength This parameter will be added to with the number of
- * bytes added to the vector.
- *
- * @return The number of vectors added.
- */
- static size_t addEncodedStringToVector( uint8_t serializedLength[ CORE_MQTT_SERIALIZED_LENGTH_FIELD_BYTES ],
- const char * const string,
- uint16_t length,
- TransportOutVector_t * iterator,
- size_t * updatedLength );
- /**
- * @brief Send MQTT SUBSCRIBE message without copying the user data into a buffer and
- * directly sending it.
- *
- * @param[in] pContext Initialized MQTT context.
- * @param[in] pSubscriptionList List of MQTT subscription info.
- * @param[in] subscriptionCount The count of elements in the list.
- * @param[in] packetId The packet ID of the subscribe packet
- * @param[in] remainingLength The remaining length of the subscribe packet.
- *
- * @return #MQTTSuccess or #MQTTSendFailed.
- */
- static MQTTStatus_t sendSubscribeWithoutCopy( MQTTContext_t * pContext,
- const MQTTSubscribeInfo_t * pSubscriptionList,
- size_t subscriptionCount,
- uint16_t packetId,
- size_t remainingLength );
- /**
- * @brief Send MQTT UNSUBSCRIBE message without copying the user data into a buffer and
- * directly sending it.
- *
- * @param[in] pContext Initialized MQTT context.
- * @param[in] pSubscriptionList MQTT subscription info.
- * @param[in] subscriptionCount The count of elements in the list.
- * @param[in] packetId The packet ID of the unsubscribe packet.
- * @param[in] remainingLength The remaining length of the unsubscribe packet.
- *
- * @return #MQTTSuccess or #MQTTSendFailed.
- */
- static MQTTStatus_t sendUnsubscribeWithoutCopy( MQTTContext_t * pContext,
- const MQTTSubscribeInfo_t * pSubscriptionList,
- size_t subscriptionCount,
- uint16_t packetId,
- size_t remainingLength );
- /**
- * @brief Calculate the interval between two millisecond timestamps, including
- * when the later value has overflowed.
- *
- * @note In C, the operands are promoted to signed integers in subtraction.
- * Using this function avoids the need to cast the result of subtractions back
- * to uint32_t.
- *
- * @param[in] later The later time stamp, in milliseconds.
- * @param[in] start The earlier time stamp, in milliseconds.
- *
- * @return later - start.
- */
- static uint32_t calculateElapsedTime( uint32_t later,
- uint32_t start );
- /**
- * @brief Convert a byte indicating a publish ack type to an #MQTTPubAckType_t.
- *
- * @param[in] packetType First byte of fixed header.
- *
- * @return Type of ack.
- */
- static MQTTPubAckType_t getAckFromPacketType( uint8_t packetType );
- /**
- * @brief Receive bytes into the network buffer.
- *
- * @param[in] pContext Initialized MQTT Context.
- * @param[in] bytesToRecv Number of bytes to receive.
- *
- * @note This operation calls the transport receive function
- * repeatedly to read bytes from the network until either:
- * 1. The requested number of bytes @a bytesToRecv are read.
- * OR
- * 2. No data is received from the network for MQTT_RECV_POLLING_TIMEOUT_MS duration.
- *
- * OR
- * 3. There is an error in reading from the network.
- *
- *
- * @return Number of bytes received, or negative number on network error.
- */
- static int32_t recvExact( MQTTContext_t * pContext,
- size_t bytesToRecv );
- /**
- * @brief Discard a packet from the transport interface.
- *
- * @param[in] pContext MQTT Connection context.
- * @param[in] remainingLength Remaining length of the packet to dump.
- * @param[in] timeoutMs Time remaining to discard the packet.
- *
- * @return #MQTTRecvFailed or #MQTTNoDataAvailable.
- */
- static MQTTStatus_t discardPacket( MQTTContext_t * pContext,
- size_t remainingLength,
- uint32_t timeoutMs );
- /**
- * @brief Discard a packet from the MQTT buffer and the transport interface.
- *
- * @param[in] pContext MQTT Connection context.
- * @param[in] pPacketInfo Information struct of the packet to be discarded.
- *
- * @return #MQTTRecvFailed or #MQTTNoDataAvailable.
- */
- static MQTTStatus_t discardStoredPacket( MQTTContext_t * pContext,
- const MQTTPacketInfo_t * pPacketInfo );
- /**
- * @brief Receive a packet from the transport interface.
- *
- * @param[in] pContext MQTT Connection context.
- * @param[in] incomingPacket packet struct with remaining length.
- * @param[in] remainingTimeMs Time remaining to receive the packet.
- *
- * @return #MQTTSuccess or #MQTTRecvFailed.
- */
- static MQTTStatus_t receivePacket( MQTTContext_t * pContext,
- MQTTPacketInfo_t incomingPacket,
- uint32_t remainingTimeMs );
- /**
- * @brief Get the correct ack type to send.
- *
- * @param[in] state Current state of publish.
- *
- * @return Packet Type byte of PUBACK, PUBREC, PUBREL, or PUBCOMP if one of
- * those should be sent, else 0.
- */
- static uint8_t getAckTypeToSend( MQTTPublishState_t state );
- /**
- * @brief Send acks for received QoS 1/2 publishes.
- *
- * @param[in] pContext MQTT Connection context.
- * @param[in] packetId packet ID of original PUBLISH.
- * @param[in] publishState Current publish state in record.
- *
- * @return #MQTTSuccess, #MQTTIllegalState or #MQTTSendFailed.
- */
- static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
- uint16_t packetId,
- MQTTPublishState_t publishState );
- /**
- * @brief Send a keep alive PINGREQ if the keep alive interval has elapsed.
- *
- * @param[in] pContext Initialized MQTT Context.
- *
- * @return #MQTTKeepAliveTimeout if a PINGRESP is not received in time,
- * #MQTTSendFailed if the PINGREQ cannot be sent, or #MQTTSuccess.
- */
- static MQTTStatus_t handleKeepAlive( MQTTContext_t * pContext );
- /**
- * @brief Handle received MQTT PUBLISH packet.
- *
- * @param[in] pContext MQTT Connection context.
- * @param[in] pIncomingPacket Incoming packet.
- *
- * @return MQTTSuccess, MQTTIllegalState or deserialization error.
- */
- static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext,
- MQTTPacketInfo_t * pIncomingPacket );
- /**
- * @brief Handle received MQTT publish acks.
- *
- * @param[in] pContext MQTT Connection context.
- * @param[in] pIncomingPacket Incoming packet.
- *
- * @return MQTTSuccess, MQTTIllegalState, or deserialization error.
- */
- static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext,
- MQTTPacketInfo_t * pIncomingPacket );
- /**
- * @brief Handle received MQTT ack.
- *
- * @param[in] pContext MQTT Connection context.
- * @param[in] pIncomingPacket Incoming packet.
- * @param[in] manageKeepAlive Flag indicating if PINGRESPs should not be given
- * to the application
- *
- * @return MQTTSuccess, MQTTIllegalState, or deserialization error.
- */
- static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext,
- MQTTPacketInfo_t * pIncomingPacket,
- bool manageKeepAlive );
- /**
- * @brief Run a single iteration of the receive loop.
- *
- * @param[in] pContext MQTT Connection context.
- * @param[in] manageKeepAlive Flag indicating if keep alive should be handled.
- *
- * @return #MQTTRecvFailed if a network error occurs during reception;
- * #MQTTSendFailed if a network error occurs while sending an ACK or PINGREQ;
- * #MQTTBadResponse if an invalid packet is received;
- * #MQTTKeepAliveTimeout if the server has not sent a PINGRESP before
- * #MQTT_PINGRESP_TIMEOUT_MS milliseconds;
- * #MQTTIllegalState if an incoming QoS 1/2 publish or ack causes an
- * invalid transition for the internal state machine;
- * #MQTTSuccess on success.
- */
- static MQTTStatus_t receiveSingleIteration( MQTTContext_t * pContext,
- bool manageKeepAlive );
- /**
- * @brief Validates parameters of #MQTT_Subscribe or #MQTT_Unsubscribe.
- *
- * @param[in] pContext Initialized MQTT context.
- * @param[in] pSubscriptionList List of MQTT subscription info.
- * @param[in] subscriptionCount The number of elements in pSubscriptionList.
- * @param[in] packetId Packet identifier.
- *
- * @return #MQTTBadParameter if invalid parameters are passed;
- * #MQTTSuccess otherwise.
- */
- static MQTTStatus_t validateSubscribeUnsubscribeParams( const MQTTContext_t * pContext,
- const MQTTSubscribeInfo_t * pSubscriptionList,
- size_t subscriptionCount,
- uint16_t packetId );
- /**
- * @brief Receives a CONNACK MQTT packet.
- *
- * @param[in] pContext Initialized MQTT context.
- * @param[in] timeoutMs Timeout for waiting for CONNACK packet.
- * @param[in] cleanSession Clean session flag set by application.
- * @param[out] pIncomingPacket List of MQTT subscription info.
- * @param[out] pSessionPresent Whether a previous session was present.
- * Only relevant if not establishing a clean session.
- *
- * @return #MQTTBadResponse if a bad response is received;
- * #MQTTNoDataAvailable if no data available for transport recv;
- * ##MQTTRecvFailed if transport recv failed;
- * #MQTTSuccess otherwise.
- */
- static MQTTStatus_t receiveConnack( MQTTContext_t * pContext,
- uint32_t timeoutMs,
- bool cleanSession,
- MQTTPacketInfo_t * pIncomingPacket,
- bool * pSessionPresent );
- /**
- * @brief Resends pending acks for a re-established MQTT session
- *
- * @param[in] pContext Initialized MQTT context.
- *
- * @return #MQTTSendFailed if transport send during resend failed;
- * #MQTTSuccess otherwise.
- */
- static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext );
- /**
- * @brief Clears existing state records for a clean session.
- *
- * @param[in] pContext Initialized MQTT context.
- *
- * @return #MQTTSuccess always otherwise.
- */
- static MQTTStatus_t handleCleanSession( MQTTContext_t * pContext );
- /**
- * @brief Send the publish packet without copying the topic string and payload in
- * the buffer.
- *
- * @brief param[in] pContext Initialized MQTT context.
- * @brief param[in] pPublishInfo MQTT PUBLISH packet parameters.
- * @brief param[in] pMqttHeader the serialized MQTT header with the header byte;
- * the encoded length of the packet; and the encoded length of the topic string.
- * @brief param[in] headerSize Size of the serialized PUBLISH header.
- * @brief param[in] packetId Packet Id of the publish packet.
- *
- * @return #MQTTSendFailed if transport send during resend failed;
- * #MQTTSuccess otherwise.
- */
- static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext,
- const MQTTPublishInfo_t * pPublishInfo,
- uint8_t * pMqttHeader,
- size_t headerSize,
- uint16_t packetId );
- /**
- * @brief Function to validate #MQTT_Publish parameters.
- *
- * @brief param[in] pContext Initialized MQTT context.
- * @brief param[in] pPublishInfo MQTT PUBLISH packet parameters.
- * @brief param[in] packetId Packet Id for the MQTT PUBLISH packet.
- *
- * @return #MQTTBadParameter if invalid parameters are passed;
- * #MQTTSuccess otherwise.
- */
- static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext,
- const MQTTPublishInfo_t * pPublishInfo,
- uint16_t packetId );
- /**
- * @brief Performs matching for special cases when a topic filter ends
- * with a wildcard character.
- *
- * When the topic name has been consumed but there are remaining characters to
- * to match in topic filter, this function handles the following 2 cases:
- * - When the topic filter ends with "/+" or "/#" characters, but the topic
- * name only ends with '/'.
- * - When the topic filter ends with "/#" characters, but the topic name
- * ends at the parent level.
- *
- * @note This function ASSUMES that the topic name been consumed in linear
- * matching with the topic filer, but the topic filter has remaining characters
- * to be matched.
- *
- * @param[in] pTopicFilter The topic filter containing the wildcard.
- * @param[in] topicFilterLength Length of the topic filter being examined.
- * @param[in] filterIndex Index of the topic filter being examined.
- *
- * @return Returns whether the topic filter and the topic name match.
- */
- static bool matchEndWildcardsSpecialCases( const char * pTopicFilter,
- uint16_t topicFilterLength,
- uint16_t filterIndex );
- /**
- * @brief Attempt to match topic name with a topic filter starting with a wildcard.
- *
- * If the topic filter starts with a '+' (single-level) wildcard, the function
- * advances the @a pNameIndex by a level in the topic name.
- * If the topic filter starts with a '#' (multi-level) wildcard, the function
- * concludes that both the topic name and topic filter match.
- *
- * @param[in] pTopicName The topic name to match.
- * @param[in] topicNameLength Length of the topic name.
- * @param[in] pTopicFilter The topic filter to match.
- * @param[in] topicFilterLength Length of the topic filter.
- * @param[in,out] pNameIndex Current index in the topic name being examined. It is
- * advanced by one level for `+` wildcards.
- * @param[in, out] pFilterIndex Current index in the topic filter being examined.
- * It is advanced to position of '/' level separator for '+' wildcard.
- * @param[out] pMatch Whether the topic filter and topic name match.
- *
- * @return `true` if the caller of this function should exit; `false` if the
- * caller should continue parsing the topics.
- */
- static bool matchWildcards( const char * pTopicName,
- uint16_t topicNameLength,
- const char * pTopicFilter,
- uint16_t topicFilterLength,
- uint16_t * pNameIndex,
- uint16_t * pFilterIndex,
- bool * pMatch );
- /**
- * @brief Match a topic name and topic filter allowing the use of wildcards.
- *
- * @param[in] pTopicName The topic name to check.
- * @param[in] topicNameLength Length of the topic name.
- * @param[in] pTopicFilter The topic filter to check.
- * @param[in] topicFilterLength Length of topic filter.
- *
- * @return `true` if the topic name and topic filter match; `false` otherwise.
- */
- static bool matchTopicFilter( const char * pTopicName,
- uint16_t topicNameLength,
- const char * pTopicFilter,
- uint16_t topicFilterLength );
- /*-----------------------------------------------------------*/
- static bool matchEndWildcardsSpecialCases( const char * pTopicFilter,
- uint16_t topicFilterLength,
- uint16_t filterIndex )
- {
- bool matchFound = false;
- assert( pTopicFilter != NULL );
- assert( topicFilterLength != 0U );
- /* Check if the topic filter has 2 remaining characters and it ends in
- * "/#". This check handles the case to match filter "sport/#" with topic
- * "sport". The reason is that the '#' wildcard represents the parent and
- * any number of child levels in the topic name.*/
- if( ( topicFilterLength >= 3U ) &&
- ( filterIndex == ( topicFilterLength - 3U ) ) &&
- ( pTopicFilter[ filterIndex + 1U ] == '/' ) &&
- ( pTopicFilter[ filterIndex + 2U ] == '#' ) )
- {
- matchFound = true;
- }
- /* Check if the next character is "#" or "+" and the topic filter ends in
- * "/#" or "/+". This check handles the cases to match:
- *
- * - Topic filter "sport/+" with topic "sport/".
- * - Topic filter "sport/#" with topic "sport/".
- */
- if( ( filterIndex == ( topicFilterLength - 2U ) ) &&
- ( pTopicFilter[ filterIndex ] == '/' ) )
- {
- /* Check that the last character is a wildcard. */
- matchFound = ( pTopicFilter[ filterIndex + 1U ] == '+' ) ||
- ( pTopicFilter[ filterIndex + 1U ] == '#' );
- }
- return matchFound;
- }
- /*-----------------------------------------------------------*/
- static bool matchWildcards( const char * pTopicName,
- uint16_t topicNameLength,
- const char * pTopicFilter,
- uint16_t topicFilterLength,
- uint16_t * pNameIndex,
- uint16_t * pFilterIndex,
- bool * pMatch )
- {
- bool shouldStopMatching = false;
- bool locationIsValidForWildcard;
- uint16_t nameIndex;
- assert( pTopicName != NULL );
- assert( topicNameLength != 0U );
- assert( pTopicFilter != NULL );
- assert( topicFilterLength != 0U );
- assert( pNameIndex != NULL );
- assert( pFilterIndex != NULL );
- assert( pMatch != NULL );
- nameIndex = *pNameIndex;
- /* Wild card in a topic filter is only valid either at the starting position
- * or when it is preceded by a '/'.*/
- locationIsValidForWildcard = ( *pFilterIndex == 0u ) ||
- ( pTopicFilter[ *pFilterIndex - 1U ] == '/' );
- if( ( pTopicFilter[ *pFilterIndex ] == '+' ) && ( locationIsValidForWildcard == true ) )
- {
- bool nextLevelExistsInTopicName = false;
- bool nextLevelExistsinTopicFilter = false;
- /* Move topic name index to the end of the current level. The end of the
- * current level is identified by the last character before the next level
- * separator '/'. */
- while( nameIndex < topicNameLength )
- {
- /* Exit the loop if we hit the level separator. */
- if( pTopicName[ nameIndex ] == '/' )
- {
- nextLevelExistsInTopicName = true;
- break;
- }
- nameIndex += 1;
- }
- /* Determine if the topic filter contains a child level after the current level
- * represented by the '+' wildcard. */
- if( ( *pFilterIndex < ( topicFilterLength - 1U ) ) &&
- ( pTopicFilter[ *pFilterIndex + 1U ] == '/' ) )
- {
- nextLevelExistsinTopicFilter = true;
- }
- /* If the topic name contains a child level but the topic filter ends at
- * the current level, then there does not exist a match. */
- if( ( nextLevelExistsInTopicName == true ) &&
- ( nextLevelExistsinTopicFilter == false ) )
- {
- *pMatch = false;
- shouldStopMatching = true;
- }
- /* If the topic name and topic filter have child levels, then advance the
- * filter index to the level separator in the topic filter, so that match
- * can be performed in the next level.
- * Note: The name index already points to the level separator in the topic
- * name. */
- else if( nextLevelExistsInTopicName == true )
- {
- ( *pFilterIndex )++;
- }
- else
- {
- /* If we have reached here, the the loop terminated on the
- * ( nameIndex < topicNameLength) condition, which means that have
- * reached past the end of the topic name, and thus, we decrement the
- * index to the last character in the topic name.*/
- /* coverity[integer_overflow] */
- nameIndex -= 1;
- }
- }
- /* '#' matches everything remaining in the topic name. It must be the
- * last character in a topic filter. */
- else if( ( pTopicFilter[ *pFilterIndex ] == '#' ) &&
- ( *pFilterIndex == ( topicFilterLength - 1U ) ) &&
- ( locationIsValidForWildcard == true ) )
- {
- /* Subsequent characters don't need to be checked for the
- * multi-level wildcard. */
- *pMatch = true;
- shouldStopMatching = true;
- }
- else
- {
- /* Any character mismatch other than '+' or '#' means the topic
- * name does not match the topic filter. */
- *pMatch = false;
- shouldStopMatching = true;
- }
- *pNameIndex = nameIndex;
- return shouldStopMatching;
- }
- /*-----------------------------------------------------------*/
- static bool matchTopicFilter( const char * pTopicName,
- uint16_t topicNameLength,
- const char * pTopicFilter,
- uint16_t topicFilterLength )
- {
- bool matchFound = false, shouldStopMatching = false;
- uint16_t nameIndex = 0, filterIndex = 0;
- assert( pTopicName != NULL );
- assert( topicNameLength != 0 );
- assert( pTopicFilter != NULL );
- assert( topicFilterLength != 0 );
- while( ( nameIndex < topicNameLength ) && ( filterIndex < topicFilterLength ) )
- {
- /* Check if the character in the topic name matches the corresponding
- * character in the topic filter string. */
- if( pTopicName[ nameIndex ] == pTopicFilter[ filterIndex ] )
- {
- /* If the topic name has been consumed but the topic filter has not
- * been consumed, match for special cases when the topic filter ends
- * with wildcard character. */
- if( nameIndex == ( topicNameLength - 1U ) )
- {
- matchFound = matchEndWildcardsSpecialCases( pTopicFilter,
- topicFilterLength,
- filterIndex );
- }
- }
- else
- {
- /* Check for matching wildcards. */
- shouldStopMatching = matchWildcards( pTopicName,
- topicNameLength,
- pTopicFilter,
- topicFilterLength,
- &nameIndex,
- &filterIndex,
- &matchFound );
- }
- if( ( matchFound == true ) || ( shouldStopMatching == true ) )
- {
- break;
- }
- /* Increment indexes. */
- nameIndex++;
- filterIndex++;
- }
- if( matchFound == false )
- {
- /* If the end of both strings has been reached, they match. This represents the
- * case when the topic filter contains the '+' wildcard at a non-starting position.
- * For example, when matching either of "sport/+/player" OR "sport/hockey/+" topic
- * filters with "sport/hockey/player" topic name. */
- matchFound = ( nameIndex == topicNameLength ) &&
- ( filterIndex == topicFilterLength );
- }
- return matchFound;
- }
- /*-----------------------------------------------------------*/
- static int32_t sendMessageVector( MQTTContext_t * pContext,
- TransportOutVector_t * pIoVec,
- size_t ioVecCount )
- {
- int32_t sendResult;
- uint32_t startTime;
- TransportOutVector_t * pIoVectIterator;
- size_t vectorsToBeSent = ioVecCount;
- size_t bytesToSend = 0U;
- int32_t bytesSentOrError = 0;
- assert( pContext != NULL );
- assert( pIoVec != NULL );
- assert( pContext->getTime != NULL );
- /* Send must always be defined */
- assert( pContext->transportInterface.send != NULL );
- /* Count the total number of bytes to be sent as outlined in the vector. */
- for( pIoVectIterator = pIoVec; pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ); pIoVectIterator++ )
- {
- bytesToSend += pIoVectIterator->iov_len;
- }
- /* Reset the iterator to point to the first entry in the array. */
- pIoVectIterator = pIoVec;
- /* Note the start time. */
- startTime = pContext->getTime();
- while( ( bytesSentOrError < ( int32_t ) bytesToSend ) && ( bytesSentOrError >= 0 ) )
- {
- if( pContext->transportInterface.writev != NULL )
- {
- sendResult = pContext->transportInterface.writev( pContext->transportInterface.pNetworkContext,
- pIoVectIterator,
- vectorsToBeSent );
- }
- else
- {
- sendResult = pContext->transportInterface.send( pContext->transportInterface.pNetworkContext,
- pIoVectIterator->iov_base,
- pIoVectIterator->iov_len );
- }
- if( sendResult > 0 )
- {
- /* It is a bug in the application's transport send implementation if
- * more bytes than expected are sent. */
- assert( sendResult <= ( ( int32_t ) bytesToSend - bytesSentOrError ) );
- bytesSentOrError += sendResult;
- /* Set last transmission time. */
- pContext->lastPacketTxTime = pContext->getTime();
- LogDebug( ( "sendMessageVector: Bytes Sent=%ld, Bytes Remaining=%lu",
- ( long int ) sendResult,
- ( unsigned long ) ( bytesToSend - ( size_t ) bytesSentOrError ) ) );
- }
- else if( sendResult < 0 )
- {
- bytesSentOrError = sendResult;
- LogError( ( "sendMessageVector: Unable to send packet: Network Error." ) );
- if( pContext->connectStatus == MQTTConnected )
- {
- pContext->connectStatus = MQTTDisconnectPending;
- }
- }
- else
- {
- /* MISRA Empty body */
- }
- /* Check for timeout. */
- if( calculateElapsedTime( pContext->getTime(), startTime ) > MQTT_SEND_TIMEOUT_MS )
- {
- LogError( ( "sendMessageVector: Unable to send packet: Timed out." ) );
- break;
- }
- /* Update the send pointer to the correct vector and offset. */
- while( ( pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ) ) &&
- ( sendResult >= ( int32_t ) pIoVectIterator->iov_len ) )
- {
- sendResult -= ( int32_t ) pIoVectIterator->iov_len;
- pIoVectIterator++;
- /* Update the number of vector which are yet to be sent. */
- vectorsToBeSent--;
- }
- /* Some of the bytes from this vector were sent as well, update the length
- * and the pointer to data in this vector. */
- if( ( sendResult > 0 ) &&
- ( pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ) ) )
- {
- pIoVectIterator->iov_base = ( const void * ) &( ( ( const uint8_t * ) pIoVectIterator->iov_base )[ sendResult ] );
- pIoVectIterator->iov_len -= ( size_t ) sendResult;
- }
- }
- return bytesSentOrError;
- }
- static int32_t sendBuffer( MQTTContext_t * pContext,
- const uint8_t * pBufferToSend,
- size_t bytesToSend )
- {
- int32_t sendResult;
- uint32_t startTime;
- int32_t bytesSentOrError = 0;
- const uint8_t * pIndex = pBufferToSend;
- assert( pContext != NULL );
- assert( pContext->getTime != NULL );
- assert( pContext->transportInterface.send != NULL );
- assert( pIndex != NULL );
- /* Set the timeout. */
- startTime = pContext->getTime();
- while( ( bytesSentOrError < ( int32_t ) bytesToSend ) && ( bytesSentOrError >= 0 ) )
- {
- sendResult = pContext->transportInterface.send( pContext->transportInterface.pNetworkContext,
- pIndex,
- bytesToSend - ( size_t ) bytesSentOrError );
- if( sendResult > 0 )
- {
- /* It is a bug in the application's transport send implementation if
- * more bytes than expected are sent. */
- assert( sendResult <= ( ( int32_t ) bytesToSend - bytesSentOrError ) );
- bytesSentOrError += sendResult;
- pIndex = &pIndex[ sendResult ];
- /* Set last transmission time. */
- pContext->lastPacketTxTime = pContext->getTime();
- LogDebug( ( "sendBuffer: Bytes Sent=%ld, Bytes Remaining=%lu",
- ( long int ) sendResult,
- ( unsigned long ) ( bytesToSend - ( size_t ) bytesSentOrError ) ) );
- }
- else if( sendResult < 0 )
- {
- bytesSentOrError = sendResult;
- LogError( ( "sendBuffer: Unable to send packet: Network Error." ) );
- if( pContext->connectStatus == MQTTConnected )
- {
- pContext->connectStatus = MQTTDisconnectPending;
- }
- }
- else
- {
- /* MISRA Empty body */
- }
- /* Check for timeout. */
- if( calculateElapsedTime( pContext->getTime(), startTime ) >= ( MQTT_SEND_TIMEOUT_MS ) )
- {
- LogError( ( "sendBuffer: Unable to send packet: Timed out." ) );
- break;
- }
- }
- return bytesSentOrError;
- }
- /*-----------------------------------------------------------*/
- static uint32_t calculateElapsedTime( uint32_t later,
- uint32_t start )
- {
- return later - start;
- }
- /*-----------------------------------------------------------*/
- static MQTTPubAckType_t getAckFromPacketType( uint8_t packetType )
- {
- MQTTPubAckType_t ackType = MQTTPuback;
- switch( packetType )
- {
- case MQTT_PACKET_TYPE_PUBACK:
- ackType = MQTTPuback;
- break;
- case MQTT_PACKET_TYPE_PUBREC:
- ackType = MQTTPubrec;
- break;
- case MQTT_PACKET_TYPE_PUBREL:
- ackType = MQTTPubrel;
- break;
- default:
- /* This function is only called after checking the type is one of
- * the above four values, so packet type must be PUBCOMP here. */
- assert( packetType == MQTT_PACKET_TYPE_PUBCOMP );
- ackType = MQTTPubcomp;
- break;
- }
- return ackType;
- }
- /*-----------------------------------------------------------*/
- static int32_t recvExact( MQTTContext_t * pContext,
- size_t bytesToRecv )
- {
- uint8_t * pIndex = NULL;
- size_t bytesRemaining = bytesToRecv;
- int32_t totalBytesRecvd = 0, bytesRecvd;
- uint32_t lastDataRecvTimeMs = 0U, timeSinceLastRecvMs = 0U;
- TransportRecv_t recvFunc = NULL;
- MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL;
- bool receiveError = false;
- assert( pContext != NULL );
- assert( bytesToRecv <= pContext->networkBuffer.size );
- assert( pContext->getTime != NULL );
- assert( pContext->transportInterface.recv != NULL );
- assert( pContext->networkBuffer.pBuffer != NULL );
- pIndex = pContext->networkBuffer.pBuffer;
- recvFunc = pContext->transportInterface.recv;
- getTimeStampMs = pContext->getTime;
- /* Part of the MQTT packet has been read before calling this function. */
- lastDataRecvTimeMs = getTimeStampMs();
- while( ( bytesRemaining > 0U ) && ( receiveError == false ) )
- {
- bytesRecvd = recvFunc( pContext->transportInterface.pNetworkContext,
- pIndex,
- bytesRemaining );
- if( bytesRecvd < 0 )
- {
- LogError( ( "Network error while receiving packet: ReturnCode=%ld.",
- ( long int ) bytesRecvd ) );
- totalBytesRecvd = bytesRecvd;
- receiveError = true;
- MQTT_PRE_STATE_UPDATE_HOOK( pContext );
- if( pContext->connectStatus == MQTTConnected )
- {
- pContext->connectStatus = MQTTDisconnectPending;
- }
- MQTT_POST_STATE_UPDATE_HOOK( pContext );
- }
- else if( bytesRecvd > 0 )
- {
- /* Reset the starting time as we have received some data from the network. */
- lastDataRecvTimeMs = getTimeStampMs();
- /* It is a bug in the application's transport receive implementation
- * if more bytes than expected are received. To avoid a possible
- * overflow in converting bytesRemaining from unsigned to signed,
- * this assert must exist after the check for bytesRecvd being
- * negative. */
- assert( ( size_t ) bytesRecvd <= bytesRemaining );
- bytesRemaining -= ( size_t ) bytesRecvd;
- totalBytesRecvd += ( int32_t ) bytesRecvd;
- /* Increment the index. */
- pIndex = &pIndex[ bytesRecvd ];
- LogDebug( ( "BytesReceived=%ld, BytesRemaining=%lu, TotalBytesReceived=%ld.",
- ( long int ) bytesRecvd,
- ( unsigned long ) bytesRemaining,
- ( long int ) totalBytesRecvd ) );
- }
- else
- {
- /* No bytes were read from the network. */
- timeSinceLastRecvMs = calculateElapsedTime( getTimeStampMs(), lastDataRecvTimeMs );
- /* Check for timeout if we have been waiting to receive any byte on the network. */
- if( timeSinceLastRecvMs >= MQTT_RECV_POLLING_TIMEOUT_MS )
- {
- LogError( ( "Unable to receive packet: Timed out in transport recv." ) );
- receiveError = true;
- }
- }
- }
- return totalBytesRecvd;
- }
- /*-----------------------------------------------------------*/
- static MQTTStatus_t discardPacket( MQTTContext_t * pContext,
- size_t remainingLength,
- uint32_t timeoutMs )
- {
- MQTTStatus_t status = MQTTRecvFailed;
- int32_t bytesReceived = 0;
- size_t bytesToReceive = 0U;
- uint32_t totalBytesReceived = 0U;
- uint32_t entryTimeMs = 0U;
- uint32_t elapsedTimeMs = 0U;
- MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL;
- bool receiveError = false;
- assert( pContext != NULL );
- assert( pContext->getTime != NULL );
- bytesToReceive = pContext->networkBuffer.size;
- getTimeStampMs = pContext->getTime;
- entryTimeMs = getTimeStampMs();
- while( ( totalBytesReceived < remainingLength ) && ( receiveError == false ) )
- {
- if( ( remainingLength - totalBytesReceived ) < bytesToReceive )
- {
- bytesToReceive = remainingLength - totalBytesReceived;
- }
- bytesReceived = recvExact( pContext, bytesToReceive );
- if( bytesReceived != ( int32_t ) bytesToReceive )
- {
- LogError( ( "Receive error while discarding packet."
- "ReceivedBytes=%ld, ExpectedBytes=%lu.",
- ( long int ) bytesReceived,
- ( unsigned long ) bytesToReceive ) );
- receiveError = true;
- }
- else
- {
- totalBytesReceived += ( uint32_t ) bytesReceived;
- elapsedTimeMs = calculateElapsedTime( getTimeStampMs(), entryTimeMs );
- /* Check for timeout. */
- if( elapsedTimeMs >= timeoutMs )
- {
- LogError( ( "Time expired while discarding packet." ) );
- receiveError = true;
- }
- }
- }
- if( totalBytesReceived == remainingLength )
- {
- LogError( ( "Dumped packet. DumpedBytes=%lu.",
- ( unsigned long ) totalBytesReceived ) );
- /* Packet dumped, so no data is available. */
- status = MQTTNoDataAvailable;
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- static MQTTStatus_t discardStoredPacket( MQTTContext_t * pContext,
- const MQTTPacketInfo_t * pPacketInfo )
- {
- MQTTStatus_t status = MQTTRecvFailed;
- int32_t bytesReceived = 0;
- size_t bytesToReceive = 0U;
- uint32_t totalBytesReceived = 0U;
- bool receiveError = false;
- size_t mqttPacketSize = 0;
- size_t remainingLength;
- assert( pContext != NULL );
- assert( pPacketInfo != NULL );
- mqttPacketSize = pPacketInfo->remainingLength + pPacketInfo->headerLength;
- /* Assert that the packet being discarded is bigger than the
- * receive buffer. */
- assert( mqttPacketSize > pContext->networkBuffer.size );
- /* Discard these many bytes at a time. */
- bytesToReceive = pContext->networkBuffer.size;
- /* Number of bytes depicted by 'index' have already been received. */
- remainingLength = mqttPacketSize - pContext->index;
- while( ( totalBytesReceived < remainingLength ) && ( receiveError == false ) )
- {
- if( ( remainingLength - totalBytesReceived ) < bytesToReceive )
- {
- bytesToReceive = remainingLength - totalBytesReceived;
- }
- bytesReceived = recvExact( pContext, bytesToReceive );
- if( bytesReceived != ( int32_t ) bytesToReceive )
- {
- LogError( ( "Receive error while discarding packet."
- "ReceivedBytes=%ld, ExpectedBytes=%lu.",
- ( long int ) bytesReceived,
- ( unsigned long ) bytesToReceive ) );
- receiveError = true;
- }
- else
- {
- totalBytesReceived += ( uint32_t ) bytesReceived;
- }
- }
- if( totalBytesReceived == remainingLength )
- {
- LogError( ( "Dumped packet. DumpedBytes=%lu.",
- ( unsigned long ) totalBytesReceived ) );
- /* Packet dumped, so no data is available. */
- status = MQTTNoDataAvailable;
- }
- /* Clear the buffer */
- ( void ) memset( pContext->networkBuffer.pBuffer,
- 0,
- pContext->networkBuffer.size );
- /* Reset the index. */
- pContext->index = 0;
- return status;
- }
- /*-----------------------------------------------------------*/
- static MQTTStatus_t receivePacket( MQTTContext_t * pContext,
- MQTTPacketInfo_t incomingPacket,
- uint32_t remainingTimeMs )
- {
- MQTTStatus_t status = MQTTSuccess;
- int32_t bytesReceived = 0;
- size_t bytesToReceive = 0U;
- assert( pContext != NULL );
- assert( pContext->networkBuffer.pBuffer != NULL );
- if( incomingPacket.remainingLength > pContext->networkBuffer.size )
- {
- LogError( ( "Incoming packet will be dumped: "
- "Packet length exceeds network buffer size."
- "PacketSize=%lu, NetworkBufferSize=%lu.",
- ( unsigned long ) incomingPacket.remainingLength,
- ( unsigned long ) pContext->networkBuffer.size ) );
- status = discardPacket( pContext,
- incomingPacket.remainingLength,
- remainingTimeMs );
- }
- else
- {
- bytesToReceive = incomingPacket.remainingLength;
- bytesReceived = recvExact( pContext, bytesToReceive );
- if( bytesReceived == ( int32_t ) bytesToReceive )
- {
- /* Receive successful, bytesReceived == bytesToReceive. */
- LogDebug( ( "Packet received. ReceivedBytes=%ld.",
- ( long int ) bytesReceived ) );
- }
- else
- {
- LogError( ( "Packet reception failed. ReceivedBytes=%ld, "
- "ExpectedBytes=%lu.",
- ( long int ) bytesReceived,
- ( unsigned long ) bytesToReceive ) );
- status = MQTTRecvFailed;
- }
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- static uint8_t getAckTypeToSend( MQTTPublishState_t state )
- {
- uint8_t packetTypeByte = 0U;
- switch( state )
- {
- case MQTTPubAckSend:
- packetTypeByte = MQTT_PACKET_TYPE_PUBACK;
- break;
- case MQTTPubRecSend:
- packetTypeByte = MQTT_PACKET_TYPE_PUBREC;
- break;
- case MQTTPubRelSend:
- packetTypeByte = MQTT_PACKET_TYPE_PUBREL;
- break;
- case MQTTPubCompSend:
- packetTypeByte = MQTT_PACKET_TYPE_PUBCOMP;
- break;
- default:
- /* Take no action for states that do not require sending an ack. */
- break;
- }
- return packetTypeByte;
- }
- /*-----------------------------------------------------------*/
- static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
- uint16_t packetId,
- MQTTPublishState_t publishState )
- {
- MQTTStatus_t status = MQTTSuccess;
- MQTTPublishState_t newState = MQTTStateNull;
- int32_t sendResult = 0;
- uint8_t packetTypeByte = 0U;
- MQTTPubAckType_t packetType;
- MQTTFixedBuffer_t localBuffer;
- MQTTConnectionStatus_t connectStatus;
- uint8_t pubAckPacket[ MQTT_PUBLISH_ACK_PACKET_SIZE ];
- localBuffer.pBuffer = pubAckPacket;
- localBuffer.size = MQTT_PUBLISH_ACK_PACKET_SIZE;
- assert( pContext != NULL );
- packetTypeByte = getAckTypeToSend( publishState );
- if( packetTypeByte != 0U )
- {
- packetType = getAckFromPacketType( packetTypeByte );
- status = MQTT_SerializeAck( &localBuffer,
- packetTypeByte,
- packetId );
- if( status == MQTTSuccess )
- {
- MQTT_PRE_STATE_UPDATE_HOOK( pContext );
- connectStatus = pContext->connectStatus;
- if( connectStatus != MQTTConnected )
- {
- status = ( connectStatus == MQTTNotConnected ) ? MQTTStatusNotConnected : MQTTStatusDisconnectPending;
- }
- if( status == MQTTSuccess )
- {
- /* Here, we are not using the vector approach for efficiency. There is just one buffer
- * to be sent which can be achieved with a normal send call. */
- sendResult = sendBuffer( pContext,
- localBuffer.pBuffer,
- MQTT_PUBLISH_ACK_PACKET_SIZE );
- if( sendResult < ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE )
- {
- status = MQTTSendFailed;
- }
- }
- MQTT_POST_STATE_UPDATE_HOOK( pContext );
- }
- if( status == MQTTSuccess )
- {
- pContext->controlPacketSent = true;
- MQTT_PRE_STATE_UPDATE_HOOK( pContext );
- status = MQTT_UpdateStateAck( pContext,
- packetId,
- packetType,
- MQTT_SEND,
- &newState );
- MQTT_POST_STATE_UPDATE_HOOK( pContext );
- if( status != MQTTSuccess )
- {
- LogError( ( "Failed to update state of publish %hu.",
- ( unsigned short ) packetId ) );
- }
- }
- else
- {
- LogError( ( "Failed to send ACK packet: PacketType=%02x, SentBytes=%ld, "
- "PacketSize=%lu.",
- ( unsigned int ) packetTypeByte, ( long int ) sendResult,
- MQTT_PUBLISH_ACK_PACKET_SIZE ) );
- }
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- static MQTTStatus_t handleKeepAlive( MQTTContext_t * pContext )
- {
- MQTTStatus_t status = MQTTSuccess;
- uint32_t now = 0U;
- uint32_t packetTxTimeoutMs = 0U;
- uint32_t lastPacketTxTime = 0U;
- assert( pContext != NULL );
- assert( pContext->getTime != NULL );
- now = pContext->getTime();
- packetTxTimeoutMs = 1000U * ( uint32_t ) pContext->keepAliveIntervalSec;
- if( PACKET_TX_TIMEOUT_MS < packetTxTimeoutMs )
- {
- packetTxTimeoutMs = PACKET_TX_TIMEOUT_MS;
- }
- /* If keep alive interval is 0, it is disabled. */
- if( pContext->waitingForPingResp == true )
- {
- /* Has time expired? */
- if( calculateElapsedTime( now, pContext->pingReqSendTimeMs ) >
- MQTT_PINGRESP_TIMEOUT_MS )
- {
- status = MQTTKeepAliveTimeout;
- }
- }
- else
- {
- MQTT_PRE_STATE_UPDATE_HOOK( pContext );
- lastPacketTxTime = pContext->lastPacketTxTime;
- MQTT_POST_STATE_UPDATE_HOOK( pContext );
- if( ( packetTxTimeoutMs != 0U ) && ( calculateElapsedTime( now, lastPacketTxTime ) >= packetTxTimeoutMs ) )
- {
- status = MQTT_Ping( pContext );
- }
- else
- {
- const uint32_t timeElapsed = calculateElapsedTime( now, pContext->lastPacketRxTime );
- if( ( timeElapsed != 0U ) && ( timeElapsed >= PACKET_RX_TIMEOUT_MS ) )
- {
- status = MQTT_Ping( pContext );
- }
- }
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext,
- MQTTPacketInfo_t * pIncomingPacket )
- {
- MQTTStatus_t status;
- MQTTPublishState_t publishRecordState = MQTTStateNull;
- uint16_t packetIdentifier = 0U;
- MQTTPublishInfo_t publishInfo;
- MQTTDeserializedInfo_t deserializedInfo;
- bool duplicatePublish = false;
- assert( pContext != NULL );
- assert( pIncomingPacket != NULL );
- assert( pContext->appCallback != NULL );
- status = MQTT_DeserializePublish( pIncomingPacket, &packetIdentifier, &publishInfo );
- LogInfo( ( "De-serialized incoming PUBLISH packet: DeserializerResult=%s.",
- MQTT_Status_strerror( status ) ) );
- if( ( status == MQTTSuccess ) &&
- ( pContext->incomingPublishRecords == NULL ) &&
- ( publishInfo.qos > MQTTQoS0 ) )
- {
- LogError( ( "Incoming publish has QoS > MQTTQoS0 but incoming "
- "publish records have not been initialized. Dropping the "
- "incoming publish. Please call MQTT_InitStatefulQoS to enable "
- "use of QoS1 and QoS2 publishes." ) );
- status = MQTTRecvFailed;
- }
- if( status == MQTTSuccess )
- {
- MQTT_PRE_STATE_UPDATE_HOOK( pContext );
- status = MQTT_UpdateStatePublish( pContext,
- packetIdentifier,
- MQTT_RECEIVE,
- publishInfo.qos,
- &publishRecordState );
- MQTT_POST_STATE_UPDATE_HOOK( pContext );
- if( status == MQTTSuccess )
- {
- LogInfo( ( "State record updated. New state=%s.",
- MQTT_State_strerror( publishRecordState ) ) );
- }
- /* Different cases in which an incoming publish with duplicate flag is
- * handled are as listed below.
- * 1. No collision - This is the first instance of the incoming publish
- * packet received or an earlier received packet state is lost. This
- * will be handled as a new incoming publish for both QoS1 and QoS2
- * publishes.
- * 2. Collision - The incoming packet was received before and a state
- * record is present in the state engine. For QoS1 and QoS2 publishes
- * this case can happen at 2 different cases and handling is
- * different.
- * a. QoS1 - If a PUBACK is not successfully sent for the incoming
- * publish due to a connection issue, it can result in broker
- * sending out a duplicate publish with dup flag set, when a
- * session is reestablished. It can result in a collision in
- * state engine. This will be handled by processing the incoming
- * publish as a new publish ignoring the
- * #MQTTStateCollision status from the state engine. The publish
- * data is not passed to the application.
- * b. QoS2 - If a PUBREC is not successfully sent for the incoming
- * publish or the PUBREC sent is not successfully received by the
- * broker due to a connection issue, it can result in broker
- * sending out a duplicate publish with dup flag set, when a
- * session is reestablished. It can result in a collision in
- * state engine. This will be handled by ignoring the
- * #MQTTStateCollision status from the state engine. The publish
- * data is not passed to the application. */
- else if( status == MQTTStateCollision )
- {
- status = MQTTSuccess;
- duplicatePublish = true;
- /* Calculate the state for the ack packet that needs to be sent out
- * for the duplicate incoming publish. */
- publishRecordState = MQTT_CalculateStatePublish( MQTT_RECEIVE,
- publishInfo.qos );
- LogDebug( ( "Incoming publish packet with packet id %hu already exists.",
- ( unsigned short ) packetIdentifier ) );
- if( publishInfo.dup == false )
- {
- LogError( ( "DUP flag is 0 for duplicate packet (MQTT-3.3.1.-1)." ) );
- }
- }
- else
- {
- LogError( ( "Error in updating publish state for incoming publish with packet id %hu."
- " Error is %s",
- ( unsigned short ) packetIdentifier,
- MQTT_Status_strerror( status ) ) );
- }
- }
- if( status == MQTTSuccess )
- {
- /* Set fields of deserialized struct. */
- deserializedInfo.packetIdentifier = packetIdentifier;
- deserializedInfo.pPublishInfo = &publishInfo;
- deserializedInfo.deserializationResult = status;
- /* Invoke application callback to hand the buffer over to application
- * before sending acks.
- * Application callback will be invoked for all publishes, except for
- * duplicate incoming publishes. */
- if( duplicatePublish == false )
- {
- pContext->appCallback( pContext,
- pIncomingPacket,
- &deserializedInfo );
- }
- /* Send PUBACK or PUBREC if necessary. */
- status = sendPublishAcks( pContext,
- packetIdentifier,
- publishRecordState );
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext,
- MQTTPacketInfo_t * pIncomingPacket )
- {
- MQTTStatus_t status;
- MQTTPublishState_t publishRecordState = MQTTStateNull;
- uint16_t packetIdentifier;
- MQTTPubAckType_t ackType;
- MQTTEventCallback_t appCallback;
- MQTTDeserializedInfo_t deserializedInfo;
- assert( pContext != NULL );
- assert( pIncomingPacket != NULL );
- assert( pContext->appCallback != NULL );
- appCallback = pContext->appCallback;
- ackType = getAckFromPacketType( pIncomingPacket->type );
- status = MQTT_DeserializeAck( pIncomingPacket, &packetIdentifier, NULL );
- LogInfo( ( "Ack packet deserialized with result: %s.",
- MQTT_Status_strerror( status ) ) );
- if( status == MQTTSuccess )
- {
- MQTT_PRE_STATE_UPDATE_HOOK( pContext );
- status = MQTT_UpdateStateAck( pContext,
- packetIdentifier,
- ackType,
- MQTT_RECEIVE,
- &publishRecordState );
- MQTT_POST_STATE_UPDATE_HOOK( pContext );
- if( status == MQTTSuccess )
- {
- LogInfo( ( "State record updated. New state=%s.",
- MQTT_State_strerror( publishRecordState ) ) );
- }
- else
- {
- LogError( ( "Updating the state engine for packet id %hu"
- " failed with error %s.",
- ( unsigned short ) packetIdentifier,
- MQTT_Status_strerror( status ) ) );
- }
- }
- if( ( ackType == MQTTPuback ) || ( ackType == MQTTPubrec ) )
- {
- if( ( status == MQTTSuccess ) &&
- ( pContext->clearFunction != NULL ) )
- {
- pContext->clearFunction( pContext, packetIdentifier );
- }
- }
- if( status == MQTTSuccess )
- {
- /* Set fields of deserialized struct. */
- deserializedInfo.packetIdentifier = packetIdentifier;
- deserializedInfo.deserializationResult = status;
- deserializedInfo.pPublishInfo = NULL;
- /* Invoke application callback to hand the buffer over to application
- * before sending acks. */
- appCallback( pContext, pIncomingPacket, &deserializedInfo );
- /* Send PUBREL or PUBCOMP if necessary. */
- status = sendPublishAcks( pContext,
- packetIdentifier,
- publishRecordState );
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext,
- MQTTPacketInfo_t * pIncomingPacket,
- bool manageKeepAlive )
- {
- MQTTStatus_t status = MQTTBadResponse;
- uint16_t packetIdentifier = MQTT_PACKET_ID_INVALID;
- MQTTDeserializedInfo_t deserializedInfo;
- /* We should always invoke the app callback unless we receive a PINGRESP
- * and are managing keep alive, or if we receive an unknown packet. We
- * initialize this to false since the callback must be invoked before
- * sending any PUBREL or PUBCOMP. However, for other cases, we invoke it
- * at the end to reduce the complexity of this function. */
- bool invokeAppCallback = false;
- MQTTEventCallback_t appCallback = NULL;
- assert( pContext != NULL );
- assert( pIncomingPacket != NULL );
- assert( pContext->appCallback != NULL );
- appCallback = pContext->appCallback;
- LogDebug( ( "Received packet of type %02x.",
- ( unsigned int ) pIncomingPacket->type ) );
- switch( pIncomingPacket->type )
- {
- case MQTT_PACKET_TYPE_PUBACK:
- case MQTT_PACKET_TYPE_PUBREC:
- case MQTT_PACKET_TYPE_PUBREL:
- case MQTT_PACKET_TYPE_PUBCOMP:
- /* Handle all the publish acks. The app callback is invoked here. */
- status = handlePublishAcks( pContext, pIncomingPacket );
- break;
- case MQTT_PACKET_TYPE_PINGRESP:
- status = MQTT_DeserializeAck( pIncomingPacket, &packetIdentifier, NULL );
- invokeAppCallback = ( status == MQTTSuccess ) && !manageKeepAlive;
- if( ( status == MQTTSuccess ) && ( manageKeepAlive == true ) )
- {
- pContext->waitingForPingResp = false;
- }
- break;
- case MQTT_PACKET_TYPE_SUBACK:
- case MQTT_PACKET_TYPE_UNSUBACK:
- /* Deserialize and give these to the app provided callback. */
- status = MQTT_DeserializeAck( pIncomingPacket, &packetIdentifier, NULL );
- invokeAppCallback = ( status == MQTTSuccess ) || ( status == MQTTServerRefused );
- break;
- default:
- /* Bad response from the server. */
- LogError( ( "Unexpected packet type from server: PacketType=%02x.",
- ( unsigned int ) pIncomingPacket->type ) );
- status = MQTTBadResponse;
- break;
- }
- if( invokeAppCallback == true )
- {
- /* Set fields of deserialized struct. */
- deserializedInfo.packetIdentifier = packetIdentifier;
- deserializedInfo.deserializationResult = status;
- deserializedInfo.pPublishInfo = NULL;
- appCallback( pContext, pIncomingPacket, &deserializedInfo );
- /* In case a SUBACK indicated refusal, reset the status to continue the loop. */
- status = MQTTSuccess;
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- static MQTTStatus_t receiveSingleIteration( MQTTContext_t * pContext,
- bool manageKeepAlive )
- {
- MQTTStatus_t status = MQTTSuccess;
- MQTTPacketInfo_t incomingPacket = { 0 };
- int32_t recvBytes;
- size_t totalMQTTPacketLength = 0;
- assert( pContext != NULL );
- assert( pContext->networkBuffer.pBuffer != NULL );
- /* Read as many bytes as possible into the network buffer. */
- recvBytes = pContext->transportInterface.recv( pContext->transportInterface.pNetworkContext,
- &( pContext->networkBuffer.pBuffer[ pContext->index ] ),
- pContext->networkBuffer.size - pContext->index );
- if( recvBytes < 0 )
- {
- /* The receive function has failed. Bubble up the error up to the user. */
- status = MQTTRecvFailed;
- MQTT_PRE_STATE_UPDATE_HOOK( pContext );
- if( pContext->connectStatus == MQTTConnected )
- {
- pContext->connectStatus = MQTTDisconnectPending;
- }
- MQTT_POST_STATE_UPDATE_HOOK( pContext );
- }
- else if( ( recvBytes == 0 ) && ( pContext->index == 0U ) )
- {
- /* No more bytes available since the last read and neither is anything in
- * the buffer. */
- status = MQTTNoDataAvailable;
- }
- /* Either something was received, or there is still data to be processed in the
- * buffer, or both. */
- else
- {
- /* Update the number of bytes in the MQTT fixed buffer. */
- pContext->index += ( size_t ) recvBytes;
- status = MQTT_ProcessIncomingPacketTypeAndLength( pContext->networkBuffer.pBuffer,
- &( pContext->index ),
- &incomingPacket );
- totalMQTTPacketLength = incomingPacket.remainingLength + incomingPacket.headerLength;
- }
- /* No data was received, check for keep alive timeout. */
- if( recvBytes == 0 )
- {
- if( manageKeepAlive == true )
- {
- /* Keep the copy of the status to be reset later. */
- MQTTStatus_t statusCopy = status;
- /* Assign status so an error can be bubbled up to application,
- * but reset it on success. */
- status = handleKeepAlive( pContext );
- if( status == MQTTSuccess )
- {
- /* Reset the status. */
- status = statusCopy;
- }
- else
- {
- LogError( ( "Handling of keep alive failed. Status=%s",
- MQTT_Status_strerror( status ) ) );
- }
- }
- }
- /* Check whether there is data available before processing the packet further. */
- if( ( status == MQTTNeedMoreBytes ) || ( status == MQTTNoDataAvailable ) )
- {
- /* Do nothing as there is nothing to be processed right now. The proper
- * error code will be bubbled up to the user. */
- }
- /* Any other error code. */
- else if( status != MQTTSuccess )
- {
- LogError( ( "Call to receiveSingleIteration failed. Status=%s",
- MQTT_Status_strerror( status ) ) );
- }
- /* If the MQTT Packet size is bigger than the buffer itself. */
- else if( totalMQTTPacketLength > pContext->networkBuffer.size )
- {
- /* Discard the packet from the receive buffer and drain the pending
- * data from the socket buffer. */
- status = discardStoredPacket( pContext,
- &incomingPacket );
- }
- /* If the total packet is of more length than the bytes we have available. */
- else if( totalMQTTPacketLength > pContext->index )
- {
- status = MQTTNeedMoreBytes;
- }
- else
- {
- /* MISRA else. */
- }
- /* Handle received packet. If incomplete data was read then this will not execute. */
- if( status == MQTTSuccess )
- {
- incomingPacket.pRemainingData = &pContext->networkBuffer.pBuffer[ incomingPacket.headerLength ];
- /* PUBLISH packets allow flags in the lower four bits. For other
- * packet types, they are reserved. */
- if( ( incomingPacket.type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH )
- {
- status = handleIncomingPublish( pContext, &incomingPacket );
- }
- else
- {
- status = handleIncomingAck( pContext, &incomingPacket, manageKeepAlive );
- }
- /* Update the index to reflect the remaining bytes in the buffer. */
- pContext->index -= totalMQTTPacketLength;
- /* Move the remaining bytes to the front of the buffer. */
- ( void ) memmove( pContext->networkBuffer.pBuffer,
- &( pContext->networkBuffer.pBuffer[ totalMQTTPacketLength ] ),
- pContext->index );
- if( status == MQTTSuccess )
- {
- pContext->lastPacketRxTime = pContext->getTime();
- }
- }
- if( status == MQTTNoDataAvailable )
- {
- /* No data available is not an error. Reset to MQTTSuccess so the
- * return code will indicate success. */
- status = MQTTSuccess;
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- static MQTTStatus_t validateSubscribeUnsubscribeParams( const MQTTContext_t * pContext,
- const MQTTSubscribeInfo_t * pSubscriptionList,
- size_t subscriptionCount,
- uint16_t packetId )
- {
- MQTTStatus_t status = MQTTSuccess;
- size_t iterator;
- /* Validate all the parameters. */
- if( ( pContext == NULL ) || ( pSubscriptionList == NULL ) )
- {
- LogError( ( "Argument cannot be NULL: pContext=%p, "
- "pSubscriptionList=%p.",
- ( void * ) pContext,
- ( void * ) pSubscriptionList ) );
- status = MQTTBadParameter;
- }
- else if( subscriptionCount == 0UL )
- {
- LogError( ( "Subscription count is 0." ) );
- status = MQTTBadParameter;
- }
- else if( packetId == 0U )
- {
- LogError( ( "Packet Id for subscription packet is 0." ) );
- status = MQTTBadParameter;
- }
- else
- {
- if( pContext->incomingPublishRecords == NULL )
- {
- for( iterator = 0; iterator < subscriptionCount; iterator++ )
- {
- if( pSubscriptionList[ iterator ].qos > MQTTQoS0 )
- {
- LogError( ( "The incoming publish record list is not "
- "initialised for QoS1/QoS2 records. Please call "
- " MQTT_InitStatefulQoS to enable use of QoS1 and "
- " QoS2 packets." ) );
- status = MQTTBadParameter;
- break;
- }
- }
- }
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- static size_t addEncodedStringToVector( uint8_t serializedLength[ CORE_MQTT_SERIALIZED_LENGTH_FIELD_BYTES ],
- const char * const string,
- uint16_t length,
- TransportOutVector_t * iterator,
- size_t * updatedLength )
- {
- size_t packetLength = 0U;
- TransportOutVector_t * pLocalIterator = iterator;
- size_t vectorsAdded = 0U;
- /* When length is non-zero, the string must be non-NULL. */
- assert( ( length != 0U ) ? ( string != NULL ) : true );
- serializedLength[ 0 ] = ( ( uint8_t ) ( ( length ) >> 8 ) );
- serializedLength[ 1 ] = ( ( uint8_t ) ( ( length ) & 0x00ffU ) );
- /* Add the serialized length of the string first. */
- pLocalIterator[ 0 ].iov_base = serializedLength;
- pLocalIterator[ 0 ].iov_len = CORE_MQTT_SERIALIZED_LENGTH_FIELD_BYTES;
- vectorsAdded++;
- packetLength = CORE_MQTT_SERIALIZED_LENGTH_FIELD_BYTES;
- /* Sometimes the string can be NULL that is, of 0 length. In that case,
- * only the length field should be encoded in the vector. */
- if( ( string != NULL ) && ( length != 0U ) )
- {
- /* Then add the pointer to the string itself. */
- pLocalIterator[ 1 ].iov_base = string;
- pLocalIterator[ 1 ].iov_len = length;
- vectorsAdded++;
- packetLength += length;
- }
- ( *updatedLength ) = ( *updatedLength ) + packetLength;
- return vectorsAdded;
- }
- /*-----------------------------------------------------------*/
- static MQTTStatus_t sendSubscribeWithoutCopy( MQTTContext_t * pContext,
- const MQTTSubscribeInfo_t * pSubscriptionList,
- size_t subscriptionCount,
- uint16_t packetId,
- size_t remainingLength )
- {
- MQTTStatus_t status = MQTTSuccess;
- uint8_t * pIndex;
- TransportOutVector_t pIoVector[ MQTT_SUB_UNSUB_MAX_VECTORS ];
- TransportOutVector_t * pIterator;
- uint8_t serializedTopicFieldLength[ MQTT_SUB_UNSUB_MAX_VECTORS ][ CORE_MQTT_SERIALIZED_LENGTH_FIELD_BYTES ];
- size_t totalPacketLength = 0U;
- size_t ioVectorLength = 0U;
- size_t subscriptionsSent = 0U;
- size_t vectorsAdded;
- size_t topicFieldLengthIndex;
- /* Maximum number of bytes required by the 'fixed' part of the SUBSCRIBE
- * packet header according to the MQTT specification.
- * MQTT Control Byte 0 + 1 = 1
- * Remaining length (max) + 4 = 5
- * Packet ID + 2 = 7 */
- uint8_t subscribeheader[ 7U ];
- /* The vector array should be at least three element long as the topic
- * string needs these many vector elements to be stored. */
- assert( MQTT_SUB_UNSUB_MAX_VECTORS >= CORE_MQTT_SUBSCRIBE_PER_TOPIC_VECTOR_LENGTH );
- pIndex = subscribeheader;
- pIterator = pIoVector;
- pIndex = MQTT_SerializeSubscribeHeader( remainingLength,
- pIndex,
- packetId );
- /* The header is to be sent first. */
- pIterator->iov_base = subscribeheader;
- /* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-182 */
- /* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-108 */
- /* coverity[misra_c_2012_rule_18_2_violation] */
- /* coverity[misra_c_2012_rule_10_8_violation] */
- pIterator->iov_len = ( size_t ) ( pIndex - subscribeheader );
- totalPacketLength += pIterator->iov_len;
- pIterator++;
- ioVectorLength++;
- while( ( status == MQTTSuccess ) && ( subscriptionsSent < subscriptionCount ) )
- {
- /* Reset the index for next iteration. */
- topicFieldLengthIndex = 0;
- /* Check whether the subscription topic (with QoS) will fit in the
- * given vector. */
- while( ( ioVectorLength <= ( MQTT_SUB_UNSUB_MAX_VECTORS - CORE_MQTT_SUBSCRIBE_PER_TOPIC_VECTOR_LENGTH ) ) &&
- ( subscriptionsSent < subscriptionCount ) )
- {
- /* The topic filter and the filter length gets sent next. */
- vectorsAdded = addEncodedStringToVector( serializedTopicFieldLength[ topicFieldLengthIndex ],
- pSubscriptionList[ subscriptionsSent ].pTopicFilter,
- pSubscriptionList[ subscriptionsSent ].topicFilterLength,
- pIterator,
- &totalPacketLength );
- /* Update the pointer after the above operation. */
- pIterator = &pIterator[ vectorsAdded ];
- /* Lastly, the QoS gets sent. */
- pIterator->iov_base = &( pSubscriptionList[ subscriptionsSent ].qos );
- pIterator->iov_len = 1U;
- totalPacketLength += pIterator->iov_len;
- /* Increment the pointer. */
- pIterator++;
- /* Two slots get used by the topic string length and topic string.
- * One slot gets used by the quality of service. */
- ioVectorLength += vectorsAdded + 1U;
- subscriptionsSent++;
- /* The index needs to be updated for next iteration. */
- topicFieldLengthIndex++;
- }
- if( sendMessageVector( pContext,
- pIoVector,
- ioVectorLength ) != ( int32_t ) totalPacketLength )
- {
- status = MQTTSendFailed;
- }
- /* Update the iterator for the next potential loop iteration. */
- pIterator = pIoVector;
- /* Reset the vector length for the next potential loop iteration. */
- ioVectorLength = 0U;
- /* Reset the packet length for the next potential loop iteration. */
- totalPacketLength = 0U;
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- static MQTTStatus_t sendUnsubscribeWithoutCopy( MQTTContext_t * pContext,
- const MQTTSubscribeInfo_t * pSubscriptionList,
- size_t subscriptionCount,
- uint16_t packetId,
- size_t remainingLength )
- {
- MQTTStatus_t status = MQTTSuccess;
- uint8_t * pIndex;
- TransportOutVector_t pIoVector[ MQTT_SUB_UNSUB_MAX_VECTORS ];
- TransportOutVector_t * pIterator;
- uint8_t serializedTopicFieldLength[ MQTT_SUB_UNSUB_MAX_VECTORS ][ CORE_MQTT_SERIALIZED_LENGTH_FIELD_BYTES ];
- size_t totalPacketLength = 0U;
- size_t unsubscriptionsSent = 0U;
- size_t ioVectorLength = 0U;
- size_t vectorsAdded;
- size_t topicFieldLengthIndex;
- /* Maximum number of bytes required by the 'fixed' part of the UNSUBSCRIBE
- * packet header according to the MQTT specification.
- * MQTT Control Byte 0 + 1 = 1
- * Remaining length (max) + 4 = 5
- * Packet ID + 2 = 7 */
- uint8_t unsubscribeheader[ 7U ];
- /* The vector array should be at least three element long as the topic
- * string needs these many vector elements to be stored. */
- assert( MQTT_SUB_UNSUB_MAX_VECTORS >= CORE_MQTT_UNSUBSCRIBE_PER_TOPIC_VECTOR_LENGTH );
- pIndex = unsubscribeheader;
- pIterator = pIoVector;
- pIndex = MQTT_SerializeUnsubscribeHeader( remainingLength,
- pIndex,
- packetId );
- /* The header is to be sent first. */
- pIterator->iov_base = unsubscribeheader;
- /* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-182 */
- /* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-108 */
- /* coverity[misra_c_2012_rule_18_2_violation] */
- /* coverity[misra_c_2012_rule_10_8_violation] */
- pIterator->iov_len = ( size_t ) ( pIndex - unsubscribeheader );
- totalPacketLength += pIterator->iov_len;
- pIterator++;
- ioVectorLength++;
- while( ( status == MQTTSuccess ) && ( unsubscriptionsSent < subscriptionCount ) )
- {
- /* Reset the index for next iteration. */
- topicFieldLengthIndex = 0;
- /* Check whether the subscription topic will fit in the given vector. */
- while( ( ioVectorLength <= ( MQTT_SUB_UNSUB_MAX_VECTORS - CORE_MQTT_UNSUBSCRIBE_PER_TOPIC_VECTOR_LENGTH ) ) &&
- ( unsubscriptionsSent < subscriptionCount ) )
- {
- /* The topic filter gets sent next. */
- vectorsAdded = addEncodedStringToVector( serializedTopicFieldLength[ topicFieldLengthIndex ],
- pSubscriptionList[ unsubscriptionsSent ].pTopicFilter,
- pSubscriptionList[ unsubscriptionsSent ].topicFilterLength,
- pIterator,
- &totalPacketLength );
- /* Update the iterator to point to the next empty location. */
- pIterator = &pIterator[ vectorsAdded ];
- /* Update the total count based on how many vectors were added. */
- ioVectorLength += vectorsAdded;
- unsubscriptionsSent++;
- /* Update the index for next iteration. */
- topicFieldLengthIndex++;
- }
- if( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalPacketLength )
- {
- status = MQTTSendFailed;
- }
- /* Update the iterator for the next potential loop iteration. */
- pIterator = pIoVector;
- /* Reset the vector length for the next potential loop iteration. */
- ioVectorLength = 0U;
- /* Reset the packet length for the next potential loop iteration. */
- totalPacketLength = 0U;
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext,
- const MQTTPublishInfo_t * pPublishInfo,
- uint8_t * pMqttHeader,
- size_t headerSize,
- uint16_t packetId )
- {
- MQTTStatus_t status = MQTTSuccess;
- size_t ioVectorLength;
- size_t totalMessageLength;
- bool dupFlagChanged = false;
- /* Bytes required to encode the packet ID in an MQTT header according to
- * the MQTT specification. */
- uint8_t serializedPacketID[ 2U ];
- /* Maximum number of vectors required to encode and send a publish
- * packet. The breakdown is shown below.
- * Fixed header (including topic string length) 0 + 1 = 1
- * Topic string + 1 = 2
- * Packet ID (only when QoS > QoS0) + 1 = 3
- * Payload + 1 = 4 */
- TransportOutVector_t pIoVector[ 4U ];
- /* The header is sent first. */
- pIoVector[ 0U ].iov_base = pMqttHeader;
- pIoVector[ 0U ].iov_len = headerSize;
- totalMessageLength = headerSize;
- /* Then the topic name has to be sent. */
- pIoVector[ 1U ].iov_base = pPublishInfo->pTopicName;
- pIoVector[ 1U ].iov_len = pPublishInfo->topicNameLength;
- totalMessageLength += pPublishInfo->topicNameLength;
- /* The next field's index should be 2 as the first two fields
- * have been filled in. */
- ioVectorLength = 2U;
- if( pPublishInfo->qos > MQTTQoS0 )
- {
- /* Encode the packet ID. */
- serializedPacketID[ 0 ] = ( ( uint8_t ) ( ( packetId ) >> 8 ) );
- serializedPacketID[ 1 ] = ( ( uint8_t ) ( ( packetId ) & 0x00ffU ) );
- pIoVector[ ioVectorLength ].iov_base = serializedPacketID;
- pIoVector[ ioVectorLength ].iov_len = sizeof( serializedPacketID );
- ioVectorLength++;
- totalMessageLength += sizeof( serializedPacketID );
- }
- /* Publish packets are allowed to contain no payload. */
- if( pPublishInfo->payloadLength > 0U )
- {
- pIoVector[ ioVectorLength ].iov_base = pPublishInfo->pPayload;
- pIoVector[ ioVectorLength ].iov_len = pPublishInfo->payloadLength;
- ioVectorLength++;
- totalMessageLength += pPublishInfo->payloadLength;
- }
- /* store a copy of the publish for retransmission purposes */
- if( ( pPublishInfo->qos > MQTTQoS0 ) &&
- ( pContext->storeFunction != NULL ) )
- {
- /* If not already set, set the dup flag before storing a copy of the publish
- * this is because on retrieving back this copy we will get it in the form of an
- * array of TransportOutVector_t that holds the data in a const pointer which cannot be
- * changed after retrieving. */
- if( pPublishInfo->dup != true )
- {
- status = MQTT_UpdateDuplicatePublishFlag( pMqttHeader, true );
- dupFlagChanged = ( status == MQTTSuccess );
- }
- if( status == MQTTSuccess )
- {
- MQTTVec_t mqttVec;
- mqttVec.pVector = pIoVector;
- mqttVec.vectorLen = ioVectorLength;
- if( pContext->storeFunction( pContext, packetId, &mqttVec ) != true )
- {
- status = MQTTPublishStoreFailed;
- }
- }
- /* change the value of the dup flag to its original, if it was changed */
- if( ( status == MQTTSuccess ) && ( dupFlagChanged == true ) )
- {
- status = MQTT_UpdateDuplicatePublishFlag( pMqttHeader, false );
- }
- }
- if( ( status == MQTTSuccess ) &&
- ( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalMessageLength ) )
- {
- status = MQTTSendFailed;
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext,
- const MQTTConnectInfo_t * pConnectInfo,
- const MQTTPublishInfo_t * pWillInfo,
- size_t remainingLength )
- {
- MQTTStatus_t status = MQTTSuccess;
- TransportOutVector_t * iterator;
- size_t ioVectorLength = 0U;
- size_t totalMessageLength = 0U;
- int32_t bytesSentOrError;
- uint8_t * pIndex;
- uint8_t serializedClientIDLength[ 2 ];
- uint8_t serializedTopicLength[ 2 ];
- uint8_t serializedPayloadLength[ 2 ];
- uint8_t serializedUsernameLength[ 2 ];
- uint8_t serializedPasswordLength[ 2 ];
- size_t vectorsAdded;
- /* Maximum number of bytes required by the 'fixed' part of the CONNECT
- * packet header according to the MQTT specification.
- * MQTT Control Byte 0 + 1 = 1
- * Remaining length (max) + 4 = 5
- * Protocol Name Length + 2 = 7
- * Protocol Name (MQTT) + 4 = 11
- * Protocol level + 1 = 12
- * Connect flags + 1 = 13
- * Keep alive + 2 = 15 */
- uint8_t connectPacketHeader[ 15U ];
- /* The maximum vectors required to encode and send a connect packet. The
- * breakdown is shown below.
- * Fixed header 0 + 1 = 1
- * Client ID + 2 = 3
- * Will topic + 2 = 5
- * Will payload + 2 = 7
- * Username + 2 = 9
- * Password + 2 = 11 */
- TransportOutVector_t pIoVector[ 11U ];
- iterator = pIoVector;
- pIndex = connectPacketHeader;
- /* Validate arguments. */
- if( ( pWillInfo != NULL ) && ( pWillInfo->pTopicName == NULL ) )
- {
- LogError( ( "pWillInfo->pTopicName cannot be NULL if Will is present." ) );
- status = MQTTBadParameter;
- }
- else
- {
- pIndex = MQTT_SerializeConnectFixedHeader( pIndex,
- pConnectInfo,
- pWillInfo,
- remainingLength );
- assert( ( ( size_t ) ( pIndex - connectPacketHeader ) ) <= sizeof( connectPacketHeader ) );
- /* The header gets sent first. */
- iterator->iov_base = connectPacketHeader;
- /* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-182 */
- /* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-108 */
- /* coverity[misra_c_2012_rule_18_2_violation] */
- /* coverity[misra_c_2012_rule_10_8_violation] */
- iterator->iov_len = ( size_t ) ( pIndex - connectPacketHeader );
- totalMessageLength += iterator->iov_len;
- iterator++;
- ioVectorLength++;
- /* Serialize the client ID. */
- vectorsAdded = addEncodedStringToVector( serializedClientIDLength,
- pConnectInfo->pClientIdentifier,
- pConnectInfo->clientIdentifierLength,
- iterator,
- &totalMessageLength );
- /* Update the iterator to point to the next empty slot. */
- iterator = &iterator[ vectorsAdded ];
- ioVectorLength += vectorsAdded;
- if( pWillInfo != NULL )
- {
- /* Serialize the topic. */
- vectorsAdded = addEncodedStringToVector( serializedTopicLength,
- pWillInfo->pTopicName,
- pWillInfo->topicNameLength,
- iterator,
- &totalMessageLength );
- /* Update the iterator to point to the next empty slot. */
- iterator = &iterator[ vectorsAdded ];
- ioVectorLength += vectorsAdded;
- /* Serialize the payload. Payload of last will and testament can be NULL. */
- vectorsAdded = addEncodedStringToVector( serializedPayloadLength,
- pWillInfo->pPayload,
- ( uint16_t ) pWillInfo->payloadLength,
- iterator,
- &totalMessageLength );
- /* Update the iterator to point to the next empty slot. */
- iterator = &iterator[ vectorsAdded ];
- ioVectorLength += vectorsAdded;
- }
- /* Encode the user name if provided. */
- if( pConnectInfo->pUserName != NULL )
- {
- /* Serialize the user name string. */
- vectorsAdded = addEncodedStringToVector( serializedUsernameLength,
- pConnectInfo->pUserName,
- pConnectInfo->userNameLength,
- iterator,
- &totalMessageLength );
- /* Update the iterator to point to the next empty slot. */
- iterator = &iterator[ vectorsAdded ];
- ioVectorLength += vectorsAdded;
- }
- /* Encode the password if provided. */
- if( pConnectInfo->pPassword != NULL )
- {
- /* Serialize the user name string. */
- vectorsAdded = addEncodedStringToVector( serializedPasswordLength,
- pConnectInfo->pPassword,
- pConnectInfo->passwordLength,
- iterator,
- &totalMessageLength );
- /* Update the iterator to point to the next empty slot. */
- ioVectorLength += vectorsAdded;
- }
- bytesSentOrError = sendMessageVector( pContext, pIoVector, ioVectorLength );
- if( bytesSentOrError != ( int32_t ) totalMessageLength )
- {
- status = MQTTSendFailed;
- }
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- static MQTTStatus_t receiveConnack( MQTTContext_t * pContext,
- uint32_t timeoutMs,
- bool cleanSession,
- MQTTPacketInfo_t * pIncomingPacket,
- bool * pSessionPresent )
- {
- MQTTStatus_t status = MQTTSuccess;
- MQTTGetCurrentTimeFunc_t getTimeStamp = NULL;
- uint32_t entryTimeMs = 0U, remainingTimeMs = 0U, timeTakenMs = 0U;
- bool breakFromLoop = false;
- uint16_t loopCount = 0U;
- assert( pContext != NULL );
- assert( pIncomingPacket != NULL );
- assert( pContext->getTime != NULL );
- getTimeStamp = pContext->getTime;
- /* Get the entry time for the function. */
- entryTimeMs = getTimeStamp();
- do
- {
- /* Transport read for incoming CONNACK packet type and length.
- * MQTT_GetIncomingPacketTypeAndLength is a blocking call and it is
- * returned after a transport receive timeout, an error, or a successful
- * receive of packet type and length. */
- status = MQTT_GetIncomingPacketTypeAndLength( pContext->transportInterface.recv,
- pContext->transportInterface.pNetworkContext,
- pIncomingPacket );
- /* The loop times out based on 2 conditions.
- * 1. If timeoutMs is greater than 0:
- * Loop times out based on the timeout calculated by getTime()
- * function.
- * 2. If timeoutMs is 0:
- * Loop times out based on the maximum number of retries config
- * MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT. This config will control
- * maximum the number of retry attempts to read the CONNACK packet.
- * A value of 0 for the config will try once to read CONNACK. */
- if( timeoutMs > 0U )
- {
- breakFromLoop = calculateElapsedTime( getTimeStamp(), entryTimeMs ) >= timeoutMs;
- }
- else
- {
- breakFromLoop = loopCount >= MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT;
- loopCount++;
- }
- /* Loop until there is data to read or if we have exceeded the timeout/retries. */
- } while( ( status == MQTTNoDataAvailable ) && ( breakFromLoop == false ) );
- if( status == MQTTSuccess )
- {
- /* Time taken in this function so far. */
- timeTakenMs = calculateElapsedTime( getTimeStamp(), entryTimeMs );
- if( timeTakenMs < timeoutMs )
- {
- /* Calculate remaining time for receiving the remainder of
- * the packet. */
- remainingTimeMs = timeoutMs - timeTakenMs;
- }
- /* Reading the remainder of the packet by transport recv.
- * Attempt to read once even if the timeout has expired.
- * Invoking receivePacket with remainingTime as 0 would attempt to
- * recv from network once. If using retries, the remainder of the
- * CONNACK packet is tried to be read only once. Reading once would be
- * good as the packet type and remaining length was already read. Hence,
- * the probability of the remaining 2 bytes available to read is very high. */
- if( pIncomingPacket->type == MQTT_PACKET_TYPE_CONNACK )
- {
- status = receivePacket( pContext,
- *pIncomingPacket,
- remainingTimeMs );
- }
- else
- {
- LogError( ( "Incorrect packet type %X received while expecting"
- " CONNACK(%X).",
- ( unsigned int ) pIncomingPacket->type,
- MQTT_PACKET_TYPE_CONNACK ) );
- status = MQTTBadResponse;
- }
- }
- if( status == MQTTSuccess )
- {
- /* Update the packet info pointer to the buffer read. */
- pIncomingPacket->pRemainingData = pContext->networkBuffer.pBuffer;
- /* Deserialize CONNACK. */
- status = MQTT_DeserializeAck( pIncomingPacket, NULL, pSessionPresent );
- }
- /* If a clean session is requested, a session present should not be set by
- * broker. */
- if( status == MQTTSuccess )
- {
- if( ( cleanSession == true ) && ( *pSessionPresent == true ) )
- {
- LogError( ( "Unexpected session present flag in CONNACK response from broker."
- " CONNECT request with clean session was made with broker." ) );
- status = MQTTBadResponse;
- }
- }
- if( status == MQTTSuccess )
- {
- LogDebug( ( "Received MQTT CONNACK successfully from broker." ) );
- }
- else
- {
- LogError( ( "CONNACK recv failed with status = %s.",
- MQTT_Status_strerror( status ) ) );
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext )
- {
- MQTTStatus_t status = MQTTSuccess;
- MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER;
- uint16_t packetId = MQTT_PACKET_ID_INVALID;
- MQTTPublishState_t state = MQTTStateNull;
- size_t totalMessageLength = 0;
- uint8_t * pMqttPacket = NULL;
- assert( pContext != NULL );
- /* Get the next packet ID for which a PUBREL need to be resent. */
- packetId = MQTT_PubrelToResend( pContext, &cursor, &state );
- /* Resend all the PUBREL acks after session is reestablished. */
- while( ( packetId != MQTT_PACKET_ID_INVALID ) &&
- ( status == MQTTSuccess ) )
- {
- status = sendPublishAcks( pContext, packetId, state );
- packetId = MQTT_PubrelToResend( pContext, &cursor, &state );
- }
- if( ( status == MQTTSuccess ) &&
- ( pContext->retrieveFunction != NULL ) )
- {
- cursor = MQTT_STATE_CURSOR_INITIALIZER;
- /* Resend all the PUBLISH for which PUBACK/PUBREC is not received
- * after session is reestablished. */
- do
- {
- packetId = MQTT_PublishToResend( pContext, &cursor );
- if( packetId != MQTT_PACKET_ID_INVALID )
- {
- if( pContext->retrieveFunction( pContext, packetId, &pMqttPacket, &totalMessageLength ) != true )
- {
- status = MQTTPublishRetrieveFailed;
- break;
- }
- MQTT_PRE_STATE_UPDATE_HOOK( pContext );
- if( sendBuffer( pContext, pMqttPacket, totalMessageLength ) != ( int32_t ) totalMessageLength )
- {
- status = MQTTSendFailed;
- }
- MQTT_POST_STATE_UPDATE_HOOK( pContext );
- }
- } while( ( packetId != MQTT_PACKET_ID_INVALID ) &&
- ( status == MQTTSuccess ) );
- }
- return status;
- }
- static MQTTStatus_t handleCleanSession( MQTTContext_t * pContext )
- {
- MQTTStatus_t status = MQTTSuccess;
- MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER;
- uint16_t packetId = MQTT_PACKET_ID_INVALID;
- assert( pContext != NULL );
- /* Reset the index and clear the buffer when a new session is established. */
- pContext->index = 0;
- ( void ) memset( pContext->networkBuffer.pBuffer, 0, pContext->networkBuffer.size );
- if( pContext->clearFunction != NULL )
- {
- cursor = MQTT_STATE_CURSOR_INITIALIZER;
- /* Resend all the PUBLISH for which PUBACK/PUBREC is not received
- * after session is reestablished. */
- do
- {
- packetId = MQTT_PublishToResend( pContext, &cursor );
- if( packetId != MQTT_PACKET_ID_INVALID )
- {
- pContext->clearFunction( pContext, packetId );
- }
- } while( packetId != MQTT_PACKET_ID_INVALID );
- }
- if( pContext->outgoingPublishRecordMaxCount > 0U )
- {
- /* Clear any existing records if a new session is established. */
- ( void ) memset( pContext->outgoingPublishRecords,
- 0x00,
- pContext->outgoingPublishRecordMaxCount * sizeof( *pContext->outgoingPublishRecords ) );
- }
- if( pContext->incomingPublishRecordMaxCount > 0U )
- {
- ( void ) memset( pContext->incomingPublishRecords,
- 0x00,
- pContext->incomingPublishRecordMaxCount * sizeof( *pContext->incomingPublishRecords ) );
- }
- return status;
- }
- static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext,
- const MQTTPublishInfo_t * pPublishInfo,
- uint16_t packetId )
- {
- MQTTStatus_t status = MQTTSuccess;
- /* Validate arguments. */
- if( ( pContext == NULL ) || ( pPublishInfo == NULL ) )
- {
- LogError( ( "Argument cannot be NULL: pContext=%p, "
- "pPublishInfo=%p.",
- ( void * ) pContext,
- ( void * ) pPublishInfo ) );
- status = MQTTBadParameter;
- }
- else if( ( pPublishInfo->qos != MQTTQoS0 ) && ( packetId == 0U ) )
- {
- LogError( ( "Packet Id is 0 for PUBLISH with QoS=%u.",
- ( unsigned int ) pPublishInfo->qos ) );
- status = MQTTBadParameter;
- }
- else if( ( pPublishInfo->payloadLength > 0U ) && ( pPublishInfo->pPayload == NULL ) )
- {
- LogError( ( "A nonzero payload length requires a non-NULL payload: "
- "payloadLength=%lu, pPayload=%p.",
- ( unsigned long ) pPublishInfo->payloadLength,
- pPublishInfo->pPayload ) );
- status = MQTTBadParameter;
- }
- else if( ( pContext->outgoingPublishRecords == NULL ) && ( pPublishInfo->qos > MQTTQoS0 ) )
- {
- LogError( ( "Trying to publish a QoS > MQTTQoS0 packet when outgoing publishes "
- "for QoS1/QoS2 have not been enabled. Please, call MQTT_InitStatefulQoS "
- "to initialize and enable the use of QoS1/QoS2 publishes." ) );
- status = MQTTBadParameter;
- }
- else
- {
- /* MISRA else */
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- MQTTStatus_t MQTT_Init( MQTTContext_t * pContext,
- const TransportInterface_t * pTransportInterface,
- MQTTGetCurrentTimeFunc_t getTimeFunction,
- MQTTEventCallback_t userCallback,
- const MQTTFixedBuffer_t * pNetworkBuffer )
- {
- MQTTStatus_t status = MQTTSuccess;
- /* Validate arguments. */
- if( ( pContext == NULL ) || ( pTransportInterface == NULL ) ||
- ( pNetworkBuffer == NULL ) )
- {
- LogError( ( "Argument cannot be NULL: pContext=%p, "
- "pTransportInterface=%p, "
- "pNetworkBuffer=%p",
- ( void * ) pContext,
- ( void * ) pTransportInterface,
- ( void * ) pNetworkBuffer ) );
- status = MQTTBadParameter;
- }
- else if( getTimeFunction == NULL )
- {
- LogError( ( "Invalid parameter: getTimeFunction is NULL" ) );
- status = MQTTBadParameter;
- }
- else if( userCallback == NULL )
- {
- LogError( ( "Invalid parameter: userCallback is NULL" ) );
- status = MQTTBadParameter;
- }
- else if( pTransportInterface->recv == NULL )
- {
- LogError( ( "Invalid parameter: pTransportInterface->recv is NULL" ) );
- status = MQTTBadParameter;
- }
- else if( pTransportInterface->send == NULL )
- {
- LogError( ( "Invalid parameter: pTransportInterface->send is NULL" ) );
- status = MQTTBadParameter;
- }
- else
- {
- ( void ) memset( pContext, 0x00, sizeof( MQTTContext_t ) );
- pContext->connectStatus = MQTTNotConnected;
- pContext->transportInterface = *pTransportInterface;
- pContext->getTime = getTimeFunction;
- pContext->appCallback = userCallback;
- pContext->networkBuffer = *pNetworkBuffer;
- /* Zero is not a valid packet ID per MQTT spec. Start from 1. */
- pContext->nextPacketId = 1;
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- MQTTStatus_t MQTT_InitStatefulQoS( MQTTContext_t * pContext,
- MQTTPubAckInfo_t * pOutgoingPublishRecords,
- size_t outgoingPublishCount,
- MQTTPubAckInfo_t * pIncomingPublishRecords,
- size_t incomingPublishCount )
- {
- MQTTStatus_t status = MQTTSuccess;
- if( pContext == NULL )
- {
- LogError( ( "Argument cannot be NULL: pContext=%p\n",
- ( void * ) pContext ) );
- status = MQTTBadParameter;
- }
- /* Check whether the arguments make sense. Not equal here behaves
- * like an exclusive-or operator for boolean values. */
- else if( ( outgoingPublishCount == 0U ) !=
- ( pOutgoingPublishRecords == NULL ) )
- {
- LogError( ( "Arguments do not match: pOutgoingPublishRecords=%p, "
- "outgoingPublishCount=%lu",
- ( void * ) pOutgoingPublishRecords,
- ( unsigned long ) outgoingPublishCount ) );
- status = MQTTBadParameter;
- }
- /* Check whether the arguments make sense. Not equal here behaves
- * like an exclusive-or operator for boolean values. */
- else if( ( incomingPublishCount == 0U ) !=
- ( pIncomingPublishRecords == NULL ) )
- {
- LogError( ( "Arguments do not match: pIncomingPublishRecords=%p, "
- "incomingPublishCount=%lu",
- ( void * ) pIncomingPublishRecords,
- ( unsigned long ) incomingPublishCount ) );
- status = MQTTBadParameter;
- }
- else if( pContext->appCallback == NULL )
- {
- LogError( ( "MQTT_InitStatefulQoS must be called only after MQTT_Init has"
- " been called successfully.\n" ) );
- status = MQTTBadParameter;
- }
- else
- {
- pContext->incomingPublishRecordMaxCount = incomingPublishCount;
- pContext->incomingPublishRecords = pIncomingPublishRecords;
- pContext->outgoingPublishRecordMaxCount = outgoingPublishCount;
- pContext->outgoingPublishRecords = pOutgoingPublishRecords;
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- MQTTStatus_t MQTT_InitRetransmits( MQTTContext_t * pContext,
- MQTTStorePacketForRetransmit storeFunction,
- MQTTRetrievePacketForRetransmit retrieveFunction,
- MQTTClearPacketForRetransmit clearFunction )
- {
- MQTTStatus_t status = MQTTSuccess;
- if( pContext == NULL )
- {
- LogError( ( "Argument cannot be NULL: pContext=%p\n",
- ( void * ) pContext ) );
- status = MQTTBadParameter;
- }
- else if( storeFunction == NULL )
- {
- LogError( ( "Invalid parameter: storeFunction is NULL" ) );
- status = MQTTBadParameter;
- }
- else if( retrieveFunction == NULL )
- {
- LogError( ( "Invalid parameter: retrieveFunction is NULL" ) );
- status = MQTTBadParameter;
- }
- else if( clearFunction == NULL )
- {
- LogError( ( "Invalid parameter: clearFunction is NULL" ) );
- status = MQTTBadParameter;
- }
- else
- {
- pContext->storeFunction = storeFunction;
- pContext->retrieveFunction = retrieveFunction;
- pContext->clearFunction = clearFunction;
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- MQTTStatus_t MQTT_CancelCallback( const MQTTContext_t * pContext,
- uint16_t packetId )
- {
- MQTTStatus_t status = MQTTSuccess;
- if( pContext == NULL )
- {
- LogWarn( ( "pContext is NULL\n" ) );
- status = MQTTBadParameter;
- }
- else if( pContext->outgoingPublishRecords == NULL )
- {
- LogError( ( "QoS1/QoS2 is not initialized for use. Please, "
- "call MQTT_InitStatefulQoS to enable QoS1 and QoS2 "
- "publishes.\n" ) );
- status = MQTTBadParameter;
- }
- else
- {
- MQTT_PRE_STATE_UPDATE_HOOK( pContext );
- status = MQTT_RemoveStateRecord( pContext,
- packetId );
- MQTT_POST_STATE_UPDATE_HOOK( pContext );
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- MQTTStatus_t MQTT_CheckConnectStatus( const MQTTContext_t * pContext )
- {
- MQTTConnectionStatus_t connectStatus;
- MQTTStatus_t status = MQTTSuccess;
- if( pContext == NULL )
- {
- LogError( ( "Argument cannot be NULL: pContext=%p",
- ( void * ) pContext ) );
- status = MQTTBadParameter;
- }
- if( status == MQTTSuccess )
- {
- MQTT_PRE_STATE_UPDATE_HOOK( pContext );
- connectStatus = pContext->connectStatus;
- MQTT_POST_STATE_UPDATE_HOOK( pContext );
- switch( connectStatus )
- {
- case MQTTConnected:
- status = MQTTStatusConnected;
- break;
- case MQTTDisconnectPending:
- status = MQTTStatusDisconnectPending;
- break;
- default:
- status = MQTTStatusNotConnected;
- break;
- }
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext,
- const MQTTConnectInfo_t * pConnectInfo,
- const MQTTPublishInfo_t * pWillInfo,
- uint32_t timeoutMs,
- bool * pSessionPresent )
- {
- size_t remainingLength = 0UL, packetSize = 0UL;
- MQTTStatus_t status = MQTTSuccess;
- MQTTPacketInfo_t incomingPacket = { 0 };
- MQTTConnectionStatus_t connectStatus;
- incomingPacket.type = ( uint8_t ) 0;
- if( ( pContext == NULL ) || ( pConnectInfo == NULL ) || ( pSessionPresent == NULL ) )
- {
- LogError( ( "Argument cannot be NULL: pContext=%p, "
- "pConnectInfo=%p, pSessionPresent=%p.",
- ( void * ) pContext,
- ( void * ) pConnectInfo,
- ( void * ) pSessionPresent ) );
- status = MQTTBadParameter;
- }
- if( status == MQTTSuccess )
- {
- /* Get MQTT connect packet size and remaining length. */
- status = MQTT_GetConnectPacketSize( pConnectInfo,
- pWillInfo,
- &remainingLength,
- &packetSize );
- /* coverity[sensitive_data_leak] */
- LogDebug( ( "CONNECT packet size is %lu and remaining length is %lu.",
- ( unsigned long ) packetSize,
- ( unsigned long ) remainingLength ) );
- }
- if( status == MQTTSuccess )
- {
- MQTT_PRE_STATE_UPDATE_HOOK( pContext );
- connectStatus = pContext->connectStatus;
- if( connectStatus != MQTTNotConnected )
- {
- status = ( connectStatus == MQTTConnected ) ? MQTTStatusConnected : MQTTStatusDisconnectPending;
- }
- if( status == MQTTSuccess )
- {
- status = sendConnectWithoutCopy( pContext,
- pConnectInfo,
- pWillInfo,
- remainingLength );
- }
- /* Read CONNACK from transport layer. */
- if( status == MQTTSuccess )
- {
- status = receiveConnack( pContext,
- timeoutMs,
- pConnectInfo->cleanSession,
- &incomingPacket,
- pSessionPresent );
- }
- if( ( status == MQTTSuccess ) && ( *pSessionPresent != true ) )
- {
- status = handleCleanSession( pContext );
- }
- if( status == MQTTSuccess )
- {
- pContext->connectStatus = MQTTConnected;
- /* Initialize keep-alive fields after a successful connection. */
- pContext->keepAliveIntervalSec = pConnectInfo->keepAliveSeconds;
- pContext->waitingForPingResp = false;
- pContext->pingReqSendTimeMs = 0U;
- }
- MQTT_POST_STATE_UPDATE_HOOK( pContext );
- }
- if( ( status == MQTTSuccess ) && ( *pSessionPresent == true ) )
- {
- /* Resend PUBRELs and PUBLISHES when reestablishing a session */
- status = handleUncleanSessionResumption( pContext );
- }
- if( status == MQTTSuccess )
- {
- LogInfo( ( "MQTT connection established with the broker." ) );
- }
- else if( ( status == MQTTStatusConnected ) || ( status == MQTTStatusDisconnectPending ) )
- {
- LogInfo( ( "MQTT Connection is either already established or a disconnect is pending, return status = %s.",
- MQTT_Status_strerror( status ) ) );
- }
- else if( pContext == NULL )
- {
- LogError( ( "MQTT connection failed with status = %s.",
- MQTT_Status_strerror( status ) ) );
- }
- else
- {
- LogError( ( "MQTT connection failed with status = %s.",
- MQTT_Status_strerror( status ) ) );
- MQTT_PRE_STATE_UPDATE_HOOK( pContext );
- if( pContext->connectStatus == MQTTConnected )
- {
- /* This will only be executed if after the connack is received
- * the retransmits fail for some reason on an unclean session
- * connection. In this case we need to retry the re-transmits
- * which can only be done using the connect API and that can only
- * be done once we are disconnected, hence we ask the user to
- * call disconnect here */
- pContext->connectStatus = MQTTDisconnectPending;
- }
- MQTT_POST_STATE_UPDATE_HOOK( pContext );
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext,
- const MQTTSubscribeInfo_t * pSubscriptionList,
- size_t subscriptionCount,
- uint16_t packetId )
- {
- MQTTConnectionStatus_t connectStatus;
- size_t remainingLength = 0UL, packetSize = 0UL;
- /* Validate arguments. */
- MQTTStatus_t status = validateSubscribeUnsubscribeParams( pContext,
- pSubscriptionList,
- subscriptionCount,
- packetId );
- if( status == MQTTSuccess )
- {
- /* Get the remaining length and packet size.*/
- status = MQTT_GetSubscribePacketSize( pSubscriptionList,
- subscriptionCount,
- &remainingLength,
- &packetSize );
- LogDebug( ( "SUBSCRIBE packet size is %lu and remaining length is %lu.",
- ( unsigned long ) packetSize,
- ( unsigned long ) remainingLength ) );
- }
- if( status == MQTTSuccess )
- {
- MQTT_PRE_STATE_UPDATE_HOOK( pContext );
- connectStatus = pContext->connectStatus;
- if( connectStatus != MQTTConnected )
- {
- status = ( connectStatus == MQTTNotConnected ) ? MQTTStatusNotConnected : MQTTStatusDisconnectPending;
- }
- if( status == MQTTSuccess )
- {
- /* Send MQTT SUBSCRIBE packet. */
- status = sendSubscribeWithoutCopy( pContext,
- pSubscriptionList,
- subscriptionCount,
- packetId,
- remainingLength );
- }
- MQTT_POST_STATE_UPDATE_HOOK( pContext );
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
- const MQTTPublishInfo_t * pPublishInfo,
- uint16_t packetId )
- {
- size_t headerSize = 0UL;
- size_t remainingLength = 0UL;
- size_t packetSize = 0UL;
- MQTTPublishState_t publishStatus = MQTTStateNull;
- MQTTConnectionStatus_t connectStatus;
- /* Maximum number of bytes required by the 'fixed' part of the PUBLISH
- * packet header according to the MQTT specifications.
- * Header byte 0 + 1 = 1
- * Length (max) + 4 = 5
- * Topic string length + 2 = 7
- *
- * Note that since publish is one of the most common operations in MQTT
- * connection, we have moved the topic string length to the 'fixed' part of
- * the header so efficiency. Otherwise, we would need an extra vector and
- * an extra call to 'send' (in case writev is not defined) to send the
- * topic length. */
- uint8_t mqttHeader[ 7U ];
- /* Validate arguments. */
- MQTTStatus_t status = validatePublishParams( pContext, pPublishInfo, packetId );
- if( status == MQTTSuccess )
- {
- /* Get the remaining length and packet size.*/
- status = MQTT_GetPublishPacketSize( pPublishInfo,
- &remainingLength,
- &packetSize );
- }
- if( status == MQTTSuccess )
- {
- status = MQTT_SerializePublishHeaderWithoutTopic( pPublishInfo,
- remainingLength,
- mqttHeader,
- &headerSize );
- }
- if( status == MQTTSuccess )
- {
- /* Take the mutex as multiple send calls are required for sending this
- * packet. */
- MQTT_PRE_STATE_UPDATE_HOOK( pContext );
- connectStatus = pContext->connectStatus;
- if( connectStatus != MQTTConnected )
- {
- status = ( connectStatus == MQTTNotConnected ) ? MQTTStatusNotConnected : MQTTStatusDisconnectPending;
- }
- if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) )
- {
- /* Set the flag so that the corresponding hook can be called later. */
- status = MQTT_ReserveState( pContext,
- packetId,
- pPublishInfo->qos );
- /* State already exists for a duplicate packet.
- * If a state doesn't exist, it will be handled as a new publish in
- * state engine. */
- if( ( status == MQTTStateCollision ) && ( pPublishInfo->dup == true ) )
- {
- status = MQTTSuccess;
- }
- }
- if( status == MQTTSuccess )
- {
- status = sendPublishWithoutCopy( pContext,
- pPublishInfo,
- mqttHeader,
- headerSize,
- packetId );
- }
- if( ( status == MQTTSuccess ) &&
- ( pPublishInfo->qos > MQTTQoS0 ) )
- {
- /* Update state machine after PUBLISH is sent.
- * Only to be done for QoS1 or QoS2. */
- status = MQTT_UpdateStatePublish( pContext,
- packetId,
- MQTT_SEND,
- pPublishInfo->qos,
- &publishStatus );
- if( status != MQTTSuccess )
- {
- LogError( ( "Update state for publish failed with status %s."
- " However PUBLISH packet was sent to the broker."
- " Any further handling of ACKs for the packet Id"
- " will fail.",
- MQTT_Status_strerror( status ) ) );
- }
- }
- /* mutex should be released and not before updating the state
- * because we need to make sure that the state is updated
- * after sending the publish packet, before the receive
- * loop receives ack for this and would want to update its state
- */
- MQTT_POST_STATE_UPDATE_HOOK( pContext );
- }
- if( status != MQTTSuccess )
- {
- LogError( ( "MQTT PUBLISH failed with status %s.",
- MQTT_Status_strerror( status ) ) );
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )
- {
- int32_t sendResult = 0;
- MQTTStatus_t status = MQTTSuccess;
- size_t packetSize = 0U;
- /* MQTT ping packets are of fixed length. */
- uint8_t pingreqPacket[ 2U ];
- MQTTFixedBuffer_t localBuffer;
- MQTTConnectionStatus_t connectStatus;
- localBuffer.pBuffer = pingreqPacket;
- localBuffer.size = sizeof( pingreqPacket );
- if( pContext == NULL )
- {
- LogError( ( "pContext is NULL." ) );
- status = MQTTBadParameter;
- }
- if( status == MQTTSuccess )
- {
- /* Get MQTT PINGREQ packet size. */
- status = MQTT_GetPingreqPacketSize( &packetSize );
- if( status == MQTTSuccess )
- {
- assert( packetSize == localBuffer.size );
- LogDebug( ( "MQTT PINGREQ packet size is %lu.",
- ( unsigned long ) packetSize ) );
- }
- else
- {
- LogError( ( "Failed to get the PINGREQ packet size." ) );
- }
- }
- if( status == MQTTSuccess )
- {
- /* Serialize MQTT PINGREQ. */
- status = MQTT_SerializePingreq( &localBuffer );
- }
- if( status == MQTTSuccess )
- {
- /* Take the mutex as the send call should not be interrupted in
- * between. */
- MQTT_PRE_STATE_UPDATE_HOOK( pContext );
- connectStatus = pContext->connectStatus;
- if( connectStatus != MQTTConnected )
- {
- status = ( connectStatus == MQTTNotConnected ) ? MQTTStatusNotConnected : MQTTStatusDisconnectPending;
- }
- if( status == MQTTSuccess )
- {
- /* Send the serialized PINGREQ packet to transport layer.
- * Here, we do not use the vectored IO approach for efficiency as the
- * Ping packet does not have numerous fields which need to be copied
- * from the user provided buffers. Thus it can be sent directly. */
- sendResult = sendBuffer( pContext,
- localBuffer.pBuffer,
- packetSize );
- /* It is an error to not send the entire PINGREQ packet. */
- if( sendResult < ( int32_t ) packetSize )
- {
- LogError( ( "Transport send failed for PINGREQ packet." ) );
- status = MQTTSendFailed;
- }
- else
- {
- pContext->pingReqSendTimeMs = pContext->lastPacketTxTime;
- pContext->waitingForPingResp = true;
- LogDebug( ( "Sent %ld bytes of PINGREQ packet.",
- ( long int ) sendResult ) );
- }
- }
- MQTT_POST_STATE_UPDATE_HOOK( pContext );
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext,
- const MQTTSubscribeInfo_t * pSubscriptionList,
- size_t subscriptionCount,
- uint16_t packetId )
- {
- MQTTConnectionStatus_t connectStatus;
- size_t remainingLength = 0UL, packetSize = 0UL;
- /* Validate arguments. */
- MQTTStatus_t status = validateSubscribeUnsubscribeParams( pContext,
- pSubscriptionList,
- subscriptionCount,
- packetId );
- if( status == MQTTSuccess )
- {
- /* Get the remaining length and packet size.*/
- status = MQTT_GetUnsubscribePacketSize( pSubscriptionList,
- subscriptionCount,
- &remainingLength,
- &packetSize );
- LogDebug( ( "UNSUBSCRIBE packet size is %lu and remaining length is %lu.",
- ( unsigned long ) packetSize,
- ( unsigned long ) remainingLength ) );
- }
- if( status == MQTTSuccess )
- {
- /* Take the mutex because the below call should not be interrupted. */
- MQTT_PRE_STATE_UPDATE_HOOK( pContext );
- connectStatus = pContext->connectStatus;
- if( connectStatus != MQTTConnected )
- {
- status = ( connectStatus == MQTTNotConnected ) ? MQTTStatusNotConnected : MQTTStatusDisconnectPending;
- }
- if( status == MQTTSuccess )
- {
- status = sendUnsubscribeWithoutCopy( pContext,
- pSubscriptionList,
- subscriptionCount,
- packetId,
- remainingLength );
- }
- MQTT_POST_STATE_UPDATE_HOOK( pContext );
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext )
- {
- size_t packetSize = 0U;
- int32_t sendResult = 0;
- MQTTStatus_t status = MQTTSuccess;
- MQTTFixedBuffer_t localBuffer;
- uint8_t disconnectPacket[ 2U ];
- MQTTConnectionStatus_t connectStatus;
- localBuffer.pBuffer = disconnectPacket;
- localBuffer.size = 2U;
- /* Validate arguments. */
- if( pContext == NULL )
- {
- LogError( ( "pContext cannot be NULL." ) );
- status = MQTTBadParameter;
- }
- if( status == MQTTSuccess )
- {
- /* Get MQTT DISCONNECT packet size. */
- status = MQTT_GetDisconnectPacketSize( &packetSize );
- LogDebug( ( "MQTT DISCONNECT packet size is %lu.",
- ( unsigned long ) packetSize ) );
- }
- if( status == MQTTSuccess )
- {
- /* Serialize MQTT DISCONNECT packet. */
- status = MQTT_SerializeDisconnect( &localBuffer );
- }
- if( status == MQTTSuccess )
- {
- /* Take the mutex because the below call should not be interrupted. */
- MQTT_PRE_STATE_UPDATE_HOOK( pContext );
- connectStatus = pContext->connectStatus;
- if( connectStatus == MQTTNotConnected )
- {
- status = MQTTStatusNotConnected;
- }
- if( status == MQTTSuccess )
- {
- LogInfo( ( "Disconnected from the broker." ) );
- pContext->connectStatus = MQTTNotConnected;
- /* Reset the index and clean the buffer on a successful disconnect. */
- pContext->index = 0;
- ( void ) memset( pContext->networkBuffer.pBuffer, 0, pContext->networkBuffer.size );
- LogError( ( "MQTT Connection Disconnected Successfully" ) );
- /* Here we do not use vectors as the disconnect packet has fixed fields
- * which do not reside in user provided buffers. Thus, it can be sent
- * using a simple send call. */
- sendResult = sendBuffer( pContext,
- localBuffer.pBuffer,
- packetSize );
- if( sendResult < ( int32_t ) packetSize )
- {
- LogError( ( "Transport send failed for DISCONNECT packet." ) );
- status = MQTTSendFailed;
- }
- else
- {
- LogDebug( ( "Sent %ld bytes of DISCONNECT packet.",
- ( long int ) sendResult ) );
- }
- }
- MQTT_POST_STATE_UPDATE_HOOK( pContext );
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- MQTTStatus_t MQTT_ProcessLoop( MQTTContext_t * pContext )
- {
- MQTTStatus_t status = MQTTBadParameter;
- if( pContext == NULL )
- {
- LogError( ( "Invalid input parameter: MQTT Context cannot be NULL." ) );
- }
- else if( pContext->getTime == NULL )
- {
- LogError( ( "Invalid input parameter: MQTT Context must have valid getTime." ) );
- }
- else if( pContext->networkBuffer.pBuffer == NULL )
- {
- LogError( ( "Invalid input parameter: The MQTT context's networkBuffer must not be NULL." ) );
- }
- else
- {
- pContext->controlPacketSent = false;
- status = receiveSingleIteration( pContext, true );
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- MQTTStatus_t MQTT_ReceiveLoop( MQTTContext_t * pContext )
- {
- MQTTStatus_t status = MQTTBadParameter;
- if( pContext == NULL )
- {
- LogError( ( "Invalid input parameter: MQTT Context cannot be NULL." ) );
- }
- else if( pContext->getTime == NULL )
- {
- LogError( ( "Invalid input parameter: MQTT Context must have a valid getTime function." ) );
- }
- else if( pContext->networkBuffer.pBuffer == NULL )
- {
- LogError( ( "Invalid input parameter: MQTT context's networkBuffer must not be NULL." ) );
- }
- else
- {
- status = receiveSingleIteration( pContext, false );
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- uint16_t MQTT_GetPacketId( MQTTContext_t * pContext )
- {
- uint16_t packetId = 0U;
- if( pContext != NULL )
- {
- MQTT_PRE_STATE_UPDATE_HOOK( pContext );
- packetId = pContext->nextPacketId;
- /* A packet ID of zero is not a valid packet ID. When the max ID
- * is reached the next one should start at 1. */
- if( pContext->nextPacketId == ( uint16_t ) UINT16_MAX )
- {
- pContext->nextPacketId = 1;
- }
- else
- {
- pContext->nextPacketId++;
- }
- MQTT_POST_STATE_UPDATE_HOOK( pContext );
- }
- return packetId;
- }
- /*-----------------------------------------------------------*/
- MQTTStatus_t MQTT_MatchTopic( const char * pTopicName,
- const uint16_t topicNameLength,
- const char * pTopicFilter,
- const uint16_t topicFilterLength,
- bool * pIsMatch )
- {
- MQTTStatus_t status = MQTTSuccess;
- bool topicFilterStartsWithWildcard = false;
- bool matchStatus = false;
- if( ( pTopicName == NULL ) || ( topicNameLength == 0u ) )
- {
- LogError( ( "Invalid paramater: Topic name should be non-NULL and its "
- "length should be > 0: TopicName=%p, TopicNameLength=%hu",
- ( void * ) pTopicName,
- ( unsigned short ) topicNameLength ) );
- status = MQTTBadParameter;
- }
- else if( ( pTopicFilter == NULL ) || ( topicFilterLength == 0u ) )
- {
- LogError( ( "Invalid paramater: Topic filter should be non-NULL and "
- "its length should be > 0: TopicName=%p, TopicFilterLength=%hu",
- ( void * ) pTopicFilter,
- ( unsigned short ) topicFilterLength ) );
- status = MQTTBadParameter;
- }
- else if( pIsMatch == NULL )
- {
- LogError( ( "Invalid paramater: Output parameter, pIsMatch, is NULL" ) );
- status = MQTTBadParameter;
- }
- else
- {
- /* Check for an exact match if the incoming topic name and the registered
- * topic filter length match. */
- if( topicNameLength == topicFilterLength )
- {
- matchStatus = strncmp( pTopicName, pTopicFilter, topicNameLength ) == 0;
- }
- if( matchStatus == false )
- {
- /* If an exact match was not found, match against wildcard characters in
- * topic filter.*/
- /* Determine if topic filter starts with a wildcard. */
- topicFilterStartsWithWildcard = ( pTopicFilter[ 0 ] == '+' ) ||
- ( pTopicFilter[ 0 ] == '#' );
- /* Note: According to the MQTT 3.1.1 specification, incoming PUBLISH topic names
- * starting with "$" character cannot be matched against topic filter starting with
- * a wildcard, i.e. for example, "$SYS/sport" cannot be matched with "#" or
- * "+/sport" topic filters. */
- if( !( ( pTopicName[ 0 ] == '$' ) && ( topicFilterStartsWithWildcard == true ) ) )
- {
- matchStatus = matchTopicFilter( pTopicName, topicNameLength, pTopicFilter, topicFilterLength );
- }
- }
- /* Update the output parameter with the match result. */
- *pIsMatch = matchStatus;
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- MQTTStatus_t MQTT_GetSubAckStatusCodes( const MQTTPacketInfo_t * pSubackPacket,
- uint8_t ** pPayloadStart,
- size_t * pPayloadSize )
- {
- MQTTStatus_t status = MQTTSuccess;
- if( pSubackPacket == NULL )
- {
- LogError( ( "Invalid parameter: pSubackPacket is NULL." ) );
- status = MQTTBadParameter;
- }
- else if( pPayloadStart == NULL )
- {
- LogError( ( "Invalid parameter: pPayloadStart is NULL." ) );
- status = MQTTBadParameter;
- }
- else if( pPayloadSize == NULL )
- {
- LogError( ( "Invalid parameter: pPayloadSize is NULL." ) );
- status = MQTTBadParameter;
- }
- else if( pSubackPacket->type != MQTT_PACKET_TYPE_SUBACK )
- {
- LogError( ( "Invalid parameter: Input packet is not a SUBACK packet: "
- "ExpectedType=%02x, InputType=%02x",
- ( int ) MQTT_PACKET_TYPE_SUBACK,
- ( int ) pSubackPacket->type ) );
- status = MQTTBadParameter;
- }
- else if( pSubackPacket->pRemainingData == NULL )
- {
- LogError( ( "Invalid parameter: pSubackPacket->pRemainingData is NULL" ) );
- status = MQTTBadParameter;
- }
- /* A SUBACK must have a remaining length of at least 3 to accommodate the
- * packet identifier and at least 1 return code. */
- else if( pSubackPacket->remainingLength < 3U )
- {
- LogError( ( "Invalid parameter: Packet remaining length is invalid: "
- "Should be greater than 2 for SUBACK packet: InputRemainingLength=%lu",
- ( unsigned long ) pSubackPacket->remainingLength ) );
- status = MQTTBadParameter;
- }
- else
- {
- /* According to the MQTT 3.1.1 protocol specification, the "Remaining Length" field is a
- * length of the variable header (2 bytes) plus the length of the payload.
- * Therefore, we add 2 positions for the starting address of the payload, and
- * subtract 2 bytes from the remaining length for the length of the payload.*/
- *pPayloadStart = &pSubackPacket->pRemainingData[ sizeof( uint16_t ) ];
- *pPayloadSize = pSubackPacket->remainingLength - sizeof( uint16_t );
- }
- return status;
- }
- /*-----------------------------------------------------------*/
- const char * MQTT_Status_strerror( MQTTStatus_t status )
- {
- const char * str = NULL;
- switch( status )
- {
- case MQTTSuccess:
- str = "MQTTSuccess";
- break;
- case MQTTBadParameter:
- str = "MQTTBadParameter";
- break;
- case MQTTNoMemory:
- str = "MQTTNoMemory";
- break;
- case MQTTSendFailed:
- str = "MQTTSendFailed";
- break;
- case MQTTRecvFailed:
- str = "MQTTRecvFailed";
- break;
- case MQTTBadResponse:
- str = "MQTTBadResponse";
- break;
- case MQTTServerRefused:
- str = "MQTTServerRefused";
- break;
- case MQTTNoDataAvailable:
- str = "MQTTNoDataAvailable";
- break;
- case MQTTIllegalState:
- str = "MQTTIllegalState";
- break;
- case MQTTStateCollision:
- str = "MQTTStateCollision";
- break;
- case MQTTKeepAliveTimeout:
- str = "MQTTKeepAliveTimeout";
- break;
- case MQTTNeedMoreBytes:
- str = "MQTTNeedMoreBytes";
- break;
- case MQTTStatusConnected:
- str = "MQTTStatusConnected";
- break;
- case MQTTStatusNotConnected:
- str = "MQTTStatusNotConnected";
- break;
- case MQTTStatusDisconnectPending:
- str = "MQTTStatusDisconnectPending";
- break;
- case MQTTPublishStoreFailed:
- str = "MQTTPublishStoreFailed";
- break;
- case MQTTPublishRetrieveFailed:
- str = "MQTTPublishRetrieveFailed";
- break;
- default:
- str = "Invalid MQTT Status code";
- break;
- }
- return str;
- }
- /*-----------------------------------------------------------*/
- size_t MQTT_GetBytesInMQTTVec( const MQTTVec_t * pVec )
- {
- size_t memoryRequired = 0;
- size_t i;
- const TransportOutVector_t * pTransportVec = pVec->pVector;
- size_t vecLen = pVec->vectorLen;
- for( i = 0; i < vecLen; i++ )
- {
- memoryRequired += pTransportVec[ i ].iov_len;
- }
- return memoryRequired;
- }
- /*-----------------------------------------------------------*/
- void MQTT_SerializeMQTTVec( uint8_t * pAllocatedMem,
- const MQTTVec_t * pVec )
- {
- const TransportOutVector_t * pTransportVec = pVec->pVector;
- const size_t vecLen = pVec->vectorLen;
- size_t index = 0;
- size_t i = 0;
- for( i = 0; i < vecLen; i++ )
- {
- ( void ) memcpy( ( void * ) &pAllocatedMem[ index ], ( const void * ) pTransportVec[ i ].iov_base, pTransportVec[ i ].iov_len );
- index += pTransportVec[ i ].iov_len;
- }
- }
- /*-----------------------------------------------------------*/
|