| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- #
- # Copyright (c) 2021 Project CHIP Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import filecmp
- import logging
- import os
- import subprocess
- import sys
- import threading
- from xmlrpc.server import SimpleXMLRPCServer
- _DEFAULT_CHIP_ROOT = os.path.abspath(
- os.path.join(os.path.dirname(__file__), '..', '..', '..'))
- IP = '127.0.0.1'
- PORT = 9000
- if sys.platform == 'linux':
- IP = '10.10.10.5'
- class AppsRegister:
- _instance = None
- __accessories = {}
- def init(self):
- self.__startXMLRPCServer()
- def uninit(self):
- self.__stopXMLRPCServer()
- @property
- def accessories(self):
- """List of registered accessory applications."""
- return self.__accessories.values()
- def add(self, name, accessory):
- self.__accessories[name] = accessory
- def remove(self, name):
- self.__accessories.pop(name)
- def removeAll(self):
- self.__accessories = {}
- def get(self, name):
- return self.__accessories[name]
- def kill(self, name):
- accessory = self.__accessories[name]
- if accessory:
- accessory.kill()
- def killAll(self):
- for accessory in self.__accessories.values():
- accessory.kill()
- def start(self, name, args):
- accessory = self.__accessories[name]
- if accessory:
- # The args param comes directly from the sys.argv[2:] of Start.py and should contain a list of strings in
- # key-value pair, e.g. [option1, value1, option2, value2, ...]
- options = self.__createCommandLineOptions(args)
- return accessory.start(options)
- return False
- def stop(self, name):
- accessory = self.__accessories[name]
- if accessory:
- return accessory.stop()
- return False
- def reboot(self, name):
- accessory = self.__accessories[name]
- if accessory:
- return accessory.stop() and accessory.start()
- return False
- def factoryResetAll(self):
- for accessory in self.__accessories.values():
- accessory.factoryReset()
- def factoryReset(self, name):
- accessory = self.__accessories[name]
- if accessory:
- return accessory.factoryReset()
- return False
- def waitForMessage(self, name, message):
- accessory = self.__accessories[name]
- if accessory:
- # The message param comes directly from the sys.argv[2:] of WaitForMessage.py and should contain a list of strings that
- # comprise the entire message to wait for
- return accessory.waitForMessage(' '.join(message))
- return False
- def createOtaImage(self, otaImageFilePath, rawImageFilePath, rawImageContent, vid='0xDEAD', pid='0xBEEF'):
- # Write the raw image content
- with open(rawImageFilePath, 'w') as rawFile:
- rawFile.write(rawImageContent)
- # Add an OTA header to the raw file
- otaImageTool = _DEFAULT_CHIP_ROOT + '/src/app/ota_image_tool.py'
- cmd = [otaImageTool, 'create', '-v', vid, '-p', pid, '-vn', '2',
- '-vs', "2.0", '-da', 'sha256', rawImageFilePath, otaImageFilePath]
- s = subprocess.Popen(cmd)
- s.wait()
- if s.returncode != 0:
- raise Exception('Cannot create OTA image file')
- return True
- def compareFiles(self, file1, file2):
- if filecmp.cmp(file1, file2, shallow=False) is False:
- raise Exception('Files %s and %s do not match' % (file1, file2))
- return True
- def __startXMLRPCServer(self):
- self.server = SimpleXMLRPCServer((IP, PORT))
- self.server.register_function(self.start, 'start')
- self.server.register_function(self.stop, 'stop')
- self.server.register_function(self.reboot, 'reboot')
- self.server.register_function(self.factoryReset, 'factoryReset')
- self.server.register_function(self.waitForMessage, 'waitForMessage')
- self.server.register_function(self.compareFiles, 'compareFiles')
- self.server.register_function(self.createOtaImage, 'createOtaImage')
- self.server_thread = threading.Thread(target=self.server.serve_forever)
- self.server_thread.start()
- def __stopXMLRPCServer(self):
- self.server.shutdown()
- def __createCommandLineOptions(self, args):
- if not args:
- return {}
- # args should contain a list of strings in key-value pair, e.g. [option1, value1, option2, value2, ...]
- if (len(args) % 2) != 0:
- logging.warning("Unexpected command line options %r - not key/value pairs (odd length)" % (args,))
- return {}
- # Create a dictionary from the key-value pair list
- options = {args[i]: args[i+1] for i in range(0, len(args), 2)}
- return options
|