2025年Solar应急响应公益月赛-10月

本文最后更新于 2025年10月29日 下午

ruoyi

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import os
import re
import sys
import zipfile
from pathlib import Path

def find_http_links(text):
pattern = r'https?://[^\s<>"{}|\\^`\[\]]+'
return re.findall(pattern, text)


def read_file_safe(file_path):
"""尝试使用多种编码读取文件,如果失败则使用二进制方式"""
encodings = ['utf-8', 'latin-1', 'gbk', 'gb2312']
for enc in encodings:
try:
with open(file_path, 'r', encoding=enc) as f:
return f.read()
except Exception:
continue
# 二进制读取,忽略无法解码字符
try:
with open(file_path, 'rb') as f:
return f.read().decode('utf-8', errors='ignore')
except Exception:
return ""


def search_file_for_http(file_path):
"""在单个文件中搜索HTTP/HTTPS链接"""
try:
content = read_file_safe(file_path)
if content:
return list(set(find_http_links(content))) # 去重
except Exception:
pass
return []


def extract_jar(jar_path, extract_to):
"""解压JAR文件"""
try:
with zipfile.ZipFile(jar_path, 'r') as jar:
jar.extractall(extract_to)
return True
except Exception as e:
print(f"❌ 解压失败: {jar_path} | {e}")
return False


def scan_directory_for_jars(directory):
"""扫描目录下所有JAR文件"""
return [str(p) for p in Path(directory).rglob("*.jar")]

# ----------------- 主逻辑 -----------------

def main(source_dir, output_dir=None):
source_path = Path(source_dir)
if not source_path.exists():
print(f"❌ 源目录不存在: {source_dir}")
return

if output_dir is None:
output_path = source_path / "jar_extracted"
else:
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)

print(f"📂 源目录: {source_path}")
print(f"📂 解压目录: {output_path}")
print("="*50)

jar_files = scan_directory_for_jars(source_path)
print(f"✅ 找到 {len(jar_files)} 个JAR文件\n")
if not jar_files:
print("⚠️ 未找到JAR文件")
return

results = []

for idx, jar_path in enumerate(jar_files, 1):
jar_name = Path(jar_path).name
print(f"[{idx}/{len(jar_files)}] 处理: {jar_name}")

extract_path = output_path / jar_name.replace('.jar', '')
extract_path.mkdir(parents=True, exist_ok=True)

if extract_jar(jar_path, extract_path):
http_found = False
for file in extract_path.rglob("*"):
if file.is_file():
links = search_file_for_http(file)
if links:
http_found = True
rel_file = file.relative_to(extract_path)
results.append({
'jar': jar_name,
'file': str(rel_file),
'links': links
})
print(f" 🔗 {rel_file}")
for link in links:
print(f" → {link}")
if not http_found:
print(" ℹ️ 未找到HTTP链接")
print()

# 输出汇总报告
print("="*50)
print("📊 搜索完成汇总")
print(f"总JAR文件数: {len(jar_files)}")
print(f"包含HTTP链接的文件数: {len(results)}")

report_file = output_path / "http_links_report.txt"
try:
with open(report_file, 'w', encoding='utf-8') as f:
f.write("HTTP链接搜索报告\n")
f.write("="*50 + "\n")
f.write(f"总JAR文件数: {len(jar_files)}\n")
f.write(f"包含HTTP链接的文件数: {len(results)}\n\n")
for r in results:
f.write(f"\nJAR: {r['jar']}\n")
f.write(f"文件: {r['file']}\n")
f.write("链接:\n")
for link in r['links']:
f.write(f" - {link}\n")
print(f"📝 报告已保存: {report_file}")
except Exception as e:
print(f"❌ 保存报告失败: {e}")


# ----------------- 命令行入口 -----------------

if __name__ == "__main__":
if len(sys.argv) < 2:
print(f"用法: python {sys.argv[0]} <jar目录> [解压输出目录]")
sys.exit(1)

source = sys.argv[1]
output = sys.argv[2] if len(sys.argv) > 2 else None
main(source, output)

查看 framework 包 com\ruoyi\framework\interceptor\RepeatSubmitInterceptor.class

1
flag{preRedirect}

工厂应急

Q1

1
2
谁把泵关了?
提交格式:flag{0xtransaction_id_0xfunction_code_0xcoil_address}

脚本提取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import sys
from scapy.all import rdpcap, TCP, Raw

def parse_modbus(packet):
"""解析 Modbus TCP 报文"""
raw = bytes(packet[Raw].load)
if len(raw) < 8:
return None

trans_id = int.from_bytes(raw[0:2], "big")
func_code = raw[7]
return {
"trans_id": trans_id,
"func_code": func_code,
"data": raw[8:]
}

def find_turn_off_pump(pcap_path):
packets = rdpcap(pcap_path)

for pkt in packets:
if TCP in pkt and pkt.haslayer(Raw):
try:
mod = parse_modbus(pkt)
if not mod:
continue

func = mod["func_code"]
if func in (5, 15): # 写单线圈 or 写多线圈
data = mod["data"]
coil_addr = int.from_bytes(data[0:2], "big")

# 写单线圈 (0x05)
if func == 5:
value = int.from_bytes(data[2:4], "big")
if value == 0x0000: # 写 0 -> 关泵
flag = f"flag{{0x{mod['trans_id']:04x}_0x{func:02x}_0x{coil_addr:04x}}}"
print(flag)
return

# 写多线圈 (0x0F)
elif func == 15 and len(data) > 5:
qty = int.from_bytes(data[2:4], "big")
byte_count = data[4]
coil_data = data[5:5+byte_count]
bits = ''.join(f'{b:08b}'[::-1] for b in coil_data)
for i in range(qty):
if bits[i] == '0':
flag = f"flag{{0x{mod['trans_id']:04x}_0x{func:02x}_0x{coil_addr+i:04x}}}"
print(flag)
return
except Exception:
continue

print("[-] 未发现关泵操作。")

if __name__ == "__main__":
if len(sys.argv) < 2:
print("用法: python find_who_turned_off_pump_final.py traffic.pcap")
sys.exit(1)
find_turn_off_pump(sys.argv[1])

flag

1
flag{0x1a2b_0x05_0x000d}

Q2

1
被写入的 NodeId

AI脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import sys
import re
import csv
from datetime import datetime
from scapy.all import rdpcap, Raw, IP, TCP, UDP

# 常见文本 NodeId 正则(ns=2;s=..., ns=1;i=123, ns=3;g=..., ns=0;b=...)
RE_NODEID_TEXT = re.compile(rb'ns=\d+;(?:s=[^\s,;\"\']+|i=\d+|g=[0-9A-Fa-f\-]+|b=[A-Za-z0-9+/=]+)')
# 对应的 unicode (UTF-16LE) 形式(查找 "n\0s\0=\0digit\0" 之类)
RE_NODEID_UTF16 = re.compile(rb'(?:n\0s\0=\0\d\0+\0;\0(?:s\0=\0[^\0,;\"\']+\0|i\0=\0\d\0+|g\0=\0[0-9A-Fa-f\-\0]+|b\0=\0[A-Za-z0-9+/=\0]+))', re.IGNORECASE)

def bytes_to_printable(b):
try:
return b.decode('utf-8', errors='ignore')
except:
return str(b)

def find_matches_in_payload(payload_bytes):
matches = []
# 文本模式
for m in RE_NODEID_TEXT.finditer(payload_bytes):
try:
matches.append(m.group(0).decode('utf-8', errors='ignore'))
except:
matches.append(str(m.group(0)))
# UTF-16LE 模式 -> 解码为正常字符串再把 \x00 去掉
for m in RE_NODEID_UTF16.finditer(payload_bytes):
try:
s = m.group(0).decode('utf-16le', errors='ignore')
matches.append(s)
except:
# 退回:移除 nulls
cleaned = m.group(0).replace(b'\x00', b'').decode('utf-8', errors='ignore')
matches.append(cleaned)
# 去重但保持顺序
seen = set()
uniq = []
for x in matches:
if x not in seen:
seen.add(x)
uniq.append(x)
return uniq

def scan_pcap(path):
print(f"[+] 读取 pcap:{path}")
pkts = rdpcap(path)
results = []
for idx, p in enumerate(pkts, start=1):
payload = b''
ts = None
src = dst = ""
# 尝试提取 Raw 层
if Raw in p:
payload = bytes(p[Raw].load)
else:
# 有些包可通过 TCP/UDP 层 payload 属性拿到
if TCP in p and hasattr(p[TCP], 'payload') and len(bytes(p[TCP].payload))>0:
payload = bytes(p[TCP].payload)
elif UDP in p and hasattr(p[UDP], 'payload') and len(bytes(p[UDP].payload))>0:
payload = bytes(p[UDP].payload)

# 时间戳(scapy Packet 有 time 属性)
ts = getattr(p, 'time', None)
if ts:
timestr = datetime.utcfromtimestamp(ts).strftime('%Y-%m-%dT%H:%M:%SZ')
else:
timestr = ""

# 源/目的
if IP in p:
src = p[IP].src
dst = p[IP].dst
else:
# 非 IP 层可尝试层次结构的 src/dst 字段(或保持为空)
src = getattr(p, 'src', '')
dst = getattr(p, 'dst', '')

if payload and len(payload) > 0:
matches = find_matches_in_payload(payload)
if matches:
for m in matches:
results.append({
'pkt_index': idx,
'time': timestr,
'src': src,
'dst': dst,
'nodeid': m,
})
return results

def print_results(results):
if not results:
print("[!] 未发现匹配的 NodeId(文本或 UTF-16LE)。若流量加密则需要解密后再分析。")
return
print(f"[+] 共发现 {len(results)} 条匹配:")
for r in results:
print(f"#{r['pkt_index']:6d} {r['time']} {r['src']} -> {r['dst']} {r['nodeid']}")

def save_csv(results, outpath):
with open(outpath, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=['pkt_index','time','src','dst','nodeid'])
writer.writeheader()
for r in results:
writer.writerow(r)
print(f"[+] 已保存 CSV: {outpath}")

if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: python find_nodeid_in_pcap.py traffic.pcap [-o out.csv]")
sys.exit(1)
pcap = sys.argv[1]
out = None
if len(sys.argv) >= 3 and sys.argv[2] in ('-o','--out') and len(sys.argv) >=4:
out = sys.argv[3]
res = scan_pcap(pcap)
print_results(res)
if out:
save_csv(res, out)

1
flag{ns=2;s=Pump/SpeedSetpoint}

Q3

1
2
工程站域名解析结果
找出工程站域名 engws.plant.local 的 A 记录解析结果(目标 IP)。

过滤搜索

1
dns && (dns.qry.name == "engws.plant.local")

1
flag{10.0.0.8}

Q4

1
2
确定 HMI(源:10.0.0.x)到工程站(目的:10.0.0.x)上首个成功发起的时间点。
提交格式:flag{YYYY-MM-DDTHH:MM:SSZ}(UTC,精确到秒)

过滤

1
ip.src == 10.0.0.5 && ip.dst == 10.0.0.8 &&tcp

握手成功时间

1
flag{2025-03-12T14:22:09Z}

Q5

1
在横向后不久,HMI 对工程站发起了 HTTP 请求。提交请求的 Host 与 URI,用下划线连接。

过滤搜索

1
ip.dst == 10.0.0.8 && http.request

拼接

1
flag{engws.plant.local_/rpc}

逆向

Brainiac

有异常处理,进行动态调试,在 Handler 里下断点,触发异常时会把 RIP 设置为 ModuleHandleA + 2

在 process option 里填写

调试跟进,出现 MZ 字样时按 P 创建函数,出现如下函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
__int64 __fastcall sub_7FF7D1540002(__int64 a1)
{
unsigned int v2; // r9d
unsigned __int64 v3; // rax
unsigned int v4; // r11d
int v5; // r10d
__int64 v6; // r8
int *v7; // r8
int v8; // edx
__int64 v9; // rax
__int64 v11; // [rsp+20h] [rbp+8h] BYREF
__int64 v12; // [rsp+28h] [rbp+10h] BYREF

v2 = *(_DWORD *)(a1 + 16);
if ( (v2 & 7) != 0 )
v2 = (v2 & 0xFFFFFFF8) + 8;
v3 = __rdtsc();
v4 = 0;
v5 = v3 ^ HIDWORD(v3) ^ 0x76543210;
*(_DWORD *)(v2 + *(_QWORD *)(a1 + 8)) = v5;
*(_DWORD *)(*(_QWORD *)(a1 + 8) + v2 + 4i64) = *(_DWORD *)(a1 + 16) ^ v5 ^ 0x89ABCDEF;
*(_DWORD *)(*(_QWORD *)(a1 + 8) + v2 + 8i64) = 1330403418;
for ( *(_DWORD *)(*(_QWORD *)(a1 + 8) + v2 + 12i64) = 1380272963; v4 < v2; v7[1] = v5 ^ v8 )
{
v6 = v4;
v4 += 8;
v7 = (int *)(*(_QWORD *)(a1 + 8) + v6);
v8 = *v7;
*v7 = v7[1] ^ (1919810 - 86066498 * v5);
v5 = 1919810 - 86066498 * (1919810 - 86066498 * v5);
}
v9 = *(_QWORD *)a1 + 4217i64;
v11 = 8i64;
v12 = v9;
(*(void (__fastcall **)(__int64, __int64 *, __int64 *, __int64, __int64))(a1 + 24))(-1i64, &v12, &v11, 4i64, a1 + 20);
*(_QWORD *)(*(_QWORD *)a1 + 4217i64) = 0i64;
(*(void (__fastcall **)(__int64, __int64 *, __int64 *, _QWORD, __int64))(a1 + 24))(
-1i64,
&v12,
&v11,
*(unsigned int *)(a1 + 20),
a1 + 20);
return 0i64;
}

脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import struct

ciphertext = bytes.fromhex("""
EF 1C FD 7A 7C 33 44 12 EB A0 21 A5 C0 68 74 03
02 DA B0 DA B3 25 56 99 73 BE 39 05 A2 97 29 F2
D8 63 35 4E F7 A1 11 E6 E5 6F A9 DA F9 4E B7 B3
A7 C2 42 EC A4 62 DA 92 77 E4 37 C2 AF 29 9C 4B
5A 58 4C 4F 43 4B 45 52
""".replace('\n', ''))


encrypted_data = bytearray(ciphertext[:56])
metadata = ciphertext[56:]

v5_stored = struct.unpack('<I', metadata[0:4])[0]
size_xor = struct.unpack('<I', metadata[4:8])[0]
original_size = size_xor ^ v5_stored ^ 0x89ABCDEF

print(f"密文长度: {len(ciphertext)} 字节")
print(f"种子 v5: 0x{v5_stored:08X}")
print(f"原始大小: {original_size} 字节\n")


def decrypt_combined(data, seed, original_size):

result = bytearray()
v5_before = seed
v5_after = seed

for i in range(0, len(data), 8):
if i + 7 < len(data):
enc0 = struct.unpack('<I', data[i:i + 4])[0]
enc1 = struct.unpack('<I', data[i + 4:i + 8])[0]

# 策略1:v5先更新(用于解密前4字节)
v5_before = (1919810 - 86066498 * v5_before) & 0xFFFFFFFF
v5_before = (1919810 - 86066498 * v5_before) & 0xFFFFFFFF
key_before = (1919810 - 86066498 * v5_before) & 0xFFFFFFFF
part1 = enc1 ^ v5_before # 前4字节

# 策略2:v5后更新(用于解密后4字节)
key_after = (1919810 - 86066498 * v5_after) & 0xFFFFFFFF
part2 = enc0 ^ key_after # 后4字节
v5_after = (1919810 - 86066498 * v5_after) & 0xFFFFFFFF
v5_after = (1919810 - 86066498 * v5_after) & 0xFFFFFFFF

# 合并:前4字节 + 后4字节
result.extend(struct.pack('<I', part1))
result.extend(struct.pack('<I', part2))

return bytes(result[:original_size])


flag = decrypt_combined(encrypted_data, v5_stored, original_size)

print("="*60)
print("完整FLAG解密结果:")
print(f"Hex: {' '.join(f'{b:02X}' for b in flag)}")
print(f"ASCII: {flag.decode('ascii', errors='replace')}")
print("="*60)

if b'flag{' in flag and b'}' in flag:
print(f"\n✓✓✓ 成功解密! ✓✓✓")
print(f"\nFLAG: {flag.decode('ascii')}")
else:
print("\n⚠ 解密可能有误")

拿到结果

1
flag{M4lf0Rmed_Pe_W1th_B4d_ReL0Ca71oNs@solarsec_202510}

2025年Solar应急响应公益月赛-10月
http://example.com/2025/10/29/2025年Solar应急响应公益月赛-10月/
作者
butt3rf1y
发布于
2025年10月29日
许可协议