LMD算法Python程序
本帖最后由 陈波 于 2015-10-13 13:52 编辑自己写的一个简单的LMD算法程序,需要 Python 3.0 或以上版本。
注意:未作计算优化,速度较慢。
欢迎大家提意见。
"""
This module implements Local Mean Decomposition signal processing algorithm
as described in .
References:
Jonathan S. Smith. The local mean decomposition and its application to EEG
perception data. Journal of the Royal Society Interface, 2005, 2(5):443-454
"""
#
# 算法控制参数
#
# 是否把信号的端点看作伪极值点
INCLUDE_ENDPOINTS = True
# 滑动平均算法: 最多迭代的次数
MAX_SMOOTH_ITERATION = 12
# 分离局部包络信号时最多迭代的次数
MAX_ENVELOPE_ITERATION = 200
ENVELOPE_EPSILON = 0.01
CONVERGENCE_EPSILON = 0.01
# 最多生成的积函数个数
MAX_NUM_PF = 8
# for debugging
DBG_FIG = None
def is_monotonous(signal):
"""判断信号是否是(非严格的)单调序列"""
def is_monotonous_increase(signal):
y0 = signal
for y1 in signal:
if y1 < y0:
return False
y0 = y1
return True
def is_monotonous_decrease(signal):
y0 = signal
for y1 in signal:
if y1 > y0:
return False
y0 = y1
return True
if len(signal) <= 0:
return True
else:
return is_monotonous_increase(signal) \
or is_monotonous_decrease(signal)
def find_extrema(signal):
"""找出信号的所有局部极值点位置."""
n = len(signal)
# 局部极值的位置列表
extrema = [ ]
lookfor = None
for x in range(1, n):
y1 = signal; y2 = signal
if lookfor == "min":
if y2 > y1:
extrema.append(x-1)
lookfor = "max"
elif lookfor == "max":
if y2 < y1:
extrema.append(x-1)
lookfor = "min"
else:
if y2 < y1:
lookfor = "min"
elif y2 > y1:
lookfor = "max"
# 优化(微调)极值点的位置:
# 当连续多个采样值相同时,取最中间位置作为极值点位置
def micro_adjust(x):
assert(0 <= x < n)
y = signal; lo = hi = x
while (lo-1 >= 0) and (signal == y): lo -= 1
while (hi+1 < n) and (signal == y): hi += 1
if INCLUDE_ENDPOINTS:
if lo == 0: return 0
if hi == n-1: return n-1
return (lo + hi) // 2
extrema =
if extrema and INCLUDE_ENDPOINTS:
if extrema != 0:
extrema.insert(0, 0)
if extrema[-1] != n-1:
extrema.append(n-1)
return extrema
def moving_average_smooth(signal, window):
"""使用移动加权平均算法平滑方波信号."""
n = len(signal)
# at least one nearby sample is needed for average
if window < 3:
window = 3
# adjust length of sliding window to an odd number for symmetry
if (window % 2) == 0:
window += 1
half = window // 2
weight = list(range(1, half+2)) + list(range(half, 0, -1))
assert(len(weight) == window)
def sliding(signal, window):
for x in range(n):
x1 = x - half; x2 = x1 + window
w1 = 0; w2 = window
if x1 < 0: w1 = -x1; x1 = 0
if x2 > n: w2 = n-x2; x2 = n
yield signal, weight
def weighted(signal, weight):
assert(len(signal) == len(weight))
return sum(y*w for y,w in zip(signal, weight)) / sum(weight)
def is_smooth(signal):
for x in range(1, n):
if signal == signal:
return False
return True
smoothed = signal
for i in range(MAX_SMOOTH_ITERATION):
smoothed =
if is_smooth(smoothed): break
return smoothed
def local_mean_and_envelope(signal, extrema):
"""根据极值点位置计算局部均值函数和局部包络函数."""
n = len(signal); k = len(extrema); assert(1 < k <= n)
# construct square signal
mean = [ ]; enve = [ ]
prev_mean = (signal] + signal]) / 2
prev_enve = abs(signal] - signal]) / 2
e = 1
for x in range(n):
if (x == extrema) and (e + 1 < k):
next_mean = (signal] + signal]) / 2
mean.append((prev_mean + next_mean) / 2)
prev_mean = next_mean
next_enve = abs(signal] - signal]) / 2
enve.append((prev_enve + next_enve) / 2)
prev_enve = next_enve
e += 1
else:
mean.append(prev_mean)
enve.append(prev_enve)
# smooth square signal
window = max(extrema-extrema for i in range(1, k)) // 3
return mean, moving_average_smooth(mean, window), \
enve, moving_average_smooth(enve, window)
def extract_product_function(signal):
s = signal; n = len(signal)
envelopes = [ ] # 每次迭代得到的包络信号
def component():
c = [ ]
for i in range(n):
y = s
for e in envelopes:
y = y * e
c.append(y)
return c
for i in range(MAX_ENVELOPE_ITERATION):
extrema = find_extrema(s)
if len(extrema) <= 3: break
m0, m, a0, a = local_mean_and_envelope(s, extrema)
for y in a:
if y <= 0: raise ValueError("invalid envelope signal")
# 对原信号进行调制
h =
t =
if DBG_FIG:
DBG_FIG.plot(i, component(), s, m0, m, a0, a, h, t)
# 得到纯调频信号时终止迭代
err = sum(abs(1-y) for y in a) / n
if err <= ENVELOPE_EPSILON: break
# 调制信号收敛时终止迭代
err = sum(abs(y1-y2) for y1,y2 in zip(s, t)) / n
if err <= CONVERGENCE_EPSILON: break
envelopes.append(a); s = t
return component()
def lmd(signal):
pf = [ ]
# 每次迭代得到一个PF分量, 直到残余函数接近为一个单调函数为止
residue = signal[:]
while (len(pf) < MAX_NUM_PF) and \
(not is_monotonous(residue)) and \
(len(find_extrema(residue)) >= 5):
component = extract_product_function(residue)
residue =
pf.append((component, residue))
return pf
谢谢您的分享,先收藏了 LZ,请问您有LMD算法的MATLAB代码么?谢谢!!! 紫云轩8023 发表于 2015-10-23 13:44
LZ,请问您有LMD算法的MATLAB代码么?谢谢!!!
matlab版有人发过,可以搜索一下
页:
[1]