NetBurner 3.5.6
PDF Version
mqtt_internal.h
1#ifndef __NBMQTT_INTERNAL_H
2#define __NBMQTT_INTERNAL_H
3/*NB_REVISION*/
4
5/*NB_COPYRIGHT*/
6#include <predef.h>
7#include <nbrtos.h>
8#include <webclient/http_funcs.h>
9#include <mqtt/mqtt.h>
10#include <iosys.h>
11#include <iointernal.h>
12
13#define MQTT_MAX_SESSIONS (8)
14#define MQTT_OBJ_PRIORITY_COUNT (16)
15
16#define MQTT_MATCH_HANDLER_MASK_SIZE ((MQTT_MAX_HANDLERS_PER_CONNECTION/32) +((MQTT_MAX_HANDLERS_PER_CONNECTION&0x1F)!=0))
17
18#define __MQTT_STR(x) #x
19#define __MQTT_XSTR(x) __MQTT_STR(x)
20
21#if (MQTT_MAX_SUPPORTED_IN_FLIGHT > MQTT_MAX_MAX_SUPPORTED_IN_FLIGHT)
22#error __MQTT_STR(MQTT_MAX_SUPPORTED_IN_FLIGHT) ## " must be less than " ## __MQTT_XSTR(MQTT_MAX_MAX_SUPPORTED_IN_FLIGHT)
23#endif
24
25#define MQTT_STATE_FREE 0x0
26#define MQTT_STATE_ALLOCED 0x1
27#define MQTT_STATE_CONN_SENT 0x2
28#define MQTT_STATE_ESTABLISHED 0x3
29
30namespace MQTT {
31class Client;
32
33namespace internal {
34enum {
35 eTransport_TCP,
36 eTransport_TLS,
37 eTransport_WS,
38 eTransport_WSS
39};
40
41typedef enum {
42 eState_Free,
43 eState_Alloced,
44 eState_ConnectOnStart,
45 eState_Inited,
46 eState_StartConnect,
47 eState_NetConnected,
48 eState_NoBrokerURI,
49 eState_Aborted,
50 eState_Reconnect,
51 eState_Connecting,
52 eState_ConnAck,
53 eState_Authenticating,
54 eState_Disconnecting,
55 eState_Closing,
56 eState_Established,
57 eState_TxObjPub,
58 eState_TxSubUnsub,
59 eState_TxPub,
60 eState_TxObjSubUnsub,
61 eState_MsgFixed,
62 eState_MsgDispatch,
63 eState_MsgAck,
64 eState_PingReq,
65 eState_PingResp,
66 eState_ProtocolError,
67} eClientState_t;
68
69typedef enum {
70 ePktState_Done,
71 ePktState_Writing,
72 ePktState_NeedAck,
73 ePktState_WritingAck,
74 ePktState_NeedReceived,
75 ePktState_WritingReceived,
76 ePktState_NeedRelease,
77 ePktState_WritingRelease,
78 ePktState_NeedComplete,
79 ePktState_WritingComplete,
80} ePacketState_t;
81
82struct MqttAckTracker {
83 uint16_t window;
84 uint16_t inFlight[MQTT_MAX_SUPPORTED_IN_FLIGHT];
85 uint8_t nextIdx;
86 uint8_t lastIdx;
87
88};
89
90
91struct HandlerMatchMask {
92 uint32_t mask[MQTT_MATCH_HANDLER_MASK_SIZE];
93 int num;
94 void reset();
95 void Assign(const HandlerMatchMask &rhs);
96 void SetFrom(const HandlerMatchMask &rhs);
97 HandlerMatchMask & operator=(const HandlerMatchMask &rhs);
98 HandlerMatchMask & operator|=(const HandlerMatchMask &rhs);
99};
100
101
102struct RxTopicAlias {
103 NBString name;
104 HandlerMatchMask matches;
105};
106
107struct ffBufInterposer {
108 PoolPtr ppCurr;
109 uint8_t *pCurr;
110 int wrFd;
111 int thisFd;
112 bool closed;
113 uint32_t offset;
114
115 PoolPtr ppStart;
116 uint8_t *origStart;
117 PoolPtr ppEnd;
118 uint8_t *end;
119};
120
121struct TxAckCtx {
122 uint16_t id;
123 Pkt::eType_t expecting;
124 uint8_t subHandlerIdx;
125 AckCbCtx ctx;
126};
127
128struct Session;
129
130struct FdInterposer {
131 Session *pOwner;
132 int baseFd;
133 uint32_t availSpace;
134 uint32_t remLen;
135 bool closed;
136};
137
138class FDInfo_fdWrBufInterposer : public FDInfo
139{
140public:
141 FDInfo_fdWrBufInterposer(const char *name = NULL);
142
143 virtual int GetFdBelow(int fd) override;
144};
145
146
147struct RateBucket {
148 uint8_t bucket;
149 uint8_t max;
150 uint8_t count;
151 uint8_t ticks;
152 tick_t lastFill;
153};
154
155struct PublishQueue {
156 PublishRequest *head;
157 PublishRequest *tail;
158};
159
160struct SubUnsubQueue {
161 SubUnsubQueueMsg *head;
162 SubUnsubQueueMsg *tail;
163};
164
165struct Session {
166 Client *pOwner;
167 OS_CRIT wrCrit;
168 OS_CRIT qCrit;
169 OS_CRIT rdCrit;
170 ConnectRequest *connReq;
171 NBString reasonStr;
172 NBString respInfo;
173 NBString serverRef;
174 NBString authMethod;
175 NBString rxPubTopic;
176 fifo_buffer_storage authData;
177
178 FdInterposer interposer;
179 int transportFd;
180 int tcpTxFd;
181 int rxFd;
182 int interFd;
183 int notifFd;
184 int dnsFd;
185 bool txWasBlocked;
186 int transportType;
187 fifo_buffer_storage *fdRxBuf;
188 fifo_buffer_storage tlsRxBuf;
189
190 uint32_t txMaxSize;
191 uint32_t rxMaxSize;
192 uint32_t sessionStart;
193 uint32_t sessionTime;
194 tick_t needKeepAliveBy;
195 uint16_t keepAliveInterval;
196 uint16_t topicAliases_max;
197 uint16_t topicAliases_inUse;
198 Pkt::eReasonCode_t reason_disconn;
199
200 bool retainAvail:1;
201 bool wildSubAvail:1;
202 bool subIdsAvail:1;
203 bool sharedSubsAvail:1;
204 bool updateFdSets:1;
205
206 uint16_t sessExpIntvl;
207 uint8_t maxQoS;
208
209 eClientState_t txState;
210 eClientState_t rxState;
211
212 int txNeedSpace;
213 int rxNeedData;
214
215#ifdef TCP_NOCOPY_TX
216 // the amount of space allocated within each PoolBuffer in the tcp TxBuffer
217 // for data (in the event of TCP_TX_NOCOPY it's less than ETHER_BUFFER_SIZE)
218 uint32_t txInternalBuffersLen;
219#endif
220 bool txBuffersAlreadyElevated;
221 bool txBuffersNeedReduced;
222 int txBufferSpaceAvail;
223
224 uint16_t rxNeedsToAckId;
225 Pkt::eType_t rxSendAckType;
226 Pkt::eReasonCode_t rxAckReason;
227
228 uint16_t txWindow;
229 uint16_t rxWindow;
230 beuint16_t seqNum;
231 uint16_t nextHandlerIdx;
232
233 RateBucket rateLimit_TxPublish;
234 RateBucket rateLimit_Subscribe;
235
236 int logFd;
237 int logLevel;
238
239// MqttPublisher *currPublisher;
240// MqttSubHandler *currSubHandle;
241
242 ConnAckCb_t connAckCb;
243 void *connAckCtx;
244 DisconnCb_t disconnCb;
245
246 uint32_t pubReqQueueWaiting;
247 uint32_t pubReqQueueActive;
248 PublishQueue pubReqQueue[MQTT_OBJ_PRIORITY_COUNT];
249 SubUnsubQueue subReqQueue;
250
251 PacketInfo::Msg rxHdr;
252 HandlerMatchMask rxPubMatches;
253 RxTopicAlias rxAliases[MQTT_MAX_SUPPORTED_RX_TOPIC_ALIASES];
254 TxAckCtx txNotAcked[MQTT_MAX_SUPPORTED_IN_FLIGHT];
255 uint16_t rxNotReleased[MQTT_MAX_SUPPORTED_IN_FLIGHT];
256 TopicHandler handlers[MQTT_MAX_HANDLERS_PER_CONNECTION];
257
258 Session(Client *owner);
259 ~Session() {};
260 int netConnect();
261 inline Client *getOwner() { return pOwner; }
262
263 static void fdNotify(int fd, FDChangeType change, void *pData);
264 void DnsNotify();
265
266 void fillRateBuckets();
267 void ResetState();
268 int MakeTxSpace(uint32_t need, TickTimeout &t);
269 void SkipInterposerData(int nbytes);
270 void ReadyInterposerData(int nbytes);
271
272 int WaitForTxQuota(TickTimeout &t);
273 int MakeTcpTxSpace(uint32_t need, TickTimeout &t, bool force = false);
274 int WaitForSubscribeQuota(TickTimeout &t);
275
276 void SetInterposerAvailSpace(int availSpace);
277
278 MQTT::eResult_t RegisterTopicHandler(TopicHandler *newHandler, uint16_t *retIdx);
279 int GetFreeHandlerIdx();
280 MQTT::eResult_t RemoveTopicHandler(uint16_t handlerIdx);
281 MQTT::eResult_t RemoveTopicHandlerByFilter(const NBString &filter, uint16_t handlerIdx, bool strict = true);
282 MQTT::eResult_t SetRxMatchesFromAlias(uint16_t alias);
283 void RegisterRxAlias(NBString *topicName, uint16_t aliasIdx);
284 void SetMatchingHandlers(NBString *topicName, HandlerMatchMask *matches);
285
286 int GetNewPktId();
287
288 TxAckCtx * addTxAckCtx(uint16_t id);
289 TxAckCtx * getTxAckCtx(uint16_t id);
290 bool addRxPktIdCtx(uint16_t pktId);
291 int getRxPktIdCtx(uint16_t pktId);
292 int removeRxPktId(uint16_t pktId);
293
294 int QueueSubUnsub(SubUnsubQueueMsg *qmsg, TickTimeout &t);
295 int DequeueSubUnsub(SubUnsubQueueMsg *qmsg, TickTimeout &t);
296 int PublishFromQueue(TickTimeout &t);
297 int QueuePublish(PublishRequest *req, int *prio, TickTimeout &t);
298 int DequeuePublish(PublishRequest *req, int prio, TickTimeout &t);
299
300 int RxFixedHeader();
301 int RxMsg();
302
303
304 int RxMsg_ConnAck();
305 int RxMsg_Publish();
306 int RxMsg_PubRel();
307 int RxMsg_PubAck(Pkt::eType_t type);
308 int RxMsg_Disconnect();
309
310
311 int SendMsg_Connect(ConnectRequest *req, TickTimeout &t);
312 int SendMsg_Publish(PublishRequest *req, TickTimeout &t, bool force = false);
313 int SendMsg_Subscribe_UserProperties(SubscribeRequest *req);
314 int SendMsg_Subscribe_Filters(SubscribeRequest *req);
315 int SendMsg_Queued_SubUnsub(TickTimeout &t);
316 int SendMsg_SubUnsub(SubUnsubQueueMsg *qmsg, TickTimeout &t);
317 int SendMsg_Ack(TickTimeout &t);
318 int SendMsg_Disconnect(TickTimeout &t);
319
320 int CleanupConnection();
321
322 int RxMsg_ConnAck_Properties();
323 int RxMsg_ConnAck_SessionExpirary(fifo_buffer_storage::PeekIterator &pkr, uint32_t &remProps);
324 int RxMsg_ConnAck_Property_ClientId(
325 fifo_buffer_storage::PeekIterator &pkr, uint32_t &remProps);
326 int RxMsg_ConnAck_Property_KeepAlive(
327 fifo_buffer_storage::PeekIterator &pkr, uint32_t &remProps);
328 int RxMsg_ConnAck_Property_AuthMethod(
329 fifo_buffer_storage::PeekIterator &pkr, uint32_t &remProps);
330 int RxMsg_ConnAck_Property_AuthData(
331 fifo_buffer_storage::PeekIterator &pkr, uint32_t &remProps);
332 int RxMsg_ConnAck_Property_ResponseInfo(
333 fifo_buffer_storage::PeekIterator &pkr, uint32_t &remProps);
334 int RxMsg_ConnAck_Property_ServerReference(
335 fifo_buffer_storage::PeekIterator &pkr, uint32_t &remProps);
336 int RxMsg_ConnAck_Property_ReasonString(
337 fifo_buffer_storage::PeekIterator &pkr, uint32_t &remProps);
338 int RxMsg_ConnAck_Property_ReceiveMax(
339 fifo_buffer_storage::PeekIterator &pkr, uint32_t &remProps);
340 int RxMsg_ConnAck_Property_TopicAliasMax(
341 fifo_buffer_storage::PeekIterator &pkr, uint32_t &remProps);
342 int RxMsg_ConnAck_Property_MaximumQoS(
343 fifo_buffer_storage::PeekIterator &pkr, uint32_t &remProps);
344 int RxMsg_ConnAck_Property_RetainAvail(
345 fifo_buffer_storage::PeekIterator &pkr, uint32_t &remProps);
346 int RxMsg_ConnAck_Property_UserProperty(
347 fifo_buffer_storage::PeekIterator &pkr, uint32_t &remProps);
348 int RxMsg_ConnAck_Property_MaxPktSize(
349 fifo_buffer_storage::PeekIterator &pkr, uint32_t &remProps);
350 int RxMsg_ConnAck_Property_WildSubAvail(
351 fifo_buffer_storage::PeekIterator &pkr, uint32_t &remProps);
352 int RxMsg_ConnAck_Property_SubIdAvail(
353 fifo_buffer_storage::PeekIterator &pkr, uint32_t &remProps);
354 int RxMsg_ConnAck_Property_SharedSubsAvail(
355 fifo_buffer_storage::PeekIterator &pkr, uint32_t &remProps);
356
357 int RxMsg_Publish_Properties();
358 int RxMsg_Publish_SubId(fifo_buffer_storage::PeekIterator &pkr, uint32_t &remProps);
359 int RxMsg_Publish_TopicAlias(fifo_buffer_storage::PeekIterator &pkr, uint32_t &remProps);
360
361 int RxMsg_Disconn_Properties();
362 int RxMsg_Disconn_Property_SessionExpirary(
363 fifo_buffer_storage::PeekIterator &pkr, uint32_t &remProps);
364 int RxMsg_Disconn_Property_ReasonString(
365 fifo_buffer_storage::PeekIterator &pkr, uint32_t &remProps);
366 int RxMsg_Disconn_Property_ServerReference(
367 fifo_buffer_storage::PeekIterator &pkr, uint32_t &remProps);
368
369 int writeProperty(const RequestProperty *prop, uint32_t *propsLen);
370 int SendMsg_PropertiesBuf(const uint8_t *propBuf, uint32_t propsLen);
371 int SendMsg_PropertyList(RequestProperty *propList, uint32_t propsLen);
372 int SendMsg_Payload_PoolPtrList(PoolPtr pp, uint32_t payloadLen);
373};
374
375const char * GetString_State(MQTT::internal::eClientState_t state);
376
377}
378}
379
380#endif /* ----- #ifndef __NBMQTT_INTERNAL_H ----- */
Lightweight alternative to C++ CString class.
Definition nbstring.h:118
Iterator for non-destructive examination of buffered data.
Definition buffers.h:532
FIFO buffer storage using linked pool buffers.
Definition buffers.h:443
FDChangeType
The notifications that a registered FD monitor can receive.
Definition iosys.h:439
#define MQTT_MAX_SUPPORTED_RX_TOPIC_ALIASES
Definition mqtt.h:59
#define MQTT_MAX_SUPPORTED_IN_FLIGHT
Definition mqtt.h:57
#define MQTT_OBJ_PRIORITY_COUNT
Definition mqtt.h:51
#define MQTT_MAX_HANDLERS_PER_CONNECTION
Definition mqtt.h:58
MQTT Namespace.
Definition mqtt.h:81
eResult_t
MQTT library return codes.
Definition mqtt.h:85
Main buffer structure for network and serial communication.
Definition buffers.h:90