0%

Angr学习记录

Angr简介

Angr提供了一个处理二进制文件的框架 主流的指令集指令在被载入CLE框架后 会根据功能被抽象为中间语言(IL) Angr提供的就是处理这些被载入为IL的程序 可以用来实现自动化的程序调试 例如直接载入程序然后像模拟的标准输入流输入数据以达到直接利用原程序进行爆破的目的 同时Angr还内置了约束求解器

做做题

下面跟着Angr-ctf里的题目学习Angr的基础应用

angr_find

IDA加载附件看一下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int __cdecl complex_function(int a1, int a2)
{
if ( a1 <= 64 || a1 > 90 )
{
puts("Try again.");
exit(1);
}
return (3 * a2 + a1 - 65) % 26 + 65;
}

int __cdecl main(int argc, const char **argv, const char **envp)
{
int i; // [esp+1Ch] [ebp-1Ch]
char s1[9]; // [esp+23h] [ebp-15h] BYREF
unsigned int v6; // [esp+2Ch] [ebp-Ch]

v6 = __readgsdword(0x14u);
printf("Enter the password: ");
__isoc99_scanf("%8s", s1);
for ( i = 0; i <= 7; ++i )
s1[i] = complex_function(s1[i], i);
if ( !strcmp(s1, "JACEJGCS") )
puts("Good Job.");
else
puts("Try again.");
return 0;
}

显然目标是要进入输出”Good Job.”的分支

image-20240718201531013

用angr爆破:

1
2
3
4
5
6
7
8
9
10
11
12
import angr

proj = angr.Project('00_angr_find', load_options={'auto_load_libs': False})
#载入二进制程序 并且禁用自动加载外部动态连接选项 得到project对象
state = proj.factory.entry_state()
#然后获取当前的模拟程序对象(SimState) factory成员拥有一系列载入二进制文件的属性和处理方法 而simstate则包含调试过程中的寄存器, 内存等信息
simg = proj.factory.simgr(state)
#开始符号执行载入的程序
simg.explore(find=0x8048675, avoid=0x8048663)
#设定程序预期到达的地址和预期避免的地址
print(simg.found[0].posix.dumps(0))
#从标准输入流获取结果

angr_find_condition

主函数进行了混淆 加入了很多实际上控制流不会经过的块 不过还好IDA的反汇编功能比较强大 伪代码看起来还是很清晰的:

image-20240719153400747

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int __cdecl main(int argc, const char **argv, const char **envp)
{
int i; // [esp+18h] [ebp-40h]
int j; // [esp+1Ch] [ebp-3Ch]
char s1[20]; // [esp+24h] [ebp-34h] BYREF
char s2[20]; // [esp+38h] [ebp-20h] BYREF
unsigned int v8; // [esp+4Ch] [ebp-Ch]

v8 = __readgsdword(0x14u);
for ( i = 0; i <= 19; ++i )
s2[i] = 0;
qmemcpy(s2, "VXRRJEUR", 8);
printf("Enter the password: ");
__isoc99_scanf("%8s", s1);
for ( j = 0; j <= 7; ++j )
s1[j] = complex_function(s1[j], j + 8);
if ( !strcmp(s1, s2) )
puts("Good Job.");
else
puts("Try again.");
return 0;
}

整体逻辑和前面的题基本相同 引入新的爆破方法 顺便借机研究以下angr执行的模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import angr, sys

eip_record = []
branch_record = []

def is_succ(state):
eip = state.regs.eip.args[0]
if eip not in eip_record:
eip_record.append(eip)
elif eip not in branch_record:
branch_record.append(eip)
# return eip == 0x804CA3A
print('EIP:', hex(eip)[2:])
return b'Good Job' in state.posix.dumps(1)
def is_fail(state):
eip = state.regs.eip.args[0]
# return eip == 0x804CA25
# print('Excute is fail')
return b'Try again' in state.posix.dumps(1)

def main(argv):
proj = angr.Project('02_angr_find_condition', load_options={'auto_load_libs':False})
simstate = proj.factory.entry_state()
simmgr = proj.factory.simgr(simstate)
simmgr.explore(find=is_succ, avoid=is_fail)
# 用新的判断函数来判断是否达成功 可以看出 如果传入的是整数对象就会对比EIP和目标整数 如果是函数对象就会执行函数根据返回值判断是否成功
if simmgr.found:
print(simmgr.found[0].posix.dumps(0))
# print("\n".join([hex(addr) for addr in branch_record]))
# print("\n".join([hex(addr) for addr in eip_record]))

if __name__ == '__main__':
main(sys.argv)

去掉注释再执行可以看到实际上.explore函数每次执行is_succ的检测同时会执行is_fail 而每组执行的两个检测传入的state包含了相同(? 至少EIP相同)的运行状态 那么来看看每次执行的时候EIP执行到了哪里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
EIP: 8048450
EIP: 8048420
EIP: 8100014
EIP: 804d290
WARNING | 2024-07-19 15:11:59,957 | angr.storage.memory_mixins.default_filler_mixin | The program is accessing register with an unspecified value. This could indicate unwanted behavior.
WARNING | 2024-07-19 15:11:59,959 | angr.storage.memory_mixins.default_filler_mixin | angr will cope with this by generating an unconstrained symbolic variable and continuing. You can resolve this by:
WARNING | 2024-07-19 15:11:59,959 | angr.storage.memory_mixins.default_filler_mixin | 1) setting a value to the initial state
WARNING | 2024-07-19 15:11:59,959 | angr.storage.memory_mixins.default_filler_mixin | 2) adding the state option ZERO_FILL_UNCONSTRAINED_{MEMORY,REGISTERS}, to make unknown regions hold null
WARNING | 2024-07-19 15:11:59,959 | angr.storage.memory_mixins.default_filler_mixin | 3) adding the state option SYMBOL_FILL_UNCONSTRAINED_{MEMORY,REGISTERS}, to suppress these messages.
WARNING | 2024-07-19 15:11:59,959 | angr.storage.memory_mixins.default_filler_mixin | Filling register edi with 4 unconstrained bytes referenced from 0x804d291 (__libc_csu_init+0x1 in 02_angr_find_condition (0x804d291))
WARNING | 2024-07-19 15:11:59,959 | angr.storage.memory_mixins.default_filler_mixin | Filling register ebx with 4 unconstrained bytes referenced from 0x804d293 (__libc_csu_init+0x3 in 02_angr_find_condition (0x804d293))
EIP: 8048480
EIP: 804d299
EIP: 8048394
EIP: 8048480
EIP: 804839d
EIP: 80483b2
EIP: 804d2b1
EIP: 804d2c0
EIP: 8048520
EIP: 804852b
EIP: 80484c0
EIP: 80484f3
EIP: 804d2db
EIP: 804d2e5
EIP: 820104c
EIP: 80485c8
EIP: 804860b
EIP: 80485fc
EIP: 80485fc
EIP: 80485fc
EIP: 80485fc
EIP: 80485fc
EIP: 80485fc
EIP: 80485fc
EIP: 80485fc
EIP: 80485fc
EIP: 80485fc
EIP: 80485fc
EIP: 80485fc
EIP: 80485fc
EIP: 80485fc
EIP: 80485fc
EIP: 80485fc
EIP: 80485fc
EIP: 80485fc
EIP: 80485fc
EIP: 80485fc
EIP: 8048611
EIP: 80483e0
EIP: 8100004
EIP: 804862e
EIP: 8048430
EIP: 8100018
EIP: 8048642
EIP: 804867f
EIP: 804864e
EIP: 8048569
EIP: 804857b
EIP: 8048575
EIP: 8048400
EIP: 8048595
EIP: 804857b
EIP: 810000c
EIP: 804866c
EIP: 8048400
EIP: 8048588
EIP: 804864e
EIP: 810000c
EIP: 8048569
EIP: 8048588
EIP: 804857b
EIP: 8048575
EIP: 8048400
EIP: 8048595
EIP: 804857b
EIP: 810000c
EIP: 804866c
EIP: 8048400
EIP: 8048588
EIP: 804864e
EIP: 810000c
EIP: 8048569
EIP: 8048588
EIP: 804857b
EIP: 8048575
EIP: 8048400
EIP: 8048595
EIP: 804857b
EIP: 810000c
EIP: 804866c
EIP: 8048400
EIP: 8048588
EIP: 804864e
EIP: 810000c
EIP: 8048569
EIP: 8048588
EIP: 804857b
EIP: 8048575
EIP: 8048400
EIP: 8048595
EIP: 804857b
EIP: 810000c
EIP: 804866c
EIP: 8048400
EIP: 8048588
EIP: 804864e
EIP: 810000c
EIP: 8048569
EIP: 8048588
EIP: 804857b
EIP: 8048575
EIP: 8048400
EIP: 8048595
EIP: 804857b
EIP: 810000c
EIP: 804866c
EIP: 8048400
EIP: 8048588
EIP: 804864e
EIP: 810000c
EIP: 8048569
EIP: 8048588
EIP: 804857b
EIP: 8048575
EIP: 8048400
EIP: 8048595
EIP: 804857b
EIP: 810000c
EIP: 804866c
EIP: 8048400
EIP: 8048588
EIP: 804864e
EIP: 810000c
EIP: 8048569
EIP: 8048588
EIP: 804857b
EIP: 8048575
EIP: 8048400
EIP: 8048595
EIP: 804857b
EIP: 810000c
EIP: 804866c
EIP: 8048400
EIP: 8048588
EIP: 804864e
EIP: 810000c
EIP: 8048569
EIP: 8048588
EIP: 804857b
EIP: 8048575
EIP: 8048400
EIP: 8048595
EIP: 804857b
EIP: 810000c
EIP: 804866c
EIP: 8048400
EIP: 8048588
EIP: 8048685
EIP: 810000c
EIP: 8048692
EIP: 8048588
EIP: 804869f
EIP: 80486ac
EIP: 8048b6c
EIP: 8048dcc
EIP: 8048efc
EIP: 8048f94
EIP: 8048fde
EIP: 80483d0
EIP: 8100000
WARNING | 2024-07-19 15:12:01,375 | angr.storage.memory_mixins.default_filler_mixin | Filling memory at 0x7ffeff2d with 11 unconstrained bytes referenced from 0x8100000 (strcmp+0x0 in extern-address space (0x0))
WARNING | 2024-07-19 15:12:01,379 | angr.storage.memory_mixins.default_filler_mixin | Filling memory at 0x7ffeff50 with 4 unconstrained bytes referenced from 0x8100000 (strcmp+0x0 in extern-address space (0x0))
EIP: 8048fee
EIP: 804900a
EIP: 8048ff5
EIP: 8048400
EIP: 8048400
EIP: 810000c
EIP: 810000c
EIP: 8049017
EIP: 8049002
b'HETOBRCU'

结合IDA中的汇编 每一次都是执行到一个块(Block)的开头地址才调用检查函数 关于块 [官方文档][https://docs.angr.io/en/latest/core-concepts/toplevel.html]有解释 可以看到循环的部分也记录了循环的次数次 但是有分支的部分两个分支都有被记录而且被记录的次序紧挨着 这里猜测执行的模式是先扫描所有块 然后再在关键的块进行快照并爆破 以后再细究

为某段程序的符号执行设定初始值

angr_symbolic_registers | 寄存器的初始化

程序设置的读入函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
.text:0804890C                               ; int get_user_input()
.text:0804890C public get_user_input
.text:0804890C get_user_input proc near
.text:0804890C
.text:0804890C var_18= dword ptr -18h
.text:0804890C var_14= dword ptr -14h
.text:0804890C var_10= dword ptr -10h
.text:0804890C var_C= dword ptr -0Ch
.text:0804890C
.text:0804890C ; __unwind {
.text:0804890C 55 push ebp
.text:0804890D 89 E5 mov ebp, esp
.text:0804890F 83 EC 18 sub esp, 18h
.text:08048912 65 8B 0D 14 00 00 00 mov ecx, large gs:14h
.text:08048919 89 4D F4 mov [ebp+var_C], ecx
.text:0804891C 31 C9 xor ecx, ecx
.text:0804891E 8D 4D F0 lea ecx, [ebp+var_10]
.text:08048921 51 push ecx
.text:08048922 8D 4D EC lea ecx, [ebp+var_14]
.text:08048925 51 push ecx
.text:08048926 8D 4D E8 lea ecx, [ebp+var_18]
.text:08048929 51 push ecx
.text:0804892A 68 93 8A 04 08 push offset aXXX ; "%x %x %x"
.text:0804892F E8 9C FA FF FF call ___isoc99_scanf
.text:0804892F
.text:08048934 83 C4 10 add esp, 10h
.text:08048937 8B 4D E8 mov ecx, [ebp+var_18]
.text:0804893A 89 C8 mov eax, ecx
.text:0804893C 8B 4D EC mov ecx, [ebp+var_14]
.text:0804893F 89 CB mov ebx, ecx
.text:08048941 8B 4D F0 mov ecx, [ebp+var_10]
.text:08048944 89 CA mov edx, ecx
.text:08048946 90 nop
.text:08048947 8B 4D F4 mov ecx, [ebp+var_C]
.text:0804894A 65 33 0D 14 00 00 00 xor ecx, large gs:14h
.text:08048951 74 05 jz short locret_8048958
.text:08048951
.text:08048953 E8 48 FA FF FF call ___stack_chk_fail
.text:08048953
.text:08048958 locret_8048958: ; CODE XREF: get_user_input+45↑j
.text:08048958 C9 leave
.text:08048959 C3 retn

最后的结果等效于scanf("%x %x %x", eax, ebx, edx) 早期的angr无法一次输入多个数据(现在测试是可以直接按照前两题的做法出结果的) 那就需要我们直接跳过读入数据的过程对eax, ebx, edx赋值 那么这次的simstate就不能设置为entry_state()了 而应该将入口设置为读入数据后一行代码:

image-20240723101425292

1
2
start_addr = 0x804897B
simstate = proj.factory.blank_state(addr=start_addr)

然后初始化simstate 将eax, ebx, edx设定为待定值:

1
2
3
4
5
simstate.regs.eax = claripy.BVS('eax', 32)
simstate.regs.ebx = claripy.BVS('ebx', 32)
simstate.regs.edx = claripy.BVS('edx', 32)
# 如果不想再导入claripy模块可以写成下面的等效写法:
# simstate.regs.eax = simstate.solver.BVS('eax', 32)

完整脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import angr, sys, claripy

eip_record = []
branch_record = []

def is_succ(state):
print(f'eax: {hex(state.solver.eval(state.regs.eax))[2:]} ebx: {hex(state.solver.eval(state.regs.ebx))[2:]} edx: {hex(state.solver.eval(state.regs.edx))[2:]}')
return b'Good Job' in state.posix.dumps(1)
def is_fail(state):
return b'Try again' in state.posix.dumps(1)

def main(argv):
proj = angr.Project('03_angr_symbolic_registers', load_options={'auto_load_libs':False})
start_addr = 0x804897B
simstate = proj.factory.blank_state(addr=start_addr)
simstate.regs.eax = claripy.BVS('eax', 32)
simstate.regs.ebx = claripy.BVS('ebx', 32)
simstate.regs.edx = claripy.BVS('edx', 32)
simmgr = proj.factory.simgr(simstate)
simmgr.explore(find=is_succ, avoid=is_fail)
if simmgr.found:
print(simmgr.found[0].posix.dumps(0))

if __name__ == '__main__':
main(sys.argv)

angr_symbolic_stack | 栈的初始化

和上一题一样 这次的程序要求一次输入两个值 不同的是这次直接存放到栈上 不用寄存器做媒介:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
.text:08048679                               ; int handle_user()
.text:08048679 public handle_user
.text:08048679 handle_user proc near ; CODE XREF: main+21↓p
.text:08048679
.text:08048679 var_10= dword ptr -10h
.text:08048679 var_C= dword ptr -0Ch
.text:08048679
.text:08048679 ; __unwind {
.text:08048679 55 push ebp
.text:0804867A 89 E5 mov ebp, esp
.text:0804867C 83 EC 18 sub esp, 18h
.text:0804867F 83 EC 04 sub esp, 4
.text:08048682 8D 45 F0 lea eax, [ebp+var_10]
.text:08048685 50 push eax
.text:08048686 8D 45 F4 lea eax, [ebp+var_C]
.text:08048689 50 push eax
.text:0804868A 68 B3 87 04 08 push offset aUU ; "%u %u"
.text:0804868F E8 DC FC FF FF call ___isoc99_scanf
.text:0804868F
.text:08048694 83 C4 10 add esp, 10h
.text:08048697 8B 45 F4 mov eax, [ebp+var_C]
.text:0804869A 83 EC 0C sub esp, 0Ch
.text:0804869D 50 push eax
.text:0804869E E8 06 FE FF FF call complex_function0
.text:0804869E
.text:080486A3 83 C4 10 add esp, 10h
.text:080486A6 89 45 F4 mov [ebp+var_C], eax
.text:080486A9 8B 45 F0 mov eax, [ebp+var_10]
.text:080486AC 83 EC 0C sub esp, 0Ch
.text:080486AF 50 push eax
.text:080486B0 E8 DC FE FF FF call complex_function1
.text:080486B0
.text:080486B5 83 C4 10 add esp, 10h
.text:080486B8 89 45 F0 mov [ebp+var_10], eax
.text:080486BB 8B 45 F4 mov eax, [ebp+var_C]
.text:080486BE 3D D1 24 30 77 cmp eax, 773024D1h
.text:080486C3 75 0A jnz short loc_80486CF
.text:080486C3
.text:080486C5 8B 45 F0 mov eax, [ebp+var_10]
.text:080486C8 3D CF 11 43 BC cmp eax, 0BC4311CFh
.text:080486CD 74 12 jz short loc_80486E1
.text:080486CD
.text:080486CF
.text:080486CF loc_80486CF: ; CODE XREF: handle_user+4A↑j
.text:080486CF 83 EC 0C sub esp, 0Ch
.text:080486D2 68 B9 87 04 08 push offset s ; "Try again."
.text:080486D7 E8 74 FC FF FF call _puts
.text:080486D7
.text:080486DC 83 C4 10 add esp, 10h
.text:080486DF EB 10 jmp short loc_80486F1
.text:080486DF
.text:080486E1 ; ---------------------------------------------------------------------------
.text:080486E1
.text:080486E1 loc_80486E1: ; CODE XREF: handle_user+54↑j
.text:080486E1 83 EC 0C sub esp, 0Ch
.text:080486E4 68 C4 87 04 08 push offset aGoodJob ; "Good Job."
.text:080486E9 E8 62 FC FF FF call _puts
.text:080486E9
.text:080486EE 83 C4 10 add esp, 10h
.text:080486EE
.text:080486F1
.text:080486F1 loc_80486F1: ; CODE XREF: handle_user+66↑j
.text:080486F1 90 nop
.text:080486F2 C9 leave
.text:080486F3 C3 retn

输入函数等效于scanf("%u %u", [val0](ebp - 0xC), [val1](ebp - 0x10)) 为了在栈上正确的位置进行值的符号化 简单看一下栈输入完后栈的结构:

1
2
3
       ebp - 4          ebp - 10h
...| ebx | ... | val0 | val1 | ...
ebp - 0 ebp - 8h ebp - Ch

也就是说第一个值在ebp-8到ebp-10h之间 第二个在ebp-10h到ebp-C之间 为了模拟进入函数的过程 初始状态中将esp赋给ebp:

1
2
state = proj.factory.blank_state(addr = 0x8048697)
state.regs.ebp = state.regs.esp

再对ebp-8处的栈空间进行符号化:

1
2
3
4
p1 = claripy.BVS('p1', 32)
p2 = claripy.BVS('p2', 32)
state.stack_push(p1)
state.stack_push(p2)

完整脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import angr, sys, claripy

def is_succ(state):
return b'Good Job' in state.posix.dumps(1)
def is_fail(state):
return b'Try again' in state.posix.dumps(1)

def main(argv):
proj = angr.Project('04_angr_symbolic_stack', load_options={'auto_load_libs': False})
state = proj.factory.blank_state(addr = 0x8048697)
state.regs.ebp = state.regs.esp
state.regs.esp -= 8
p1 = claripy.BVS('p1', 32)
p2 = claripy.BVS('p2', 32)
state.stack_push(p1)
state.stack_push(p2)
simgr = proj.factory.simulation_manager(state)
simgr.explore(find=is_succ, avoid=is_fail)
if simgr.found:
print(f'p1: {hex(simgr.found[0].solver.eval(p1))}\np2: {hex(simgr.found[0].solver.eval(p2))}')
else:
print('Answer not found')

if __name__ == '__main__':
main(sys.argv)

angr_symbolic_memory | 内存空间初始化1

如果程序要用到的值不是本地变量而是全局变量 那就需要对对应的内存区域进行符号化再求解 这题校验的值放在.bss段上:

image-20240724112359400

image-20240724112412817

和之前一样跳过输入部分(现版本angr已经支持直接输入 可以按照第1, 2题的方法做 下面一题也是) 并对目标区域进行符号化(其实栈的初始化本质上也是内存区域初始化 可以用一样的方法进行值的符号化) 完整脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import angr, sys, claripy

def is_succ(state):
return b'Good Job' in state.posix.dumps(1)
def is_fail(state):
return b'Try again' in state.posix.dumps(1)

def main(argv):
proj = angr.Project('05_angr_symbolic_memory', load_options={'auto_load_libs': False})
# state = proj.factory.entry_state()
state = proj.factory.blank_state(addr = 0x80485FE)
bss = 0xA1BA1C0
pws = [claripy.BVS('pw%d' % i, 64) for i in range(4)]
for i in range(4):
# 对.bss段上的值进行初始化 memory.store()默认使用大端序进行填充 添加参数endness = proj.arch.memory_endness可以强制以程序的端序进行填充
state.memory.store(bss + i * 8, pws[i])
simgr = proj.factory.simulation_manager(state)
simgr.explore(find=is_succ, avoid=is_fail)
if simgr.found:
# print(simgr.found[0].posix.dumps(0))
for i in range(4):
print(simgr.found[0].solver.eval(pws[i], cast_to=bytes), end = ' ')
else:
print('Answer not found')

if __name__ == '__main__':
main(sys.argv)

angr_symbolic_dynamic_memory | 内存空间初始化2

如果要符号化的内存区域一开始是不确定的 可以退而求其次不对原本要校验的区域进行符号化 而是更改要校验的区域将其固定下来 这题用malloc()进行内存分配:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int __cdecl main(int argc, const char **argv, const char **envp)
{
char *v3; // ebx
char *v4; // ebx
int v6; // [esp-10h] [ebp-1Ch]
int i; // [esp+0h] [ebp-Ch]

buffer0 = malloc(9u);
buffer1 = malloc(9u);
memset(buffer0, 0, 9u);
memset(buffer1, 0, 9u);
printf("Enter the password: ");
__isoc99_scanf("%8s %8s", buffer0, buffer1, v6);
for ( i = 0; i <= 7; ++i )
{
v3 = &buffer0[i];
*v3 = complex_function(buffer0[i], i);
v4 = &buffer1[i];
*v4 = complex_function(buffer1[i], i + 32);
}
if ( !strncmp(buffer0, "UODXLZBI", 8u) && !strncmp(buffer1, "UAORRAYF", 8u) )
puts("Good Job.");
else
puts("Try again.");
free(buffer0);
free(buffer1);
return 0;
}

image-20240724113755497

malloc分配的内存在.heap堆空间上 和本地变量一样一开始是不确定其地址的 那就直接改变buffer0[1]存放的内存地址:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import angr, sys, claripy

def is_succ(state):
return b'Good Job' in state.posix.dumps(1)
def is_fail(state):
return b'Try again' in state.posix.dumps(1)

def main(argv):
proj = angr.Project('06_angr_symbolic_dynamic_memory', load_options={'auto_load_libs': False})
start = 0x08048696
state = proj.factory.blank_state(addr = start)
p1 = 0xABCC8A4
p2 = 0xABCC8AC
bss1 = 0xABCC890
bss2 = 0xABCC880
# 把原本存放校验值的堆地址改成固定的.bss段上的空闲空间的地址
state.memory.store(p1, bss1, endness=proj.arch.memory_endness)
state.memory.store(p2, bss2, endness=proj.arch.memory_endness)
pws = [state.solver.BVS('p1', 8*8), state.solver.BVS('p2', 8*8)]
state.memory.store(bss1, pws[0])
state.memory.store(bss2, pws[1])
simgr = proj.factory.simulation_manager(state)
simgr.explore(find=is_succ, avoid=is_fail)
if simgr.found:
print(simgr.found[0].solver.eval(pws[0], cast_to=bytes))
print(simgr.found[0].solver.eval(pws[1], cast_to=bytes))
else:
print('Answer not found')

if __name__ == '__main__':
main(sys.argv)

angr_symbolic_file | 文件内容初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int __cdecl __noreturn main(int argc, const char **argv, const char **envp)
{
int i; // [esp+Ch] [ebp-Ch]

memset(buffer, 0, sizeof(buffer));
printf("Enter the password: ");
__isoc99_scanf("%64s", buffer);
ignore_me(buffer, 0x40u);
memset(buffer, 0, sizeof(buffer));
fp = fopen("OJKSQYDP.txt", "rb");
fread(buffer, 1u, 0x40u, fp);
fclose(fp);
unlink("OJKSQYDP.txt");
for ( i = 0; i <= 7; ++i )
*(_BYTE *)(i + 0x804A0A0) = complex_function(*(char *)(i + 0x804A0A0), i);
if ( strncmp(buffer, "AQWLCTXB", 9u) )
{
puts("Try again.");
exit(1);
}
puts("Good Job.");
exit(0);
}

ignore_me()的作用就是将输入的内容放进文件里保存以防下面读不到文件 这题可以在读取文件内容后开始对buffer初始化(测试后发现甚至可以从头开始执行 angr也能求解出答案) 这里从读取文件开始学习一下文件内容的符号化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import angr, sys, claripy

def is_succ(state):
return b'Good Job' in state.posix.dumps(1)
def is_fail(state):
return b'Try again' in state.posix.dumps(1)

def main(argv):
proj = angr.Project('07_angr_symbolic_file', load_options={'auto_load_libs': False})
start = 0x080488D6
file_name = 'OJKSQYDP.txt'
file_size = 0x40
file_content = claripy.BVS('file_content', file_size * 8)
# 创建一个模拟文件以供模拟执行时可以读取
simgr_file = angr.storage.SimFile(file_name, content=file_content, size=file_size)
state = proj.factory.blank_state(addr = start)
# state = proj.factory.entry_state()
# 插入进程可读取文件的虚拟空间
state.fs.insert(file_name, simgr_file)
simgr = proj.factory.simulation_manager(state)
simgr.explore(find=is_succ, avoid=is_fail)
if simgr.found:
print(simgr.found[0].solver.eval(file_content, cast_to=bytes))
# print(simgr.found[0].posix.dumps(0))
else:
print('Answer not found')

if __name__ == '__main__':
main(sys.argv)

angr_constraints | 手动添加条件

当程序在循环进行多次判断 符号执行时会产生指数级的路径从而发生路径爆炸问题 比如这道题的判断逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
_BOOL4 __cdecl check_equals_AUPDNNPROEZRJWKB(char *input, unsigned int len)
{
int v3; // [esp+8h] [ebp-8h]
unsigned int i; // [esp+Ch] [ebp-4h]

v3 = 0;
for ( i = 0; i < len; ++i )
{
if ( input[i] == *(i + 0x804A040) ) // => input[i] == enc[i]
++v3;
}
return v3 == len;
}

int __cdecl main(int argc, const char **argv, const char **envp)
{
int i; // [esp+Ch] [ebp-Ch]

qmemcpy(&password, "AUPDNNPROEZRJWKB", 16);
memset(&buffer, 0, 0x11u);
printf("Enter the password: ");
__isoc99_scanf("%16s", &buffer);
for ( i = 0; i <= 15; ++i )
*(i + 134520912) = complex_function(*(i + 0x804A050), 15 - i);
if ( check_equals_AUPDNNPROEZRJWKB(&buffer, 16) )
puts("Good Job.");
else
puts("Try again.");
return 0;
}

密文长度为16bytes 也就是循环中对比的次数是16次 加上最后判断是否正确的一次判断 可能的路径达到了2 ^ 17条 如果直接让angr用这个函数进行是否正确的判断肯定会运行很长时间 我们直接在判断前中断程序 让程序进入我们自己设置的限制条件的代码段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import angr, sys, claripy

def is_succ(state):
return b'Good Job' in state.posix.dumps(1)
def is_fail(state):
return b'Try again' in state.posix.dumps(1)

def main(argv):
proj = angr.Project('08_angr_constraints', load_options={'auto_load_libs': False})
start = 0x08048625
buf = 0x804A050
state = proj.factory.blank_state(addr = start)
# 一定要实例化一个符号化内存的引用出来
ans = state.solver.BVS('buf', 8*16)
state.memory.store(buf, ans)
simgr = proj.factory.simulation_manager(state)
simgr.explore(find=0x08048673)
if simgr.found:
new_state = simgr.found[0]
string_after_processed = new_state.memory.load(buf, 16)
# 和z3的使用一样 添加限制条件
new_state.solver.add(string_after_processed == "AUPDNNPROEZRJWKB")
# 如果上面不实例化ans 直接用进过符号执行后的对应内存区域的话 得到的就是F(x) 而不是x
print(new_state.solver.eval(ans, cast_to=bytes))
else:
print('Answer not found')

if __name__ == '__main__':
main(sys.argv)

hook某个函数

angr_hooks | 通过调用地址hook函数

这一题尝试用另一种方式解决路径爆炸问题 像Frida hook一样将会产生路径爆炸的函数替换成自己写的函数

和上一题的对比逻辑差不多 都是一位一位比:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
_BOOL4 __cdecl check_equals_XYMKBKUHNIQYNQXE(int a1, unsigned int a2)
{
int v3; // [esp+8h] [ebp-8h]
unsigned int i; // [esp+Ch] [ebp-4h]

v3 = 0;
for ( i = 0; i < a2; ++i )
{
if ( *(i + a1) == *(i + 0x804A044) )
++v3;
}
return v3 == a2;
}

int __cdecl main(int argc, const char **argv, const char **envp)
{
_BOOL4 v3; // eax
int i; // [esp+8h] [ebp-10h]
int j; // [esp+Ch] [ebp-Ch]

qmemcpy(password, "XYMKBKUHNIQYNQXE", 16);
memset(buffer, 0, 0x11u);
printf("Enter the password: ");
__isoc99_scanf("%16s", buffer);
for ( i = 0; i <= 15; ++i )
*(i + 0x804A054) = complex_function(*(i + 0x804A054), 18 - i);
equals = check_equals_XYMKBKUHNIQYNQXE(buffer, 16);
for ( j = 0; j <= 15; ++j )
*(j + 0x804A044) = complex_function(*(j + 0x804A044), j + 9);
__isoc99_scanf("%16s", buffer);
v3 = equals && !strncmp(buffer, password, 0x10u);
equals = v3;
if ( v3 )
puts("Good Job.");
else
puts("Try again.");
return 0;
}

0x804A044和0x804A054分别就是password和buffer的地址 检查函数被调用的位置:

1
2
3
4
5
6
7
.text:080486A9 83 EC 08                      sub     esp, 8
.text:080486AC 6A 10 push 10h
.text:080486AE 68 54 A0 04 08 push offset buffer
.text:080486B3 E8 ED FE FF FF call check_equals_XYMKBKUHNIQYNQXE
.text:080486B3
.text:080486B8 83 C4 10 add esp, 10h
.text:080486BB A3 68 A0 04 08 mov ds:equals, eax

call语句长度是5bytes 要hook的位置不是函数的开头 是call的位置 同时还需要call指令的长度 hook脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import angr, sys, claripy

def is_succ(state):
return b'Good Job' in state.posix.dumps(1)
def is_fail(state):
return b'Try again' in state.posix.dumps(1)

def main(argv):
proj = angr.Project('09_angr_hooks', load_options={'auto_load_libs': False})
state = proj.factory.entry_state()
to_hook = 0x80486B3
lenth = 0x5
simgr = proj.factory.simulation_manager(state)
# 用工程对象的hook修饰器
@proj.hook(to_hook, length=lenth)
def hook_func(state): # 要替换成的函数
to_cmp = state.memory.load(0x804A044, 0x10)
arg = state.memory.load(0x804A054, 0x10)
# 返回值存在eax中
state.regs.eax = claripy.If(to_cmp == arg, claripy.BVV(1, 32), claripy.BVV(0, 32))
simgr.explore(find=is_succ, avoid=is_fail)
if simgr.found:
print(simgr.found[0].posix.dumps(0))
else:
print('Answer not found')

if __name__ == '__main__':
main(sys.argv)

angr_simprocedures | 通过符号hook函数

当要hook的函数在多处被调用 并且hook的目的是彻底更改函数执行的内容就要对所有的调用地址进行hook 非常麻烦 例如这一题:

image-20240729090200722

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int __cdecl main(int argc, const char **argv, const char **envp)
{
int i; // [esp+20h] [ebp-28h]
char s[17]; // [esp+2Bh] [ebp-1Dh] BYREF
unsigned int v6; // [esp+3Ch] [ebp-Ch]

v6 = __readgsdword(0x14u);
memcpy(&password, "ORSDDWXHZURJRBDH", 0x10u);
memset(s, 0, sizeof(s));
printf("Enter the password: ");
__isoc99_scanf("%16s", s);
for ( i = 0; i <= 15; ++i )
s[i] = complex_function(s[i], 18 - i);
if ( check_equals_ORSDDWXHZURJRBDH((int)s, 0x10u) )
puts("Good Job.");
else
puts("Try again.");
return 0;
}

程序被虚拟控制流混淆(虽然IDA的反汇编可以轻松优化出唯一会执行的路径) 这样就需要对函数的符号进行hook:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import angr, sys, claripy

def is_succ(state):
return b'Good Job' in state.posix.dumps(1)
def is_fail(state):
return b'Try again' in state.posix.dumps(1)

def main(argv):
proj = angr.Project('10_angr_simprocedures', load_options={'auto_load_libs': False})
start = 0x0
state = proj.factory.entry_state()
class simpro(angr.SimProcedure):
# 参数需要对应要hook的函数
def run(self, user_input, lenth):
to_cmp = "ORSDDWXHZURJRBDH"
content = self.state.memory.load(user_input, lenth)
return claripy.If(content == to_cmp, claripy.BVV(1, 32), claripy.BVV(0, 32))
# 作为hook_symbol的参数的用户函数要先实例化
proj.hook_symbol('check_equals_ORSDDWXHZURJRBDH', simpro())
simgr = proj.factory.simulation_manager(state)
simgr.explore(find=is_succ, avoid=is_fail)
if simgr.found:
print(simgr.found[0].posix.dumps(0))
else:
print('Answer not found')

if __name__ == '__main__':
main(sys.argv)

angr_sim_scanf | hook库函数

hook库函数的流程和上一题一样 这题也是虚拟控制流混淆 要用hook绕过angr只能输入一个参数的问题(现版本已经没有这个问题):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int __cdecl main(int argc, const char **argv, const char **envp)
{
int i; // [esp+20h] [ebp-28h]
char s[20]; // [esp+28h] [ebp-20h] BYREF
unsigned int v7; // [esp+3Ch] [ebp-Ch]

v7 = __readgsdword(0x14u);
memset(s, 0, sizeof(s));
qmemcpy(s, "SUQMKQFX", 8);
for ( i = 0; i <= 7; ++i )
s[i] = complex_function(s[i], i);
printf("Enter the password: ");
__isoc99_scanf("%u %u", buffer0, buffer1);
if ( !strncmp(buffer0, s, 4u) && !strncmp(buffer1, &s[4], 4u) )
puts("Good Job.");
else
puts("Try again.");
return 0;
}

虽然函数原型是可变参数 但是构造hook函数的时候根据要hook的那个scanf进行参数个数的设置就行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import angr, sys, claripy

def is_succ(state):
return b'Good Job' in state.posix.dumps(1)
def is_fail(state):
return b'Try again' in state.posix.dumps(1)

def main(argv):
proj = angr.Project('11_angr_sim_scanf', load_options={'auto_load_libs': False})
start = 0x0
class sim_scanf(angr.SimProcedure):
def run(self, format_string, buf1, buf2):
p1, p2 = claripy.BVS('p1', 8*4), claripy.BVS('p2', 8*4)
self.state.memory.store(buf1, p1, endness=proj.arch.memory_endness)
self.state.memory.store(buf2, p2, endness=proj.arch.memory_endness)
self.state.globals['ans'] = (p1, p2)
proj.hook_symbol('__isoc99_scanf', sim_scanf())
state = proj.factory.entry_state()
simgr = proj.factory.simulation_manager(state)
simgr.explore(find=is_succ, avoid=is_fail)
if simgr.found:
solved_state = simgr.found[0]
p1, p2 = solved_state.globals['ans']
print(solved_state.solver.eval(p1, cast_to=bytes) + solved_state.solver.eval(p2, cast_to=bytes))
else:
print('Answer not found')

if __name__ == '__main__':
main(sys.argv)

angr_veritesting | 不知道有没有用的技术

这题的官方解现在(2024.7.30)已经跑不出结果出来了 据官方文档 这种方法是结合静态符号分析来减少符号路径的条数 但是就算开启了也没办法解决本题路径爆炸问题 所以用之前的做法做这题 把路径爆炸的部分替换成用户函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import angr, sys, claripy

def is_succ(state):
return b'Good Job' in state.posix.dumps(1)
def is_fail(state):
return b'Try again' in state.posix.dumps(1)

def main(argv):
proj = angr.Project('12_angr_veritesting', load_options={'auto_load_libs': False})
start = 0x0
state = proj.factory.entry_state()
simgr = proj.factory.simgr(state, veritesting = True)
simgr.explore(find = 0x8048635)
if simgr.found:
now_state = simgr.found[0]
def complex_function(a1, a2):
return (a1 - 65 + 2 * a2) % 26 + 65
for i in range(32):
ans = now_state.memory.load(now_state.regs.ebp.args[0] - 0x2d + i, 1)
now_state.add_constraints(ans == complex_function(75, i + 93))
print(now_state.solver.eval(ans, cast_to=bytes).decode(), end='')

else:
print('Answer not found')

if __name__ == '__main__':
main(sys.argv)

angr_static_binary | 静态编译程序的符号执行

如果程序是静态编译出来的 angr就会把库函数当成用户函数 在符号执行的时候会进入库函数进行完全的符号执行 相当于步入调试 以前识别出库函数使用的是步过调试 这时候就需要hook这些静态编译进程序的库函数 替换成angr自带的库函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import angr, sys, claripy

def is_succ(state):
return b'Good Job' in state.posix.dumps(1)
def is_fail(state):
return b'Try again' in state.posix.dumps(1)

def main(argv):
proj = angr.Project('13_angr_static_binary', load_options={'auto_load_libs': False})
start = 0x0
state = proj.factory.entry_state()
simgr = proj.factory.simulation_manager(state)
proj.hook(0x8048D10, angr.SIM_PROCEDURES['glibc']['__libc_start_main']())
proj.hook(0x804ED40, angr.SIM_PROCEDURES['libc']['printf']())
proj.hook(0x804ED80, angr.SIM_PROCEDURES['glibc']['__isoc99_scanf']())
proj.hook(0x8048280, angr.SIM_PROCEDURES['libc']['strcmp']())
proj.hook(0x804F350, angr.SIM_PROCEDURES['libc']['puts']())
simgr.explore(find=is_succ, avoid=is_fail)
if simgr.found:
print(simgr.found[0].posix.dumps(0))
else:
print('Answer not found')

if __name__ == '__main__':
main(sys.argv)

hook的地址就用库函数的起始地址 区分是glibc还是libc的函数看前面有没有两个下划线就行

angr_shared_library | 符号执行动态链接库中的函数

angr的好处就是可以设定初始条件直接符号执行一段代码 而不用像调试器一样如果是库的话要附加到程序上才能调试 这题要符号执行一个外部库的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 14_angr_shared_library

int __cdecl main(int argc, const char **argv, const char **envp)
{
char s[16]; // [esp+1Ch] [ebp-1Ch] BYREF
unsigned int v5; // [esp+2Ch] [ebp-Ch]

v5 = __readgsdword(0x14u);
memset(s, 0, sizeof(s));
printf("Enter the password: ");
__isoc99_scanf("%8s", s);
if ( validate((int)s, 8) )
puts("Good Job.");
else
puts("Try again.");
return 0;
}

// lib14_angr_shared_library.so

_BOOL4 __cdecl validate(char *s1, int a2)
{
char *v3; // esi
char s2[20]; // [esp+4h] [ebp-24h] BYREF
int j; // [esp+18h] [ebp-10h]
int i; // [esp+1Ch] [ebp-Ch]

if ( a2 <= 7 )
return 0;
for ( i = 0; i <= 19; ++i )
s2[i] = 0;
qmemcpy(s2, "PVBLVTFT", 8);
for ( j = 0; j <= 7; ++j )
{
v3 = &s1[j];
*v3 = complex_function(s1[j], j);
}
return strcmp(s1, s2) == 0;
}

加载库需要指定基址 这里选base = 0x4000000:

1
2
3
4
5
6
7
8
base = 0x4000000
func_offset = 0x6D7

proj = angr.Project('lib14_angr_shared_library.so', load_options={
'main_opts': {
'custom_base_addr': base
}
})

为了设定函数执行的初始值需要一个call_state来指定函数起始地址和参数 这里假定用户输入的字符串存放地址是0x3000000:

state = proj.factory.call_state(base + func_offset, 0x3000000, 8)

后面的流程就和符号执行普通的程序一样了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import angr, sys, claripy

def main(argv):
base = 0x4000000
func_offset = 0x6D7

proj = angr.Project('lib14_angr_shared_library.so', load_options={
'main_opts': {
'custom_base_addr': base
}
})
state = proj.factory.call_state(base + func_offset, 0x3000000, 8)
pw = claripy.BVS('pw', 8 * 8)
state.memory.store(0x3000000, pw)
simgr = proj.factory.simulation_manager(state)
simgr.explore(find = base + 0x783)
if simgr.found:
new_state = simgr.found[0]
new_state.add_constraints(new_state.regs.eax == 1)
print(new_state.solver.eval(pw, cast_to=bytes))
else:
print('Answer not found')

if __name__ == '__main__':
main(sys.argv)

用Angr实现去控制流平坦化

开工之前先简单了解一下控制流平坦化的原理 去平坦化的核心任务就是恢复各个真实块(relevant block) 即原程序中就有的块之间的联系 而用Angr完成这项任务的难点有:

  1. angr自带的块对象与IDA中graph view的block有区别 angr产生的控制流图(ControlFlow Graph, CFG)会被call指令分割 而IDA中的块直到跳转才会结束一个块 幸运的是官方的附带组件库angr_management中有一个转化angr CFG为IDA CFG的函数 但是同时也带来了第二个问题

  2. angr_management中的to_supergraph产生的IDA CFG实际上是networkx库中的DI-Graph(有向图)类的派生类 相比angr CFG 可操作性直线下降

  3. 对原来产生分支的块的处理

这里一步步地进行去平坦化并在遇到这些问题时进行解释

获取各种块以方便后续的操作

这里就要发挥IDA CFG的优势了 根据上文引用的资料 各个块之间的界限以及各自的特征(以有向图的视角看 特征就是一个块的in-degree和out-degree)还是非常明显的:

image-20240810222053426

将angr CFG变成IDA CFG后根据特征汇总每种块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def Get_CFG(proj : angr.Project):
cfg = None
try:
cfg = proj.analyses.CFGFast(normalize=True, force_complete_scan=False)
except:
cfg = proj.analyses.CFGFast(normalize=True)
addr = int(args.addr, 16) if args.addr != 'main' else args.addr
cfg = cfg.functions[addr].transition_graph
IDA_cfg = graph.to_supergraph(cfg)
return IDA_cfg

def Get_Blocks(cfg) -> dict:
blocks = {}
for node in cfg.nodes():
if cfg.in_degree(node) == 0:
blocks['entry'] = node
elif cfg.out_degree(node) == 0:
blocks['exit'] = node
blocks['main_dispatcher'] = list(cfg.successors(blocks['entry']))[0]
for node in cfg.predecessors(blocks['main_dispatcher']):
if cfg.in_degree(node) != 0:
blocks['pre_dispatcher'] = node
break
blocks['relevant'] = [blocks['entry']] + [node for node in cfg.predecessors(blocks['pre_dispatcher']) if cfg.in_degree(node) == 1] + [blocks['exit']]
blocks['irrelevant'] = [node for node in cfg.nodes() if node not in blocks['relevant'] and node != blocks['main_dispatcher'] and node != blocks['pre_dispatcher']]
return blocks

获取真实块的详细信息

这一步的目的是为下一步重建真实块之间的联系做铺垫 真实块在原程序中最多有两个out-degree 也就是说这里要对真实块进行分类 最简单的就是没有in-degree和没有out-degree的入口和终止块 剩下的程序内容可以分为有分支和无分支的真实块 稍微了解一下(x86_64)控制流平坦化的实现就能发现 原本的jx跳转都需要转化为对对整个控制流起决定性的关键值的分支化赋值 具体实现就是采用cmovxx指令:

image-20240810222920030

cmovxx指令会根据zf, sf等标志位来对源寄存器进行赋值 例如cmovz会在zf=1时执行mov eax, ecx反之什么也不干 在确定框架的前提下用angr CFG自带的.capstone.ins方法来获取汇编指令 并用汇编指令来判断该块是否会产生分支 这里就会体现第1, 2点的缺陷了 因为用IDA CFG创建的真实块如果调用某函数的话 在对应的angr CFG中会是分隔的一个或几个块 这里用递归的方法找到angr CFG中包含jmp指令的块来保证已经分析完一个完整IDA CFG块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def Get_branch(block_addr, all_branches, target, _from=None, flag=False) -> bool:
# This function is used to get the address where the branch is located with it's type and the address of the end of the block
if _from is None:
_from = block_addr
all_branches[_from] = [None, None]
block = proj.factory.block(block_addr)
def NOP(state):
pass
block_ins = block.capstone.insns
for ins in block_ins:
if ins.mnemonic == 'call':
# Hook all calls to avoid errors in the CFG(too deep recursion)
proj.hook(ins.address, hook=NOP, length=ins.size)
elif ins.mnemonic.startswith('cmov'):
all_branches[_from][0], all_branches[_from][1] = ins.address, ins.mnemonic
flag = True
if ins.mnemonic == 'jmp' or ins.mnemonic == 'retn':
all_branches[_from].append(ins.address)
return flag
else:
return Get_branch(ins.address + ins.size, all_branches, target, _from, flag)

最后all_branches就会包含所有真实块的3个信息:起始地址, 分支类型(若没有则为None), 结束地址 在后面的恢复控制流中会用到

重建控制流

接下来只需要根据分支类型来确定真实块间的关系就算是基本完成任务了 但是难点也来了 原本我的想法是不采用以真实块作为起点的宽搜方式 而是从入口块开始执行 期待angr的符号执行会在cmovxx指令自动分离出两个active 然后只要不断使用step()方法直到active.addr出现在真实块的起始地址里就能轻松获取控制流 但是angr只会在跳转产生分支 根本不鸟我 完全没有产生分支 似了

然后我的想法是根据分支类型来设置sf, zf等标志位 手动产生两条active 但是angr本来设计来就是遍历所有可能的跳转的 根本没有标志位 又似了

最后在网上找到了一种方法 通过控制angr底层的vex IR的执行来达到上面第二种尝试的目的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def Get_successors(node, targets : list, branch_sign=None, cond=None):
if branch_sign is None:
state = proj.factory.blank_state(addr=node.addr, remove_options={angr.sim_options.LAZY_SOLVES})
simgr = proj.factory.simulation_manager(state)
while True:
simgr.step()
if simgr.active[0].addr in targets:
break
return simgr.active[0].addr
else:
def apply_branch(state):
expressions = list(state.scratch.irsb.statements[state.inspect.statement].expressions)
if expressions and isinstance(expressions[0], pyvex.expr.ITE):
state.scratch.temps[expressions[0].cond.tmp] = cond
state.inspect._breakpoints['statement'] = []
state = proj.factory.blank_state(addr=node.addr, remove_options={angr.sim_options.LAZY_SOLVES})
state.inspect.b('statement', when=angr.BP_BEFORE, action=apply_branch)
simgr = proj.factory.simulation_manager(state)
while True:
simgr.step()
if simgr.active[0].addr in targets:
break
return simgr.active[0].addr

new_cfg = nx.DiGraph()
new_cfg.add_node(blocks['relevant'][0])
relevant_addrs = [node.addr for node in blocks['relevant'][1:]]
all_branches = {}
# Get the first real block of the function
end = Get_successors(blocks['relevant'][0], relevant_addrs)
end = blocks['relevant'][relevant_addrs.index(end) + 1]
new_cfg.add_edge(blocks['relevant'][0], end)
while True:
nodes = [node for node in new_cfg.nodes() if new_cfg.out_degree(node) == 0 and node != blocks['exit']]
if not nodes:
break
for node in nodes:
# Record each branch in relevant block. If there's a branch, decide the operate after cmp(jz/jnz)
Get_branch(node.addr, all_branches, blocks['pre_dispatcher'].addr)
if all_branches[node.addr][0] is None:
end = Get_successors(node, relevant_addrs)
end = blocks['relevant'][relevant_addrs.index(end) + 1]
new_cfg.add_edge(node, end)
continue
else:
end_true = Get_successors(node, relevant_addrs, all_branches[node.addr][0], claripy.BVV(1, 1))
end_false = Get_successors(node, relevant_addrs, all_branches[node.addr][0], claripy.BVV(0, 1))
end_true, end_false = blocks['relevant'][relevant_addrs.index(end_true) + 1], blocks['relevant'][relevant_addrs.index(end_false) + 1]
new_cfg.add_edges_from([(node, end_true), (node, end_false)])

简单来说就是cmovxx的指令转化为vexcode时必定会产生v3 = ITE(cond, v1, v2)指令 通过更改cond就能达到上面的尝试的目的 同时分析的过程中打印出vex IR也能注意到 当原本的指令为cmovx v1, v2时 对应的vex IR总为v1 = ITE(cond, v1, v2) 相反 如果原来指令是cmovnx 那么vex IR就会是v1 = ITE(cond, v2, v1) 也就是说当设置cond为1时 执行的就是符号位满足执行mov指令的条件 那么当cond为1时cmovxx就完美对应了jxx

patch程序

上面已经完成了所有困难的任务了 接下来要做的就是根据重建的控制流图来patch程序 这里将所有的无用块都patch成nop以方便后续对某些无用块的利用 同时对有分支的真实块在cmovxx处开始patch 那么剩余的真实块也需要patch成nop以防指令识别错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
with open(args.path, 'rb') as f:
origin = bytearray(f.read())
base = proj.loader.main_object.min_addr
arch = proj.arch.name
if arch == 'AMD64':
ks = ks.Ks(ks.KS_ARCH_X86, ks.KS_MODE_64)
# Fill all the irrelevant blocks with NOP
for node in blocks['irrelevant'] + [blocks['pre_dispatcher']] + [blocks['main_dispatcher']]:
origin[node.addr - base:node.addr - base + node.size] = ks.asm('nop')[0] * node.size
# Redirect the entry block to the first relevant block
node = blocks['main_dispatcher']
first_relevant = list(new_cfg.successors([node for node in new_cfg.nodes() if new_cfg.in_degree(node) == 0][0]))[0]
new_opcode = ks.asm(f'jmp {hex(first_relevant.addr)}', node.addr)[0]
origin[node.addr - base:node.addr - base + len(new_opcode)] = new_opcode
# Rebuild the network between nodes
nodes = [node for node in new_cfg if new_cfg.in_degree(node) and new_cfg.out_degree(node)]
for node in nodes:
follows = list(new_cfg.successors(node))
if len(follows) == 1:
patch_addr = all_branches[node.addr][2]
new_opcode = ks.asm(f'jmp {hex(follows[0].addr)}', patch_addr)[0]
origin[patch_addr - base:patch_addr - base + len(new_opcode)] = new_opcode
else:
patch_addr = all_branches[node.addr][0]
end_addr = all_branches[node.addr][2]
# Fill the rest of the block with NOP
origin[patch_addr - base:end_addr - base] = ks.asm('nop')[0] * (end_addr - patch_addr)
f1, f2 = follows[0], follows[1]
op1 = x86_64_book[all_branches[node.addr][1]] + ' ' + hex(f1.addr)
new_opcode1 = ks.asm(op1, patch_addr)[0]
op2 = 'jmp ' + hex(f2.addr)
new_opcode2 = ks.asm(op2, patch_addr + len(new_opcode1))[0]
origin[patch_addr - base:patch_addr - base + len(new_opcode1 + new_opcode2)] = new_opcode1 + new_opcode2
# Write the patched program
with open(args.output, 'wb') as f_:
f_.write(bytes(origin))

这里还要提出 将真实块添加到new_cfg中时是以 先加入cond = 1到达的下一个真实块后加入cond = 0到达的下一个真实块的顺序加入的 这个脚本能成功运行基于networkx对加入顺序的记忆性 当用.successors()方法取出时会按同样的顺序取出

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import angr, logging, pyvex, claripy, binascii, argparse
import networkx as nx
import keystone as ks
from angrmanagement.utils import graph

parser = argparse.ArgumentParser(description='Deflat the program')
parser.add_argument('-i', '--path', help='The path to input program', required=True)
parser.add_argument('-a', '--addr', help='The address of the function you want to deflat, if not given, deflat main', required=False, default='main', type=str)
parser.add_argument('-o', '--output', help='The name of output program', required=True)
parser.add_argument('-g', '--graph', help='Draw the graph of the CFG after deflated, off as default', required=False, action='store_true')
parser.add_argument('-v', '--vex', help='Print the vex code of the relevant block, off as default', required=False, action='store_true')
args = parser.parse_args()

logging.getLogger('angr').setLevel('ERROR')
proj = angr.Project(args.path, load_options={'auto_load_libs': False})
x86_64_book = {'cmovz' : 'jz', 'cmove' : 'jz', 'cmovnz' : 'jnz', 'cmovne' : 'jnz', 'cmovg' : 'jg', 'cmovl' : 'jl', 'cmovge' : 'jge', 'cmovle' : 'jle', 'jmp' : 'jmp', 'call' : 'call', 'nop' : 'nop', 'ret' : [['ret', 'retn'], ''], 'name' : 'x86_64'}
arm_book = {'moveq' : 'beq', 'movne' : 'bne', 'movgt' : 'bgt', 'movlt' : 'blt', 'movge' : 'bge', 'movle' : 'ble', 'jmp' : 'b', 'call' : 'bl', 'nop' : 'nop', 'ret' : [['bx'], 'lr'], 'name' : 'arm'}
Ins_book = {'AMD64' : x86_64_book, 'ARMEL' : arm_book, 'X86' : x86_64_book}
KSS = {'AMD64' : ks.Ks(ks.KS_ARCH_X86, ks.KS_MODE_64), 'ARMEL' : ks.Ks(ks.KS_ARCH_ARM, ks.KS_MODE_ARM), 'X86' : ks.Ks(ks.KS_ARCH_X86, ks.KS_MODE_32)}
book = Ins_book[proj.arch.name]
ks = KSS[proj.arch.name]

def Get_CFG(proj : angr.Project):
cfg = None
try:
cfg = proj.analyses.CFGFast(normalize=True, force_complete_scan=False)
except:
cfg = proj.analyses.CFGFast(normalize=True)
addr = int(args.addr, 16) if args.addr != 'main' else args.addr
cfg = cfg.functions[addr].transition_graph
IDA_cfg = graph.to_supergraph(cfg)
return IDA_cfg

def Get_Blocks(cfg) -> dict:
blocks = {}
for node in cfg.nodes():
if cfg.in_degree(node) == 0:
blocks['entry'] = node
elif cfg.out_degree(node) == 0:
blocks['exit'] = node
blocks['main_dispatcher'] = list(cfg.successors(blocks['entry']))[0]
for node in cfg.predecessors(blocks['main_dispatcher']):
if cfg.in_degree(node) != 0:
blocks['pre_dispatcher'] = node
break
blocks['relevant'] = [blocks['entry']] + [node for node in cfg.predecessors(blocks['pre_dispatcher']) if cfg.in_degree(node) == 1] + [blocks['exit']]
blocks['irrelevant'] = [node for node in cfg.nodes() if node not in blocks['relevant'] and node != blocks['main_dispatcher'] and node != blocks['pre_dispatcher']]
return blocks

def Get_branch(block_addr, all_branches, target, _from=None, flag=False) -> bool:
# This function is used to get the address where the branch is located with it's type and the address of the end of the block
if _from is None:
_from = block_addr
all_branches[_from] = [None, None]
block = proj.factory.block(block_addr)
def NOP(state):
pass
block_ins = block.capstone.insns
for ins in block_ins:
if ins.mnemonic == book['call']:
# Hook all calls to avoid errors in the CFG(too deep recursion)
proj.hook(ins.address, hook=NOP, length=ins.size)
print(f'hooked {hex(ins.address)}')
elif ins.mnemonic.startswith('cmov') or (ins.mnemonic.startswith('mov') and len(ins.mnemonic) > 3 and book['name'] == 'arm'):
all_branches[_from][0], all_branches[_from][1] = ins.address, ins.mnemonic
flag = True
if ins.mnemonic == book['jmp'] or (ins.mnemonic in book['ret'][0] and ins.op_str == book['ret'][1]):
all_branches[_from].append(ins.address)
return flag
else:
return Get_branch(ins.address + ins.size, all_branches, target, _from, flag)

def Get_successors(node, targets : list, branch_sign=None, cond=None):
if branch_sign is None:
state = proj.factory.blank_state(addr=node.addr, remove_options={angr.sim_options.LAZY_SOLVES})
simgr = proj.factory.simulation_manager(state)
while True:
simgr.step()
if simgr.active[0].addr in targets:
break
return simgr.active[0].addr
else:
def apply_branch(state):
expressions = list(state.scratch.irsb.statements[state.inspect.statement].expressions)
if expressions and isinstance(expressions[0], pyvex.expr.ITE):
if args.vex:
# ========Inspec of vexcode========
block = proj.factory.block(node.addr)
print(block.vex.pp())
# ========Inspec of vexcode========
state.scratch.temps[expressions[0].cond.tmp] = cond
state.inspect._breakpoints['statement'] = []
state = proj.factory.blank_state(addr=node.addr, remove_options={angr.sim_options.LAZY_SOLVES})
state.inspect.b('statement', when=angr.BP_BEFORE, action=apply_branch)
simgr = proj.factory.simulation_manager(state)
while True:
simgr.step()
if simgr.active[0].addr in targets:
break
return simgr.active[0].addr

if __name__ == '__main__':
# Get the cfg of a function, if passed a function address, get the cfg of that function, default is main:
cfg = Get_CFG(proj)

# Get each kind of block:
blocks = Get_Blocks(cfg)

# Rebuild the cfg:
new_cfg = nx.DiGraph()
new_cfg.add_node(blocks['relevant'][0])
relevant_addrs = [node.addr for node in blocks['relevant'][1:]]
all_branches = {}
print(f'Entry: {hex(blocks["entry"].addr)}\nExit: {hex(blocks["exit"].addr)}\nMain Dispatcher: {hex(blocks["main_dispatcher"].addr)}\nPre Dispatcher: {hex(blocks["pre_dispatcher"].addr)}\nRelevant: {[hex(node.addr) for node in blocks["relevant"]]}\nIrrelevant: {[hex(node.addr) for node in blocks["irrelevant"]]}')
input('...')
# Get the first real block of the function
end = Get_successors(blocks['relevant'][0], relevant_addrs)
end = blocks['relevant'][relevant_addrs.index(end) + 1]
new_cfg.add_edge(blocks['relevant'][0], end)
while True:
nodes = [node for node in new_cfg.nodes() if new_cfg.out_degree(node) == 0 and node != blocks['exit']]
if not nodes:
break
for node in nodes:
# Record each branch in relevant block. If there's a branch, decide the operate after cmp(jz/jnz)
Get_branch(node.addr, all_branches, blocks['pre_dispatcher'].addr)
for branch in all_branches:
print(f'{hex(branch)} : {all_branches[branch]}')
if all_branches[node.addr][0] is None:
end = Get_successors(node, relevant_addrs)
end = blocks['relevant'][relevant_addrs.index(end) + 1]
new_cfg.add_edge(node, end)
continue
else:
end_true = Get_successors(node, relevant_addrs, all_branches[node.addr][0], claripy.BVV(1, 1))
end_false = Get_successors(node, relevant_addrs, all_branches[node.addr][0], claripy.BVV(0, 1))
end_true, end_false = blocks['relevant'][relevant_addrs.index(end_true) + 1], blocks['relevant'][relevant_addrs.index(end_false) + 1]
new_cfg.add_edges_from([(node, end_true), (node, end_false)])
if args.graph:
from matplotlib import pyplot as plt
# ========Draw the new cfg========
nx.draw(new_cfg, with_labels=True)
plt.show()
# ========Draw the new cfg========

# Patch the origin program:
with open(args.path, 'rb') as f:
origin = bytearray(f.read())
base = proj.loader.main_object.mapped_base
# Fill all the irrelevant blocks with NOP
for node in blocks['irrelevant'] + [blocks['pre_dispatcher']] + [blocks['main_dispatcher']]:
origin[node.addr - base:node.addr - base + node.size] = ks.asm('nop')[0] * (node.size // len(ks.asm('nop')[0]))
# Redirect the entry block to the first relevant block
node = blocks['main_dispatcher']
first_relevant = list(new_cfg.successors([node for node in new_cfg.nodes() if new_cfg.in_degree(node) == 0][0]))[0]
new_opcode = ks.asm(book['jmp'] + ' ' + hex(first_relevant.addr), node.addr)[0]
origin[node.addr - base:node.addr - base + len(new_opcode)] = new_opcode
# Rebuild the network between nodes
nodes = [node for node in new_cfg if new_cfg.in_degree(node) and new_cfg.out_degree(node)]
for node in nodes:
follows = list(new_cfg.successors(node))
print(f'{hex(node.addr)} --> {[hex(follow.addr) for follow in follows]}')
if len(follows) == 1:
patch_addr = all_branches[node.addr][2]
new_opcode = ks.asm(book['jmp'] + ' ' + hex(follows[0].addr), patch_addr)[0]
origin[patch_addr - base:patch_addr - base + len(new_opcode)] = new_opcode
else:
patch_addr = all_branches[node.addr][0]
end_addr = all_branches[node.addr][2]
# Fill the rest of the block with NOP
origin[patch_addr - base:end_addr - base] = ks.asm('nop')[0] * ((end_addr - patch_addr) // len(ks.asm('nop')[0]))
f1, f2 = follows[0], follows[1]
op1 = book[all_branches[node.addr][1]] + ' ' + hex(f1.addr)
new_opcode1 = ks.asm(op1, patch_addr)[0]
op2 = book['jmp'] + ' ' + hex(f2.addr)
new_opcode2 = ks.asm(op2, patch_addr + len(new_opcode1))[0]
origin[patch_addr - base:patch_addr - base + len(new_opcode1 + new_opcode2)] = new_opcode1 + new_opcode2
# print(f'{hex(patch_addr)} -- {hex(patch_addr + len(new_opcode1 + new_opcode2))} : {bytes(new_opcode1 + new_opcode2)}')
# Write the patched program
with open(args.output, 'wb') as f_:
f_.write(bytes(origin))

总结&后续打算

学到虚脱

把其他框架的也加进来增强兼容性(24/8/20 已实现) 后面应该还会再用angr实现一下普通花指令的梭哈和绕过虚拟控制流

参考

  1. https://security.tencent.com/index.php/blog/msg/112
  2. https://34r7hm4n.me/0x401RevTrain-Tools/angr/10_%E5%88%A9%E7%94%A8angr%E7%AC%A6%E5%8F%B7%E6%89%A7%E8%A1%8C%E5%8E%BB%E9%99%A4%E6%8E%A7%E5%88%B6%E6%B5%81%E5%B9%B3%E5%9D%A6%E5%8C%96/
  3. https://izayoishiki.github.io (请视奸他)

用Angr实现去虚拟跳转 / 控制流

实际上虚拟跳转比虚拟控制流要容易去除 因为IDA在硬编码中间数的前提下IDA可以直接计算出最终跳转的结果 例如:

image-20240911145906286

但是使用Angr来去除虚拟跳转 / 控制流可以不用考虑那么多 只需要当作弱化的控制流平坦化处理就行 直接找到虚拟跳转的特征结构:

image-20240911151216660

据此确定要开始符号执行的位置 虚拟跳转目标的计算和原程序控制流是完全没有关系的 可以直接从特征结构的头开始符号执行:

1
2
3
4
5
6
7
8
9
10
11
proj = angr.Project('fix2.so', load_options={'auto_load_libs': False})
ks = KSS[proj.arch.name]
bias = -0x10

func_block = proj.factory.block(addr=start, size=end-start+1)
to_patch = []
for ins in func_block.capstone.insns:
if ins.insn.mnemonic == 'bl':
proj.hook(int(ins.op_str[3:], 16), angr.SIM_PROCEDURES["stubs"]["ReturnUnconstrained"](), replace=True)
elif ins.insn.op_str.startswith('pc'):
to_patch.append(ins.address)

然后进行记录跳转后的地址进行patch即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
patch_addr = {}
for addr in to_patch:
state = proj.factory.blank_state(addr = addr + bias, remove_options={angr.options.LAZY_SOLVES})
simgr = proj.factory.simulation_manager(state)
while simgr.active[0].addr != addr:
simgr.step(num_inst=1)
simgr.step(num_inst=1)
to_jmp = simgr.active[0].addr
patch_addr[addr] = to_jmp
with open('fix.so', 'rb') as f:
data = bytearray(f.read())
base = 0x400001
NOP = ks.asm('nop')[0]
for addr, jmp in patch_addr.items():
data[addr - base + bias:addr - base] = NOP * (abs(bias) // 2)
print(f'Fill {hex(addr - base + bias)} to {hex(addr - base)} with NOP({abs(bias) // 2})')
opcode = ks.asm(f'b {hex(jmp)}', addr + bias)[0]
data[addr - base + bias:addr - base + bias + len(opcode)] = opcode
print(f'Patched {hex(addr)} -> {hex(jmp)}')
fix_so = open('fix.so', 'wb')
fix_so.write(data)
fix_so.close()

完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import angr, logging
import keystone as ks
import capstone as cs

logging.getLogger('angr').setLevel('ERROR')
start, end = 0x401700, 0x40290C
KSS = {'AMD64' : ks.Ks(ks.KS_ARCH_X86, ks.KS_MODE_64), 'ARMEL' : ks.Ks(ks.KS_ARCH_ARM, ks.KS_MODE_THUMB), 'X86' : ks.Ks(ks.KS_ARCH_X86, ks.KS_MODE_32)}

proj = angr.Project('fix2.so', load_options={'auto_load_libs': False})
ks = KSS[proj.arch.name]
bias = -0x10

func_block = proj.factory.block(addr=start, size=end-start+1)
to_patch = []
for ins in func_block.capstone.insns:
if ins.insn.mnemonic == 'bl':
proj.hook(int(ins.op_str[3:], 16), angr.SIM_PROCEDURES["stubs"]["ReturnUnconstrained"](), replace=True)
# print(f'Hooked {hex(int(ins.op_str[3:], 16))}')
elif ins.insn.op_str.startswith('pc'):
to_patch.append(ins.address)
# print(f'Virtual jmp at: {hex(ins.address)}')
patch_addr = {}
for addr in to_patch:
state = proj.factory.blank_state(addr = addr + bias, remove_options={angr.options.LAZY_SOLVES})
simgr = proj.factory.simulation_manager(state)
while simgr.active[0].addr != addr:
simgr.step(num_inst=1)
simgr.step(num_inst=1)
to_jmp = simgr.active[0].addr
patch_addr[addr] = to_jmp
with open('fix.so', 'rb') as f:
data = bytearray(f.read())
base = 0x400001
NOP = ks.asm('nop')[0]
for addr, jmp in patch_addr.items():
data[addr - base + bias:addr - base] = NOP * (abs(bias) // 2)
print(f'Fill {hex(addr - base + bias)} to {hex(addr - base)} with NOP({abs(bias) // 2})')
opcode = ks.asm(f'b {hex(jmp)}', addr + bias)[0]
data[addr - base + bias:addr - base + bias + len(opcode)] = opcode
print(f'Patched {hex(addr)} -> {hex(jmp)}')
fix_so = open('fix.so', 'wb')
fix_so.write(data)
fix_so.close()

修复前后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void __fastcall sub_1700(int a1, int a2, int a3)
{
int v3; // [sp+8h] [bp-E8h]
char *v4; // [sp+Ch] [bp-E4h]
int v5; // [sp+10h] [bp-E0h]
int v6; // [sp+14h] [bp-DCh]
char v7; // [sp+1Ch] [bp-D4h] BYREF
int v8; // [sp+28h] [bp-C8h]

v4 = &v7;
v3 = a3;
v5 = a2;
v6 = a1;
v8 = 1066412143;
__asm { MOV PC, R0; loc_174C }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int __fastcall sub_1700(int a1, int a2, int a3)
{
char *v3; // r1
char *v4; // r1
int v5; // r0
...
int v93; // [sp+DCh] [bp-14h]
_BYTE *v94; // [sp+E0h] [bp-10h]

v48 = &v52;
v47 = a3;
v49 = a2;
v50 = a1;
v54 = 1066412143;
while ( 1 )
{
...
}
return _stack_chk_guard;
}