[casetta] New organization for cas.py |
[ Thread Index | Date Index | More lists.tuxfamily.org/casetta Archives ]
Hello, I have just finished to implement the new organization for cas.py described here : http://dev.filyb.info/casetta/ticket/22. cas.py now uses TransferableObjects (for example for programs, we have a TransferableProgram class) Take some coffee because, it is going to be a loooong (but important ! ) email :-P I defined some guidelines for the creation of such objects: - Each TransferableObject, which inherits of all the properties of an object defined in data.py (for example, TransferableProgram derives from Program) must have a class attribute called header_len (length of a header) - Moreover, the must have all of the following methods. These methods are called by the functions of devices_serial.py : - parse_header(header) : This method fills the object's metadata with information from the header and returns the expect length of the object. The expected length of an object (got from the header) is not stored in objects attributes because its real data length may change after. - checksum_needed(index, data_len) : returns 0 if no checksum if needed as this given index / returns 1 if a checksum should be performed (ie after receiving the sheet of a picture) / returns 2 if we should send 0x06 and ignore if the checksum is valid or not (useful for color screencapture) / returns 3 when index == data_len -1 if you want to do a final checksum, 0 else - set_raw_data(raw_data) : stores the received raw_data into the object. This method will often contain only the instruction : self.raw_data = raw_data but it can contain some specials modifications (for screencaptures for example). - __len__() : returns the real data length (usually len(self.raw_data)+2) - build_header() : returns a header for this TransferalbeObject - get_raw_data_list() : Returns all data parts which should be transferred. Even if it contains only one element, it must return a list. If an object is nor supposed to be transferred, it is allowed not to have these last three methods. See attachments for more details The reorganization has lots of advantages : - More in the spirit of Object Orient Programming / More respectful towards the encapsulation principle - Less data.__class__ instructions which, I think should be avoided - Less code in devices_serial.py / Cleaner file / More logical organization : devices_serial.py functions only handle the low level communication - It is now much easier to add support for the transfer of new data types : before it was necessary to modify almost all the functions of cas.py and modify some functions in devices_serial.py. Now you just have to create a class which respects the guidelines mentioned above. However, it has one drawback. send_data and receive_data functions of devices_serial now manipulate TransferableObjects. For example the receive_data functions does not return a normal object bu a TransferableObject (ie receive_data returns a TransferableProgram). This is not really a problem because all Transferable objects inherits of all the properties of normal Object. However, it is a problem for send_data because it is not able to send a Program (for example) but it can only send a TransferableProgram (or another TransferableObject). It is possible to transform a Program into a TransferableProgram. If prgm is a datacls.Program, you only have to do : prgm = TransferableProgram(prgm.name,prgm.raw_data,prgm.date,prgm.use_base). I did not commit that to the SVN repository because it is a VERY big change and I would like to have your approval before. I tested this new cas.py and I was able to send/receive programs,pictures, receive backups(I didn't test backup receipt), receive Screencaptures. If you have difficulties to sleep, you can read this email twice and I guarantee you will have a good night :-D Thank you for reading this long and boring (but important ;-) ) email. Bye, Ps : Just in case you didn't know I just inform you that the main server of tuxfamily.org is going to die so we may have problems to connect to the SVN : http://tuxfamily.info/?p=100 Ps2 : I am very proud because this time I didn't forget the attachments :-) --
Fabien ANDRE aka Xion345 Linux User #418689 -- fabien.andre.g@xxxxxxxxxx -- xion345@xxxxxxxxxxxxx World domination. Fast. ( Linus Torvalds, Not dated ) |
# -*- coding: utf-8 -*- # ############################################################################ # # (c) 2007 Florian Birée aka Thesa <florian@xxxxxxxxxx> # # Website: <URL:http://casetta.tuxfamily.org> # # Version 0.3.0 # # changelog: # # # 0.3.0 version: # # o First release # This code is hardly inspired by cafix softwares (cafix.sourceforge.com) # A lot of thanks for cafix coders. # ############################################################################ # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, # MA 02110-1301, USA. # # http://www.gnu.org/copyleft/gpl.html # ############################################################################ """cas (CASIOLINK format) management module""" __author__ = "Florian Birée" __version__ = "0.3.0" __copyright__ = "Copyright (c) 2007, Florian Birée" __license__ = "GPL" __revision__ = "$Revision: $" # $Source: $ __date__ = "$Date: $" import datetime import errors import data as datacls # Constant PICTURE_PALLET = ['o', 'b', 'g', 'w'] SCREEN_PALLET = ['b', 'g', 'w', 'o'] ########## Restructuration Objets ############### def calc_checksum(data): crc = 0 for byte in data: crc = (crc + ord(byte)) % 256 crc = (0 - crc) % 256 return chr(crc) # Special Data class class End(datacls.Data): ## FIXME need methods """Special data to end a transfer.""" dType = 'end' header_len = 49 def __init__(self): """New end data""" datacls.Data.__init__(self) def parse_header(self, header): pass class TransferableProgram(datacls.Program): header_len = 49 def __init__(self,name = '', raw_data = '', date = None, password = "", use_base = False): datacls.Program.__init__(self, name, raw_data, date, password, use_base) def __len__(self): return len(self.raw_data) + 2 def build_header(self): header = 'TXT\x00PG' header += chr(self.__len__() / (256 ** 3)) header += chr((self.__len__() % 256 ** 3) / (256 ** 2)) header += chr((self.__len__() % 256 ** 2) / 256) header += chr(self.__len__() % 256) # 0 -> 9 header += self.name[:8] + '\xff' * (8 - len(self.name[:8])) #10->17 header += '\xff' * 8 header += self.password[:8] + '\xff' * (8 - len(self.password[:8])) if self.use_base: header += 'BN' else: header += 'NL' header += '\xff' * 12 header += calc_checksum(header) return header def get_raw_data_list(self): return [self.raw_data] def parse_header(self, header, add_date=True): self.name = header[10:18].replace('\xff', '') if add_date: self.date = datetime.date.today() # Password self.password = header[26:34].replace('\xff', '') # Base: if header[34:36] == 'BN': self.use_base = True else: self.use_base = False return ord(header[6]) * (256 ** 3) + ord(header[7]) * (256 ** 2) + \ ord(header[8]) * 256 + ord(header[9]) - 2 def checksum_needed(self, index, data_len): if index == (data_len-1): return 3 else: return 0 def set_raw_data(self, raw_data): self.raw_data = raw_data class TransferableBackup(datacls.Backup): header_len = 49 def __init__(self, name = '', raw_data = '', date = None): datacls.Backup.__init__(self, name, raw_data,date) def __len__(self): return len(self.raw_data) + 2 def build_header(self): header = self.raw_data[2048: 2048 + 48] header = header[:33] + '\x00\x10\x00\x00\x00\x00' + header[39:] header += calc_checksum(header) return header def get_raw_data_list(): return [self.raw_data] def parse_header(self, header, add_date=True): self.name = header[10:18].replace('\xff', '') if add_date: self.date = datetime.date.today() return ord(header[6]) * (256 ** 3) + ord(header[7]) * (256 ** 2) + \ ord(header[8]) * 256 + ord(header[9]) - 2 def checksum_needed(self, index, data_len): if index==(data_len-1): return 3 else: return 0 def set_raw_data(self, raw_data): self.raw_data = raw_data ## Pictures and Screencaptures class TransferablePicture(datacls.Picture): header_len = 49 data_len = 4112 def __init__(self, name = '', raw_data = '', date = None, pallet = None, \ color_byte = None): datacls.Picture.__init__(self, name, raw_data, date, pallet, color_byte) def __len__(self): return self.data_len def build_header(self): header = 'IMG\x00PC' header += '\x00\x40\x00\x80' # len ? header += self.name header += '\xff' * 8 header += 'DRUWF' header += '\x00\x04\x00\x01' # len ? header += '\xff' * 13 header += calc_checksum(header) return header def get_raw_data_list(self): return [self.raw_data[0:1028],self.raw_data[1028:2056],self.raw_data[2056:3084],self.raw_data[3084:4112]] def parse_header(self, header ,add_date=True): self.name = self.name = header[10:18].replace('\xff', '') if add_date: self.date = datetime.date.today() self.pallet = PICTURE_PALLET self.color_byte = 3 return self.data_len def checksum_needed(self, index, data_len): if (index + 1) % 1028 == 0 and index != 0 and index + 1 < data_len: return 1 elif index==(data_len-1): return 3 else: return 0 def set_raw_data(self, raw_data): self.raw_data = raw_data class TransferableMonoScreenCapture(datacls.ScreenCapture): header_len = 39 data_len = 1024 name = "Capture" def __init__(self, name = '', raw_data = '', date = None, pallet = None, \ color_byte = None): datacls.ScreenCapture.__init__(self, name, raw_data, date, pallet, \ color_byte) def __len__(): return self.data_len def parse_header(self, header,add_date=True): if add_date: self.date = datetime.date.today() self.pallet = SCREEN_PALLET self.color_byte = 0 return self.data_len def checksum_needed(self, index ,data_len): if index == data_len: return 3 else: return 0 def set_raw_data(self, raw_data): columns = [''] * 16 for index in range(len(raw_data)): col = index / 64 columns[col] += raw_data[index] for col in range(len(columns)): byte_list = list(columns[col]) byte_list.reverse() columns[col] = ''.join(byte_list) black_sheet = ''.join(columns) self.raw_data = '\x01' + black_sheet +\ '\x02' + '\x00' * 0x400 +\ '\x03' + '\x00' * 0x400 +\ '\x04' + '\x00' * 0x400 class TransferableColorScreenCapture(datacls.ScreenCapture): header_len = 39 data_len = 3075 name = "Capture" def __init__(self, name='', raw_data='', date = None, pallet=None, \ color_byte = None): datacls.ScreenCapture.__init__(self, name, raw_data, date, pallet, \ color_byte) def __len__(): return self.data_len def parse_header(self, add_date=True): if add_date == True: self.date = datetime.date.today() self.pallet = SCREEN_PALLET self.color_byte = 0 return self.data_len def checksum_needed(self, index, data_len): if (index == 1024 or index == 2048): return 2 # False checksum elif index == data_len: return 3 else: return 0 def set_raw_data(self, raw_data): self.raw_data=raw_data[:0x400 * 2 + 2] + '\x03' + '\x00' * 0x400 + raw_data[0x400 * 2 + 2:] def get_header_format(header): """Return [header_len, header_type, sub_type].""" sub_type = None type_id = header[0:3] sub_id = header[4:6] if type_id == "MEM" and sub_id == "BU": data = TransferableBackup() elif type_id == "DD@": data = TransferableMonoScreenCapture() elif type_id == "DC@": data = TransferableColorScreenCapture() # elif type_id == "FNC": # datatype = 4 # headerlen = 49 elif type_id == "IMG" and sub_id == "PC": data = TransferablePicture() elif type_id == "TXT" and sub_id == "PG": data = TransferableProgram() # elif type_id == "VAL": # datatype = 8 # headerlen = 49 # elif type_id == "REQ": # datatype = 7 # headerlen = 49 elif type_id == "END": data = End() else: return "unknown",49 return data, data.header_len ## Q: Are we able to send screencaptures ? ### A: Apparently not ! ## Add support for screencaptures - Done ## Add header checksum calculation - Done ## Add data.name in fillmetadata - Done ## Add get_raw_data_list method ## Add __len__ ### Au moment de l'appel de get_data_len, on a le header complet ! ### Autant appeler fillmetadata directment ### Renommer fillmetadata en parse_header ## Liste des méthodes : ## Envoi : ## - __len__ - OK ## - build_header - OK ## - get_raw_data_list - OK ## Reception : ## - parse_header - OK ## - check_needed - OK ## - store_raw_data - OK ################################################# # File format management classes class CasFile: """Cas (Casiolink) file format manager.""" name = 'cas' managed_data = [datacls.Program, datacls.Backup, datacls.Picture, \ datacls.ScreenCapture] read = True write = False list_ext = ['cas'] def __init__(self, filename = None): """Make a cas file object with filename""" self.filename = filename def open_file(self): """Open a file in the cas Format""" # Build the FileData file_data = datacls.FileData() cas_file = open(self.filename, 'r') ctn = cas_file.read() cas_file.close() # Init var index = 0 is_header = True header = "" header_len = 100 data = None data_len = 100 raw_data = "" # Loop on bytes in ctn while index < len(ctn): if is_header and ctn[index] != ':' and len(header) < header_len: header += ctn[index] if len(header) == 7: # Get metadata header_len, data_type, sub_type = \ get_header_format(header) elif is_header and len(header) == header_len: # Crc byte, init the data record is_header = False if data_type == 'end': break data_len = get_data_len(header, data_type, sub_type) # Make the data if data_type == 'program': data = file_data.new_record(datacls.Program) elif data_type == 'backup': data = file_data.new_record(datacls.Backup) elif data_type == 'picture': data = file_data.new_record(datacls.Picture) elif data_type == 'ScreenCapture': data = file_data.new_record(datacls.ScreenCapture) else: data = file_data.new_record(datacls.Data) fill_metadata(data, header, False) elif not is_header and len(raw_data) == 0 and ctn[index] == ':': # Not header, first : pass elif not is_header and len(raw_data) < data_len: # Raw_data if data_type == 'screencapture' and sub_type == 'color' \ and (len(raw_data) == 1024 or len(raw_data) == 2048): # Jump two bytes (checksum and :) index += 2 if data_type == 'picture' and (len(raw_data) + 1) % 1028 == 0 \ and len(raw_data) != 0 and len(raw_data) + 1 < data_len: # Jump two bytes (checksum and :) index += 2 raw_data += ctn[index] elif not is_header and len(raw_data) == data_len: # Data Crc, save raw_data (and make some convertions) if data_type == 'screencapture' and sub_type == 'color': data.raw_data = color_screencapture_to_raw(raw_data) elif data_type == 'screencapture' and sub_type == 'mono': data.raw_data = mono_screencapture_to_raw(raw_data) else: data.raw_data = raw_data # Init vars is_header = True header = "" header_len = 100 data = None raw_data = "" index += 1 return file_data
# -*- coding: utf-8 -*- # ############################################################################ # # (c) 2007 Florian Birée aka Thesa <florian@xxxxxxxxxx> # # Website: <URL:http://casetta.tuxfamily.org> # # Version 0.3.0 # # changelog: # # # 0.3.0 version: # # o First release # This code is hardly inspired by cafix softwares (cafix.sourceforge.com) # A lot of thanks for cafix coders. # ############################################################################ # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, # MA 02110-1301, USA. # # http://www.gnu.org/copyleft/gpl.html # ############################################################################ """Internal serial tansfer tool""" __author__ = "Florian Birée" __version__ = "0.3.0" __copyright__ = "Copyright (c) 2007, Florian Birée" __license__ = "GPL" __revision__ = "$Revision: $" # $Source: $ __date__ = "$Date: $" import serial import time import sys import cas import errors import data as datacls # Default functions def default_status(data_name, current_size, total_size, is_header = False): """Print the status of a transfert on the standard output.""" percent = ((current_size * 100) / total_size) if is_header: progress_line = 'Header ' else: progress_line = data_name + ' ' * (9 - len(data_name)) progress_line += '[' + '=' * (percent / 2) progress_line += ' ' * (50 - (percent / 2)) + '] ' progress_line += '(%i%%) %i/%i' % (percent, current_size, total_size) if current_size != 0: progress_line = '\r' + progress_line sys.stdout.write(progress_line) sys.stdout.flush() if current_size >= total_size: sys.stdout.write('\n') def default_overwrite(data_name): """Ask the user if he want to overwrite data_name on the calculator.""" user_rep = raw_input('Overwrite %s (Y/N): ' % data_name) return user_rep.lower().startswith('y') # Serial management classes class Connexion: """Casetta serial transfer tool.""" name = 'serial' managed_data = [datacls.Program, datacls.Backup, datacls.Picture, \ datacls.ScreenCapture] def __init__(self, serial_port = 0, status = default_status, \ overwrite = default_overwrite): """Initial configuration of the serial port status is a function which is runned as status(current_data_name, current_size, total_size, is_header) each time the status change. overwrite is a function which is runned as overwrite(data_name) and which must return True or False. """ if type(serial_port) == str and serial_port.isdigit(): serial_port = int(serial_port) self.serial = serial.Serial( port=serial_port, baudrate=9600, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=5, # or None xonxoff=0, #0 rtscts=0) #(o:1)0 self.status = status self.overwrite = overwrite self.cancel = False def check_cancel(self): """If the cancel flag is set, unset it and raise TransferAborted""" if self.cancel: self.cancel = False raise errors.TransferAborted() ### Low level serial functions def get_byte(self): """Read a single byte""" byte = self.serial.read() return byte def send_byte(self, byte): """Send a single byte""" self.serial.write(byte) def send_string(self, string, status = None, data_name = '', \ is_header = False): """Send a string of bytes""" for index in range(len(string)): self.check_cancel() self.send_byte(string[index]) time.sleep(0.005) if status != None: status(data_name, index, len(string) - 1, is_header) ### Send system def send(self, data_list): """Send all data in data_list Warning: a backup must be the only data to send """ while not self.calc_is_ready_to_receive(): self.check_cancel() for data in data_list: self.send_data(data) if data_list[-1].dType != "backup": # Send the END header self.send_header(cas.End()) def calc_is_ready_to_receive(self): """Return if the calculator is ready to receive some data""" interactive = False if interactive: code = '\x06' else: code = '\x16' self.send_byte(code) byte_rep = self.get_byte() if interactive and byte_rep == '\x06': return True elif not interactive and byte_rep == '\x13': return True else: return False def send_data(self, data): """Send a data to the calc""" self.send_header(data) response = self.get_byte() if response == '\x06': # Calculator accept the header pass elif response == '\x24': raise errors.DataNoManaged([data.__class__]) elif response == '\x2b': raise errors.ChecksumError() elif response == '\x21': #File already exist in the calculator. if self.overwrite(data.name) : self.send_byte('\x06') sec_resp = self.get_byte() if sec_resp != '\x06': raise errors.CannotOverwrite(data.name) # Calculator accept to overwrite. else: # send abort code self.send_byte('\x15') sec_resp = self.get_byte() if sec_resp != '\x06': raise errors.BadAnswerFromCalc(sec_resp) raise errors.TransferAborted() elif response == '\x51': raise errors.HeaderError() else: raise errors.BadAnswerFromCalc(response) # Gets the data list to be transfered raw_data_list = data.get_raw_data_list() for raw_data in raw_data_list: # CRC calcul crc = 0 for byte in raw_data: crc += ord(byte) crc = (abs(255 - (crc % 256)) + 1) % 256 # Other way to get the crc - maybe better #crc = 0 #for byte in raw_data: # crc += ord(byte) # if crc > 255: # crc -= 256 #crc = 256 - crc # Now sending the data self.send_string('\x3a' + raw_data, self.status, data.name, False) # All data sent, let the calc check the CRC self.send_byte(chr(crc)) response = self.get_byte() if response != '\x06': raise errors.ChecksumError() return def send_header(self, data): """Send the header corresponding to data""" header = data.build_header() if data.dType == "end": self.send_string('\x3a' + header, self.status, 'End', False) else: self.send_string('\x3a' + header, self.status, data.name, True) return ### Receive system def receive(self): """Return a file_data with all data receive from the calc.""" file_data = datacls.FileData() while not self.calc_is_ready_to_send(): self.check_cancel() # Starts to receive the data while True: data = self.receive_data() if data.dType == "end": break # Adds the data to the file file_data.data.append(data) if data.dType == "backup" or \ data.dType == "screencapture": break return file_data def calc_is_ready_to_send(self): """Waiting for receiving data""" byte = self.get_byte() if byte == '\x15': # Received request for interactive handshake (0x15) # -- responding with 0x13 self.send_byte("\x13") return True elif byte == '\x16': # Received request for noninteractive handshake (0x16) # -- responding with 0x13 self.send_byte("\x13") return True else: return False def receive_data(self): """Receive a data""" # Get header self.status('unknown', 0, 49, True) data, data_len = self.get_header() self.status('unknown', 49, 49, True) if data.dType == 'end': # End of the transfer : no data return cas.End() crc = 0 raw_data = '' # Starts the receive loop for index in range(data_len): self.check_cancel() byte = self.get_byte() crc = crc + ord(byte) # Do we need to perform a checksum at this given index ? if data.checksum_needed(index, data_len) == 2: # Pretends the cheksum we got is valid # Does not do real checksum check resp2 = self.get_byte() self.send_byte('\x06') resp2 = self.get_byte() crc = 0 if data.checksum_needed(index, data_len) == 1: # Real checksum check of the data part (sheet of a picture, # Value of a matrix) newcrc = ord(self.get_byte()) crc = abs(255 - (crc % 256)) + 1 if not newcrc == crc : raise errors.ChecksumError() crc = 0 self.send_byte('\x06') resp2 = self.get_byte() if not resp2 == '\x3a': raise errors.BadAnswerFromCalc(resp2) #if data_type == 1 and bytesproc + 1 == 62684: # print 'byte 62684 of a backup : send 0x06' # send_byte('\x06') raw_data += byte self.status(data.name, index, data_len - 1, False) # Receipt of raw_data is finished if data.checksum_needed(index, data_len) == 3: # Final checksum # Should be done for any data type # except for those who frequently generate errors # such as color screencaptures. newcrc = ord(self.get_byte()) crc = abs(255 - (crc % 256)) + 1 if not newcrc == crc: #and data_type != 'screencapture' and \ # sub_type != 'color': # Warning: the crc check is not done for color screencapture # because the crc of color screencapture is never # valid. raise errors.ChecksumError() self.send_byte('\x06') # Stores received raw data into the object data.set_raw_data(raw_data) return data def get_header(self): """Return [header, header_type]""" byte = self.get_byte() if not byte == '\x3a': raise errors.HeaderError() header = '' total_bytes = 49 # Default header size cur_byte = 0 h_crc = 0 while cur_byte < total_bytes: byte = self.get_byte() header += byte if cur_byte == 7: data, header_len = cas.get_header_format(header) total_bytes = header_len cur_byte += 1 if cur_byte < total_bytes: h_crc = (h_crc + ord(byte)) % 256 h_crc = (0 - h_crc) % 256 if h_crc != ord(byte) : raise errors.ChecksumError() # Check if we know the header type if data == "unknown": self.send_byte("\x00") raise errors.HeaderError() # Accepts the header self.send_byte("\x06") if not self.get_byte() == '\x3a': raise errors.BadAnswerFromCalc() return data, data.parse_header(header)
Mail converted by MHonArc 2.6.19+ | http://listengine.tuxfamily.org/ |