167 lines
		
	
	
		
			6.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			167 lines
		
	
	
		
			6.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python3
 | |
| """
 | |
| 根据转换后的时间戳从原始数据集中还原音素对应的神经信号数据
 | |
| """
 | |
| 
 | |
| import pickle
 | |
| import numpy as np
 | |
| import h5py
 | |
| import os
 | |
| from pathlib import Path
 | |
| 
 | |
| def load_converted_phoneme_dataset(pkl_path):
 | |
|     """加载转换后的音素数据集"""
 | |
|     with open(pkl_path, 'rb') as f:
 | |
|         data = pickle.load(f)
 | |
|     return data
 | |
| 
 | |
| def load_h5_session_data(session_path):
 | |
|     """加载HDF5会话数据"""
 | |
|     with h5py.File(session_path, 'r') as f:
 | |
|         # 根据项目结构,神经特征应该在某个键下
 | |
|         print(f"HDF5文件键: {list(f.keys())}")
 | |
| 
 | |
|         # 尝试找到神经特征数据
 | |
|         if 'neuralFeatures' in f:
 | |
|             features = f['neuralFeatures'][:]
 | |
|         elif 'neural_features' in f:
 | |
|             features = f['neural_features'][:]
 | |
|         elif 'features' in f:
 | |
|             features = f['features'][:]
 | |
|         else:
 | |
|             # 检查所有键的形状
 | |
|             for key in f.keys():
 | |
|                 try:
 | |
|                     shape = f[key].shape
 | |
|                     print(f"  {key}: shape {shape}")
 | |
|                     # 寻找形状像神经特征的数据 (时间步, 特征数)
 | |
|                     if len(shape) == 2 and shape[1] == 512:
 | |
|                         features = f[key][:]
 | |
|                         print(f"使用 {key} 作为神经特征数据")
 | |
|                         break
 | |
|                 except:
 | |
|                     print(f"  {key}: 无法获取形状")
 | |
|             else:
 | |
|                 raise ValueError("未找到神经特征数据")
 | |
| 
 | |
|         print(f"神经特征形状: {features.shape}")
 | |
|         return features
 | |
| 
 | |
| def find_session_file(session_name, data_dir):
 | |
|     """根据会话名称找到对应的HDF5文件"""
 | |
|     data_path = Path(data_dir)
 | |
| 
 | |
|     # 尝试不同的文件命名模式
 | |
|     possible_patterns = [
 | |
|         f"{session_name}.h5",
 | |
|         f"{session_name}.hdf5",
 | |
|         f"*{session_name}*.h5",
 | |
|         f"*{session_name}*.hdf5"
 | |
|     ]
 | |
| 
 | |
|     for pattern in possible_patterns:
 | |
|         files = list(data_path.glob(pattern))
 | |
|         if files:
 | |
|             return files[0]
 | |
| 
 | |
|     # 如果没找到,列出所有文件看看
 | |
|     print(f"未找到会话 {session_name} 的文件")
 | |
|     print(f"数据目录 {data_dir} 中的文件:")
 | |
|     for f in data_path.glob("*.h5*"):
 | |
|         print(f"  {f.name}")
 | |
| 
 | |
|     return None
 | |
| 
 | |
| def restore_phoneme_samples(pkl_path, data_dir, num_samples=5):
 | |
|     """还原几个音素样本的神经信号数据"""
 | |
| 
 | |
|     print("=== 加载转换后的音素数据集 ===")
 | |
|     phoneme_data = load_converted_phoneme_dataset(pkl_path)
 | |
| 
 | |
|     print(f"音素数量: {len(phoneme_data)}")
 | |
|     print(f"第一个音素的键: {list(phoneme_data.keys())[:5]}")
 | |
| 
 | |
|     # 选择几个样本进行还原
 | |
|     sample_count = 0
 | |
| 
 | |
|     for phoneme, segments in phoneme_data.items():
 | |
|         if sample_count >= num_samples:
 | |
|             break
 | |
| 
 | |
|         if phoneme == '|':  # 跳过静音
 | |
|             continue
 | |
| 
 | |
|         print(f"\n=== 处理音素 '{phoneme}' ===")
 | |
|         print(f"该音素有 {len(segments)} 个segments")
 | |
| 
 | |
|         # 取第一个segment
 | |
|         segment = segments[0]
 | |
|         print(f"Segment信息:")
 | |
|         for key, value in segment.items():
 | |
|             if key != 'original_timestamps':
 | |
|                 print(f"  {key}: {value}")
 | |
| 
 | |
|         # 显示时间戳转换结果
 | |
|         original_ts = segment['original_timestamps']
 | |
|         print(f"原始时间戳转换:")
 | |
|         print(f"  输出时间戳: {segment['start_time']}-{segment['end_time']}")
 | |
|         print(f"  简单映射: {original_ts['simple']['start']}-{original_ts['simple']['end']}")
 | |
|         print(f"  保守映射: {original_ts['conservative']['start']}-{original_ts['conservative']['end']}")
 | |
|         print(f"  可能映射: {original_ts['likely']['start']}-{original_ts['likely']['end']}")
 | |
| 
 | |
|         # 尝试加载对应的会话数据
 | |
|         session_name = segment['session']
 | |
|         print(f"\n寻找会话文件: {session_name}")
 | |
| 
 | |
|         session_file = find_session_file(session_name, data_dir)
 | |
|         if session_file is None:
 | |
|             print(f"未找到会话 {session_name} 的数据文件")
 | |
|             continue
 | |
| 
 | |
|         print(f"找到会话文件: {session_file}")
 | |
| 
 | |
|         try:
 | |
|             # 加载神经特征数据
 | |
|             neural_features = load_h5_session_data(session_file)
 | |
| 
 | |
|             # 使用简单映射提取对应的神经信号
 | |
|             start_idx = original_ts['simple']['start']
 | |
|             end_idx = original_ts['simple']['end']
 | |
| 
 | |
|             if end_idx < neural_features.shape[0]:
 | |
|                 extracted_features = neural_features[start_idx:end_idx+1, :]
 | |
|                 print(f"\n成功提取神经信号:")
 | |
|                 print(f"  时间范围: {start_idx}-{end_idx} ({end_idx-start_idx+1} 个时间步)")
 | |
|                 print(f"  提取数据形状: {extracted_features.shape}")
 | |
|                 print(f"  时间长度: {(end_idx-start_idx+1) * 20}ms")
 | |
|                 print(f"  特征统计:")
 | |
|                 print(f"    均值: {np.mean(extracted_features):.4f}")
 | |
|                 print(f"    标准差: {np.std(extracted_features):.4f}")
 | |
|                 print(f"    最小值: {np.min(extracted_features):.4f}")
 | |
|                 print(f"    最大值: {np.max(extracted_features):.4f}")
 | |
| 
 | |
|                 # 显示前几个特征的值
 | |
|                 print(f"  前5个时间步的前10个特征:")
 | |
|                 for t in range(min(5, extracted_features.shape[0])):
 | |
|                     values = extracted_features[t, :10]
 | |
|                     print(f"    t+{t}: {values}")
 | |
| 
 | |
|                 sample_count += 1
 | |
|             else:
 | |
|                 print(f"时间戳超出数据范围: {end_idx} >= {neural_features.shape[0]}")
 | |
| 
 | |
|         except Exception as e:
 | |
|             print(f"处理会话 {session_name} 时出错: {e}")
 | |
|             continue
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     # 文件路径
 | |
|     pkl_path = "../phoneme_segmented_data/phoneme_dataset_20251009_202457_with_original_timestamps.pkl"
 | |
|     data_dir = "../data/hdf5_data_final"
 | |
| 
 | |
|     print("=== 音素数据还原测试 ===")
 | |
|     print(f"音素数据集: {pkl_path}")
 | |
|     print(f"神经数据目录: {data_dir}")
 | |
| 
 | |
|     restore_phoneme_samples(pkl_path, data_dir, num_samples=3)
 | |
|     print("\n=== 还原测试完成 ===") | 
