appnodes.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. ###########################################
  2. # Project: CMSIS DSP Library
  3. # Title: appnodes.py
  4. # Description: Application nodes for kws example
  5. #
  6. # $Date: 16 March 2022
  7. # $Revision: V1.10.0
  8. #
  9. # Target Processor: Cortex-M and Cortex-A cores
  10. # -------------------------------------------------------------------- */
  11. #
  12. # Copyright (C) 2010-2022 ARM Limited or its affiliates. All rights reserved.
  13. #
  14. # SPDX-License-Identifier: Apache-2.0
  15. #
  16. # Licensed under the Apache License, Version 2.0 (the License); you may
  17. # not use this file except in compliance with the License.
  18. # You may obtain a copy of the License at
  19. #
  20. # www.apache.org/licenses/LICENSE-2.0
  21. #
  22. # Unless required by applicable law or agreed to in writing, software
  23. # distributed under the License is distributed on an AS IS BASIS, WITHOUT
  24. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  25. # See the License for the specific language governing permissions and
  26. # limitations under the License.
  27. ############################################
  28. from cmsisdsp.sdf.nodes.simu import *
  29. from custom import *
  30. import cmsisdsp.fixedpoint as fix
  31. import cmsisdsp as dsp
  32. # Sink node displaying the recognized word
  33. class Sink(GenericSink):
  34. def __init__(self,inputSize,fifoin):
  35. GenericSink.__init__(self,inputSize,fifoin)
  36. def run(self):
  37. i=self.getReadBuffer()
  38. if i[0] == -1:
  39. print("Unknown")
  40. if i[0] == 0:
  41. print("Yes")
  42. return(0)
  43. # Source node getting sample from NumPy buffer
  44. # At the end of the buffer we generate 0 samples
  45. class Source(GenericSource):
  46. def __init__(self,outputSize,fifoout,buffer):
  47. GenericSource.__init__(self,outputSize,fifoout)
  48. self._offset=0
  49. self._buffer=np.array(buffer)
  50. def run(self):
  51. a=self.getWriteBuffer()
  52. if self._offset + self._outputSize >= len(self._buffer):
  53. a[0:self._outputSize] = np.zeros(self._outputSize,dtype=np.int16)
  54. else:
  55. a[0:self._outputSize] = self._buffer[self._offset:self._offset+self._outputSize]
  56. self._offset = self._offset + self._outputSize
  57. return(0)
  58. # Same implementation as the one in the python notebook
  59. def dsp_zcr_q15(w):
  60. m = dsp.arm_mean_q15(w)
  61. # Negate can saturate so we use CMSIS-DSP function which is working on array (and we have a scalar)
  62. m = dsp.arm_negate_q15(np.array([m]))[0]
  63. w = dsp.arm_offset_q15(w,m)
  64. f=w[:-1]
  65. g=w[1:]
  66. k=np.count_nonzero(np.logical_and(np.logical_or(np.logical_and(f>0,g<0), np.logical_and(f<0,g>0)),g>f))
  67. # k < len(f) so shift should be 0 except when k == len(f)
  68. # When k==len(f) normally quotient is 0x4000 and shift 1 and we convert
  69. # this to 0x7FFF
  70. status,quotient,shift_val=dsp.arm_divide_q15(k,len(f))
  71. if shift_val==1:
  72. return(dsp.arm_shift_q31(np.array([quotient]),shift)[0])
  73. else:
  74. return(quotient)
  75. # Same implementation as the one in the notebook
  76. class FIR(GenericNode):
  77. def __init__(self,inputSize,outSize,fifoin,fifoout):
  78. GenericNode.__init__(self,inputSize,outSize,fifoin,fifoout)
  79. self._firq15=dsp.arm_fir_instance_q15()
  80. def run(self):
  81. a=self.getReadBuffer()
  82. b=self.getWriteBuffer()
  83. errorStatus = 0
  84. blockSize=self._inputSize
  85. numTaps=10
  86. stateLength = numTaps + blockSize - 1
  87. dsp.arm_fir_init_q15(self._firq15,10,fix.toQ15(np.ones(10)/10.0),np.zeros(stateLength,dtype=np.int16))
  88. b[:] = dsp.arm_fir_q15(self._firq15,a)
  89. return(errorStatus)
  90. class Feature(GenericNode):
  91. def __init__(self,inputSize,outSize,fifoin,fifoout,window):
  92. GenericNode.__init__(self,inputSize,outSize,fifoin,fifoout)
  93. self._window=window
  94. def run(self):
  95. a=self.getReadBuffer()
  96. b=self.getWriteBuffer()
  97. errorStatus = 0
  98. b[:] = dsp_zcr_q15(dsp.arm_mult_q15(a,self._window))
  99. return(errorStatus)
  100. class KWS(GenericNode):
  101. def __init__(self,inputSize,outSize,fifoin,fifoout,coef_q15,coef_shift,intercept_q15,intercept_shift):
  102. GenericNode.__init__(self,inputSize,outSize,fifoin,fifoout)
  103. self._coef_q15=coef_q15
  104. self._coef_shift=coef_shift
  105. self._intercept_q15=intercept_q15
  106. self._intercept_shift=intercept_shift
  107. def run(self):
  108. a=self.getReadBuffer()
  109. b=self.getWriteBuffer()
  110. errorStatus = 0
  111. res=dsp.arm_dot_prod_q15(self._coef_q15,a)
  112. scaled=dsp.arm_shift_q15(np.array([self._intercept_q15]),self._intercept_shift-self._coef_shift)[0]
  113. # Because dot prod output is in Q34.30
  114. # and ret is on 64 bits
  115. scaled = np.int64(scaled) << 15
  116. res = res + scaled
  117. if res<0:
  118. b[0]=-1
  119. else:
  120. b[0]=0
  121. return(errorStatus)