forked from fleneindre/lpgm-calculator
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexample.py
More file actions
302 lines (251 loc) · 9.61 KB
/
example.py
File metadata and controls
302 lines (251 loc) · 9.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
from obspy import read
from LPGMCalculator import LPGMCalculator
import matplotlib.pyplot as plt
import time
import numpy as np
from dvg_ringbuffer import RingBuffer
import matplotlib
matplotlib.use('TkAgg') # 使用較快的後端
# 濾波器開關(注意:LPGMCalculator 內部已有高通濾波器,這裡的濾波僅用於顯示波形)
FILTER = True
# 播放偏移(秒)- 跳過前面的資料從指定時間開始播放
OFFSET = 60 * 6
def read_from_mseed(file_path):
"""從 mseed 文件讀取三個分量的數據,返回 (data_x, data_y, data_z, sample_rate)"""
stream = read(file_path)
sample_rate = int(stream[0].stats.sampling_rate)
if len(stream) == 3:
return stream[0].data, stream[1].data, stream[2].data, sample_rate
# 根據通道名稱識別
channels = {}
for trace in stream:
ch = trace.stats.channel
if 'Z' in ch or ch.endswith('Z'):
channels['Z'] = trace.data
elif 'N' in ch or ch.endswith('N') or '1' in ch:
channels['N'] = trace.data
elif 'E' in ch or ch.endswith('E') or '2' in ch:
channels['E'] = trace.data
# 使用識別到的通道,否則使用前三個
return (channels.get('N', stream[0].data),
channels.get('E', stream[1].data if len(
stream) > 1 else stream[0].data),
channels.get('Z', stream[2].data if len(
stream) > 2 else stream[0].data),
sample_rate)
class BiquadFilter:
"""二階節濾波器類"""
def __init__(self, num_coeffs, den_coeffs):
if len(num_coeffs) != len(den_coeffs):
raise ValueError(
"num_coeffs and den_coeffs must have the same length")
self.stages = []
for num, den in zip(num_coeffs, den_coeffs):
b0, b1, b2 = num
a0, a1, a2 = den
if a0 != 1.0:
b0, b1, b2 = b0/a0, b1/a0, b2/a0
a1, a2 = a1/a0, a2/a0
self.stages.append({'b0': b0, 'b1': b1, 'b2': b2, 'a1': a1, 'a2': a2,
'z1': 0.0, 'z2': 0.0})
def apply(self, x):
"""應用濾波器到單個樣本"""
y = x
for s in self.stages:
out = s['b0'] * y + s['z1']
s['z1'] = s['b1'] * y - s['a1'] * out + s['z2']
s['z2'] = s['b2'] * y - s['a2'] * out
y = out
return y
def apply_buffer(self, x):
"""應用濾波器到整個緩衝區"""
return np.array([self.apply(xi) for xi in x])
def create_bpf_filter():
"""創建帶通濾波器(高通 + 低通)"""
# 低通濾波器係數
NUM_LPF = [
[0.8063260828207, 0, 0],
[1, -0.3349099821478, 1],
[0.8764452158503, 0, 0],
[1, -0.08269016387548, 1],
[0.8131516681065, 0, 0],
[1, 0.5521204464881, 1],
[1.228277124762, 0, 0],
[1, 1.705652561121, 1],
[0.00431639855615, 0, 0],
[1, -0.4218227257396, 1],
[1, 0, 0],
]
DEN_LPF = [
[1, 0, 0],
[1, -0.6719798550872, 0.938845023254],
[1, 0, 0],
[1, -0.8264759910073, 0.8561761588872],
[1, 0, 0],
[1, -1.10962299915, 0.7141202529829],
[1, 0, 0],
[1, -1.413006561919, 0.5638384962434],
[1, 0, 0],
[1, -0.6139497794955, 0.9834048810788],
[1, 0, 0],
]
# 高通濾波器係數
NUM_HPF = [
[0.9769037485204, 0, 0],
[1, -2, 1],
[0.9424328308459, 0, 0],
[1, -2, 1],
[0.9149691441131, 0, 0],
[1, -2, 1],
[0.8959987277275, 0, 0],
[1, -2, 1],
[0.8863374802187, 0, 0],
[1, -2, 1],
[1, 0, 0],
]
DEN_HPF = [
[1, 0, 0],
[1, -1.946073828052, 0.9615411660298],
[1, 0, 0],
[1, -1.877404882092, 0.8923264412918],
[1, 0, 0],
[1, -1.822694925196, 0.837181651256],
[1, 0, 0],
[1, -1.78490427193, 0.7990906389804],
[1, 0, 0],
[1, -1.765658260281, 0.7796916605933],
[1, 0, 0],
]
lpf = BiquadFilter(NUM_LPF, DEN_LPF)
hpf = BiquadFilter(NUM_HPF, DEN_HPF)
return hpf, lpf
# 從 mseed 文件讀取加速度數據(單位:counts)
acc_x_counts, acc_y_counts, acc_z_counts, sampleRate = read_from_mseed(
'example.mseed')
if len(acc_x_counts) != len(acc_y_counts) or len(acc_y_counts) != len(acc_z_counts):
raise ValueError(
"The lengths of the acceleration data from mseed file are not equal")
# 將 counts 轉換為 gal(counts/10000 = gal)
acc_x = acc_x_counts / 10000
acc_y = acc_y_counts / 10000
acc_z = acc_z_counts / 10000
# 原始加速度用於 LPGM 計算(LPGMCalculator 內部有自己的濾波器)
accel_raw = np.column_stack((acc_x, acc_y, acc_z))
# 濾波後的加速度僅用於波形顯示
if FILTER:
hpf, lpf = create_bpf_filter()
acc_x_f = lpf.apply_buffer(hpf.apply_buffer(acc_x))
acc_y_f = lpf.apply_buffer(hpf.apply_buffer(acc_y))
acc_z_f = lpf.apply_buffer(hpf.apply_buffer(acc_z))
accel_display = np.column_stack((acc_x_f, acc_y_f, acc_z_f))
else:
accel_display = accel_raw
lpgmCalculator = LPGMCalculator(sampleRate)
buffer_size = 30 * sampleRate
accPx = RingBuffer(buffer_size, dtype=np.float64)
accPx.extend(np.zeros(buffer_size))
accPy = RingBuffer(buffer_size, dtype=np.float64)
accPy.extend(np.zeros(buffer_size))
accPz = RingBuffer(buffer_size, dtype=np.float64)
accPz.extend(np.zeros(buffer_size))
t = np.linspace(-29.99, 0, len(accPx))
data_length = len(accel_raw)
# 目標幀率
target_fps = 30
frame_duration = 1.0 / target_fps # 每幀時間
# 預先建立圖表結構(只建立一次)
plt.ion() # 開啟互動模式
fig, axes = plt.subplots(3, 1, figsize=(10, 8))
fig.tight_layout(pad=3.0)
labels = ['Acc NS [gal]', 'Acc EW [gal]', 'Acc UD [gal]']
lines = []
for j, ax in enumerate(axes):
line, = ax.plot(t, np.zeros(len(t)), 'b-')
lines.append(line)
ax.set_ylabel(labels[j])
ax.set_xlim(t[0], t[-1])
ax.set_ylim(-100, 100) # 初始範圍,之後會動態調整
ax.grid(True)
if j == 2:
ax.set_xlabel('Time [s]')
title_text = axes[0].set_title('')
fig.canvas.draw() # 初次繪製
# 儲存背景以便 blitting
backgrounds = [fig.canvas.copy_from_bbox(ax.bbox) for ax in axes]
# 預處理 OFFSET 秒的資料(快速跳過,不繪圖)
offset_samples = int(OFFSET * sampleRate)
offset_samples = min(offset_samples, data_length)
for i in range(offset_samples):
rawAcc = accel_raw[i, :]
lpgmCalculator.update(rawAcc)
accPx.append(accel_display[i, 0])
accPy.append(accel_display[i, 1])
accPz.append(accel_display[i, 2])
start_time = time.time()
last_draw_time = start_time
current_sample = offset_samples
while current_sample < data_length:
# 計算當前真實時間對應的樣本位置(從 offset 開始算)
elapsed_time = time.time() - start_time
target_sample = offset_samples + int(elapsed_time * sampleRate)
target_sample = min(target_sample, data_length)
# 確保至少處理一個樣本(避免空轉)
if target_sample <= current_sample and current_sample < data_length:
target_sample = current_sample + 1
# 處理從 current_sample 到 target_sample 的所有樣本(資料處理不跳過)
while current_sample < target_sample:
# 使用原始加速度給 LPGM 計算
rawAcc = accel_raw[current_sample, :]
LPGM = lpgmCalculator.update(rawAcc)
# 使用濾波後的加速度給波形顯示
accPx.append(accel_display[current_sample, 0])
accPy.append(accel_display[current_sample, 1])
accPz.append(accel_display[current_sample, 2])
current_sample += 1
# 檢查是否該繪製新幀(限制幀率以確保流暢)
now = time.time()
if now - last_draw_time >= frame_duration and current_sample > 0:
last_draw_time = now
maxSva30 = lpgmCalculator.getMaxSva30()
vectPx = accPx[:]
vectPy = accPy[:]
vectPz = accPz[:]
PGA = np.max(np.sqrt(vectPx[-100:]**2 +
vectPy[-100:]**2 + vectPz[-100:]**2))
maxSva = lpgmCalculator.getMaxSva()
valMax = np.max([np.max(np.abs(vectPx)), np.max(
np.abs(vectPy)), np.max(np.abs(vectPz))])
strTitle = 'PGA {:.2f} gal Sva {:.2f} cm/s LPGM {} ({:.2f} cm/s)'.format(
PGA, np.max(maxSva[:100]), LPGM, maxSva30)
# 更新資料(不重繪整個圖表)
data = [vectPx, vectPy, vectPz]
need_rescale = False
for j, (line, ax, d) in enumerate(zip(lines, axes, data)):
line.set_ydata(d)
# 檢查是否需要調整 Y 軸範圍
current_ylim = ax.get_ylim()
new_limit = valMax + 5
if new_limit > current_ylim[1] * 0.9 or new_limit < current_ylim[1] * 0.3:
ax.set_ylim(-new_limit, new_limit)
need_rescale = True
title_text.set_text(strTitle)
if need_rescale:
# Y 軸範圍改變時需要重繪
fig.canvas.draw()
backgrounds = [fig.canvas.copy_from_bbox(ax.bbox) for ax in axes]
else:
# 使用 blitting 快速更新
for j, (ax, line) in enumerate(zip(axes, lines)):
fig.canvas.restore_region(backgrounds[j])
ax.draw_artist(line)
fig.canvas.blit(ax.bbox)
# 更新標題
fig.canvas.draw_idle()
fig.canvas.flush_events()
# 每秒打印一次狀態
if int(elapsed_time) != int(elapsed_time - frame_duration):
print(f"[{elapsed_time:.1f}s] {strTitle}")
# 短暫休眠避免 CPU 空轉
time.sleep(0.001)
plt.ioff()
plt.show()