DcrClub 0 Posted November 13, 2024 I have been studying the players.db file for a while. I have always wanted to use the correct method to read out all the data and perform addition, deletion, modification and query processing. However, I have little knowledge of binary data processing. Now I can only parse all the data of the current player, but I cannot process the basic attributes of the player and the context attributes of the inventory materials. I don't understand this data structure. Is there any teacher who can help me process the code to realize the addition, deletion, modification and query of the inventory materials? import struct from io import BytesIO import sqlite3 import json import bson class DzChar: def __init__(self, id, uid, alive, data): self.ID = id self.UID = uid self.Alive = alive self.Items = [] self.decode_data(data) def decode_data(self, data): try: stream = BytesIO(data) # 1. 读取 HeaderData self.HeaderData = stream.read(16) self.HeaderGUID = self.HeaderData.hex().upper() # 2. 读取 CharacterName Length char_name_len_data = stream.read(1) if len(char_name_len_data) < 1: print("Error: Unable to read CharacterName length.") return char_name_len = struct.unpack('<B', char_name_len_data)[0] # 3. 读取 CharacterName character_name_data = stream.read(char_name_len) if len(character_name_data) < char_name_len: print("Error: Unable to read complete CharacterName.") return self.CharacterName = character_name_data.decode('utf-8') # 4. 读取 StatsData Length data_len_data = stream.read(2) if len(data_len_data) < 2: print("Error: Unable to read StatsData length.") return data_len = struct.unpack('<H', data_len_data)[0] # 5. 读取 StatsData self.StatsData = stream.read(data_len) # self.parse_stats_data() # 6. 读取 Version ver_data = stream.read(2) if len(ver_data) < 2: print("Error: Unable to read Version.") return self.Ver = struct.unpack('<H', ver_data)[0] # 7. 读取 Items Count items_count_data = stream.read(4) if len(items_count_data) < 4: print("Error: Unable to read Items Count.") return items_count = struct.unpack('<I', items_count_data)[0] # 8. 读取 Items for i in range(items_count): print(f"Reading item {i+1}/{items_count}") item = DzItem(self.UID, stream) self.Items.append(item) # 9. 检查是否有剩余数据 remaining = stream.read() if remaining: print(f"Remaining Data ({len(remaining)} bytes): {remaining.hex()}") else: print("No remaining data.") except Exception as e: print(f"Exception during decode_data: {e}") def parse_stats_data(self): try: # 假设 StatsData 是 JSON 格式的数据 stats_str = self.StatsData.decode('utf-8', errors='ignore') # 尝试解析为 JSON self.ParsedStatsData = json.loads(stats_str) print(f"Parsed StatsData: {json.dumps(self.ParsedStatsData, indent=4, ensure_ascii=False)}") except json.JSONDecodeError: print("StatsData is not valid JSON.") # 如果是其他格式,可以尝试其他解析方法 class DzItem: def __init__(self, uid, stream, parent=None): try: self.Parent = uid self.ParentItem = parent self.Childs = [] # 读取 Data Count data_count_data = stream.read(4) if len(data_count_data) < 4: print("Error: Unable to read Data Count.") return self.DataCount = struct.unpack('<I', data_count_data)[0] # 读取 Classname Length item_name_len_data = stream.read(1) if len(item_name_len_data) < 1: print("Error: Unable to read Classname length.") return item_name_len = struct.unpack('<B', item_name_len_data)[0] # 读取 Classname self.Classname = stream.read(item_name_len).decode('utf-8') # 读取 Skip (6 bytes) self.Skip = stream.read(6) # 读取 Slot Length slot_len_data = stream.read(1) if len(slot_len_data) < 1: print("Error: Unable to read Slot length.") return slot_len = struct.unpack('<B', slot_len_data)[0] # 读取 Slot self.Slot = stream.read(slot_len).decode('utf-8') # 读取 Custom Data Length custom_data_len_data = stream.read(4) if len(custom_data_len_data) < 4: print("Error: Unable to read Custom Data Length.") return custom_data_len = struct.unpack('<I', custom_data_len_data)[0] # 读取 PersistentGuid (16 bytes) self.PersistentGuid = stream.read(16).hex().upper() # 读取 Data (custom_data_len - 16 bytes) data_length = custom_data_len - 16 self.Data = stream.read(data_length) # 尝试解析 Data # self.parse_data() # 读取 Childs Count childs_count_data = stream.read(4) if len(childs_count_data) < 4: print("Error: Unable to read Childs Count.") return childs_count = struct.unpack('<I', childs_count_data)[0] # 读取 Childs for _ in range(childs_count): child = DzItem(uid, stream, self) self.add_child(child) except Exception as e: print(f"Exception during DzItem initialization: {e}") def parse_data(self): try: # 假设 Data 是 JSON 格式的数据 data_str = self.Data.decode('utf-8', errors='ignore') # 尝试解析为 JSON self.ParsedData = json.loads(data_str) print(f" Parsed Data: {json.dumps(self.ParsedData, indent=4, ensure_ascii=False)}") except json.JSONDecodeError: print(" Data is not valid JSON.") # 如果是其他格式,可以尝试其他解析方法 def add_child(self, item): self.Childs.append(item) class DzPlayersDb: def __init__(self, db_path): self.Players = [] connection = sqlite3.connect(db_path) cursor = connection.cursor() cursor.execute("SELECT * FROM Players") rows = cursor.fetchall() for row in rows: Id = row[0] Alive = row[1] UID = row[2] Data = row[3] self.Players.append(DzChar(Id, UID, bool(Alive), Data)) connection.close() def print_item(item, indent=0): indent_str = ' ' * indent print(f"{indent_str}Classname: {item.Classname}") print(f"{indent_str}Slot: {item.Slot}") print(f"{indent_str}PersistentGuid: {item.PersistentGuid}") print(f"{indent_str}Data: {item.Data.hex()}") # 如果解析出了 Data,可以打印解析后的数据 if hasattr(item, 'ParsedData'): print(f"{indent_str}Parsed Data: {json.dumps(item.ParsedData, indent=4, ensure_ascii=False)}") print(f"{indent_str}Number of Childs: {len(item.Childs)}") for child in item.Childs: print(f"{indent_str}Child Item:") print_item(child, indent + 1) def main(): db_path = 'players.db' # 确保这里的路径是您的数据库文件的正确路径 db = DzPlayersDb(db_path) for player in db.Players: print(f"\n--- Player ID: {player.ID} ---") print(f"UID: {player.UID}") print(f"Alive: {player.Alive}") print(f"Character Name: {player.CharacterName}") print(f"Version: {player.Ver}") print(f"Number of Items: {len(player.Items)}") # 如果解析出了 StatsData,可以打印解析后的数据 if hasattr(player, 'ParsedStatsData'): print(f"Parsed StatsData: {json.dumps(player.ParsedStatsData, indent=4, ensure_ascii=False)}") for idx, item in enumerate(player.Items, start=1): print(f"Item {idx}:") print_item(item) if __name__ == '__main__': main() Place the py file and players.db in the same directory Share this post Link to post Share on other sites
Sid Debian 135 Posted November 13, 2024 6 hours ago, DcrClub said: I have been studying the players.db file for a while. I have always wanted to use the correct method to read out all the data and perform addition, deletion, modification and query processing. However, I have little knowledge of binary data processing. Now I can only parse all the data of the current player, but I cannot process the basic attributes of the player and the context attributes of the inventory materials. I don't understand this data structure. Is there any teacher who can help me process the code to realize the addition, deletion, modification and query of the inventory materials? import struct from io import BytesIO import sqlite3 import json import bson class DzChar: def __init__(self, id, uid, alive, data): self.ID = id self.UID = uid self.Alive = alive self.Items = [] self.decode_data(data) def decode_data(self, data): try: stream = BytesIO(data) # 1. 读取 HeaderData self.HeaderData = stream.read(16) self.HeaderGUID = self.HeaderData.hex().upper() # 2. 读取 CharacterName Length char_name_len_data = stream.read(1) if len(char_name_len_data) < 1: print("Error: Unable to read CharacterName length.") return char_name_len = struct.unpack('<B', char_name_len_data)[0] # 3. 读取 CharacterName character_name_data = stream.read(char_name_len) if len(character_name_data) < char_name_len: print("Error: Unable to read complete CharacterName.") return self.CharacterName = character_name_data.decode('utf-8') # 4. 读取 StatsData Length data_len_data = stream.read(2) if len(data_len_data) < 2: print("Error: Unable to read StatsData length.") return data_len = struct.unpack('<H', data_len_data)[0] # 5. 读取 StatsData self.StatsData = stream.read(data_len) # self.parse_stats_data() # 6. 读取 Version ver_data = stream.read(2) if len(ver_data) < 2: print("Error: Unable to read Version.") return self.Ver = struct.unpack('<H', ver_data)[0] # 7. 读取 Items Count items_count_data = stream.read(4) if len(items_count_data) < 4: print("Error: Unable to read Items Count.") return items_count = struct.unpack('<I', items_count_data)[0] # 8. 读取 Items for i in range(items_count): print(f"Reading item {i+1}/{items_count}") item = DzItem(self.UID, stream) self.Items.append(item) # 9. 检查是否有剩余数据 remaining = stream.read() if remaining: print(f"Remaining Data ({len(remaining)} bytes): {remaining.hex()}") else: print("No remaining data.") except Exception as e: print(f"Exception during decode_data: {e}") def parse_stats_data(self): try: # 假设 StatsData 是 JSON 格式的数据 stats_str = self.StatsData.decode('utf-8', errors='ignore') # 尝试解析为 JSON self.ParsedStatsData = json.loads(stats_str) print(f"Parsed StatsData: {json.dumps(self.ParsedStatsData, indent=4, ensure_ascii=False)}") except json.JSONDecodeError: print("StatsData is not valid JSON.") # 如果是其他格式,可以尝试其他解析方法 class DzItem: def __init__(self, uid, stream, parent=None): try: self.Parent = uid self.ParentItem = parent self.Childs = [] # 读取 Data Count data_count_data = stream.read(4) if len(data_count_data) < 4: print("Error: Unable to read Data Count.") return self.DataCount = struct.unpack('<I', data_count_data)[0] # 读取 Classname Length item_name_len_data = stream.read(1) if len(item_name_len_data) < 1: print("Error: Unable to read Classname length.") return item_name_len = struct.unpack('<B', item_name_len_data)[0] # 读取 Classname self.Classname = stream.read(item_name_len).decode('utf-8') # 读取 Skip (6 bytes) self.Skip = stream.read(6) # 读取 Slot Length slot_len_data = stream.read(1) if len(slot_len_data) < 1: print("Error: Unable to read Slot length.") return slot_len = struct.unpack('<B', slot_len_data)[0] # 读取 Slot self.Slot = stream.read(slot_len).decode('utf-8') # 读取 Custom Data Length custom_data_len_data = stream.read(4) if len(custom_data_len_data) < 4: print("Error: Unable to read Custom Data Length.") return custom_data_len = struct.unpack('<I', custom_data_len_data)[0] # 读取 PersistentGuid (16 bytes) self.PersistentGuid = stream.read(16).hex().upper() # 读取 Data (custom_data_len - 16 bytes) data_length = custom_data_len - 16 self.Data = stream.read(data_length) # 尝试解析 Data # self.parse_data() # 读取 Childs Count childs_count_data = stream.read(4) if len(childs_count_data) < 4: print("Error: Unable to read Childs Count.") return childs_count = struct.unpack('<I', childs_count_data)[0] # 读取 Childs for _ in range(childs_count): child = DzItem(uid, stream, self) self.add_child(child) except Exception as e: print(f"Exception during DzItem initialization: {e}") def parse_data(self): try: # 假设 Data 是 JSON 格式的数据 data_str = self.Data.decode('utf-8', errors='ignore') # 尝试解析为 JSON self.ParsedData = json.loads(data_str) print(f" Parsed Data: {json.dumps(self.ParsedData, indent=4, ensure_ascii=False)}") except json.JSONDecodeError: print(" Data is not valid JSON.") # 如果是其他格式,可以尝试其他解析方法 def add_child(self, item): self.Childs.append(item) class DzPlayersDb: def __init__(self, db_path): self.Players = [] connection = sqlite3.connect(db_path) cursor = connection.cursor() cursor.execute("SELECT * FROM Players") rows = cursor.fetchall() for row in rows: Id = row[0] Alive = row[1] UID = row[2] Data = row[3] self.Players.append(DzChar(Id, UID, bool(Alive), Data)) connection.close() def print_item(item, indent=0): indent_str = ' ' * indent print(f"{indent_str}Classname: {item.Classname}") print(f"{indent_str}Slot: {item.Slot}") print(f"{indent_str}PersistentGuid: {item.PersistentGuid}") print(f"{indent_str}Data: {item.Data.hex()}") # 如果解析出了 Data,可以打印解析后的数据 if hasattr(item, 'ParsedData'): print(f"{indent_str}Parsed Data: {json.dumps(item.ParsedData, indent=4, ensure_ascii=False)}") print(f"{indent_str}Number of Childs: {len(item.Childs)}") for child in item.Childs: print(f"{indent_str}Child Item:") print_item(child, indent + 1) def main(): db_path = 'players.db' # 确保这里的路径是您的数据库文件的正确路径 db = DzPlayersDb(db_path) for player in db.Players: print(f"\n--- Player ID: {player.ID} ---") print(f"UID: {player.UID}") print(f"Alive: {player.Alive}") print(f"Character Name: {player.CharacterName}") print(f"Version: {player.Ver}") print(f"Number of Items: {len(player.Items)}") # 如果解析出了 StatsData,可以打印解析后的数据 if hasattr(player, 'ParsedStatsData'): print(f"Parsed StatsData: {json.dumps(player.ParsedStatsData, indent=4, ensure_ascii=False)}") for idx, item in enumerate(player.Items, start=1): print(f"Item {idx}:") print_item(item) if __name__ == '__main__': main() Place the py file and players.db in the same directory I can't help you with pytooh, but the base question is simple, how about the digging into PlayerBase class and subclasses to find structure of each variable/class instance and use that for processing binary data? When you know where is string and where is float - it's quite simpler to find where do you wrong 🙂 Share this post Link to post Share on other sites