The format of analysis output command is as follows:
1 2 3
-export [options] Logfile or -export [options] Directory
If the name of a log file is given, then that file will be processed. If a directory is given, it will look for all .qmdl files within that directory, merge them, then run analysis on the merged file.
The analyzer option lets the user specify which analyzer they want to run. The names of the analyzers are in the format Name of Workspace; subfolder; Name of Analyzer. For example, if a user wants to run LTE PDCP DL Stats Summary, as shown in Figure 5-1, the command for this would be:
#---------------------------------------------------------------------------- # Parse.py #---------------------------------------------------------------------------- import os import sys import glob import time import subprocess import json import threading import random from pathlib import Path from typing importList, Tuple, Set from concurrent.futures import ThreadPoolExecutor, as_completed
#---------------------------------------------------------------------------- # State management functions #---------------------------------------------------------------------------- # Thread lock for state file operations state_lock = threading.Lock()
defget_state_file_path(output_folder: str) -> Path: """Get the path to the state file that tracks processed files.""" return Path(output_folder) / ".qcat_processing_state.json"
defload_processed_files(output_folder: str) -> Set[str]: """Load the set of already processed files from the state file.""" state_file = get_state_file_path(output_folder) if state_file.exists(): try: withopen(state_file, 'r', encoding='utf-8') as f: data = json.load(f) processed_files = set(data.get('processed_files', [])) print(f"\nLoaded state: {len(processed_files)} files already processed") return processed_files except (json.JSONDecodeError, IOError) as e: print(f"Warning: Could not load state file ({e}). Starting fresh.") returnset()
defsave_processed_file(output_folder: str, file_path: str): """Add a successfully processed file to the state file. Thread-safe.""" with state_lock: # Ensure thread-safe access to state file state_file = get_state_file_path(output_folder) # Load existing state processed_files = load_processed_files(output_folder) # Add new file processed_files.add(file_path) # Save updated state try: state_data = { 'processed_files': list(processed_files), 'last_updated': time.strftime('%Y-%m-%d %H:%M:%S') } withopen(state_file, 'w', encoding='utf-8') as f: json.dump(state_data, f, indent=2, ensure_ascii=False) except IOError as e: print(f"Warning: Could not save state file ({e}). Continuing without state tracking.")
defclear_state_file(output_folder: str): """Clear the state file (useful for restarting from scratch).""" state_file = get_state_file_path(output_folder) if state_file.exists(): try: state_file.unlink() print(f"Cleared state file: {state_file}") except IOError as e: print(f"Warning: Could not clear state file ({e}).")
for pattern in blacklist: files = [file for file in files ifnotany(part in file.name for part in pattern.split())]
return files
#---------------------------------------------------------------------------- # find_qcat_files() # Recursively find all QCAT log files in the given directory #---------------------------------------------------------------------------- deffind_qcat_files(input_folder: str) -> tuple[Path, List[Path]]: """ Find all QCAT log files recursively in the given folder. Common QCAT file extensions: .hdf, .qmdl, .qmdl2, .dlf, .isf Windows compatible path handling. """ qcat_extensions = ['*.hdf', '*.qmdl', '*.qmdl2', '*.dlf', '*.isf'] qcat_files: list[Path] = [] print(f"\nSearching for QCAT files in: {input_folder}") input_path = Path(input_folder) ifnot input_path.exists(): print(f"ERROR: Input folder does not exist: {input_folder}") return input_path, [] for ext in qcat_extensions: # Use pathlib for Windows compatibility files = list(input_path.rglob(ext)) qcat_files.extend(files)
# Remove duplicates and sort qcat_files = sorted(list(set(qcat_files))) # Strip common prefix (input folder) qcat_files = [file.relative_to(input_path) for file in qcat_files]
qcat_files = filter_files(qcat_files)
print(f"Found {len(qcat_files)} QCAT log files:") for file in qcat_files: print(f" {file}") return Path(input_folder), qcat_files
#---------------------------------------------------------------------------- # Single file processing function for thread pool #---------------------------------------------------------------------------- defprocess_single_file(args: tuple) -> tuple[str, bool, str]: """ Process a single QCAT file. Designed to be used with thread pool. Args: args: Tuple containing (input_path_full, output_path, file_key, output_folder) Returns: tuple: (file_key, success_flag, error_message) """ input_path_full, output_path, file_key, output_folder = args
# Add a random delay time.sleep(0.5 * random.randint(0, 15)) # To avoid opening too many QCAT at same time
try: # Create output directory output_path.mkdir(parents=True, exist_ok=True) print(f"?? [Thread {threading.current_thread().ident}] Processing: {input_path_full.name}") print(f" Output: {output_path}") # Run QCAT command result = subprocess.run([ "C:\\Program Files\\Qualcomm\\QCAT7\\Bin\\QCAT.exe", "-export", # "-workspace=C:\\Users\\DELL\\Desktop\\ajax\\ws.aws", "-delimiter=,", f"-outputdir={output_path}", str(input_path_full) ], check=True, capture_output=True, text=True) print(f"? [Thread {threading.current_thread().ident}] Successfully processed: {file_key}") # Save state immediately after successful processing save_processed_file(output_folder, file_key) return file_key, True, "" except subprocess.CalledProcessError as e: error_msg = f"QCAT command failed: {e}" if e.stderr: error_msg += f"\nStderr: {e.stderr}" print(f"? [Thread {threading.current_thread().ident}] Failed to process {file_key}: {error_msg}") return file_key, False, error_msg except Exception as e: error_msg = f"Unexpected error: {e}" print(f"? [Thread {threading.current_thread().ident}] Failed to process {file_key}: {error_msg}") return file_key, False, error_msg
defprocess_multiple_files(base_dir: tuple[Path, List[Path]], output_folder: str, max_workers: int = 4) -> int: """ Use ThreadPoolExecutor to process multiple QCAT files in parallel. Args: base_dir: Tuple of (input_path, qcat_files) output_folder: Output directory path max_workers: Maximum number of concurrent QCAT processes (default: 4) Returns: Number of successfully processed files """ input_path, qcat_files = base_dir # Load state of already processed files processed_files = load_processed_files(output_folder) # Filter out already processed files files_to_process = [] skipped_files = 0 for input_file in qcat_files: file_key = str(input_file) # Use relative path as key if file_key in processed_files: print(f"?? Skipping already processed file: {input_file}") skipped_files += 1 continue input_path_full = (input_path / input_file).resolve() output_path = (Path(output_folder) / input_file).resolve() files_to_process.append((input_path_full, output_path, file_key, output_folder)) if skipped_files > 0: print(f"\n?? Skipped {skipped_files} already processed files") ifnot files_to_process: print("?? No files to process") returnlen(processed_files) print(f"\n?? Starting parallel processing with {max_workers} workers") print(f"?? Files to process: {len(files_to_process)}") successful_files = len(processed_files) # Count already processed files failed_files = [] # Process files using ThreadPoolExecutor with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all tasks future_to_file = { executor.submit(process_single_file, args): args[2] for args in files_to_process } # Process completed tasks as they finish for future in as_completed(future_to_file): file_key = future_to_file[future] try: file_key_result, success, error_msg = future.result() if success: successful_files += 1 else: failed_files.append((file_key_result, error_msg)) except Exception as e: error_msg = f"Thread execution error: {e}" failed_files.append((file_key, error_msg)) print(f"? Thread failed for {file_key}: {error_msg}") # Print summary of failed files if failed_files: print(f"\n?? Failed files ({len(failed_files)}):") for file_key, error_msg in failed_files: print(f" - {file_key}: {error_msg}") return successful_files
#---------------------------------------------------------------------------- # main function #---------------------------------------------------------------------------- defmain(): """ Main function to process all QCAT files in the given folder and export 0xB193 packets. Uses ThreadPoolExecutor for parallel processing (Windows compatible, Python 3.10). Now includes state management to resume from where it left off if interrupted, and parallel processing support with configurable thread count. """ # Check command line arguments iflen(sys.argv) < 3: print("\nUsage: python Parse_cmd.py <input_folder> <output_folder> [--clear-state] [--threads=N]") print("\nDescription:") print(" Recursively finds all QCAT log files in input_folder and subfolders,") print(" extracts 0xB193 packets from each file using parallel QCAT instances,") print(" and exports them to Excel format in the output_folder.") print("\nSupported QCAT file extensions: .hdf, .qmdl, .qmdl2, .dlf, .isf") print("\nFeatures:") print(" - Windows compatible with Python 3.10") print(" - Parallel processing: Up to 4 concurrent QCAT instances by default") print(" - Memory efficient: Separate QCAT instance per thread") print(" - Recursive folder search") print(" - Excel export for each file with 0xB193 packets") print(" - State management: Resume processing if interrupted") print("\nOptions:") print(" --clear-state Clear the processing state and start fresh") print(" --threads=N Set number of parallel threads (1-8, default: 4)") print("\nExample:") print(" python Parse_cmd.py C:\\logs C:\\output") print(" python Parse_cmd.py C:\\logs C:\\output --clear-state") print(" python Parse_cmd.py C:\\logs C:\\output --threads=2") print(" python Parse_cmd.py C:\\logs C:\\output --clear-state --threads=6") sys.exit(1)
input_folder = sys.argv[1] output_folder = sys.argv[2] # Parse command line options clear_state = '--clear-state'in sys.argv max_workers = 4# Default # Parse --threads option for arg in sys.argv[3:]: if arg.startswith('--threads='): try: max_workers = int(arg.split('=')[1]) if max_workers < 1or max_workers > 8: print("ERROR: Thread count must be between 1 and 8") sys.exit(1) except ValueError: print("ERROR: Invalid thread count format. Use --threads=N") sys.exit(1)
# Validate input folder using pathlib for Windows compatibility input_path = Path(input_folder) ifnot input_path.exists(): print(f"ERROR: Input folder does not exist: {input_folder}") sys.exit(1)
ifnot input_path.is_dir(): print(f"ERROR: Input path is not a directory: {input_folder}") sys.exit(1)
# Create output folder if it doesn't exist output_path = Path(output_folder) output_path.mkdir(parents=True, exist_ok=True) print(f"Output folder: {output_path.absolute()}") # Handle state clearing if requested if clear_state: clear_state_file(str(output_path)) print("Processing state cleared. Starting fresh.")
# Find all QCAT files base_dir, qcat_files = find_qcat_files(str(input_path))
ifnot qcat_files: print("No QCAT log files found in the specified folder.") sys.exit(0) # Show current state processed_files = load_processed_files(str(output_path)) remaining_files = [f for f in qcat_files ifstr(f) notin processed_files] print(f"\n?? Processing Status:") print(f" Total files found: {len(qcat_files)}") print(f" Already processed: {len(processed_files)}") print(f" Remaining to process: {len(remaining_files)}") iflen(remaining_files) == 0: print(f"\n? All files have already been processed!") print(f"Use --clear-state flag if you want to reprocess all files.") sys.exit(0)
# Process all files using parallel QCAT instances print(f"\n{'='*80}") print(f"Starting parallel processing of {len(remaining_files)} remaining files...") print(f"Using {max_workers} concurrent QCAT instances") print(f"State will be saved after each successful file") print(f"{'='*80}")
# Summary print(f"\n{'='*80}") print("PROCESSING COMPLETE") print(f"{'='*80}") print(f"Total files found: {len(qcat_files)}") print(f"Successfully processed: {successful_files}") print(f"Failed to process: {len(qcat_files) - successful_files}") print(f"Processing time: {processing_time:.1f} seconds") print(f"Average time per file: {processing_time/len(qcat_files):.1f} seconds") print(f"Parallel efficiency: {max_workers} threads") print(f"Output folder: {output_path.absolute()}") if successful_files < len(qcat_files): failed_count = len(qcat_files) - successful_files print(f"\n?? WARNING: {failed_count} files failed to process.") print("Check the console output above for details.") print("You can restart the script to retry failed files.") else: print(f"\n?? All files processed successfully!") print(f"\nLook for files ending with '_0xB193_packets.xlsx' in the output folder.") print(f"State file saved at: {get_state_file_path(str(output_path))}")
#----------------------------------------------------------------------------- # Entry point #----------------------------------------------------------------------------- if __name__ == "__main__": main()
#---------------------------------------------------------------------------- # QcatClient has dependency on QUTS, so QUTS installation is required #---------------------------------------------------------------------------- from sys import platform if platform == "linux"or platform == "linux2": sys.path.append('/opt/qcom/QCAT7/Support/Python') elif platform == "win32": sys.path.append('C:\\Program Files\\Qualcomm\\QUTS\\Support\\python') sys.path.append('C:\\Program Files\\Qualcomm\\QCAT7\\Support\\Python') import QcatClient import Common.ttypes from QcatClient import QcatAutomationClient
#---------------------------------------------------------------------------- # Logging helper functions #---------------------------------------------------------------------------- deflog_with_thread(message: str, level: str = "INFO"): """Log message with thread name for better tracking in multithreading""" thread_name = threading.current_thread().name print(f"[{level}][{thread_name}] {message}")
deflog_thread_minimal(message: str): """Minimal logging for threads to reduce output""" thread_name = threading.current_thread().name print(f"[{thread_name}] {message}")
#---------------------------------------------------------------------------- # find_filter_files() # Find all filter files in the Filters directory #---------------------------------------------------------------------------- deffind_filter_files(filters_folder: str = "./Filters") -> List[Path]: """ Find all filter files in the Filters directory. Filter files should be .txt files containing packet filter IDs. """ filters_path = Path(filters_folder) ifnot filters_path.exists(): print(f"WARNING: Filters folder does not exist: {filters_folder}") return [] filter_files = list(filters_path.glob("*.txt")) print(f"\nFound {len(filter_files)} filter files:") for file in filter_files: print(f" {file.name}") return filter_files
#---------------------------------------------------------------------------- # find_qcat_files() # Recursively find all QCAT log files in the given directory #---------------------------------------------------------------------------- deffind_qcat_files(input_folder: str) -> Tuple[Path, List[Path]]: """ Find all QCAT log files recursively in the given folder. Common QCAT file extensions: .hdf, .qmdl, .qmdl2, .dlf, .isf Windows compatible path handling. """ qcat_extensions = ['*.hdf', '*.qmdl', '*.qmdl2', '*.dlf', '*.isf'] qcat_files: List[Path] = [] print(f"\nSearching for QCAT files in: {input_folder}") input_path = Path(input_folder) ifnot input_path.exists(): print(f"ERROR: Input folder does not exist: {input_folder}") return input_path, [] for ext in qcat_extensions: # Use pathlib for Windows compatibility files = list(input_path.rglob(ext)) qcat_files.extend(files)
# Remove duplicates and sort qcat_files = sorted(list(set(qcat_files))) # Strip common prefix (input folder) qcat_files = [file.relative_to(input_path) for file in qcat_files]
print(f"Found {len(qcat_files)} QCAT log files:") for file in qcat_files: print(f" {file}") return Path(input_folder), qcat_files
#---------------------------------------------------------------------------- # QcatProcessor class # Manages a single QCAT instance for processing multiple files #---------------------------------------------------------------------------- classQcatProcessor: """ Windows-compatible QCAT processor that maintains a single QCAT instance and processes files by closing/opening within the same instance. """ def__init__(self, client_name: str = "Parser"): """Initialize the QCAT processor with a single instance.""" self.qcat_auto_client = None self.pid = None self.client_name = client_name self.is_initialized = False self.current_file = None self.current_filter = None definitialize(self, filter_file: Optional[Path] = None) -> bool: """ Initialize the QCAT automation client and manager. Returns True if successful, False otherwise. """ try: log_with_thread("Initializing QCAT automation client...") self.qcat_auto_client = QcatClient.QcatAutomationClient(self.client_name)
if qcatApp isNone: raise ValueError("Failed to get QCAT automation manager") self.qcatApp = qcatApp
self.pid = self.qcatApp.GetProcessID() log_with_thread(f"QCAT pid: {self.pid}") # Load filter file if provided if filter_file isNone: raise ValueError("Filter file is required") log_with_thread(f"Loading filter file: {filter_file.name}") self.current_filter = filter_file try: filters = map(int, open(filter_file, encoding="utf-8").readlines()) except FileNotFoundError: log_with_thread(f"Filter file not found: {filter_file}", "ERROR") returnFalse self.filters = [] for f in filters: if f == -1: break self.filters.append(f)
log_thread_minimal(f"Loaded {len(self.filters)} filters") self.is_initialized = True log_with_thread("QCAT initialization successful") returnTrue except Exception as e: log_with_thread(f"Failed to initialize QCAT: {str(e)}", "ERROR") returnFalse defprocess_file(self, base_dir: Path, input_file: Path, output_folder: str) -> Tuple[bool, int]: """ Process a single file using the existing QCAT instance. Creates a subfolder for each input file and generates multiple Excel files. Returns (success, packet_count) tuple. """ ifnotself.is_initialized: log_with_thread("QCAT not initialized", "ERROR") returnFalse, 0 input_path = (base_dir / input_file).resolve() log_thread_minimal(f"Processing: {input_path.name}") try: # Close current file if one is open ifself.current_file: close_result = self.qcatApp.CloseFile() if close_result != 1: log_with_thread("Failed to close previous file", "WARN") self.current_file = None
self.qcatApp.SetAll("PacketFilter", 1) defsf(f: int): self.qcatApp.Set("PacketFilter", f, 0) results = list(map(sf, self.filters)) result = self.qcatApp.Commit("PacketFilter") # Open the new log file filePath = str(input_path) ifself.qcatApp.OpenLog([filePath]) != 1: log_with_thread(f"Failed to open {input_file}", "ERROR") returnFalse, 0
# Check encryption status only if needed (reduced logging) try: keyInfoStatus = self.qcatApp.EncryptionKeyExchangeStatus() if keyInfoStatus != "Available": # Only log if not standard log_with_thread(f"Encryption status: {keyInfoStatus}", "WARN") except: pass# Suppress encryption status errors in threads
returnTrue, visible_packets
except Exception as e: log_with_thread(f"Exception while processing {input_file}: {str(e)}", "ERROR") returnFalse, 0 defcleanup(self): """Clean up the QCAT instance.""" try: ifself.qcatApp andself.is_initialized: log_thread_minimal("Cleaning up QCAT...") # Close current file if open ifself.current_file: try: self.qcatApp.CloseFile() except: log_with_thread("Could not close current file", "WARN")
ifself.pid: time.sleep(1) # Give QCAT time to exit cleanly try: import subprocess subprocess.run(['taskkill', '/F', '/PID', str(self.pid)], capture_output=True, check=False) log_thread_minimal(f"Force terminated QCAT process {self.pid}") except: log_with_thread("Could not force terminate QCAT process", "WARN")
except Exception as e: log_with_thread(f"Error during cleanup: {str(e)}", "WARN") # On Windows, we could try to terminate the process ifself.pid: try: import subprocess subprocess.run(['taskkill', '/F', '/PID', str(self.pid)], capture_output=True, check=False) log_thread_minimal(f"Force terminated QCAT process {self.pid}") except: log_with_thread("Could not force terminate QCAT process", "WARN")
#---------------------------------------------------------------------------- # Process single filter file #---------------------------------------------------------------------------- defprocess_single_filter(filter_file: Path, base_dir: Path, qcat_files: List[Path], output_folder: str) -> Dict: """ Process all QCAT files with a single filter file. Returns a dictionary with results for this filter. """ # Set thread name for better logging thread_name = f"Filter-{filter_file.stem}" threading.current_thread().name = thread_name results = { 'filter_name': filter_file.stem, 'filter_file': str(filter_file), 'total_packets': 0, 'processed_files': 0, 'failed_files': 0, 'file_results': [] } processor = QcatProcessor(f"Parser_{filter_file.stem}") try: # Initialize QCAT with this filter ifnot processor.initialize(filter_file): log_with_thread(f"Failed to initialize QCAT for filter {filter_file.name}", "ERROR") return results log_with_thread(f"Processing {len(qcat_files)} files with filter: {filter_file.name}") for i, qcat_file inenumerate(qcat_files, 1): # Minimal progress logging every 10 files to reduce output if i % 10 == 1or i == len(qcat_files): log_thread_minimal(f"[{i}/{len(qcat_files)}] {qcat_file.name}") success, packet_count = processor.process_file(base_dir, qcat_file, output_folder) file_result = { 'file_name': str(qcat_file), 'success': success, 'packet_count': packet_count } results['file_results'].append(file_result) if success: results['processed_files'] += 1 results['total_packets'] += packet_count # Only log individual files if they have packets or fail if packet_count > 0: log_thread_minimal(f"✓ {qcat_file.name} ({packet_count} packets)") else: results['failed_files'] += 1 log_with_thread(f"✗ FAILED: {qcat_file.name}", "ERROR") finally: processor.cleanup() return results
#---------------------------------------------------------------------------- # Save results to file #---------------------------------------------------------------------------- defsave_results_to_file(all_results: List[Dict], output_folder: str): """ Save the processing results to JSON and summary text files. """ output_path = Path(output_folder) # Save detailed JSON results json_file = output_path / "packet_count_results.json" withopen(json_file, 'w', encoding='utf-8') as f: json.dump(all_results, f, indent=2, ensure_ascii=False) print(f"\nDetailed results saved to: {json_file}") # Save summary text file summary_file = output_path / "packet_count_summary.txt" withopen(summary_file, 'w', encoding='utf-8') as f: f.write("QCAT Packet Count Processing Summary\n") f.write("=" * 50 + "\n\n") total_all_packets = 0 for result in all_results: filter_name = result['filter_name'] total_packets = result['total_packets'] processed_files = result['processed_files'] failed_files = result['failed_files'] f.write(f"Filter: {filter_name}\n") f.write(f" Total packets found: {total_packets}\n") f.write(f" Successfully processed files: {processed_files}\n") f.write(f" Failed files: {failed_files}\n") f.write(f" Filter file: {result['filter_file']}\n\n") total_all_packets += total_packets f.write(f"GRAND TOTAL PACKETS: {total_all_packets}\n") f.write(f"Total filters processed: {len(all_results)}\n") print(f"Summary saved to: {summary_file}") # Print summary to console print(f"\n{'='*60}") print("FINAL SUMMARY") print(f"{'='*60}") total_all_packets = 0 for result in all_results: print(f"Filter {result['filter_name']}: {result['total_packets']} packets") total_all_packets += result['total_packets'] print(f"{'='*60}") print(f"GRAND TOTAL PACKETS: {total_all_packets}")
#---------------------------------------------------------------------------- # Process multiple files with multiple filters in parallel #---------------------------------------------------------------------------- defprocess_multiple_files_parallel(files: Tuple[Path, List[Path]], output_folder: str, max_workers: int = 10) -> List[Dict]: """ Process multiple QCAT files with multiple filters in parallel. Returns list of results for each filter. """ base_dir, qcat_files = files # Find all filter files filter_files = find_filter_files() ifnot filter_files: print("ERROR: No filter files found") return [] print(f"\n{'='*80}") print(f"Starting parallel processing:") print(f" Files to process: {len(qcat_files)}") print(f" Filters to apply: {len(filter_files)}") print(f" Max parallel workers: {max_workers}") print(f"{'='*80}") all_results = [] # Process filters in parallel with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all filter processing tasks future_to_filter = { executor.submit(process_single_filter, filter_file, base_dir, qcat_files, output_folder): filter_file for filter_file in filter_files } # Collect results as they complete for future in as_completed(future_to_filter): filter_file = future_to_filter[future] try: result = future.result() all_results.append(result) print(f"\n✓ [MAIN] Completed filter: {filter_file.name}") print(f" Total packets found: {result['total_packets']}") print(f" Processed files: {result['processed_files']}") print(f" Failed files: {result['failed_files']}") except Exception as exc: print(f"\n✗ [MAIN] Filter {filter_file.name} generated an exception: {exc}") # Add failed result failed_result = { 'filter_name': filter_file.stem, 'filter_file': str(filter_file), 'total_packets': 0, 'processed_files': 0, 'failed_files': len(qcat_files), 'file_results': [], 'error': str(exc) } all_results.append(failed_result) return all_results
# #---------------------------------------------------------------------------- # # Process multiple files with single QCAT instance (original function) # #---------------------------------------------------------------------------- # """ # Process multiple QCAT files using a single QCAT instance. # Returns (successful_count, total_packets) tuple. # """ # processor = QcatProcessor() # # Initialize QCAT once # if not processor.initialize(): # print("ERROR: Failed to initialize QCAT processor") # return 0, 0 # successful_files = 0 # total_packets_exported = 0 # base_dir, qcat_files = files # try: # print(f"\n{'='*80}") # print(f"Processing {len(qcat_files)} files with single QCAT instance...") # print(f"{'='*80}")
# for i, qcat_file in enumerate(qcat_files, 1): # print(f"\n[{i}/{len(qcat_files)}] {Path(qcat_file).name}") # success, packet_count = processor.process_file(base_dir, qcat_file, output_folder) # if success: # successful_files += 1 # total_packets_exported += packet_count # status = "✓ SUCCESS" # if packet_count > 0: # status += f" ({packet_count} packets)" # else: # status += " (no packets)" # else: # status = "✗ FAILED" # print(f" {status}") # # Small delay between files for stability # time.sleep(0.5)
# finally: # # Always cleanup, even if there was an error # processor.cleanup() # return successful_files, total_packets_exported
#---------------------------------------------------------------------------- # main function #---------------------------------------------------------------------------- defmain(): """ Main function to process all QCAT files in the given folder and export 0xB193 packets. Uses a single QCAT instance for all files (Windows compatible, Python 3.10). """ # Check command line arguments iflen(sys.argv) < 3: print("\nUsage: python Export0xB193Packets.py <input_folder> <output_folder>") print("\nDescription:") print(" Recursively finds all QCAT log files in input_folder and subfolders,") print(" extracts 0xB193 packets from each file using a SINGLE QCAT instance,") print(" and exports them to Excel format in the output_folder.") print("\nSupported QCAT file extensions: .hdf, .qmdl, .qmdl2, .dlf, .isf") print("\nFeatures:") print(" - Windows compatible with Python 3.10") print(" - Memory efficient: Single QCAT instance, close/open files as needed") print(" - Recursive folder search") print(" - Excel export for each file with 0xB193 packets") print("\nExample:") print(" python Export0xB193Packets.py C:\\logs C:\\output") sys.exit(1)
# Validate input folder using pathlib for Windows compatibility input_path = Path(input_folder) ifnot input_path.exists(): print(f"ERROR: Input folder does not exist: {input_folder}") sys.exit(1)
ifnot input_path.is_dir(): print(f"ERROR: Input path is not a directory: {input_folder}") sys.exit(1)
# Create output folder if it doesn't exist output_path = Path(output_folder) output_path.mkdir(parents=True, exist_ok=True) print(f"Output folder: {output_path.absolute()}")
# Find all QCAT files base_dir, qcat_files = find_qcat_files(str(input_path))
ifnot qcat_files: print("No QCAT log files found in the specified folder.") sys.exit(0)
# Process all files using parallel processing with multiple filters print(f"\n{'='*80}") print(f"Starting parallel processing of {len(qcat_files)} files...") print(f"Using multiple filters in parallel for memory efficiency") print(f"{'='*80}")
all_results = process_multiple_files_parallel((base_dir, qcat_files), str(output_path)) # Save results to files save_results_to_file(all_results, str(output_path))
# Calculate summary statistics total_filters = len(all_results) total_packets_all_filters = sum(result['total_packets'] for result in all_results) total_successful_files = sum(result['processed_files'] for result in all_results) total_failed_files = sum(result['failed_files'] for result in all_results)
# Summary print(f"\n{'='*80}") print("PROCESSING COMPLETE") print(f"{'='*80}") print(f"Total QCAT files found: {len(qcat_files)}") print(f"Total filters processed: {total_filters}") print(f"Total packets found (all filters): {total_packets_all_filters}") print(f"Successfully processed file instances: {total_successful_files}") print(f"Failed file instances: {total_failed_files}") print(f"Output folder: {output_path.absolute()}") if total_failed_files > 0: print(f"\nWARNING: {total_failed_files} file processing instances failed.") print("Check the console output above for details.") else: print(f"\n✓ All files processed successfully with all filters!")
#----------------------------------------------------------------------------- # Entry point #----------------------------------------------------------------------------- if __name__ == "__main__": main()