165 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			165 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python3
 | |
| """
 | |
| Display the contents and structure of the phoneme dataset pkl files
 | |
| """
 | |
| 
 | |
| import pickle
 | |
| import numpy as np
 | |
| from pathlib import Path
 | |
| 
 | |
| def show_phoneme_dataset_structure():
 | |
|     """Show the structure of phoneme dataset pkl files"""
 | |
| 
 | |
|     data_dir = Path("phoneme_segmented_data")
 | |
|     if not data_dir.exists():
 | |
|         print("No phoneme_segmented_data directory found")
 | |
|         return
 | |
| 
 | |
|     # Find all dataset files
 | |
|     dataset_files = list(data_dir.glob("phoneme_dataset_*.pkl"))
 | |
|     ctc_files = list(data_dir.glob("ctc_results_*.pkl"))
 | |
| 
 | |
|     print("=== PKL Files in phoneme_segmented_data/ ===")
 | |
|     print(f"Found {len(dataset_files)} phoneme dataset files:")
 | |
|     for f in sorted(dataset_files):
 | |
|         size_mb = f.stat().st_size / (1024 * 1024)
 | |
|         print(f"  {f.name} ({size_mb:.1f} MB)")
 | |
| 
 | |
|     print(f"\nFound {len(ctc_files)} CTC results files:")
 | |
|     for f in sorted(ctc_files):
 | |
|         size_mb = f.stat().st_size / (1024 * 1024)
 | |
|         print(f"  {f.name} ({size_mb:.1f} MB)")
 | |
| 
 | |
|     if not dataset_files:
 | |
|         print("No dataset files to examine")
 | |
|         return
 | |
| 
 | |
|     # Load the latest dataset file
 | |
|     latest_dataset = max(dataset_files, key=lambda x: x.stat().st_mtime)
 | |
|     print(f"\n=== Examining Latest Dataset: {latest_dataset.name} ===")
 | |
| 
 | |
|     with open(latest_dataset, 'rb') as f:
 | |
|         dataset = pickle.load(f)
 | |
| 
 | |
|     print(f"Dataset type: {type(dataset)}")
 | |
|     print(f"Dataset size: {len(dataset)} phoneme types")
 | |
| 
 | |
|     # Show phoneme types and their counts
 | |
|     print(f"\n=== Phoneme Types and Counts ===")
 | |
|     total_segments = 0
 | |
|     for phoneme, segments in sorted(dataset.items()):
 | |
|         count = len(segments)
 | |
|         total_segments += count
 | |
|         print(f"'{phoneme}': {count:5d} segments")
 | |
| 
 | |
|     print(f"\nTotal segments across all phonemes: {total_segments:,}")
 | |
| 
 | |
|     # Show sample segments from a few phonemes
 | |
|     print(f"\n=== Sample Segment Structure ===")
 | |
|     sample_phonemes = [' | ', 'AA', 'IH', 'T', 'S']  # Common phonemes
 | |
| 
 | |
|     for phoneme in sample_phonemes:
 | |
|         if phoneme in dataset and dataset[phoneme]:
 | |
|             segment = dataset[phoneme][0]  # First segment
 | |
|             print(f"\nSample segment for '{phoneme}':")
 | |
|             print(f"  Type: {type(segment)}")
 | |
|             if isinstance(segment, dict):
 | |
|                 for key, value in segment.items():
 | |
|                     if key == 'confidence':
 | |
|                         print(f"  {key}: {value:.6f}")
 | |
|                     elif isinstance(value, np.integer):
 | |
|                         print(f"  {key}: {int(value)}")
 | |
|                     else:
 | |
|                         print(f"  {key}: {value}")
 | |
|             break
 | |
| 
 | |
|     # Check for time alignment issues
 | |
|     print(f"\n=== Time Alignment Check ===")
 | |
|     error_count = 0
 | |
|     total_checked = 0
 | |
| 
 | |
|     for phoneme, segments in dataset.items():
 | |
|         for segment in segments[:10]:  # Check first 10 of each phoneme
 | |
|             if isinstance(segment, dict) and 'start_time' in segment and 'end_time' in segment:
 | |
|                 start_time = int(segment['start_time'])
 | |
|                 end_time = int(segment['end_time'])
 | |
|                 total_checked += 1
 | |
| 
 | |
|                 if end_time < start_time:
 | |
|                     error_count += 1
 | |
|                     if error_count <= 5:  # Show first 5 errors
 | |
|                         print(f"  ❌ Error: '{phoneme}' segment has start={start_time}, end={end_time}")
 | |
| 
 | |
|     if total_checked > 0:
 | |
|         error_rate = (error_count / total_checked) * 100
 | |
|         print(f"\nChecked {total_checked} segments:")
 | |
|         print(f"  ✅ Valid segments: {total_checked - error_count}")
 | |
|         print(f"  ❌ Invalid segments: {error_count}")
 | |
|         print(f"  Error rate: {error_rate:.1f}%")
 | |
| 
 | |
|     # Show session and trial distribution
 | |
|     print(f"\n=== Session and Trial Distribution ===")
 | |
|     sessions = set()
 | |
|     trials = set()
 | |
| 
 | |
|     for phoneme, segments in dataset.items():
 | |
|         for segment in segments[:100]:  # Sample first 100 of each phoneme
 | |
|             if isinstance(segment, dict):
 | |
|                 if 'session' in segment:
 | |
|                     sessions.add(segment['session'])
 | |
|                 if 'trial_num' in segment:
 | |
|                     trials.add(f"{segment.get('session', 'unknown')}_trial_{segment['trial_num']}")
 | |
| 
 | |
|     print(f"Sessions represented: {len(sessions)}")
 | |
|     if len(sessions) <= 10:
 | |
|         for session in sorted(sessions):
 | |
|             print(f"  {session}")
 | |
|     else:
 | |
|         for session in sorted(list(sessions)[:5]):
 | |
|             print(f"  {session}")
 | |
|         print(f"  ... and {len(sessions) - 5} more")
 | |
| 
 | |
|     print(f"Unique trials sampled: {len(trials)}")
 | |
| 
 | |
| def show_ctc_results_structure():
 | |
|     """Show the structure of CTC results pkl files"""
 | |
| 
 | |
|     data_dir = Path("phoneme_segmented_data")
 | |
|     ctc_files = list(data_dir.glob("ctc_results_*.pkl"))
 | |
| 
 | |
|     if not ctc_files:
 | |
|         print("\n=== No CTC Results Files Found ===")
 | |
|         return
 | |
| 
 | |
|     # Load the latest CTC results file
 | |
|     latest_ctc = max(ctc_files, key=lambda x: x.stat().st_mtime)
 | |
|     print(f"\n=== Examining Latest CTC Results: {latest_ctc.name} ===")
 | |
| 
 | |
|     with open(latest_ctc, 'rb') as f:
 | |
|         ctc_data = pickle.load(f)
 | |
| 
 | |
|     print(f"CTC data type: {type(ctc_data)}")
 | |
|     print(f"CTC data length: {len(ctc_data)}")
 | |
| 
 | |
|     # Show sample CTC result
 | |
|     if ctc_data:
 | |
|         sample = ctc_data[0]
 | |
|         print(f"\nSample CTC result:")
 | |
|         print(f"  Type: {type(sample)}")
 | |
|         if isinstance(sample, dict):
 | |
|             for key, value in sample.items():
 | |
|                 if key == 'ctc_score':
 | |
|                     print(f"  {key}: {value:.3f}")
 | |
|                 elif key == 'predicted_phonemes' and isinstance(value, list):
 | |
|                     phonemes_str = ' '.join(value[:10])  # First 10 phonemes
 | |
|                     if len(value) > 10:
 | |
|                         phonemes_str += f" ... ({len(value)} total)"
 | |
|                     print(f"  {key}: [{phonemes_str}]")
 | |
|                 elif key == 'alignment_info' and isinstance(value, list):
 | |
|                     print(f"  {key}: {len(value)} alignment segments")
 | |
|                 else:
 | |
|                     print(f"  {key}: {str(value)[:100]}{'...' if len(str(value)) > 100 else ''}")
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     show_phoneme_dataset_structure()
 | |
|     show_ctc_results_structure() | 
