1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
| from scapy.all import rdpcap, TCP, IP
import pandas as pd
from typing import Dict, List, Tuple
import hashlib
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
import os
def calculate_md5(num: int) -> bytes:
"""计算整数的MD5,返回字节格式作为AES密钥"""
num_str = str(num)
md5_hash = hashlib.md5(num_str.encode('utf-8')).hexdigest()
# 对于AES-256,我们需要32字节密钥,所以用MD5重复填充到32字节
key = (md5_hash * 2)[:32].encode('utf-8') # 重复MD5并取前32字符
return key
def aes_ecb_decrypt(ciphertext_hex: str, key: bytes) -> bytes:
"""使用AES-256-ECB解密数据"""
try:
ciphertext = bytes.fromhex(ciphertext_hex)
## if ciphertext.startswith(b"8950"):
## return f"[AES解密错误: {str(e)}]".encode()
cipher = AES.new(key, AES.MODE_ECB)
decrypted = cipher.decrypt(ciphertext)
# 尝试去除PKCS7填充
try:
decrypted = unpad(decrypted, AES.block_size)
except ValueError:
# 如果没有填充或者填充不正确,保持原样
pass
return decrypted
except Exception as e:
return f"[AES解密错误: {str(e)}]".encode()
def extract_tcp_communication(pcap_file: str, ip1: str, ip2: str) -> List[Tuple[str, str, bytes]]:
"""
从PCAP文件中提取两个IP的TCP通讯数据,按通讯方向和端口号分组
参数:
pcap_file: PCAP/PCAPNG文件路径
ip1: 第一个目标IP
ip2: 第二个目标IP
返回:
包含(方向端口、十六进制数据、解密数据)的元组列表
"""
# 读取PCAP文件
print(f"正在读取文件: {pcap_file}")
packets = rdpcap(pcap_file)
print(f"共读取到 {len(packets)} 个数据包")
# 存储筛选后的TCP数据
tcp_data: List[Dict] = []
# 遍历所有数据包,筛选目标IP的TCP通讯
for pkt in packets:
# 只保留IP+TCP层的数据包
if IP in pkt and TCP in pkt:
ip_layer = pkt[IP]
tcp_layer = pkt[TCP]
# 筛选条件:通讯双方必须是 ip1 和 ip2(双向都包含)
src_ip = ip_layer.src
dst_ip = ip_layer.dst
if not ((src_ip == ip1 and dst_ip == ip2) or (src_ip == ip2 and dst_ip == ip1)):
continue
# 提取TCP关键信息
src_port = tcp_layer.sport
dst_port = tcp_layer.dport
payload = bytes(pkt[TCP].payload) if pkt[TCP].payload else b""
# 只处理有负载的数据包
if len(payload) > 0:
data = {
"src_ip": src_ip,
"dst_ip": dst_ip,
"src_port": src_port,
"dst_port": dst_port,
"timestamp": pkt.time,
"seq_num": tcp_layer.seq,
"payload": payload,
"payload_len": len(payload)
}
tcp_data.append(data)
if not tcp_data:
print("未找到目标IP之间的TCP通讯数据")
return []
# 转换为DataFrame,方便分组处理
df = pd.DataFrame(tcp_data)
print(f"筛选出 {len(df)} 个目标TCP数据包")
# 定义通讯方向和端口分组
df["方向端口"] = df.apply(
lambda x: f"{x['src_ip']}:{x['src_port']}→{x['dst_ip']}:{x['dst_port']}",
axis=1
)
# 按通讯方向和端口分组,合并同组的TCP数据
def merge_tcp_direction(group: pd.DataFrame) -> Tuple[str, str, bytes, int]:
"""合并同一通讯方向和端口的TCP数据包"""
direction = group["方向端口"].iloc[0]
src_port = group["src_port"].iloc[0]
# 按序列号排序,确保数据顺序正确
sorted_group = group.sort_values("seq_num")
# 合并所有负载数据
merged_payload = b"".join(sorted_group["payload"].tolist())
# 转换为十六进制字符串
hex_data = merged_payload.hex()
# 使用源端口的MD5作为AES密钥进行解密
aes_key = calculate_md5(src_port)
if bytes.fromhex(hex_data).startswith(b"8950"):
decrypted_data = ""
else:
decrypted_data = aes_ecb_decrypt(hex_data, aes_key)
# 获取数据包数量
packet_count = len(group)
return direction, hex_data, decrypted_data, packet_count, src_port
# 按方向和端口分组合并
results = []
successful_decrypts = 0
failed_decrypts = 0
for direction_port, group in df.groupby("方向端口"):
result = merge_tcp_direction(group)
direction, hex_data, decrypted_data, packet_count, src_port = result
# 统计解密结果
if isinstance(decrypted_data, bytes) and decrypted_data:
successful_decrypts += 1
status = "成功"
else:
failed_decrypts += 1
status = "失败"
print(f"方向: {direction_port}, 数据包: {packet_count}个, 解密: {status}")
results.append((direction_port, hex_data, decrypted_data, src_port))
print(f"\n解密统计: 成功 {successful_decrypts} 个, 失败 {failed_decrypts} 个")
return results
def save_all_decrypted_data(results: List[Tuple[str, str, bytes, int]], output_dir: str = "output"):
"""将每个流量的解密数据单独保存为ZIP文件"""
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
saved_files = []
for i, (direction_port, hex_data, decrypted_data, src_port) in enumerate(results, 1):
if isinstance(decrypted_data, bytes) and decrypted_data:
# 简洁的文件名:只使用索引
filename = f"flow_{i:03d}.zip"
filepath = os.path.join(output_dir, filename)
# 保存为ZIP文件
with open(filepath, 'wb') as f:
f.write(decrypted_data)
print(f"保存文件: {filename}, 源端口: {src_port}, 长度: {len(decrypted_data)} 字节")
saved_files.append((filename, len(decrypted_data), src_port, direction_port))
print(f"\n所有解密数据已保存到目录: {output_dir}")
print(f"总共保存了 {len(saved_files)} 个ZIP文件")
# 显示保存的文件统计
if saved_files:
print("\n=== 保存的文件列表 ===")
total_size = 0
for filename, size, src_port, direction in saved_files:
print(f" {filename} - 端口: {src_port} - {size} 字节")
print(f" 方向: {direction}")
total_size += size
print(f"总数据大小: {total_size} 字节")
return saved_files
if __name__ == "__main__":
# 配置参数
PCAP_FILE = "capture.pcapng" # 你的PCAPNG文件路径
TARGET_IP1 = "192.168.0.234" # 目标IP1
TARGET_IP2 = "120.232.61.180" # 目标IP2
OUTPUT_FILE = "all_decrypted_data.bin" # 输出文件路径
# 检查依赖
try:
from Crypto.Cipher import AES
except ImportError:
print("错误: 需要安装pycryptodome库")
print("请运行: pip install pycryptodome")
exit(1)
# 执行提取和AES解密
print("开始提取TCP通讯并进行AES-256-ECB解密...")
results = extract_tcp_communication(PCAP_FILE, TARGET_IP1, TARGET_IP2)
if results:
# 分析解密数据
## analyze_decrypted_data(results)
# 保存所有解密数据到文件
## os.makedirs("output_zip_files", exist_ok=True)
saved_files = save_all_decrypted_data(results, "output_zip_files")
print(f"\n=== 完成 ===")
print(f"总共处理了 {len(results)} 个通讯流")
print(f"最终输出文件: {OUTPUT_FILE}")
else:
print("未找到任何TCP通讯数据")
|