#!/usr/bin/python """ A minimal utility to send structured commands to the BPS board """ import sys import os sys.path.insert(0, '../codegen') import commands from utils import mls from analogvariables import * from entities.descriptors import PayloadFieldEnum, PayloadFieldU8, PayloadFieldU16, PayloadFieldU32 from commutils import * import serial ASK_CONFIRM = False SHOW_SENT_DATA = False SHOW_RECEIVED_DATA = False UART_DEV = '/dev/ttyUSB0' def ask_confirm(): answer = "" while answer not in ["y", "n"]: answer = raw_input("Confirm [Y/N]? ").lower() return answer == "y" if __name__ == '__main__': # command line options # example # python sendcommand.py COMMAND_NAME payload0 payload1 ... # COMMAND_NAME is the human readable name of the command (e.g.: ) # redirect stdout to null in order to avoid the warnings during object creation stdout = sys.stdout f = open(os.devnull, 'w') sys.stdout = f switch_list = commands.SwitchList() # analog_variable_list = commands.AnalogVariableList() # digital_variable_list = commands.DigitalVariableList() analog_variable_list = commands.AnalogVariableList( _var_enum_index_start=1, _alarm_enum_index_start=1) digital_variable_list = commands.DigitalVariableList( _var_enum_index_start=max(analog_variable_list.get_var_enum_indexes())+1, _alarm_enum_index_start=max(analog_variable_list.get_alarm_enum_indexes())+1) user_pin_list = commands.UserPinList() commands = commands.CommandList(switch_list, analog_variable_list, digital_variable_list, user_pin_list) # restore normal stdout sys.stdout = stdout # open the serial port ser = serial.Serial(UART_DEV, baudrate=19200, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=0.5) if len(sys.argv) <= 1: print "Command code missing" sys.exit(0) command_name = sys.argv[1].upper() filtered_commands = [command for command in commands.entries() if command.name.startswith(command_name)] if len(filtered_commands) == 0: print 'No command found with name starting with: "{}"'.format(command_name) elif len(filtered_commands) != 1: print 'More than one command found with name starting with: "{}":'.format(command_name) for command in filtered_commands: print ' {}'.format(command.name) sys.exit(0) # get the wanted command command = filtered_commands[0] # expected number of request fields: req_field_count = len(command.request_payload) if len(sys.argv) != req_field_count + 2: print 'Number of request payload fields not correct. The {} command expects {} fields; {} where given.'.format(command.name, len(command.request_payload), len(sys.argv) - 2) sys.exit(0) # prepare the payload payload_data = list() payload_field_values = list() idx = 2 for field in command.request_payload: val = int(sys.argv[idx]) payload_field_values.append(val) if isinstance(field, PayloadFieldU8): assert val < 2**8, 'Too big value for uint8' payload_data.append(val) elif isinstance(field, PayloadFieldU16): assert val < 2 ** 16, 'Too big value for uint16' payload_data.append((val >> 8) & 0xff) payload_data.append(val & 0xff) elif isinstance(field, PayloadFieldU32): assert val < 2 ** 32, 'Too big value for uint32' payload_data.append((val >> 24) & 0xff) payload_data.append((val >> 16) & 0xff) payload_data.append((val >> 8) & 0xff) payload_data.append(val & 0xff) elif isinstance(field, PayloadFieldEnum): assert val < 2 ** 8, 'Too big value for enum' payload_data.append(val) else: assert False, 'Unmanaged type {}'.format(field.__class__.__name__) idx += 1 print "Sending packet:" print ' Command code: {} (raw data: {})'.format(command.name, command.request_code) # print ' Payload data: {} (raw data: {})'.format(payload_field_values, payload_data) print ' Request payload:' idx = 0 for field in command.request_payload: print ' {} = {}'.format(field.name, payload_field_values[idx]) idx += 1 if ASK_CONFIRM: if not ask_confirm(): print "Operation aborted by user" sys.exit() rawdata = packet_create_rawdata(command.request_code, payload_data) # send the message over the serial link print "Packet sent." if SHOW_SENT_DATA: print "Sent data are:" for val in rawdata: print '\t{}\t(ascii: {})'.format(val, chr(val)) ser.write(rawdata) # wait for a response (reads up to 1000 bytes, does not parse the packet) print 'Waiting for response...' ans = ser.read(1000) bytes = [ord(c) for c in ans] print '{} bytes received.'.format(len(ans)) if SHOW_RECEIVED_DATA: if len(ans): print 'Received data:' for k in range(len(ans)): print '[byte #{}]\t0x{}\t(dec: {}, ascii: {})'.format(str(k), ans[k].encode('hex'), str(ord(ans[k])), ans[k]) if not len(ans): sys.exit(0) # parses the response command_code, payload_bytes = packet_extract_data(bytes) print "Received raw payolad:", payload_bytes print "Received packet:" print ' Command code: {}'.format(command_code) # print ' Payload data: {} (raw data: {})'.format(payload_field_values, payload_data) print ' Response payload:' byte_idx = 0 for field in command.response_payload: value = -1 if isinstance(field, PayloadFieldU8): value = payload_bytes[byte_idx] byte_idx += 1 elif isinstance(field, PayloadFieldU16): value = payload_bytes[byte_idx] << 8 value += payload_bytes[byte_idx+1] byte_idx += 2 elif isinstance(field, PayloadFieldU32): value = payload_bytes[byte_idx] << 24 value += payload_bytes[byte_idx+1] << 16 value += payload_bytes[byte_idx+2] << 8 value += payload_bytes[byte_idx+3] byte_idx += 4 elif isinstance(field, PayloadFieldEnum): value = field.get_value_by_index(payload_bytes[byte_idx]) byte_idx += 1 else: assert False, 'Unmanaged type {}'.format(field.__class__.__name__) print ' {} = {}'.format(field.name, value)