mirror of
https://github.com/calebstewart/pwncat.git
synced 2024-11-24 01:25:37 +01:00
Merged master
This commit is contained in:
commit
7655b40698
@ -106,11 +106,12 @@ def main():
|
||||
sys.stdout.buffer.write(data)
|
||||
sys.stdout.flush()
|
||||
except ConnectionResetError:
|
||||
handler.restore()
|
||||
handler.restore_local_term()
|
||||
util.warn("connection reset by remote host")
|
||||
finally:
|
||||
# Restore the shell
|
||||
handler.restore()
|
||||
handler.restore_local_term()
|
||||
util.success("local terminal restored")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
361
pwncat/commands/__init__.py
Normal file
361
pwncat/commands/__init__.py
Normal file
@ -0,0 +1,361 @@
|
||||
#!/usr/bin/env python3
|
||||
from prompt_toolkit import PromptSession, ANSI
|
||||
from prompt_toolkit.shortcuts import ProgressBar
|
||||
from prompt_toolkit.completion import (
|
||||
Completer,
|
||||
PathCompleter,
|
||||
Completion,
|
||||
CompleteEvent,
|
||||
NestedCompleter,
|
||||
WordCompleter,
|
||||
merge_completers,
|
||||
)
|
||||
from pygments.lexer import RegexLexer, bygroups, include
|
||||
from pygments.token import *
|
||||
from pygments.style import Style
|
||||
from prompt_toolkit.styles.pygments import style_from_pygments_cls
|
||||
from prompt_toolkit.lexers import PygmentsLexer
|
||||
from prompt_toolkit.document import Document
|
||||
from pygments.styles import get_style_by_name
|
||||
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
|
||||
from prompt_toolkit.history import InMemoryHistory
|
||||
from typing import Dict, Any, List, Iterable
|
||||
from enum import Enum, auto
|
||||
import argparse
|
||||
import pkgutil
|
||||
import shlex
|
||||
import os
|
||||
import re
|
||||
|
||||
from pprint import pprint
|
||||
|
||||
from pwncat.commands.base import CommandDefinition, Complete
|
||||
from pwncat.util import State
|
||||
from pwncat import util
|
||||
|
||||
|
||||
class CommandParser:
|
||||
""" Handles dynamically loading command classes, parsing input, and
|
||||
dispatching commands. """
|
||||
|
||||
def __init__(self, pty: "pwncat.pty.PtyHandler"):
|
||||
""" We need to dynamically load commands from pwncat.commands """
|
||||
|
||||
self.commands: List["CommandDefinition"] = []
|
||||
|
||||
for loader, module_name, is_pkg in pkgutil.walk_packages(__path__):
|
||||
if module_name == "base":
|
||||
continue
|
||||
self.commands.append(
|
||||
loader.find_module(module_name)
|
||||
.load_module(module_name)
|
||||
.Command(pty, self)
|
||||
)
|
||||
|
||||
history = InMemoryHistory()
|
||||
completer = CommandCompleter(pty, self.commands)
|
||||
lexer = PygmentsLexer(CommandLexer.build(self.commands))
|
||||
style = style_from_pygments_cls(get_style_by_name("monokai"))
|
||||
auto_suggest = AutoSuggestFromHistory()
|
||||
|
||||
self.prompt = PromptSession(
|
||||
[
|
||||
("fg:ansiyellow bold", "(local) "),
|
||||
("fg:ansimagenta bold", "pwncat"),
|
||||
("", "$ "),
|
||||
],
|
||||
completer=completer,
|
||||
lexer=lexer,
|
||||
style=style,
|
||||
auto_suggest=auto_suggest,
|
||||
complete_while_typing=False,
|
||||
history=history,
|
||||
)
|
||||
self.toolbar = PromptSession(
|
||||
[
|
||||
("fg:ansiyellow bold", "(local) "),
|
||||
("fg:ansimagenta bold", "pwncat"),
|
||||
("", "$ "),
|
||||
],
|
||||
completer=completer,
|
||||
lexer=lexer,
|
||||
style=style,
|
||||
auto_suggest=auto_suggest,
|
||||
complete_while_typing=False,
|
||||
prompt_in_toolbar=True,
|
||||
history=history,
|
||||
)
|
||||
|
||||
self.pty = pty
|
||||
|
||||
def run_single(self):
|
||||
|
||||
try:
|
||||
line = self.toolbar.prompt().strip()
|
||||
except (EOFError, OSError, KeyboardInterrupt):
|
||||
pass
|
||||
else:
|
||||
if line != "":
|
||||
self.dispatch_line(line)
|
||||
|
||||
def run(self):
|
||||
|
||||
self.running = True
|
||||
|
||||
while self.running:
|
||||
try:
|
||||
try:
|
||||
line = self.prompt.prompt().strip()
|
||||
except (EOFError, OSError):
|
||||
self.pty.state = State.RAW
|
||||
self.running = False
|
||||
continue
|
||||
|
||||
if line == "":
|
||||
continue
|
||||
|
||||
self.dispatch_line(line)
|
||||
except KeyboardInterrupt:
|
||||
continue
|
||||
|
||||
def dispatch_line(self, line: str):
|
||||
""" Parse the given line of command input and dispatch a command """
|
||||
|
||||
try:
|
||||
# Spit the line with shell rules
|
||||
argv = shlex.split(line)
|
||||
except ValueError as e:
|
||||
util.error(e.args[0])
|
||||
return
|
||||
|
||||
# Search for a matching command
|
||||
for command in self.commands:
|
||||
if command.PROG == argv[0]:
|
||||
break
|
||||
else:
|
||||
util.error(f"{argv[0]}: unknown command")
|
||||
return
|
||||
|
||||
try:
|
||||
# Parse the arguments
|
||||
args = command.parser.parse_args(argv[1:])
|
||||
|
||||
# Run the command
|
||||
command.run(args)
|
||||
except SystemExit:
|
||||
# The arguments were icncorrect
|
||||
return
|
||||
|
||||
|
||||
class CommandLexer(RegexLexer):
|
||||
|
||||
tokens = {}
|
||||
|
||||
@classmethod
|
||||
def build(cls, commands: List["CommandDefinition"]) -> "CommandLexer":
|
||||
""" Build the RegexLexer token list from the command definitions """
|
||||
|
||||
root = []
|
||||
for command in commands:
|
||||
root.append(("^" + re.escape(command.PROG), Name.Function, command.PROG))
|
||||
mode = []
|
||||
for args, descr in command.ARGS.items():
|
||||
for arg in args.split(","):
|
||||
if not arg.startswith("-"):
|
||||
continue
|
||||
if descr[0] != Complete.NONE:
|
||||
# Enter param state
|
||||
mode.append((r"\s+" + re.escape(arg), descr[1], "param"))
|
||||
else:
|
||||
# Don't enter param state
|
||||
mode.append((r"\s+" + re.escape(arg), descr[1]))
|
||||
mode.append((r"\s+(\-\-help|\-h)", Name.Label))
|
||||
mode.append((r"\"", String, "string"))
|
||||
mode.append((r".", Text))
|
||||
cls.tokens[command.PROG] = mode
|
||||
|
||||
root.append((r".", Text))
|
||||
cls.tokens["root"] = root
|
||||
cls.tokens["param"] = [
|
||||
(r"\"", String, "string"),
|
||||
(r"\s", Text, "#pop"),
|
||||
(r"[^\s]", Text),
|
||||
]
|
||||
cls.tokens["string"] = [
|
||||
(r"[^\"\\]+", String),
|
||||
(r"\\.", String.Escape),
|
||||
('"', String, "#pop"),
|
||||
]
|
||||
|
||||
return cls
|
||||
|
||||
|
||||
class RemotePathCompleter(Completer):
|
||||
""" Complete remote file names/paths """
|
||||
|
||||
def __init__(self, pty: "pwncat.pty.PtyHandler"):
|
||||
self.pty = pty
|
||||
|
||||
def get_completions(self, document: Document, complete_event: CompleteEvent):
|
||||
|
||||
before = document.text_before_cursor.split()[-1]
|
||||
path, partial_name = os.path.split(before)
|
||||
|
||||
if path == "":
|
||||
path = "."
|
||||
|
||||
pipe = self.pty.subprocess(f"ls -1 -a {shlex.quote(path)}", "r")
|
||||
|
||||
for name in pipe:
|
||||
name = name.decode("utf-8").strip()
|
||||
if name.startswith(partial_name):
|
||||
yield Completion(
|
||||
name,
|
||||
start_position=-len(partial_name),
|
||||
display=[("#ff0000", "(remote)"), ("", f" {name}")],
|
||||
)
|
||||
|
||||
|
||||
class LocalPathCompleter(Completer):
|
||||
""" Complete local file names/paths """
|
||||
|
||||
def __init__(self, pty: "PtyHandler"):
|
||||
self.pty = pty
|
||||
|
||||
def get_completions(self, document: Document, complete_event: CompleteEvent):
|
||||
|
||||
before = document.text_before_cursor.split()[-1]
|
||||
path, partial_name = os.path.split(before)
|
||||
|
||||
if path == "":
|
||||
path = "."
|
||||
|
||||
# Ensure the directory exists
|
||||
if not os.path.isdir(path):
|
||||
return
|
||||
|
||||
for name in os.listdir(path):
|
||||
if name.startswith(partial_name):
|
||||
yield Completion(
|
||||
name,
|
||||
start_position=-len(partial_name),
|
||||
display=[("fg:ansiyellow", "(local)"), ("", f" {name}")],
|
||||
)
|
||||
|
||||
|
||||
class CommandCompleter(Completer):
|
||||
""" Complete commands from a given list of commands """
|
||||
|
||||
def __init__(
|
||||
self, pty: "pwncat.pty.PtyHandler", commands: List["CommandDefinition"]
|
||||
):
|
||||
""" Construct a new command completer """
|
||||
|
||||
self.layers = {}
|
||||
local_file_completer = LocalPathCompleter(pty)
|
||||
remote_file_completer = RemotePathCompleter(pty)
|
||||
|
||||
for command in commands:
|
||||
self.layers[command.PROG] = [None, [], {}]
|
||||
option_names = []
|
||||
positional_completers = []
|
||||
for name_list, descr in command.ARGS.items():
|
||||
name_list = name_list.split(",")
|
||||
if descr[0] == Complete.CHOICES:
|
||||
completer = WordCompleter(descr[3]["choices"])
|
||||
elif descr[0] == Complete.LOCAL_FILE:
|
||||
completer = local_file_completer
|
||||
elif descr[0] == Complete.REMOTE_FILE:
|
||||
completer = remote_file_completer
|
||||
elif descr[0] == Complete.NONE:
|
||||
completer = None
|
||||
if len(name_list) == 1 and not name_list[0].startswith("-"):
|
||||
self.layers[command.PROG][1].append(completer)
|
||||
else:
|
||||
for name in name_list:
|
||||
self.layers[command.PROG][2][name] = completer
|
||||
option_names.append(name)
|
||||
self.layers[command.PROG][0] = WordCompleter(
|
||||
option_names + ["--help", "-h"]
|
||||
)
|
||||
|
||||
self.completer = WordCompleter(list(self.layers))
|
||||
|
||||
def get_completions(
|
||||
self, document: Document, complete_event: CompleteEvent
|
||||
) -> Iterable[Completion]:
|
||||
""" Get a list of completions for the given document """
|
||||
|
||||
text = document.text_before_cursor.lstrip()
|
||||
try:
|
||||
args = shlex.split(text)
|
||||
except ValueError:
|
||||
try:
|
||||
args = shlex.split(text + '"')
|
||||
except ValueError:
|
||||
args = shlex.split(text + "'")
|
||||
|
||||
# We haven't finished typing the command. Use our word completer for
|
||||
# commands
|
||||
if text == "" or (len(args) == 1 and not text.endswith(" ")):
|
||||
yield from self.completer.get_completions(document, complete_event)
|
||||
return
|
||||
|
||||
# Not in a known command, can't autocomplete
|
||||
if args[0] not in self.layers:
|
||||
return
|
||||
|
||||
command = self.layers[args[0]]
|
||||
args = args[1:]
|
||||
next_completer = command[0]
|
||||
this_completer = command[0]
|
||||
positional = 0
|
||||
# state = "options", completing options next
|
||||
# state = "arguments", completing arguments to options next
|
||||
state = "options"
|
||||
|
||||
for arg in args:
|
||||
if state == "options":
|
||||
# Flag options
|
||||
if arg.startswith("-"):
|
||||
# Exact match, with a sub-completer
|
||||
if arg in command[2] and command[2][arg] is not None:
|
||||
# Completer for next argument
|
||||
next_completer = command[2][arg]
|
||||
state = "arguments"
|
||||
# Exact match, with no arguments
|
||||
elif arg in command[2]:
|
||||
# Command has no argument, next completer is options
|
||||
# completer
|
||||
next_completer = command[0]
|
||||
state = "options"
|
||||
this_completer = command[0]
|
||||
# Non-exact match
|
||||
else:
|
||||
next_completer = command[0]
|
||||
this_completer = command[0]
|
||||
state = "options"
|
||||
# Appears to be a positional argument, grab next positional
|
||||
# completer and increment positional count
|
||||
else:
|
||||
if positional < len(command[1]):
|
||||
this_completer = command[1][positional]
|
||||
next_completer = command[0]
|
||||
state = "options"
|
||||
positional += 1
|
||||
else:
|
||||
this_completer = command[0]
|
||||
next_completer = command[0]
|
||||
state = "options"
|
||||
else:
|
||||
# Completing an argument to a option/switch. We can't verify
|
||||
# it's legitimacy, so we assume it's right, and reset to a
|
||||
# default state.
|
||||
state = "options"
|
||||
this_completer = next_completer
|
||||
next_completer = command[0]
|
||||
|
||||
if text.endswith(" "):
|
||||
yield from next_completer.get_completions(document, complete_event)
|
||||
else:
|
||||
yield from this_completer.get_completions(document, complete_event)
|
13
pwncat/commands/back.py
Normal file
13
pwncat/commands/back.py
Normal file
@ -0,0 +1,13 @@
|
||||
#!/usr/bin/env python3
|
||||
from pwncat.commands.base import CommandDefinition, Complete, parameter
|
||||
from pwncat import util
|
||||
|
||||
|
||||
class Command(CommandDefinition):
|
||||
""" Return to the remote terminal """
|
||||
|
||||
PROG = "back"
|
||||
ARGS = {}
|
||||
|
||||
def run(self, args):
|
||||
self.pty.state = util.State.RAW
|
155
pwncat/commands/base.py
Normal file
155
pwncat/commands/base.py
Normal file
@ -0,0 +1,155 @@
|
||||
#!/usr/bin/env python3
|
||||
from typing import Dict, Any, List, Callable
|
||||
from pygments.token import *
|
||||
from enum import Enum, auto
|
||||
from functools import partial
|
||||
import argparse
|
||||
import shlex
|
||||
import os
|
||||
|
||||
|
||||
class Complete(Enum):
|
||||
# Complete from the choices array in kwargs
|
||||
CHOICES = auto()
|
||||
# Complete from a local file
|
||||
LOCAL_FILE = auto()
|
||||
# Complete from a remote file
|
||||
REMOTE_FILE = auto()
|
||||
# This argument has no parameter
|
||||
NONE = auto()
|
||||
|
||||
|
||||
class StoreConstOnce(argparse.Action):
|
||||
""" Only allow the user to store a value in the destination once. This prevents
|
||||
users from selection multiple actions in the privesc parser. """
|
||||
|
||||
def __call__(self, parser, namespace, values, option_string=None):
|
||||
if hasattr(self, "__" + self.dest + "_seen"):
|
||||
raise argparse.ArgumentError(self, "only one action may be specified")
|
||||
setattr(self, "__" + self.dest + "_seen", True)
|
||||
setattr(namespace, self.dest, self.const)
|
||||
|
||||
|
||||
def StoreForAction(action: List[str]) -> Callable:
|
||||
""" Generates a custom argparse Action subclass which verifies that the current
|
||||
selected "action" option is one of the provided actions in this function. If
|
||||
not, an error is raised. """
|
||||
|
||||
class StoreFor(argparse.Action):
|
||||
""" Store the value if the currently selected action matches the list of
|
||||
actions passed to this function. """
|
||||
|
||||
def __call__(self, parser, namespace, values, option_string=None):
|
||||
if getattr(namespace, "action", None) not in action:
|
||||
raise argparse.ArgumentError(
|
||||
self, f"{option_string}: only valid for {action}",
|
||||
)
|
||||
|
||||
setattr(namespace, self.dest, values)
|
||||
|
||||
return StoreFor
|
||||
|
||||
|
||||
def RemoteFileType(file_exist=True, directory_exist=False):
|
||||
def _type(command: "CommandDefinition", name: str):
|
||||
""" Ensures that the remote file named exists. This should only be used for
|
||||
arguments which represent files on the remote system which should be viewable
|
||||
by the running user (e.g. not helpful for privesc methods). """
|
||||
|
||||
# Attempt to find the "test" command
|
||||
test = command.pty.which("test")
|
||||
if test is None:
|
||||
test = command.pty.which("[")
|
||||
|
||||
# No test command, this is a nicety, not a necessity.
|
||||
if test is None:
|
||||
return name
|
||||
|
||||
# Check if the file exists
|
||||
if file_exist:
|
||||
result = command.pty.run(f"{test} -f {shlex.quote(name)} && echo exists")
|
||||
if b"exists" not in result:
|
||||
raise argparse.ArgumentTypeError(f"{name}: no such file or directory")
|
||||
elif directory_exist:
|
||||
dirpath = os.path.dirname(name)
|
||||
result = command.pty.run(f"{test} -d {shlex.quote(dirpath)} && echo exists")
|
||||
if b"exists" not in result:
|
||||
raise argparse.ArgumentTypeError(
|
||||
f"{dirpath}: no such file or directory"
|
||||
)
|
||||
|
||||
# it exists
|
||||
return name
|
||||
|
||||
return _type
|
||||
|
||||
|
||||
def parameter(complete, token=Name.Label, *args, **kwargs):
|
||||
""" Build a parameter tuple from argparse arguments """
|
||||
return (complete, token, args, kwargs)
|
||||
|
||||
|
||||
class CommandDefinition:
|
||||
""" Default help/description goes here """
|
||||
|
||||
PROG = "unimplemented"
|
||||
ARGS = {}
|
||||
DEFAULTS = {}
|
||||
|
||||
# An example definition of arguments
|
||||
# PROG = "command"
|
||||
# ARGS = {
|
||||
# "--all,-a": parameter(
|
||||
# Complete.NONE, action="store_true", help="A switch/option"
|
||||
# ),
|
||||
# "--file,-f": parameter(Complete.LOCAL_FILE, help="A local file"),
|
||||
# "--rfile": parameter(Complete.REMOTE_FILE, help="A remote file"),
|
||||
# "positional": parameter(
|
||||
# Complete.CHOICES, choices=["a", "b", "c"], help="Choose one!"
|
||||
# ),
|
||||
# }
|
||||
|
||||
def __init__(self, pty: "pwncat.pty.PtyHandler", cmdparser: "CommandParser"):
|
||||
""" Initialize a new command instance. Parse the local arguments array
|
||||
into an argparse object. """
|
||||
|
||||
self.pty = pty
|
||||
self.cmdparser = cmdparser
|
||||
|
||||
# Create the parser object
|
||||
self.parser = argparse.ArgumentParser(prog=self.PROG, description=self.__doc__)
|
||||
|
||||
self.build_parser(self.parser, self.ARGS)
|
||||
|
||||
def run(self, args):
|
||||
""" Perform whatever your command is. `args` has already been parsed with
|
||||
your argparse definitions. """
|
||||
raise NotImplementedError
|
||||
|
||||
def build_parser(self, parser: argparse.ArgumentParser, args: Dict[str, Any]):
|
||||
""" Fill the given parser with arguments based on the dict """
|
||||
|
||||
for arg, descr in args.items():
|
||||
names = arg.split(",")
|
||||
|
||||
# Patch choice to work with a callable
|
||||
if "choices" in descr[3] and callable(descr[3]["choices"]):
|
||||
method = descr[3]["choices"]
|
||||
|
||||
class wrapper:
|
||||
def __iter__(wself):
|
||||
yield from method(self)
|
||||
|
||||
descr[3]["choices"] = wrapper()
|
||||
|
||||
# Patch "type" so we can see "self"
|
||||
if (
|
||||
"type" in descr[3]
|
||||
and isinstance(descr[3]["type"], tuple)
|
||||
and descr[3]["type"][0] == "method"
|
||||
):
|
||||
descr[3]["type"] = partial(descr[3]["type"][1], self)
|
||||
|
||||
parser.add_argument(*names, *descr[2], **descr[3])
|
||||
|
||||
parser.set_defaults(**self.DEFAULTS)
|
78
pwncat/commands/busybox.py
Normal file
78
pwncat/commands/busybox.py
Normal file
@ -0,0 +1,78 @@
|
||||
#!/usr/bin/env python3
|
||||
from colorama import Fore
|
||||
from pwncat.commands.base import (
|
||||
CommandDefinition,
|
||||
Complete,
|
||||
parameter,
|
||||
StoreConstOnce,
|
||||
StoreForAction,
|
||||
)
|
||||
from pwncat import util
|
||||
|
||||
|
||||
class Command(CommandDefinition):
|
||||
""" Manage installation of a known-good busybox binary on the remote system.
|
||||
After installing busybox, pwncat will be able to utilize it's functionality
|
||||
to augment or stabilize local binaries. This command can download a remote
|
||||
busybox binary appropriate for the remote architecture and then upload it
|
||||
to the remote system. """
|
||||
|
||||
PROG = "busybox"
|
||||
ARGS = {
|
||||
"--list,-l": parameter(
|
||||
Complete.NONE,
|
||||
action=StoreConstOnce,
|
||||
nargs=0,
|
||||
const="list",
|
||||
dest="action",
|
||||
help="List applets which the remote busybox provides",
|
||||
),
|
||||
"--install,-i": parameter(
|
||||
Complete.NONE,
|
||||
action=StoreConstOnce,
|
||||
nargs=0,
|
||||
const="install",
|
||||
dest="action",
|
||||
help="Install busybox on the remote host for use with pwncat",
|
||||
),
|
||||
"--status,-s": parameter(
|
||||
Complete.NONE,
|
||||
action=StoreConstOnce,
|
||||
nargs=0,
|
||||
const="status",
|
||||
dest="action",
|
||||
help="List the current busybox installation status",
|
||||
),
|
||||
"--url,-u": parameter(
|
||||
Complete.NONE,
|
||||
action=StoreForAction(["install"]),
|
||||
nargs=1,
|
||||
help="The base URL to download busybox binaries from (default: 1.31.0-defconfig-multiarch-musl)",
|
||||
default=(
|
||||
"https://busybox.net/downloads/binaries/1.31.0-defconfig-multiarch-musl/"
|
||||
),
|
||||
),
|
||||
}
|
||||
DEFAULTS = {"action": "status"}
|
||||
|
||||
def run(self, args):
|
||||
|
||||
if args.action == "list":
|
||||
if not self.pty.has_busybox:
|
||||
util.error("busybox hasn't been installed yet (hint: run 'busybox'")
|
||||
return
|
||||
util.info("binaries which the remote busybox provides:")
|
||||
for name in self.pty.busybox_provides:
|
||||
print(f" * {name}")
|
||||
elif args.action == "status":
|
||||
if not self.pty.has_busybox:
|
||||
util.error("busybox hasn't been installed yet")
|
||||
return
|
||||
util.info(
|
||||
f"busybox is installed to: {Fore.BLUE}{self.pty.busybox_path}{Fore.RESET}"
|
||||
)
|
||||
util.info(
|
||||
f"busybox provides {Fore.GREEN}{len(self.pty.busybox_provides)}{Fore.RESET} applets"
|
||||
)
|
||||
elif args.action == "install":
|
||||
self.pty.bootstrap_busybox(args.url)
|
51
pwncat/commands/download.py
Normal file
51
pwncat/commands/download.py
Normal file
@ -0,0 +1,51 @@
|
||||
#!/usr/bin/env python3
|
||||
from pwncat.commands.base import (
|
||||
CommandDefinition,
|
||||
Complete,
|
||||
parameter,
|
||||
StoreConstOnce,
|
||||
StoreForAction,
|
||||
RemoteFileType,
|
||||
)
|
||||
from functools import partial
|
||||
from colorama import Fore
|
||||
from pwncat import util
|
||||
import argparse
|
||||
import datetime
|
||||
import time
|
||||
import os
|
||||
|
||||
|
||||
class Command(CommandDefinition):
|
||||
""" Download a file from the remote host to the local host"""
|
||||
|
||||
PROG = "download"
|
||||
ARGS = {
|
||||
"source": parameter(Complete.REMOTE_FILE),
|
||||
"destination": parameter(Complete.LOCAL_FILE),
|
||||
}
|
||||
|
||||
def run(self, args):
|
||||
|
||||
try:
|
||||
length = self.pty.get_file_size(args.source)
|
||||
started = time.time()
|
||||
with open(args.destination, "wb") as destination:
|
||||
with self.pty.open(args.source, "rb", length=length) as source:
|
||||
util.with_progress(
|
||||
[
|
||||
("", "downloading "),
|
||||
("fg:ansigreen", args.source),
|
||||
("", " to "),
|
||||
("fg:ansired", args.destination),
|
||||
],
|
||||
partial(util.copyfileobj, source, destination),
|
||||
length=length,
|
||||
)
|
||||
elapsed = time.time() - started
|
||||
util.success(
|
||||
f"downloaded {Fore.CYAN}{util.human_readable_size(length)}{Fore.RESET} "
|
||||
f"in {Fore.GREEN}{util.human_readable_delta(elapsed)}{Fore.RESET}"
|
||||
)
|
||||
except (FileNotFoundError, PermissionError, IsADirectoryError) as exc:
|
||||
self.parser.error(str(exc))
|
25
pwncat/commands/help.py
Normal file
25
pwncat/commands/help.py
Normal file
@ -0,0 +1,25 @@
|
||||
#!/usr/bin/env python3
|
||||
from pwncat.commands.base import CommandDefinition, Complete, parameter
|
||||
from pwncat import util
|
||||
|
||||
|
||||
class Command(CommandDefinition):
|
||||
""" List known commands and print their associated help documentation. """
|
||||
|
||||
def get_command_names(self):
|
||||
""" Get the list of known commands """
|
||||
return [c.PROG for c in self.cmdparser.commands]
|
||||
|
||||
PROG = "help"
|
||||
ARGS = {"topic": parameter(Complete.CHOICES, choices=get_command_names, nargs="?")}
|
||||
|
||||
def run(self, args):
|
||||
if args.topic:
|
||||
for command in self.cmdparser.commands:
|
||||
if command.PROG == args.topic:
|
||||
command.parser.print_help()
|
||||
break
|
||||
else:
|
||||
util.info("the following commands are available:")
|
||||
for command in self.cmdparser.commands:
|
||||
print(f" * {command.PROG}")
|
180
pwncat/commands/privesc.py
Normal file
180
pwncat/commands/privesc.py
Normal file
@ -0,0 +1,180 @@
|
||||
#!/usr/bin/env python3
|
||||
from typing import List, Callable
|
||||
from pwncat.commands.base import (
|
||||
CommandDefinition,
|
||||
Complete,
|
||||
parameter,
|
||||
StoreConstOnce,
|
||||
StoreForAction,
|
||||
)
|
||||
from pwncat import util, privesc
|
||||
from pwncat.util import State
|
||||
from colorama import Fore
|
||||
import argparse
|
||||
import shutil
|
||||
import sys
|
||||
|
||||
|
||||
class Command(CommandDefinition):
|
||||
""" Attempt various privilege escalation methods. This command will attempt
|
||||
search for privilege escalation across all known modules. Privilege escalation
|
||||
routes can grant file read, file write or shell capabilities. The "escalate"
|
||||
mode will attempt to abuse any of these to gain a shell.
|
||||
|
||||
Further, escalation and file read/write actions will attempt to escalate multiple
|
||||
times to reach the target user if possible, attempting all known escalation paths
|
||||
until one arrives at the target user. """
|
||||
|
||||
def get_user_choices(self):
|
||||
""" Get a list of all users on the remote machine. This is used for
|
||||
parameter checking and tab completion of the "users" parameter below. """
|
||||
return list(self.pty.users)
|
||||
|
||||
PROG = "privesc"
|
||||
ARGS = {
|
||||
"--list,-l": parameter(
|
||||
Complete.NONE,
|
||||
action=StoreConstOnce,
|
||||
nargs=0,
|
||||
const="list",
|
||||
dest="action",
|
||||
help="Enumerate and list available privesc techniques",
|
||||
),
|
||||
"--all,-a": parameter(
|
||||
Complete.NONE,
|
||||
action="store_const",
|
||||
dest="user",
|
||||
const=None,
|
||||
help="list escalations for all users",
|
||||
),
|
||||
"--user,-u": parameter(
|
||||
Complete.CHOICES,
|
||||
default="root",
|
||||
choices=get_user_choices,
|
||||
metavar="USER",
|
||||
help="the user to gain privileges as",
|
||||
),
|
||||
"--max-depth,-m": parameter(
|
||||
Complete.NONE,
|
||||
default=None,
|
||||
type=int,
|
||||
help="Maximum depth for the privesc search (default: no maximum)",
|
||||
),
|
||||
"--read,-r": parameter(
|
||||
Complete.NONE,
|
||||
action=StoreConstOnce,
|
||||
nargs=0,
|
||||
const="read",
|
||||
dest="action",
|
||||
help="Attempt to read a remote file as the specified user",
|
||||
),
|
||||
"--write,-w": parameter(
|
||||
Complete.NONE,
|
||||
action=StoreConstOnce,
|
||||
nargs=0,
|
||||
const="write",
|
||||
dest="action",
|
||||
help="Attempt to write a remote file as the specified user",
|
||||
),
|
||||
"--path,-p": parameter(
|
||||
Complete.REMOTE_FILE,
|
||||
action=StoreForAction(["write", "read"]),
|
||||
help="Remote path for read or write actions",
|
||||
),
|
||||
"--escalate,-e": parameter(
|
||||
Complete.NONE,
|
||||
action=StoreConstOnce,
|
||||
nargs=0,
|
||||
const="escalate",
|
||||
dest="action",
|
||||
help="Attempt to escalate to gain a full shell as the target user",
|
||||
),
|
||||
"--data,-d": parameter(
|
||||
Complete.LOCAL_FILE,
|
||||
action=StoreForAction(["write"]),
|
||||
default=None,
|
||||
help="The local file to write to the remote file",
|
||||
),
|
||||
}
|
||||
DEFAULTS = {"action": "list"}
|
||||
|
||||
def run(self, args):
|
||||
|
||||
if args.action == "list":
|
||||
techniques = self.pty.privesc.search(args.user)
|
||||
if len(techniques) == 0:
|
||||
util.warn("no techniques found")
|
||||
else:
|
||||
for tech in techniques:
|
||||
util.info(f" - {tech}")
|
||||
elif args.action == "read":
|
||||
if not args.path:
|
||||
self.parser.error("missing required argument: --path")
|
||||
try:
|
||||
read_pipe, chain = self.pty.privesc.read_file(
|
||||
args.path, args.user, args.max_depth
|
||||
)
|
||||
util.success("file successfully opened!")
|
||||
|
||||
# Read the data from the pipe
|
||||
shutil.copyfileobj(read_pipe, sys.stdout.buffer)
|
||||
read_pipe.close()
|
||||
|
||||
# Unwrap in case we had to privesc to get here
|
||||
self.pty.privesc.unwrap(chain)
|
||||
|
||||
except privesc.PrivescError as exc:
|
||||
util.error(f"read file failed: {exc}")
|
||||
elif args.action == "write":
|
||||
# Make sure the correct arguments are present
|
||||
if not args.path:
|
||||
self.parser.error("missing required argument: --path")
|
||||
if not args.data:
|
||||
self.parser.error("missing required argument: --data")
|
||||
|
||||
# Read in the data file
|
||||
with open(args.data, "rb") as f:
|
||||
data = f.read()
|
||||
|
||||
try:
|
||||
# Attempt to write the data to the remote file
|
||||
chain = self.pty.privesc.write_file(
|
||||
args.path, data, target_user=args.user, depth=args.max_depth,
|
||||
)
|
||||
self.pty.privesc.unwrap(chain)
|
||||
util.success("file written successfully!")
|
||||
except privesc.PrivescError as exc:
|
||||
util.error(f"file write failed: {exc}")
|
||||
elif args.action == "escalate":
|
||||
try:
|
||||
chain = self.pty.privesc.escalate(args.user, args.max_depth)
|
||||
|
||||
ident = self.pty.id
|
||||
backdoor = False
|
||||
if ident["euid"]["id"] == 0 and ident["uid"]["id"] != 0:
|
||||
util.progress(
|
||||
"EUID != UID. installing backdoor to complete privesc"
|
||||
)
|
||||
try:
|
||||
self.pty.privesc.add_backdoor()
|
||||
backdoor = True
|
||||
except privesc.PrivescError as exc:
|
||||
util.warn(f"backdoor installation failed: {exc}")
|
||||
|
||||
util.success("privilege escalation succeeded using:")
|
||||
for i, (technique, _) in enumerate(chain):
|
||||
arrow = f"{Fore.YELLOW}\u2ba1{Fore.RESET} "
|
||||
print(f"{(i+1)*' '}{arrow}{technique}")
|
||||
|
||||
if backdoor:
|
||||
print(
|
||||
(
|
||||
f"{(len(chain)+1)*' '}{arrow}"
|
||||
f"{Fore.YELLOW}pwncat{Fore.RESET} backdoor"
|
||||
)
|
||||
)
|
||||
|
||||
self.pty.reset()
|
||||
self.pty.state = State.RAW
|
||||
except privesc.PrivescError as exc:
|
||||
util.error(f"escalation failed: {exc}")
|
33
pwncat/commands/sync.py
Normal file
33
pwncat/commands/sync.py
Normal file
@ -0,0 +1,33 @@
|
||||
#!/usr/bin/env python3
|
||||
from pwncat.commands.base import CommandDefinition, Complete, parameter
|
||||
from pwncat import util
|
||||
import os
|
||||
|
||||
|
||||
class Command(CommandDefinition):
|
||||
""" Synchronize the remote terminal with the local terminal. This will
|
||||
attempt to set the remote prompt, terminal width, terminal height, and TERM
|
||||
environment variables to enable to cleanest interface to the remote system
|
||||
possible. """
|
||||
|
||||
PROG = "sync"
|
||||
ARGS = {}
|
||||
DEFAULTS = {}
|
||||
|
||||
def run(self, args):
|
||||
|
||||
# Get the terminal type
|
||||
TERM = os.environ.get("TERM", None)
|
||||
if TERM is None:
|
||||
util.warn("no local TERM set. falling back to 'xterm'")
|
||||
TERM = "xterm"
|
||||
|
||||
# Get the width and height
|
||||
columns, rows = os.get_terminal_size(0)
|
||||
|
||||
# Update the state
|
||||
self.pty.run(
|
||||
f"stty rows {rows};" f"stty columns {columns};" f"export TERM='{TERM}'"
|
||||
)
|
||||
|
||||
util.success("terminal state synchronized")
|
26
pwncat/commands/test.py
Normal file
26
pwncat/commands/test.py
Normal file
@ -0,0 +1,26 @@
|
||||
#!/usr/bin/env python3
|
||||
from pygments.token import *
|
||||
|
||||
from pwncat.commands.base import CommandDefinition, Complete, parameter
|
||||
|
||||
|
||||
class Command(CommandDefinition):
|
||||
""" Command Description """
|
||||
|
||||
PROG = "test"
|
||||
ARGS = {
|
||||
"--remote,-r": parameter(
|
||||
Complete.REMOTE_FILE, Name.LABEL, help="Argument Help"
|
||||
),
|
||||
"--local,-l": parameter(Complete.LOCAL_FILE, Name.LABEL, help="Argument Help"),
|
||||
"--choice,-c": parameter(
|
||||
Complete.CHOICES,
|
||||
Name.LABEL,
|
||||
choices=["one", "two", "three"],
|
||||
help="Select one!",
|
||||
),
|
||||
}
|
||||
|
||||
def run(self, args):
|
||||
|
||||
print(args.arg)
|
56
pwncat/commands/upload.py
Normal file
56
pwncat/commands/upload.py
Normal file
@ -0,0 +1,56 @@
|
||||
#!/usr/bin/env python3
|
||||
from pwncat.commands.base import (
|
||||
CommandDefinition,
|
||||
Complete,
|
||||
parameter,
|
||||
StoreConstOnce,
|
||||
StoreForAction,
|
||||
RemoteFileType,
|
||||
)
|
||||
from functools import partial
|
||||
from colorama import Fore
|
||||
from pwncat import util
|
||||
import argparse
|
||||
import datetime
|
||||
import time
|
||||
import os
|
||||
|
||||
|
||||
class Command(CommandDefinition):
|
||||
""" Upload a file from the local host to the remote host"""
|
||||
|
||||
PROG = "upload"
|
||||
ARGS = {
|
||||
"source": parameter(Complete.LOCAL_FILE),
|
||||
"destination": parameter(
|
||||
Complete.REMOTE_FILE,
|
||||
type=("method", RemoteFileType(file_exist=False, directory_exist=True)),
|
||||
),
|
||||
}
|
||||
|
||||
def run(self, args):
|
||||
|
||||
try:
|
||||
length = os.path.getsize(args.source)
|
||||
started = time.time()
|
||||
with open(args.source, "rb") as source:
|
||||
with self.pty.open(
|
||||
args.destination, "wb", length=length
|
||||
) as destination:
|
||||
util.with_progress(
|
||||
[
|
||||
("", "uploading "),
|
||||
("fg:ansigreen", args.source),
|
||||
("", " to "),
|
||||
("fg:ansired", args.destination),
|
||||
],
|
||||
partial(util.copyfileobj, source, destination),
|
||||
length=length,
|
||||
)
|
||||
elapsed = time.time() - started
|
||||
util.success(
|
||||
f"uploaded {Fore.CYAN}{util.human_readable_size(length)}{Fore.RESET} "
|
||||
f"in {Fore.GREEN}{util.human_readable_delta(elapsed)}{Fore.RESET}"
|
||||
)
|
||||
except (FileNotFoundError, PermissionError, IsADirectoryError) as exc:
|
||||
self.parser.error(str(exc))
|
@ -22,7 +22,6 @@ from pwncat import util
|
||||
# privesc_methods = [SetuidMethod, SuMethod]
|
||||
# privesc_methods = [SuMethod, SudoMethod, SetuidMethod, DirtycowMethod, ScreenMethod]
|
||||
privesc_methods = [SuMethod, SudoMethod, SetuidMethod]
|
||||
# privesc_methods = [SuMethod, SetuidMethod]
|
||||
|
||||
|
||||
class Finder:
|
||||
|
872
pwncat/pty.py
872
pwncat/pty.py
File diff suppressed because it is too large
Load Diff
@ -2,9 +2,11 @@
|
||||
from typing import Tuple, BinaryIO, Callable
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
from socketserver import TCPServer, BaseRequestHandler
|
||||
from prompt_toolkit.shortcuts import ProgressBar
|
||||
from functools import partial
|
||||
from colorama import Fore, Style
|
||||
from io import TextIOWrapper
|
||||
from enum import Enum, auto
|
||||
import netifaces
|
||||
import socket
|
||||
import string
|
||||
@ -13,6 +15,7 @@ import threading
|
||||
import logging
|
||||
import termios
|
||||
import fcntl
|
||||
import time
|
||||
import tty
|
||||
import sys
|
||||
import os
|
||||
@ -22,6 +25,46 @@ CTRL_C = b"\x03"
|
||||
ALPHANUMERIC = string.ascii_letters + string.digits
|
||||
|
||||
|
||||
class State(Enum):
|
||||
""" The current PtyHandler state """
|
||||
|
||||
NORMAL = auto()
|
||||
RAW = auto()
|
||||
COMMAND = auto()
|
||||
SINGLE = auto()
|
||||
|
||||
|
||||
def human_readable_size(size, decimal_places=2):
|
||||
for unit in ["B", "KiB", "MiB", "GiB", "TiB"]:
|
||||
if size < 1024.0:
|
||||
break
|
||||
size /= 1024.0
|
||||
return f"{size:.{decimal_places}f}{unit}"
|
||||
|
||||
|
||||
def human_readable_delta(seconds):
|
||||
""" This produces a human-readable time-delta output suitable for output to
|
||||
the terminal. It assumes that "seconds" is less than 1 day. I.e. it will only
|
||||
display at most, hours minutes and seconds. """
|
||||
|
||||
if seconds < 60:
|
||||
return f"{seconds:.2f} seconds"
|
||||
|
||||
output = []
|
||||
output.append(f"{int(seconds%60)} seconds")
|
||||
|
||||
minutes = seconds // 60
|
||||
output.append(f"{minutes % 60} minutes")
|
||||
|
||||
if minutes < 60:
|
||||
return f"{output[1]} and {output[0]}"
|
||||
|
||||
hours = minutes // 60
|
||||
output.append(f"{hours} hours")
|
||||
|
||||
return f"{output[2]}, {output[1]} and {output[0]}"
|
||||
|
||||
|
||||
def copyfileobj(src, dst, callback, nomv=False):
|
||||
""" Copy a file object to another file object with a callback.
|
||||
This method assumes that both files are binary and support readinto
|
||||
@ -39,7 +82,7 @@ def copyfileobj(src, dst, callback, nomv=False):
|
||||
for chunk in iter(lambda: src.read(length), b""):
|
||||
dst.write(chunk)
|
||||
copied += len(chunk)
|
||||
callback(copied, len(chunk))
|
||||
callback(len(chunk))
|
||||
else:
|
||||
with memoryview(bytearray(length)) as mv:
|
||||
while True:
|
||||
@ -52,7 +95,39 @@ def copyfileobj(src, dst, callback, nomv=False):
|
||||
else:
|
||||
dst.write(mv)
|
||||
copied += n
|
||||
callback(copied, n)
|
||||
callback(n)
|
||||
|
||||
|
||||
def with_progress(title: str, target: Callable[[Callable], None], length: int = None):
|
||||
""" A shortcut to displaying a progress bar for various things. It will
|
||||
start a prompt_toolkit progress bar with the given title and a counter
|
||||
with the given length. Then, it will call `target` with an `on_progress`
|
||||
parameter. This parameter should be called for all progress updates. See
|
||||
the `do_upload` and `do_download` for examples w/ copyfileobj """
|
||||
|
||||
with ProgressBar(title) as pb:
|
||||
counter = pb(range(length))
|
||||
last_update = time.time()
|
||||
|
||||
def on_progress(blocksz):
|
||||
""" Update the progress bar """
|
||||
if blocksz == -1:
|
||||
counter.stopped = True
|
||||
counter.done = True
|
||||
pb.invalidate()
|
||||
return
|
||||
|
||||
counter.items_completed += blocksz
|
||||
if counter.items_completed >= counter.total:
|
||||
counter.done = True
|
||||
counter.stopped = True
|
||||
if (time.time() - last_update) > 0.1:
|
||||
pb.invalidate()
|
||||
|
||||
target(on_progress)
|
||||
|
||||
# https://github.com/prompt-toolkit/python-prompt-toolkit/issues/964
|
||||
time.sleep(0.1)
|
||||
|
||||
|
||||
def random_string(length: int = 8):
|
||||
@ -66,9 +141,6 @@ def enter_raw_mode():
|
||||
returns: the old state of the terminal
|
||||
"""
|
||||
|
||||
info("setting terminal to raw mode and disabling echo", overlay=True)
|
||||
success("pwncat is ready 🐈\n", overlay=True)
|
||||
|
||||
# Ensure we don't have any weird buffering issues
|
||||
sys.stdout.flush()
|
||||
|
||||
@ -111,7 +183,6 @@ def restore_terminal(state):
|
||||
# tty.setcbreak(sys.stdin)
|
||||
fcntl.fcntl(sys.stdin, fcntl.F_SETFL, state[1])
|
||||
sys.stdout.write("\n")
|
||||
info("local terminal restored")
|
||||
|
||||
|
||||
def get_ip_addr() -> str:
|
||||
|
Loading…
Reference in New Issue
Block a user