| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- ###########################################
- # Project: CMSIS DSP Library
- # Title: simu.py
- # Description: Support Python classes for the Python SDF scheduler
- #
- # $Date: 29 July 2021
- # $Revision: V1.10.0
- #
- # Target Processor: Cortex-M and Cortex-A cores
- # -------------------------------------------------------------------- */
- #
- # Copyright (C) 2010-2021 ARM Limited or its affiliates. All rights reserved.
- #
- # SPDX-License-Identifier: Apache-2.0
- #
- # Licensed under the Apache License, Version 2.0 (the License); you may
- # not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an AS IS BASIS, WITHOUT
- # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- ############################################
- """ Classes for Python version of the generated schedule
- """
- import numpy as np
- class FIFOBase:
- pass
- class FIFO(FIFOBase):
- def __init__(self,fifoSize,fifoBuf):
- self._length = fifoSize
- self._buffer = fifoBuf
- self._readPos = 0
- self._writePos = 0
- def getWriteBuffer(self,nb):
- if (self._readPos > 0):
- self._buffer[:self._writePos-self._readPos] = self._buffer[self._readPos:self._writePos]
- self._writePos = self._writePos - self._readPos
- self._readPos = 0
-
- ret = self._buffer[self._writePos:self._writePos+nb]
- self._writePos = self._writePos + nb
- return(ret)
- def getReadBuffer(self,nb):
- ret = self._buffer[self._readPos:self._readPos+nb]
- self._readPos = self._readPos + nb
- return(ret)
- @property
- def type(self):
- return(self._buffer.dtype)
- def dump(self):
- print(self._buffer)
- class GenericNode:
- def __init__(self,inputSize, outputSize,fifoin,fifoout):
- self._src = fifoin
- self._dst = fifoout
- self._inputSize = inputSize
- self._outputSize = outputSize
- def getWriteBuffer(self):
- return(self._dst.getWriteBuffer(self._outputSize))
- def getReadBuffer(self):
- return(self._src.getReadBuffer(self._inputSize))
- class GenericNode12:
- def __init__(self,inputSize, outputSize1,outputSize2,fifoin,fifoout1,fifoout2):
- self._src = fifoin
- self._dst1 = fifoout1
- self._dst2 = fifoout2
- self._inputSize = inputSize
- self._outputSize1 = outputSize1
- self._outputSize2 = outputSize2
- def getWriteBuffer1(self):
- return(self._dst1.getWriteBuffer(self._outputSize1))
- def getWriteBuffer2(self):
- return(self._dst2.getWriteBuffer(self._outputSize2))
- def getReadBuffer(self):
- return(self._src.getReadBuffer(self._inputSize))
- class GenericNode21:
- def __init__(self,inputSize1,inputSize2, outputSize,fifoin1,fifoin2,fifoout):
- self._src1 = fifoin1
- self._src2 = fifoin2
- self._dst = fifoout
- self._inputSize1 = inputSize1
- self._inputSize2 = inputSize2
- self._outputSize = outputSize
- def getWriteBuffer(self):
- return(self._dst.getWriteBuffer(self._outputSize))
- def getReadBuffer1(self):
- return(self._src1.getReadBuffer(self._inputSize1))
- def getReadBuffer2(self):
- return(self._src2.getReadBuffer(self._inputSize2))
- class GenericSource:
- def __init__(self, outputSize,fifoout):
- self._dst = fifoout
- self._outputSize = outputSize
- def getWriteBuffer(self):
- return(self._dst.getWriteBuffer(self._outputSize))
- class GenericSink:
- def __init__(self,inputSize,fifoin):
- self._src = fifoin
- self._inputSize = inputSize
-
- def getReadBuffer(self):
- return(self._src.getReadBuffer(self._inputSize))
- class SlidingBuffer(GenericNode):
- def __init__(self,windowSize,overlap,fifoin,fifoout):
- GenericNode.__init__(self,windowSize-overlap,windowSize,fifoin,fifoout)
- self._windowSize = windowSize
- self._overlap = overlap
- self._memory = np.zeros(overlap)
- def run(self):
- a=self.getReadBuffer()
- b=self.getWriteBuffer()
- b[:self._overlap] = self._memory
- b[self._overlap:self._windowSize]=a[:self._windowSize-self._overlap]
- self._memory[:self._overlap] = b[self._windowSize-self._overlap:self._windowSize]
-
- return(0)
- class OverlapTooBig(Exception):
- pass
- class OverlapAdd(GenericNode):
- def __init__(self,windowSize,overlap,fifoin,fifoout):
- GenericNode.__init__(self,windowSize,windowSize-overlap,fifoin,fifoout)
- if ((windowSize-overlap)<=0):
- raise OverlapTooBig
- self._windowSize = windowSize
- self._overlap = overlap
- self._memory = np.zeros(overlap)
- def run(self):
- a=self.getReadBuffer()
- b=self.getWriteBuffer()
- self._memory[:self._overlap]= a[:self._overlap] + self._memory[:self._overlap]
- if (2*self._overlap - self._windowSize > 0):
- b[:self._windowSize-self._overlap] = self._memory[:self._windowSize-self._overlap]
- tmp=np.zeros(2*self._overlap - self._windowSize)
- tmp[:] = self._memory[self._windowSize-self._overlap:self._overlap]
- self._memory[:2*self._overlap - self._windowSize] = tmp
- self._memory[2*self._overlap - self._windowSize:self._overlap] = a[self._overlap:self._windowSize]
- elif (2*self._overlap - self._windowSize < 0):
- b[:self._overlap] = self._memory[:self._overlap]
- b[self._overlap:self._windowSize-self._overlap] = a[self._overlap:self._windowSize-self._overlap]
- self._memory[:self._overlap] = a[self._windowSize-self._overlap:self._windowSize]
- else:
- b[:self._overlap]=self._memory[:self._overlap]
- self._memory[:self._overlap] = a[self._overlap:2*self._overlap]
- return(0)
|