Skip to content

Commit 0510219

Browse files
committed
update mqtt
1 parent 39e4620 commit 0510219

8 files changed

Lines changed: 947 additions & 197 deletions

File tree

src/UserGuide/Master/Table/API/Programming-MQTT.md

Lines changed: 49 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,27 @@
2222

2323
## 1. Overview
2424

25-
[MQTT](Message Queuing Telemetry Transport)(http://mqtt.org/) is a lightweight messaging protocol designed for IoT and low-bandwidth environments. It operates on a Publish/Subscribe (Pub/Sub) model, enabling efficient and reliable bidirectional communication between devices. Its core objectives are low power consumption, minimal bandwidth usage, and high real-time performance, making it ideal for unstable networks or resource-constrained scenarios (e.g., sensors, mobile devices).
25+
MQTT (Message Queuing Telemetry Transport) is a lightweight messaging protocol designed for IoT and low-bandwidth environments. It operates on a Publish/Subscribe (Pub/Sub) model, enabling efficient and reliable bidirectional communication between devices. Its core objectives are low power consumption, minimal bandwidth usage, and high real-time performance, making it ideal for unstable networks or resource-constrained scenarios (e.g., sensors, mobile devices).
2626

2727
IoTDB provides deep integration with the MQTT protocol, fully compliant with MQTT v3.1 (OASIS International Standard). The IoTDB server includes a built-in high-performance MQTT Broker module, eliminating the need for third-party middleware. Devices can directly write time-series data into the IoTDB storage engine via MQTT messages.
2828

29-
<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="/img/github/78357432-0c71cf80-75e4-11ea-98aa-c43a54d469ce.png">
29+
![](/img/mqtt-table-en-1.png)
3030

3131

32+
## 2. Configuration
3233

33-
## 2. Built-in MQTT Service
34-
The Built-in MQTT Service provide the ability of direct connection to IoTDB through MQTT. It listen the publish messages from MQTT clients
35-
and then write the data into storage immediately.
36-
The MQTT topic corresponds to IoTDB timeseries.The first segment of the MQTT topic (split by `/`) is used as the database name.The table name is derived from the `<measurement>` in the line protocol.
37-
The messages payload can be format to events by `PayloadFormatter` which loaded by java SPI, and the implementation of `PayloadFormatter` for table is `LinePayloadFormatter`.
38-
The following is the line protocol syntax of MQTT message payload and an example:
34+
By default, the IoTDB MQTT service loads configurations from `${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties`.
35+
36+
| **Property** | **Description** | **Default** |
37+
| ------------------------ | ---------------------------------------------------------------------------------------------------------------------- | ------------------- |
38+
| `enable_mqtt_service` | Enable/ disable the MQTT service. | FALSE |
39+
| `mqtt_host` | Host address bound to the MQTT service. | 127.0.0.1 |
40+
| `mqtt_port` | Port bound to the MQTT service. | 1883 |
41+
| `mqtt_handler_pool_size` | Thread pool size for processing MQTT messages. | 1 |
42+
| **`mqtt_payload_formatter`** | **Formatting method for MQTT message payloads. ​****Options: `json` (tree model), `line` (table model).** | **json** |
43+
| `mqtt_max_message_size` | Maximum allowed MQTT message size (bytes). | 1048576 |
44+
45+
## 3. Write Protocol
3946

4047
* Line Protocol Syntax
4148

@@ -49,23 +56,7 @@ The following is the line protocol syntax of MQTT message payload and an example
4956
myMeasurement,tag1=value1,tag2=value2 attr1=value1,attr2=value2 fieldKey="fieldValue" 1556813561098000000
5057
```
5158

52-
53-
54-
## 3. MQTT Configurations
55-
56-
By default, the IoTDB MQTT service loads configurations from `${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties`.
57-
58-
Configurations are as follows:
59-
60-
| **Property** | **Description** | **Default** |
61-
| ------------------------ | ---------------------------------------------------------------------------------------------------------------------- |-------------|
62-
| `enable_mqtt_service` | Enable/ disable the MQTT service. | false |
63-
| `mqtt_host` | Host address bound to the MQTT service. | 127.0.0.1 |
64-
| `mqtt_port` | Port bound to the MQTT service. | 1883 |
65-
| `mqtt_handler_pool_size` | Thread pool size for processing MQTT messages. | 1 |
66-
| **`mqtt_payload_formatter`** | **Formatting method for MQTT message payloads. ​****Options: `json` (tree model), `line` (table model).** | **json** |
67-
| `mqtt_max_message_size` | Maximum allowed MQTT message size (bytes). | 1048576 |
68-
59+
![](/img/mqtt-table-en-2.png)
6960

7061
## 4. Naming Conventions
7162

@@ -102,24 +93,47 @@ The table name is derived from the `<measurement>` in the line protocol.
10293
## 5. Coding Examples
10394
The following is an example which a mqtt client send messages to IoTDB server.
10495

105-
```java
96+
```java
10697
MQTT mqtt = new MQTT();
10798
mqtt.setHost("127.0.0.1", 1883);
10899
mqtt.setUserName("root");
109100
mqtt.setPassword("root");
110101

111102
BlockingConnection connection = mqtt.blockingConnection();
103+
String DATABASE = "myMqttTest";
112104
connection.connect();
113105

114-
for (int i = 0; i < 10; i++) {
115-
String payload = String.format("test%d,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"value1\",field2=1i,field3=2u,field4=3i32,field5=t,field6=false,field7=4,field8=5f 1", random.nextDouble());
116-
117-
connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
118-
}
106+
String payload =
107+
"test1,tag1=t1,tag2=t2 attr3=a5,attr4=a4 field1=\"fieldValue1\",field2=1i,field3=1u 1";
108+
connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
109+
Thread.sleep(10);
110+
111+
payload = "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 2";
112+
connection.publish(DATABASE, payload.getBytes(), QoS.AT_LEAST_ONCE, false);
113+
Thread.sleep(10);
114+
115+
payload = "# It's a remark\n " + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 6";
116+
connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
117+
Thread.sleep(10);
118+
119+
//batch write example
120+
payload =
121+
"test1,tag1=t1,tag2=t2 field7=t,field8=T,field9=true 3 \n "
122+
+ "test1,tag1=t1,tag2=t2 field7=f,field8=F,field9=FALSE 4";
123+
connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
124+
Thread.sleep(10);
125+
126+
//batch write example
127+
payload =
128+
"test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"fieldValue1\",field2=1i,field3=1u 4 \n "
129+
+ "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 5";
130+
connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
131+
Thread.sleep(10);
119132

120133
connection.disconnect();
134+
```
135+
121136

122-
```
123137

124138
## 6. Customize your MQTT Message Format
125139

@@ -166,10 +180,10 @@ public class CustomizedLinePayloadFormatter implements PayloadFormatter {
166180
for (int i = 0; i < 3; i++) {
167181
long ts = i;
168182
TableMessage message = new TableMessage();
169-
183+
170184
// Parsing Database Name
171185
message.setDatabase("db" + i);
172-
186+
173187
//Parsing Table Names
174188
message.setTable("t" + i);
175189

@@ -200,7 +214,7 @@ public class CustomizedLinePayloadFormatter implements PayloadFormatter {
200214
message.setFields(fields);
201215
message.setDataTypes(dataTypes);
202216
message.setValues(values);
203-
217+
204218
//// Parsing timestamp
205219
message.setTimestamp(ts);
206220
ret.add(message);

src/UserGuide/Master/Tree/API/Programming-MQTT.md

Lines changed: 135 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@
2222

2323
## 1. Overview
2424

25-
[MQTT](Message Queuing Telemetry Transport)(http://mqtt.org/) is a lightweight messaging protocol designed for IoT and low-bandwidth environments. It operates on a Publish/Subscribe (Pub/Sub) model, enabling efficient and reliable bidirectional communication between devices. Its core objectives are low power consumption, minimal bandwidth usage, and high real-time performance, making it ideal for unstable networks or resource-constrained scenarios (e.g., sensors, mobile devices).
25+
MQTT (Message Queuing Telemetry Transport) is a lightweight messaging protocol designed for IoT and low-bandwidth environments. It operates on a Publish/Subscribe (Pub/Sub) model, enabling efficient and reliable bidirectional communication between devices. Its core objectives are low power consumption, minimal bandwidth usage, and high real-time performance, making it ideal for unstable networks or resource-constrained scenarios (e.g., sensors, mobile devices).
2626

2727
IoTDB provides deep integration with the MQTT protocol, fully compliant with MQTT v3.1 (OASIS International Standard). The IoTDB server includes a built-in high-performance MQTT Broker module, eliminating the need for third-party middleware. Devices can directly write time-series data into the IoTDB storage engine via MQTT messages.
2828

2929
<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="/img/github/78357432-0c71cf80-75e4-11ea-98aa-c43a54d469ce.png">
3030

31-
[Programming-MQTT.md](Programming-MQTT.md)
31+
3232
## 2. Built-in MQTT Service
3333
The Built-in MQTT Service provide the ability of direct connection to IoTDB through MQTT. It listen the publish messages from MQTT clients
3434
and then write the data into storage immediately.
@@ -62,15 +62,14 @@ The IoTDB MQTT service load configurations from `${IOTDB_HOME}/${IOTDB_CONF}/iot
6262

6363
Configurations are as follows:
6464

65-
| **Property** | DESCRIPTION | DEFAULT |
66-
| ------------- |:-------------:|:------:|
67-
| `enable_mqtt_service` | whether to enable the mqtt service | false |
68-
| `mqtt_host` | the mqtt service binding host | 127.0.0.1 |
69-
| `mqtt_port` | the mqtt service binding port | 1883 |
70-
| `mqtt_handler_pool_size` | the handler pool size for handing the mqtt messages | 1 |
71-
| **`mqtt_payload_formatter`** | **Formatting method for MQTT message payloads. ​****Options: `json` (tree model), `line` (table model).** | **json** |
72-
| `mqtt_max_message_size` | the max mqtt message size in byte| 1048576 |
73-
65+
| **Property** | **Description** | **Default** |
66+
| ------------------------ | ---------------------------------------------------------------------------------------------------------------------- | ------------------- |
67+
| `enable_mqtt_service` | Enable/ disable the MQTT service. | FALSE |
68+
| `mqtt_host` | Host address bound to the MQTT service. | 127.0.0.1 |
69+
| `mqtt_port` | Port bound to the MQTT service. | 1883 |
70+
| `mqtt_handler_pool_size` | Thread pool size for processing MQTT messages. | 1 |
71+
| **`mqtt_payload_formatter`** | **Formatting method for MQTT message payloads. ​****Options: `json` (tree model), `line` (table model).** | **json** |
72+
| `mqtt_max_message_size` | Maximum allowed MQTT message size (bytes). | 1048576 |
7473

7574
## 4. Coding Examples
7675
The following is an example which a mqtt client send messages to IoTDB server.
@@ -102,10 +101,41 @@ connection.disconnect();
102101

103102
## 5. Customize your MQTT Message Format
104103

105-
If you do not like the above Json format, you can customize your MQTT Message format by just writing several lines
106-
of codes. An example can be found in [example/mqtt-customize](https://github.com/apache/iotdb/tree/master/example/mqtt-customize) project.
104+
In a production environment, each device typically has its own MQTT client, and the message formats of these clients have been pre-defined. If communication is to be carried out in accordance with the MQTT message format supported by IoTDB, a comprehensive upgrade and transformation of all existing clients would be required, which would undoubtedly incur significant costs. However, we can easily achieve customization of the MQTT message format through simple programming means, without the need to modify the clients.
105+
An example can be found in [example/mqtt-customize](https://github.com/apache/iotdb/tree/rc/2.0.1/example/mqtt-customize) project.
106+
107+
Assuming the MQTT client sends the following message format:
108+
```json
109+
{
110+
"time":1586076045523,
111+
"deviceID":"car_1",
112+
"deviceType":"Gasoline car​​",
113+
"point":"Fuel level​​",
114+
"value":10.0
115+
}
116+
```
117+
Or in the form of an array of JSON:
118+
```java
119+
[
120+
{
121+
"time":1586076045523,
122+
"deviceID":"car_1",
123+
"deviceType":"Gasoline car​​",
124+
"point":"Fuel level",
125+
"value":10.0
126+
},
127+
{
128+
"time":1586076045524,
129+
"deviceID":"car_2",
130+
"deviceType":"NEV(new enegry vehicle)",
131+
"point":"Speed",
132+
"value":80.0
133+
}
134+
]
135+
```
136+
137+
Then you can set up the custom MQTT message format through the following steps:
107138

108-
Steps:
109139
1. Create a java project, and add dependency:
110140
```xml
111141
<dependency>
@@ -120,44 +150,115 @@ e.g.,
120150
```java
121151
package org.apache.iotdb.mqtt.server;
122152

123-
import io.netty.buffer.ByteBuf;
124153
import org.apache.iotdb.db.protocol.mqtt.Message;
125154
import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
155+
import org.apache.iotdb.db.protocol.mqtt.TableMessage;
156+
157+
import com.google.common.collect.Lists;
158+
import com.google.gson.Gson;
159+
import com.google.gson.GsonBuilder;
160+
import com.google.gson.JsonArray;
161+
import com.google.gson.JsonElement;
162+
import com.google.gson.JsonObject;
163+
import com.google.gson.JsonParseException;
164+
import io.netty.buffer.ByteBuf;
165+
import org.apache.commons.lang3.NotImplementedException;
166+
import org.apache.tsfile.enums.TSDataType;
126167

127168
import java.nio.charset.StandardCharsets;
128169
import java.util.ArrayList;
129170
import java.util.Arrays;
130171
import java.util.List;
131172

173+
/**
174+
* The Customized JSON payload formatter. one json format supported: { "time":1586076045523,
175+
* "deviceID":"car_1", "deviceType":"NEV", "point":"Speed", "value":80.0 }
176+
*/
132177
public class CustomizedJsonPayloadFormatter implements PayloadFormatter {
178+
private static final String JSON_KEY_TIME = "time";
179+
private static final String JSON_KEY_DEVICEID = "deviceID";
180+
private static final String JSON_KEY_DEVICETYPE = "deviceType";
181+
private static final String JSON_KEY_POINT = "point";
182+
private static final String JSON_KEY_VALUE = "value";
183+
private static final Gson GSON = new GsonBuilder().create();
133184

134185
@Override
135186
public List<Message> format(String topic, ByteBuf payload) {
136-
// Suppose the payload is a json format
137187
if (payload == null) {
138-
return null;
188+
return new ArrayList<>();
139189
}
140-
141-
String json = payload.toString(StandardCharsets.UTF_8);
142-
// parse data from the json and generate Messages and put them into List<Meesage> ret
143-
List<Message> ret = new ArrayList<>();
144-
// this is just an example, so we just generate some Messages directly
145-
for (int i = 0; i < 2; i++) {
146-
long ts = i;
147-
Message message = new Message();
148-
message.setDevice("d" + i);
149-
message.setTimestamp(ts);
150-
message.setMeasurements(Arrays.asList("s1", "s2"));
151-
message.setValues(Arrays.asList("4.0" + i, "5.0" + i));
152-
ret.add(message);
190+
String txt = payload.toString(StandardCharsets.UTF_8);
191+
JsonElement jsonElement = GSON.fromJson(txt, JsonElement.class);
192+
if (jsonElement.isJsonObject()) {
193+
JsonObject jsonObject = jsonElement.getAsJsonObject();
194+
return formatTableRow(topic, jsonObject);
195+
} else if (jsonElement.isJsonArray()) {
196+
JsonArray jsonArray = jsonElement.getAsJsonArray();
197+
List<Message> messages = new ArrayList<>();
198+
for (JsonElement element : jsonArray) {
199+
JsonObject jsonObject = element.getAsJsonObject();
200+
messages.addAll(formatTableRow(topic, jsonObject));
201+
}
202+
return messages;
153203
}
154-
return ret;
204+
throw new JsonParseException("payload is invalidate");
205+
}
206+
207+
@Override
208+
@Deprecated
209+
public List<Message> format(ByteBuf payload) {
210+
throw new NotImplementedException();
211+
}
212+
213+
private List<Message> formatTableRow(String topic, JsonObject jsonObject) {
214+
TableMessage message = new TableMessage();
215+
String database = !topic.contains("/") ? topic : topic.substring(0, topic.indexOf("/"));
216+
String table = "test_table";
217+
218+
// Parsing Database Name
219+
message.setDatabase((database));
220+
221+
// Parsing Table Name
222+
message.setTable(table);
223+
224+
// Parsing Tags
225+
List<String> tagKeys = new ArrayList<>();
226+
tagKeys.add(JSON_KEY_DEVICEID);
227+
List<Object> tagValues = new ArrayList<>();
228+
tagValues.add(jsonObject.get(JSON_KEY_DEVICEID).getAsString());
229+
message.setTagKeys(tagKeys);
230+
message.setTagValues(tagValues);
231+
232+
// Parsing Attributes
233+
List<String> attributeKeys = new ArrayList<>();
234+
List<Object> attributeValues = new ArrayList<>();
235+
attributeKeys.add(JSON_KEY_DEVICETYPE);
236+
attributeValues.add(jsonObject.get(JSON_KEY_DEVICETYPE).getAsString());
237+
message.setAttributeKeys(attributeKeys);
238+
message.setAttributeValues(attributeValues);
239+
240+
// Parsing Fields
241+
List<String> fields = Arrays.asList(JSON_KEY_POINT);
242+
List<TSDataType> dataTypes = Arrays.asList(TSDataType.FLOAT);
243+
List<Object> values = Arrays.asList(jsonObject.get(JSON_KEY_VALUE).getAsFloat());
244+
message.setFields(fields);
245+
message.setDataTypes(dataTypes);
246+
message.setValues(values);
247+
248+
// Parsing timestamp
249+
message.setTimestamp(jsonObject.get(JSON_KEY_TIME).getAsLong());
250+
return Lists.newArrayList(message);
155251
}
156252

157253
@Override
158254
public String getName() {
159-
// set the value of mqtt_payload_formatter in iotdb-system.properties as the following string:
160-
return "CustomizedJson";
255+
// set the value of mqtt_payload_formatter in iotdb-common.properties as the following string:
256+
return "CustomizedJson2Table";
257+
}
258+
259+
@Override
260+
public String getType() {
261+
return PayloadFormatter.TABLE_TYPE;
161262
}
162263
}
163264
```
@@ -171,7 +272,7 @@ Then, in your server:
171272
1. Create ${IOTDB_HOME}/ext/mqtt/ folder, and put the jar into this folder.
172273
2. Update configuration to enable MQTT service. (`enable_mqtt_service=true` in `conf/iotdb-system.properties`)
173274
3. Set the value of `mqtt_payload_formatter` in `conf/iotdb-system.properties` as the value of getName() in your implementation
174-
, in this example, the value is `CustomizedJson`
275+
, in this example, the value is `CustomizedJson2Table`
175276
4. Launch the IoTDB server.
176277
5. Now IoTDB will use your implementation to parse the MQTT message.
177278

0 commit comments

Comments
 (0)