陈波 发表于 2015-10-13 13:52

LMD算法Python程序

本帖最后由 陈波 于 2015-10-13 13:52 编辑

自己写的一个简单的LMD算法程序,需要 Python 3.0 或以上版本。
注意:未作计算优化,速度较慢。

欢迎大家提意见。

"""
This module implements Local Mean Decomposition signal processing algorithm
as described in .


References:

Jonathan S. Smith. The local mean decomposition and its application to EEG
perception data. Journal of the Royal Society Interface, 2005, 2(5):443-454
"""

#
# 算法控制参数
#

# 是否把信号的端点看作伪极值点
INCLUDE_ENDPOINTS = True

# 滑动平均算法: 最多迭代的次数
MAX_SMOOTH_ITERATION = 12

# 分离局部包络信号时最多迭代的次数
MAX_ENVELOPE_ITERATION = 200

ENVELOPE_EPSILON = 0.01
CONVERGENCE_EPSILON = 0.01

# 最多生成的积函数个数
MAX_NUM_PF = 8

# for debugging
DBG_FIG = None

def is_monotonous(signal):
    """判断信号是否是(非严格的)单调序列"""

    def is_monotonous_increase(signal):
      y0 = signal
      for y1 in signal:
            if y1 < y0:
                return False
            y0 = y1
      return True

    def is_monotonous_decrease(signal):
      y0 = signal
      for y1 in signal:
            if y1 > y0:
                return False
            y0 = y1
      return True

    if len(signal) <= 0:
      return True
    else:
      return is_monotonous_increase(signal) \
            or is_monotonous_decrease(signal)

def find_extrema(signal):
    """找出信号的所有局部极值点位置."""
   
    n = len(signal)

    # 局部极值的位置列表
    extrema = [ ]

    lookfor = None
    for x in range(1, n):
      y1 = signal; y2 = signal
      if lookfor == "min":
            if y2 > y1:
                extrema.append(x-1)
                lookfor = "max"
      elif lookfor == "max":
            if y2 < y1:
                extrema.append(x-1)
                lookfor = "min"
      else:
            if y2 < y1:
                lookfor = "min"
            elif y2 > y1:
                lookfor = "max"

    # 优化(微调)极值点的位置:
    # 当连续多个采样值相同时,取最中间位置作为极值点位置

    def micro_adjust(x):
      assert(0 <= x < n)
      y = signal; lo = hi = x
      while (lo-1 >= 0) and (signal == y): lo -= 1
      while (hi+1 < n) and (signal == y): hi += 1
      if INCLUDE_ENDPOINTS:
            if lo == 0: return 0
            if hi == n-1: return n-1
      return (lo + hi) // 2

    extrema =

    if extrema and INCLUDE_ENDPOINTS:
      if extrema != 0:
            extrema.insert(0, 0)
      if extrema[-1] != n-1:
            extrema.append(n-1)

    return extrema

def moving_average_smooth(signal, window):
    """使用移动加权平均算法平滑方波信号."""

    n = len(signal)

    # at least one nearby sample is needed for average
    if window < 3:
      window = 3

    # adjust length of sliding window to an odd number for symmetry
    if (window % 2) == 0:
      window += 1
      
    half = window // 2

    weight = list(range(1, half+2)) + list(range(half, 0, -1))
    assert(len(weight) == window)

    def sliding(signal, window):
      for x in range(n):
            x1 = x - half; x2 = x1 + window
            w1 = 0; w2 = window
            if x1 < 0: w1 = -x1; x1 = 0
            if x2 > n: w2 = n-x2; x2 = n
            yield signal, weight

    def weighted(signal, weight):
      assert(len(signal) == len(weight))
      return sum(y*w for y,w in zip(signal, weight)) / sum(weight)

    def is_smooth(signal):
      for x in range(1, n):
            if signal == signal:
                return False
      return True

    smoothed = signal
    for i in range(MAX_SMOOTH_ITERATION):
      smoothed =
      if is_smooth(smoothed): break

    return smoothed

def local_mean_and_envelope(signal, extrema):
    """根据极值点位置计算局部均值函数和局部包络函数."""
    n = len(signal); k = len(extrema); assert(1 < k <= n)
    # construct square signal
    mean = [ ]; enve = [ ]
    prev_mean = (signal] + signal]) / 2
    prev_enve = abs(signal] - signal]) / 2
    e = 1
    for x in range(n):
      if (x == extrema) and (e + 1 < k):
            next_mean = (signal] + signal]) / 2
            mean.append((prev_mean + next_mean) / 2)
            prev_mean = next_mean
            next_enve = abs(signal] - signal]) / 2
            enve.append((prev_enve + next_enve) / 2)
            prev_enve = next_enve
            e += 1
      else:
            mean.append(prev_mean)
            enve.append(prev_enve)
    # smooth square signal
    window = max(extrema-extrema for i in range(1, k)) // 3
    return mean, moving_average_smooth(mean, window), \
         enve, moving_average_smooth(enve, window)

def extract_product_function(signal):
    s = signal; n = len(signal)
    envelopes = [ ] # 每次迭代得到的包络信号

    def component():
      c = [ ]
      for i in range(n):
            y = s
            for e in envelopes:
                y = y * e
            c.append(y)
      return c

    for i in range(MAX_ENVELOPE_ITERATION):
      extrema = find_extrema(s)
      if len(extrema) <= 3: break
      m0, m, a0, a = local_mean_and_envelope(s, extrema)
      for y in a:
            if y <= 0: raise ValueError("invalid envelope signal")
      # 对原信号进行调制
      h =
      t =
      if DBG_FIG:
            DBG_FIG.plot(i, component(), s, m0, m, a0, a, h, t)
      # 得到纯调频信号时终止迭代
      err = sum(abs(1-y) for y in a) / n
      if err <= ENVELOPE_EPSILON: break
      # 调制信号收敛时终止迭代
      err = sum(abs(y1-y2) for y1,y2 in zip(s, t)) / n
      if err <= CONVERGENCE_EPSILON: break
      envelopes.append(a); s = t

    return component()

def lmd(signal):
    pf = [ ]
    # 每次迭代得到一个PF分量, 直到残余函数接近为一个单调函数为止
    residue = signal[:]
    while (len(pf) < MAX_NUM_PF) and \
          (not is_monotonous(residue)) and \
          (len(find_extrema(residue)) >= 5):
      component = extract_product_function(residue)
      residue =
      pf.append((component, residue))
    return pf



sorry 发表于 2015-10-21 14:59

谢谢您的分享,先收藏了

紫云轩8023 发表于 2015-10-23 13:44

LZ,请问您有LMD算法的MATLAB代码么?谢谢!!!

犟牛 发表于 2015-10-23 14:56

紫云轩8023 发表于 2015-10-23 13:44
LZ,请问您有LMD算法的MATLAB代码么?谢谢!!!

matlab版有人发过,可以搜索一下
页: [1]
查看完整版本: LMD算法Python程序