Skip to content

Commit 752285f

Browse files
committed
first commit
0 parents  commit 752285f

68 files changed

Lines changed: 1359 additions & 0 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

README.md

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
# QuecPython 4G DTU 快速上手
2+
3+
## 硬件准备
4+
5+
- Windows 电脑一台,建议 Win10 系统。
6+
- 一套 [EC800GCN 华系列 DTU 开发板](https://python.quectel.com/doc/Getting_started/zh/evb/ec800g_hua_dtu.html)
7+
- 一张可用的 Nano Sim 卡。
8+
- 一根胶棒天线。
9+
- 一根 Mini USB 数据线。
10+
- 一个 USB 转 TTL 模块。
11+
12+
## 环境搭建
13+
14+
- 下载并安装 EC800G 系列模组驱动:[QuecPython_USB_Driver_Win10_U_G](https://python.quectel.com/wp-content/uploads/2024/09/Quectel_Windows_USB_DriverU_V1.0.19.zip)
15+
- 下载并安装 [VSCode](https://code.visualstudio.com/)
16+
- 下载并解压 [QPYCom](https://python.quectel.com/wp-content/uploads/2024/09/QPYcom_V3.6.0.zip) 工具到电脑的合适位置。
17+
- 下载并安装固件包[QPY_OCPU_BETA0002_EC800G_CNLD_FW](https://python.quectel.com/wp-content/uploads/2024/09/QPY_OCPU_BETA0002_EC800G_CNLD_FW.zip)。(固件包烧录请参阅 [使用 QPYcom 烧录固件](https://python.quectel.com/doc/Application_guide/zh/dev-tools/QPYcom/qpycom-dw.html)
18+
- 下载实验源码。
19+
20+
## 硬件连接
21+
22+
按照下图进行硬件连接:
23+
24+
![](./images/DTU_wires_connection.png)
25+
26+
① 连接胶棒天线。
27+
28+
② 插入 Nano Sim 卡。
29+
30+
③ 使用 mini USB 数据线连接至电脑。
31+
32+
④ 使用 USB 转 TTL 模块连接至电脑。TX(DTU开发板) 接 RX(USB 转 TTL 模块);RX(DTU开发板) 接 TX(USB 转 TTL 模块);GND(DTU开发板) 接 GND(USB 转 TTL 模块)共地。(⚠ 如果采用 RS485 接口,则两端的 A 线连接至 A 线,B 线连接至 B 线。)
33+
34+
⑤ 开发板采用 9~36 V 宽幅供电,正负极。
35+
36+
## 设备开发
37+
38+
### 修改配置文件
39+
40+
工程配置文件路径:`code/dtu_config.json`
41+
42+
本实验案例,基于 TCP 私有服务器数据透传做如下配置:
43+
44+
- 默认 `system_config.cloud` 配置项定义为 `"tcp"` 即 TCP 透传模式,系统会自行读取 `socket_private_cloud_config` 配置项。另外可选择 `\"mqtt\"` 配置,服务器参数根据实际情况修改 `mqtt_private_cloud_config` 配置项。
45+
46+
```python
47+
{
48+
"system_config": {
49+
"cloud": "tcp" # 默认配置 tcp 透传模式,可配 "mqtt"
50+
}
51+
}
52+
```
53+
54+
- 本实验采用 TCP 透传模式,用户需根据实际情况设置 `socket_private_cloud_config` 配置项中的 TCP 服务器域名(*domain*)和端口(*port*)。
55+
56+
```python
57+
{
58+
"socket_private_cloud_config": {
59+
"domain": "112.31.84.164", # 服务器域名/ip
60+
"port": 8305, # 端口号
61+
"timeout": 5, # 超时时间 (unit: s)
62+
"keep_alive": 5 # 心跳周期 (unit: s)
63+
}
64+
}
65+
```
66+
67+
- `uart_config` 配置项是串口配置参数,默认是根据当前实验开发板做的配置,不可更改。如用户采用其他开发板则需要根据实际硬件进行配置。
68+
69+
```python
70+
{
71+
"uart_config": {
72+
"port": 2, # 串口号,根据实际硬件配置选择,当前实验不可更改
73+
"baudrate": 115200, # 波特率
74+
"bytesize": 8, # 数据位
75+
"parity": 0, # 奇偶校验
76+
"stopbits": 1, # 停止位
77+
"flowctl": 0, # 流控
78+
"rs485_config": { # RS485 配置
79+
"gpio_num": 28, # 485 控制脚,当前实验不可更改
80+
"direction": 0 # 引脚电平变化控制,1表示引脚电平变化为:串口发送数据之前由低拉高、发送数据之后再由高拉低,0表示引脚电平变化为:串口发送数据之前由高拉低、发送数据之后再由低拉高
81+
}
82+
}
83+
}
84+
```
85+
86+
完整配置文件模版如下:
87+
88+
```json
89+
{
90+
"system_config": {
91+
"cloud": "tcp"
92+
},
93+
"mqtt_private_cloud_config": {
94+
"server": "mq.tongxinmao.com",
95+
"port": 18830,
96+
"client_id": "txm_1682300809",
97+
"user": "",
98+
"password": "",
99+
"clean_session": true,
100+
"qos": 0,
101+
"keepalive": 60,
102+
"subscribe": {"down": "/public/TEST/down"},
103+
"publish": {"up": "/public/TEST/up"}
104+
},
105+
"socket_private_cloud_config": {
106+
"domain": "112.31.84.164",
107+
"port": 8305,
108+
"timeout": 5,
109+
"keep_alive": 5
110+
},
111+
"uart_config": {
112+
"port": 2,
113+
"baudrate": 115200,
114+
"bytesize": 8,
115+
"parity": 0,
116+
"stopbits": 1,
117+
"flowctl": 0,
118+
"rs485_config": {
119+
"gpio_num": 28,
120+
"direction": 0
121+
}
122+
}
123+
}
124+
```
125+
126+
参数说明:
127+
128+
- `system_config.config`: 指定当前使用的私有云类型。目前支持tcp和mqtt。
129+
- `mqtt_private_cloud_config`: MQTT私有云配置。
130+
- `socket_private_cloud_config`: tcp私有云配置。
131+
- `uart_config`:串口参数配置。
132+
133+
### 脚本导入并运行
134+
135+
下载安装 **QPYCom** 工具后使用该工具下载脚本至 QuecPython 模组。
136+
137+
> 💡 **Tips**
138+
>
139+
> QPYCom 安装和使用教程:https://python.quectel.com/doc/Application_guide/zh/dev-tools/QPYcom/index.html
140+
141+
下载步骤:
142+
143+
① 选择带有 "<font color="red">**REPL**</font>" 字样的 COM 口。(⚠ 该COM 口是 QuecPython 交互口,可以用来执行 python 代码实现与模组交互。脚本导入也是通过这种方式来实现的。)
144+
145+
② 打开交互口
146+
147+
③ 选择 下载 标签页,并新建一个项目名称。(名称自定义。)
148+
149+
④ 右键 "usr" 目录,使用 **一键导入** 功能, 选择并导入 DTU 代码仓库中的 `code` 文件夹。(⚠ `usr` 目录是 QuecPython 模组的用户存储空间,一般用于存放待执行的 python 脚本代码)。
150+
151+
⑤ 点击下载按钮开始下载。
152+
153+
![](./images/download_scripts.png)
154+
155+
运行主脚本
156+
157+
① 选择 文件 标签页
158+
159+
② 右键 `_main.py`,并选择执行改文件。
160+
161+
![](./images/run_scripts.png)
162+
163+
⚠ 该脚本为工程的主入口脚本。
164+
165+
⚠ 此处手动运行改脚本仅为调试方便,若将该脚本命名为 `main.py` 则该脚本在模组上电开机后自动运行。其原理在于模组上电开机后默认会自行运行 `usr` 下的 `main.py` 脚本。
166+
167+
### 业务调试
168+
169+
程序运行后,在 REPL 交互页面可以看到日志输出如下图所示。
170+
171+
左侧图示,我们使用 QCOM 模拟 MCU 打开用于透传的模组串口(即 USB 转 TTL 模块对应的 COM 口)。
172+
173+
右侧图示,REPL 交互口输出的模组日志。
174+
175+
使用串口工具 QCOM 模拟 MCU 串口上行数据,通过 DTU 透传至 TCP 回显服务器,再由回显服务器将相同数据通过 DTU 下行透传至 QCOM。
176+
177+
![](./images/debugview.png)
178+
179+
⚠ 本案例中采用的是 TCP 回显服务器,所以 QCOM 上行数据,经过 DTU 透传至 TCP 服务器接收到之后会立即按原路径下行。
180+

code/_main.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import sim
2+
import osTimer
3+
from machine import Pin
4+
from usr import network
5+
from usr.dtu import DTU
6+
7+
8+
class Manager(object):
9+
10+
def __init__(self, dog_gpio=Pin.GPIO12):
11+
self.dog_pin = Pin(dog_gpio, Pin.OUT, Pin.PULL_DISABLE, 1)
12+
self.dog_feed_timer = osTimer()
13+
14+
self.dtu = DTU('Quectel')
15+
self.dtu.config.read_from_json('/usr/dtu_config.json')
16+
17+
def start(self):
18+
self.dog_feed_timer.start(3000, 1, self.__feed)
19+
if sim.getStatus() != 1:
20+
raise ValueError("sim card not ready")
21+
# 网络就绪
22+
network.wait_network_ready()
23+
# dtu应用启动
24+
self.dtu.run()
25+
26+
def __feed(self, args):
27+
if self.dog_pin.read():
28+
self.dog_pin.write(0)
29+
else:
30+
self.dog_pin.write(1)
31+
32+
33+
if __name__ == "__main__":
34+
manager = Manager()
35+
manager.start()

code/cloud_abc.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
2+
3+
class CloudABC(object):
4+
5+
def __init__(self, **kwargs):
6+
"""
7+
key arguments: kwargs, used for initial params for cloud (customer used).
8+
"""
9+
raise NotImplementedError("this method should me implemented for customer designed Cloud Object")
10+
11+
def connect(self):
12+
"""connect to Coud"""
13+
raise NotImplementedError("customer should implement this method to connect cloud")
14+
15+
def listen(self):
16+
"""listen message from cloud.
17+
18+
usually we use this method to start a thread for receiving message from the cloud and put message input a Queue, and then use `self.recv` method to get it on app side.
19+
"""
20+
raise NotImplementedError("customer should implement this method to listen cloud message")
21+
22+
def recv(self):
23+
"""receive a message"""
24+
raise NotImplementedError("customer should implement this method to recv a message")
25+
26+
def send(self, *args):
27+
"""send message
28+
29+
position arguments: args, customer designed method used for send message
30+
"""
31+
raise NotImplementedError("customer should implement this method to send a message")

code/configure.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import ql_fs
2+
from usr.utils import Singleton
3+
from usr.threading import Lock
4+
5+
6+
@Singleton
7+
class Configure(object):
8+
GET = 0x01
9+
SET = 0x02
10+
DEL = 0x03
11+
LOCK = Lock()
12+
DEFAULT_CONFIG_PATH = '/usr/default_config.json'
13+
14+
def __init__(self):
15+
self.path = None
16+
self.settings = None
17+
self.reset_default()
18+
19+
def __repr__(self):
20+
return 'Configure(path=\'{}\')'.format(self.path)
21+
22+
def reset_default(self):
23+
if ql_fs.path_exists(self.DEFAULT_CONFIG_PATH):
24+
with self.LOCK:
25+
self.settings = ql_fs.read_json(self.DEFAULT_CONFIG_PATH)
26+
27+
def read_from_json(self, path):
28+
self.path = path
29+
with self.LOCK:
30+
if not ql_fs.path_exists(path):
31+
raise ValueError('\"{}\" not exists!'.format(path))
32+
self.settings = ql_fs.read_json(path)
33+
34+
def save(self):
35+
with self.LOCK:
36+
ql_fs.touch(self.path, self.settings)
37+
38+
def get(self, key):
39+
with self.LOCK:
40+
return self.execute(self.settings, key.split('.'), operate=self.GET)
41+
42+
def __getitem__(self, item):
43+
return self.get(item)
44+
45+
def set(self, key, value):
46+
with self.LOCK:
47+
return self.execute(self.settings, key.split('.'), value=value, operate=self.SET)
48+
49+
def __setitem__(self, key, value):
50+
return self.set(key, value)
51+
52+
def delete(self, key):
53+
with self.LOCK:
54+
return self.execute(self.settings, key.split('.'), operate=self.DEL)
55+
56+
def __delitem__(self, key):
57+
return self.delete(key)
58+
59+
def execute(self, dict_, keys, value=None, operate=None):
60+
if self.settings is None:
61+
raise ValueError('settings not loaded. pls use `Config.read_from_json` to load settings from a json file.')
62+
63+
key = keys.pop(0)
64+
65+
if len(keys) == 0:
66+
if operate == self.GET:
67+
return dict_[key]
68+
elif operate == self.SET:
69+
dict_[key] = value
70+
elif operate == self.DEL:
71+
del dict_[key]
72+
return
73+
74+
if key not in dict_:
75+
if operate == self.SET:
76+
dict_[key] = {} # auto create sub items recursively.
77+
else:
78+
return
79+
80+
return self.execute(dict_[key], keys, value=value, operate=operate)

0 commit comments

Comments
 (0)