VHT.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. from multiprocessing import Process,Semaphore
  2. import multiprocessing as mp
  3. import socket
  4. import cmsisdsp.sdf.nodes.host.message as msg
  5. HOST = '127.0.0.1' # The remote host
  6. PORT = 50007
  7. class ModelicaConnectionLost(Exception):
  8. pass
  9. def connectToServer(inputMode,theid):
  10. s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  11. s.connect((HOST, PORT))
  12. # Identify as vht input
  13. if inputMode:
  14. print("Connecting as INPUT")
  15. theBytes=msg.list_to_bytes(msg.clientID(msg.VSIINPUT,theid))
  16. else:
  17. print("Connecting as OUTPUT")
  18. theBytes=msg.list_to_bytes(msg.clientID(msg.VSIOUTPUT,theid))
  19. #print("vs0: %d %d" % (int(theBytes[0]),int(theBytes[1])))
  20. msg.sendBytes(s,theBytes)
  21. return(s)
  22. def source(theid,size,queue,started):
  23. s=connectToServer(True,theid)
  24. started.release()
  25. try:
  26. while True:
  27. received=msg.receiveBytes(s,size)
  28. queue.put(received)
  29. except Exception as inst:
  30. print(inst)
  31. finally:
  32. queue.close()
  33. def sink(theid,size,queue,started):
  34. s=connectToServer(False,theid)
  35. data= bytes(size)
  36. msg.sendBytes(s,data)
  37. started.release()
  38. try:
  39. while True:
  40. tosend=queue.get(True,2)
  41. msg.sendBytes(s,tosend)
  42. except Exception as inst:
  43. print(inst)
  44. finally:
  45. queue.close()
  46. class Source:
  47. def __init__(self,theid,bufferSize):
  48. self._bufferSize_ = bufferSize
  49. self._srcQueue_ = mp.Queue()
  50. self._started_ = Semaphore()
  51. # Q15 data is sent so a *2 factor for bufferSize since
  52. # source function is working with bytes
  53. self._src_ = Process(target=source, args=(theid,2*bufferSize,self._srcQueue_,self._started_))
  54. self._src_.start()
  55. @property
  56. def queue(self):
  57. return(self._srcQueue_)
  58. def get(self):
  59. if self._src_.exitcode is None:
  60. return(msg.bytes_to_list(self.queue.get(True,2)))
  61. else:
  62. raise ModelicaConnectionLost
  63. def end(self):
  64. self._src_.terminate()
  65. def wait(self):
  66. self._started_.acquire()
  67. class Sink:
  68. def __init__(self,theid,bufferSize):
  69. self._bufferSize_ = bufferSize
  70. self._sinkQueue_ = mp.Queue()
  71. self._started_ = Semaphore()
  72. # Q15 data is sent so a *2 factor for bufferSize since
  73. # sink function is working with bytes
  74. self._sink_ = Process(target=sink, args=(theid,2*bufferSize,self._sinkQueue_,self._started_))
  75. self._sink_.start()
  76. @property
  77. def queue(self):
  78. return(self._sinkQueue_)
  79. def put(self,data):
  80. if self._sink_.exitcode is None:
  81. q15list=[int(x) for x in data]
  82. self.queue.put(msg.list_to_bytes(q15list),True,1)
  83. else:
  84. raise ModelicaConnectionLost
  85. def end(self):
  86. self._sink_.terminate()
  87. def wait(self):
  88. self._started_.acquire()