1#ifndef __NBMQTT_INTERNAL_H
2#define __NBMQTT_INTERNAL_H
8#include <webclient/http_funcs.h>
11#include <iointernal.h>
13#define MQTT_MAX_SESSIONS (8)
14#define MQTT_OBJ_PRIORITY_COUNT (16)
16#define MQTT_MATCH_HANDLER_MASK_SIZE ((MQTT_MAX_HANDLERS_PER_CONNECTION/32) +((MQTT_MAX_HANDLERS_PER_CONNECTION&0x1F)!=0))
18#define __MQTT_STR(x) #x
19#define __MQTT_XSTR(x) __MQTT_STR(x)
21#if (MQTT_MAX_SUPPORTED_IN_FLIGHT > MQTT_MAX_MAX_SUPPORTED_IN_FLIGHT)
22#error __MQTT_STR(MQTT_MAX_SUPPORTED_IN_FLIGHT) ## " must be less than " ## __MQTT_XSTR(MQTT_MAX_MAX_SUPPORTED_IN_FLIGHT)
25#define MQTT_STATE_FREE 0x0
26#define MQTT_STATE_ALLOCED 0x1
27#define MQTT_STATE_CONN_SENT 0x2
28#define MQTT_STATE_ESTABLISHED 0x3
44 eState_ConnectOnStart,
53 eState_Authenticating,
74 ePktState_NeedReceived,
75 ePktState_WritingReceived,
76 ePktState_NeedRelease,
77 ePktState_WritingRelease,
78 ePktState_NeedComplete,
79 ePktState_WritingComplete,
82struct MqttAckTracker {
91struct HandlerMatchMask {
92 uint32_t mask[MQTT_MATCH_HANDLER_MASK_SIZE];
95 void Assign(
const HandlerMatchMask &rhs);
96 void SetFrom(
const HandlerMatchMask &rhs);
97 HandlerMatchMask & operator=(
const HandlerMatchMask &rhs);
98 HandlerMatchMask & operator|=(
const HandlerMatchMask &rhs);
104 HandlerMatchMask matches;
107struct ffBufInterposer {
123 Pkt::eType_t expecting;
124 uint8_t subHandlerIdx;
138class FDInfo_fdWrBufInterposer :
public FDInfo
141 FDInfo_fdWrBufInterposer(
const char *name = NULL);
143 virtual int GetFdBelow(
int fd)
override;
156 PublishRequest *head;
157 PublishRequest *tail;
160struct SubUnsubQueue {
161 SubUnsubQueueMsg *head;
162 SubUnsubQueueMsg *tail;
170 ConnectRequest *connReq;
178 FdInterposer interposer;
192 uint32_t sessionStart;
193 uint32_t sessionTime;
194 tick_t needKeepAliveBy;
195 uint16_t keepAliveInterval;
196 uint16_t topicAliases_max;
197 uint16_t topicAliases_inUse;
198 Pkt::eReasonCode_t reason_disconn;
203 bool sharedSubsAvail:1;
206 uint16_t sessExpIntvl;
209 eClientState_t txState;
210 eClientState_t rxState;
218 uint32_t txInternalBuffersLen;
220 bool txBuffersAlreadyElevated;
221 bool txBuffersNeedReduced;
222 int txBufferSpaceAvail;
224 uint16_t rxNeedsToAckId;
225 Pkt::eType_t rxSendAckType;
226 Pkt::eReasonCode_t rxAckReason;
231 uint16_t nextHandlerIdx;
233 RateBucket rateLimit_TxPublish;
234 RateBucket rateLimit_Subscribe;
242 ConnAckCb_t connAckCb;
244 DisconnCb_t disconnCb;
246 uint32_t pubReqQueueWaiting;
247 uint32_t pubReqQueueActive;
249 SubUnsubQueue subReqQueue;
251 PacketInfo::Msg rxHdr;
252 HandlerMatchMask rxPubMatches;
258 Session(Client *owner);
261 inline Client *getOwner() {
return pOwner; }
263 static void fdNotify(
int fd,
FDChangeType change,
void *pData);
266 void fillRateBuckets();
268 int MakeTxSpace(uint32_t need, TickTimeout &t);
269 void SkipInterposerData(
int nbytes);
270 void ReadyInterposerData(
int nbytes);
272 int WaitForTxQuota(TickTimeout &t);
273 int MakeTcpTxSpace(uint32_t need, TickTimeout &t,
bool force =
false);
274 int WaitForSubscribeQuota(TickTimeout &t);
276 void SetInterposerAvailSpace(
int availSpace);
278 MQTT::eResult_t RegisterTopicHandler(TopicHandler *newHandler, uint16_t *retIdx);
279 int GetFreeHandlerIdx();
283 void RegisterRxAlias(
NBString *topicName, uint16_t aliasIdx);
284 void SetMatchingHandlers(
NBString *topicName, HandlerMatchMask *matches);
288 TxAckCtx * addTxAckCtx(uint16_t
id);
289 TxAckCtx * getTxAckCtx(uint16_t
id);
290 bool addRxPktIdCtx(uint16_t pktId);
291 int getRxPktIdCtx(uint16_t pktId);
292 int removeRxPktId(uint16_t pktId);
294 int QueueSubUnsub(SubUnsubQueueMsg *qmsg, TickTimeout &t);
295 int DequeueSubUnsub(SubUnsubQueueMsg *qmsg, TickTimeout &t);
296 int PublishFromQueue(TickTimeout &t);
297 int QueuePublish(PublishRequest *req,
int *prio, TickTimeout &t);
298 int DequeuePublish(PublishRequest *req,
int prio, TickTimeout &t);
307 int RxMsg_PubAck(Pkt::eType_t type);
308 int RxMsg_Disconnect();
311 int SendMsg_Connect(ConnectRequest *req, TickTimeout &t);
312 int SendMsg_Publish(PublishRequest *req, TickTimeout &t,
bool force =
false);
313 int SendMsg_Subscribe_UserProperties(SubscribeRequest *req);
314 int SendMsg_Subscribe_Filters(SubscribeRequest *req);
315 int SendMsg_Queued_SubUnsub(TickTimeout &t);
316 int SendMsg_SubUnsub(SubUnsubQueueMsg *qmsg, TickTimeout &t);
317 int SendMsg_Ack(TickTimeout &t);
318 int SendMsg_Disconnect(TickTimeout &t);
320 int CleanupConnection();
322 int RxMsg_ConnAck_Properties();
324 int RxMsg_ConnAck_Property_ClientId(
326 int RxMsg_ConnAck_Property_KeepAlive(
328 int RxMsg_ConnAck_Property_AuthMethod(
330 int RxMsg_ConnAck_Property_AuthData(
332 int RxMsg_ConnAck_Property_ResponseInfo(
334 int RxMsg_ConnAck_Property_ServerReference(
336 int RxMsg_ConnAck_Property_ReasonString(
338 int RxMsg_ConnAck_Property_ReceiveMax(
340 int RxMsg_ConnAck_Property_TopicAliasMax(
342 int RxMsg_ConnAck_Property_MaximumQoS(
344 int RxMsg_ConnAck_Property_RetainAvail(
346 int RxMsg_ConnAck_Property_UserProperty(
348 int RxMsg_ConnAck_Property_MaxPktSize(
350 int RxMsg_ConnAck_Property_WildSubAvail(
352 int RxMsg_ConnAck_Property_SubIdAvail(
354 int RxMsg_ConnAck_Property_SharedSubsAvail(
357 int RxMsg_Publish_Properties();
361 int RxMsg_Disconn_Properties();
362 int RxMsg_Disconn_Property_SessionExpirary(
364 int RxMsg_Disconn_Property_ReasonString(
366 int RxMsg_Disconn_Property_ServerReference(
369 int writeProperty(
const RequestProperty *prop, uint32_t *propsLen);
370 int SendMsg_PropertiesBuf(
const uint8_t *propBuf, uint32_t propsLen);
371 int SendMsg_PropertyList(RequestProperty *propList, uint32_t propsLen);
372 int SendMsg_Payload_PoolPtrList(
PoolPtr pp, uint32_t payloadLen);
375const char * GetString_State(MQTT::internal::eClientState_t state);
Lightweight alternative to C++ CString class.
Definition nbstring.h:118
Iterator for non-destructive examination of buffered data.
Definition buffers.h:532
FIFO buffer storage using linked pool buffers.
Definition buffers.h:443
FDChangeType
The notifications that a registered FD monitor can receive.
Definition iosys.h:439
#define MQTT_MAX_SUPPORTED_RX_TOPIC_ALIASES
Definition mqtt.h:59
#define MQTT_MAX_SUPPORTED_IN_FLIGHT
Definition mqtt.h:57
#define MQTT_OBJ_PRIORITY_COUNT
Definition mqtt.h:51
#define MQTT_MAX_HANDLERS_PER_CONNECTION
Definition mqtt.h:58
MQTT Namespace.
Definition mqtt.h:81
eResult_t
MQTT library return codes.
Definition mqtt.h:85
Main buffer structure for network and serial communication.
Definition buffers.h:90