mirror of
https://github.com/calebstewart/pwncat.git
synced 2024-11-27 19:04:15 +01:00
All old commands ported over
This commit is contained in:
parent
b1f3c54087
commit
45810027d0
@ -106,11 +106,12 @@ def main():
|
||||
sys.stdout.buffer.write(data)
|
||||
sys.stdout.flush()
|
||||
except ConnectionResetError:
|
||||
handler.restore()
|
||||
handler.restore_local_term()
|
||||
util.warn("connection reset by remote host")
|
||||
finally:
|
||||
# Restore the shell
|
||||
handler.restore()
|
||||
handler.restore_local_term()
|
||||
util.success("local terminal restored")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -18,6 +18,7 @@ from prompt_toolkit.lexers import PygmentsLexer
|
||||
from prompt_toolkit.document import Document
|
||||
from pygments.styles import get_style_by_name
|
||||
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
|
||||
from prompt_toolkit.history import InMemoryHistory
|
||||
from typing import Dict, Any, List, Iterable
|
||||
from enum import Enum, auto
|
||||
import argparse
|
||||
@ -29,6 +30,7 @@ import re
|
||||
from pprint import pprint
|
||||
|
||||
from pwncat.commands.base import CommandDefinition, Complete
|
||||
from pwncat.util import State
|
||||
from pwncat import util
|
||||
|
||||
|
||||
@ -50,6 +52,7 @@ class CommandParser:
|
||||
.Command(pty, self)
|
||||
)
|
||||
|
||||
history = InMemoryHistory()
|
||||
completer = CommandCompleter(pty, self.commands)
|
||||
lexer = PygmentsLexer(CommandLexer.build(self.commands))
|
||||
style = style_from_pygments_cls(get_style_by_name("monokai"))
|
||||
@ -66,10 +69,35 @@ class CommandParser:
|
||||
style=style,
|
||||
auto_suggest=auto_suggest,
|
||||
complete_while_typing=False,
|
||||
history=history,
|
||||
)
|
||||
self.toolbar = PromptSession(
|
||||
[
|
||||
("fg:ansiyellow bold", "(local) "),
|
||||
("fg:ansimagenta bold", "pwncat"),
|
||||
("", "$ "),
|
||||
],
|
||||
completer=completer,
|
||||
lexer=lexer,
|
||||
style=style,
|
||||
auto_suggest=auto_suggest,
|
||||
complete_while_typing=False,
|
||||
prompt_in_toolbar=True,
|
||||
history=history,
|
||||
)
|
||||
|
||||
self.pty = pty
|
||||
|
||||
def run_single(self):
|
||||
|
||||
try:
|
||||
line = self.toolbar.prompt().strip()
|
||||
except (EOFError, OSError, KeyboardInterrupt):
|
||||
pass
|
||||
else:
|
||||
if line != "":
|
||||
self.dispatch_line(line)
|
||||
|
||||
def run(self):
|
||||
|
||||
self.running = True
|
||||
@ -79,7 +107,7 @@ class CommandParser:
|
||||
try:
|
||||
line = self.prompt.prompt().strip()
|
||||
except (EOFError, OSError):
|
||||
self.pty.enter_raw()
|
||||
self.pty.state = State.RAW
|
||||
self.running = False
|
||||
continue
|
||||
|
||||
@ -165,7 +193,7 @@ class CommandLexer(RegexLexer):
|
||||
class RemotePathCompleter(Completer):
|
||||
""" Complete remote file names/paths """
|
||||
|
||||
def __init__(self, pty: "PtyHandler"):
|
||||
def __init__(self, pty: "pwncat.pty.PtyHandler"):
|
||||
self.pty = pty
|
||||
|
||||
def get_completions(self, document: Document, complete_event: CompleteEvent):
|
||||
|
@ -1,5 +1,6 @@
|
||||
#!/usr/bin/env python3
|
||||
from pwncat.commands.base import CommandDefinition, Complete, parameter
|
||||
from pwncat import util
|
||||
|
||||
|
||||
class Command(CommandDefinition):
|
||||
@ -9,4 +10,4 @@ class Command(CommandDefinition):
|
||||
ARGS = {}
|
||||
|
||||
def run(self, args):
|
||||
self.pty.enter_raw()
|
||||
self.pty.state = util.State.RAW
|
||||
|
@ -8,6 +8,7 @@ from pwncat.commands.base import (
|
||||
StoreForAction,
|
||||
)
|
||||
from pwncat import util, privesc
|
||||
from pwncat.util import State
|
||||
from colorama import Fore
|
||||
import argparse
|
||||
import shutil
|
||||
@ -174,6 +175,6 @@ class Command(CommandDefinition):
|
||||
)
|
||||
|
||||
self.pty.reset()
|
||||
self.pty.do_back([])
|
||||
self.pty.state = State.RAW
|
||||
except privesc.PrivescError as exc:
|
||||
util.error(f"escalation failed: {exc}")
|
||||
|
33
pwncat/commands/sync.py
Normal file
33
pwncat/commands/sync.py
Normal file
@ -0,0 +1,33 @@
|
||||
#!/usr/bin/env python3
|
||||
from pwncat.commands.base import CommandDefinition, Complete, parameter
|
||||
from pwncat import util
|
||||
import os
|
||||
|
||||
|
||||
class Command(CommandDefinition):
|
||||
""" Synchronize the remote terminal with the local terminal. This will
|
||||
attempt to set the remote prompt, terminal width, terminal height, and TERM
|
||||
environment variables to enable to cleanest interface to the remote system
|
||||
possible. """
|
||||
|
||||
PROG = "sync"
|
||||
ARGS = {}
|
||||
DEFAULTS = {}
|
||||
|
||||
def run(self, args):
|
||||
|
||||
# Get the terminal type
|
||||
TERM = os.environ.get("TERM", None)
|
||||
if TERM is None:
|
||||
util.warn("no local TERM set. falling back to 'xterm'")
|
||||
TERM = "xterm"
|
||||
|
||||
# Get the width and height
|
||||
columns, rows = os.get_terminal_size(0)
|
||||
|
||||
# Update the state
|
||||
self.pty.run(
|
||||
f"stty rows {rows};" f"stty columns {columns};" f"export TERM='{TERM}'"
|
||||
)
|
||||
|
||||
util.success("terminal state synchronized")
|
574
pwncat/pty.py
574
pwncat/pty.py
@ -1,5 +1,5 @@
|
||||
#!/usr/bin/env python3
|
||||
from typing import Dict, Optional, Iterable, IO, Callable
|
||||
from typing import Dict, Optional, Iterable, IO, Callable, Any
|
||||
from prompt_toolkit import PromptSession, ANSI
|
||||
from prompt_toolkit.shortcuts import ProgressBar
|
||||
from prompt_toolkit.completion import (
|
||||
@ -31,6 +31,7 @@ import os
|
||||
import re
|
||||
import io
|
||||
|
||||
from pwncat.util import State
|
||||
from pwncat import util
|
||||
from pwncat import downloader, uploader, privesc
|
||||
from pwncat.file import RemoteBinaryPipe
|
||||
@ -41,14 +42,6 @@ from pwncat.commands import CommandParser
|
||||
from colorama import Fore
|
||||
|
||||
|
||||
class State(enum.Enum):
|
||||
""" The current PtyHandler state """
|
||||
|
||||
NORMAL = enum.auto()
|
||||
RAW = enum.auto()
|
||||
COMMAND = enum.auto()
|
||||
|
||||
|
||||
def with_parser(f):
|
||||
@wraps(f)
|
||||
def _decorator(self, argv):
|
||||
@ -123,6 +116,7 @@ class CommandCompleter(Completer):
|
||||
# Split document.
|
||||
text = document.text_before_cursor.lstrip()
|
||||
stripped_len = len(document.text_before_cursor) - len(text)
|
||||
prev_term: Optional[str]
|
||||
|
||||
# If there is a space, check for the first term, and use a
|
||||
# subcompleter.
|
||||
@ -191,12 +185,12 @@ class PtyHandler:
|
||||
local terminal if requested and exit raw mode. """
|
||||
|
||||
self.client = client
|
||||
self.state = "normal"
|
||||
self._state = State.COMMAND
|
||||
self.saved_term_state = None
|
||||
self.input = b""
|
||||
self.lhost = None
|
||||
self.known_binaries = {}
|
||||
self.known_users = {}
|
||||
self.known_binaries: Dict[str, Optional[str]] = {}
|
||||
self.known_users: Dict[str, Any] = {}
|
||||
self.vars = {"lhost": util.get_ip_addr()}
|
||||
self.remote_prefix = "\\[\\033[01;31m\\](remote)\\[\\033[00m\\]"
|
||||
self.remote_prompt = (
|
||||
@ -205,7 +199,7 @@ class PtyHandler:
|
||||
)
|
||||
self.prompt = self.build_prompt_session()
|
||||
self.has_busybox = False
|
||||
self.busybox_path = None
|
||||
self.busybox_path: Optional[str] = None
|
||||
self.binary_aliases = {
|
||||
"python": [
|
||||
"python2",
|
||||
@ -222,9 +216,6 @@ class PtyHandler:
|
||||
self.gtfo: GTFOBins = GTFOBins("data/gtfobins.json", self.which)
|
||||
self.default_privkey = "./data/pwncat"
|
||||
|
||||
# Setup the argument parsers for local the local prompt
|
||||
self.setup_command_parsers()
|
||||
|
||||
# We should always get a response within 3 seconds...
|
||||
self.client.settimeout(1)
|
||||
|
||||
@ -280,11 +271,6 @@ class PtyHandler:
|
||||
if self.arch == "amd64":
|
||||
self.arch = "x86_64"
|
||||
|
||||
# Check if we are on BSD
|
||||
response = self.run("uname -a").decode("utf-8").strip()
|
||||
if "bsd" in response.lower():
|
||||
self.bootstrap_bsd()
|
||||
|
||||
# Ensure history is disabled
|
||||
util.info("disabling remote command history", overlay=True)
|
||||
self.run("unset HISTFILE; export HISTCONTROL=ignorespace")
|
||||
@ -357,10 +343,6 @@ class PtyHandler:
|
||||
# Disable automatic margins, which fuck up the prompt
|
||||
self.run("tput rmam")
|
||||
|
||||
# Synchronize the terminals
|
||||
util.info("synchronizing terminal state", overlay=True)
|
||||
self.do_sync([])
|
||||
|
||||
self.privesc = privesc.Finder(self)
|
||||
|
||||
# Save our terminal state
|
||||
@ -368,64 +350,11 @@ class PtyHandler:
|
||||
|
||||
self.command_parser = CommandParser(self)
|
||||
|
||||
# Synchronize the terminals
|
||||
self.command_parser.dispatch_line("sync")
|
||||
|
||||
# Force the local TTY to enter raw mode
|
||||
self.enter_raw()
|
||||
|
||||
def bootstrap_bsd(self):
|
||||
""" BSD acts differently than linux (since it isn't). While pwncat isn't
|
||||
specifically dependent on linux, it does depend on the interfaces provided
|
||||
by standard commands in linux. BSD has diverged. The easiest solution is to
|
||||
download a BSD version of busybox, which provides all the normal interfaces
|
||||
we expect. We can't use the normal busybox bootstrap, since it depends on
|
||||
some of these interfaces. We have to do it manually for BSD. """
|
||||
|
||||
if self.arch != "x86_64" and self.arch != "i386" or self.which("dd") is None:
|
||||
util.error(f"unable to support freebsd on {self.arch}.")
|
||||
util.error(
|
||||
"upload your own busybox and tell pwncat where to find it w/ the busybox command to enable full functionality"
|
||||
)
|
||||
return
|
||||
|
||||
dd = self.which("dd")
|
||||
length = os.path.getsize("data/busybox-bsd")
|
||||
command = f"{dd} of=/tmp/busybox bs=1 count={length}"
|
||||
|
||||
with ProgressBar("uploading busydbox via dd") as pb:
|
||||
counter = pb(length)
|
||||
last_update = time.time()
|
||||
|
||||
def on_progress(count, blocksz):
|
||||
counter.items_completed += blocksz
|
||||
if (time.time() - last_update) > 0.1:
|
||||
pb.invalidate()
|
||||
last_update = time.time()
|
||||
|
||||
with open("data/busybox-bsd", "rb") as filp:
|
||||
util.copyfileobj(filp, pipe, on_progress)
|
||||
|
||||
counter.done = True
|
||||
counter.stopped = True
|
||||
pb.invalidate()
|
||||
time.sleep(0.1)
|
||||
|
||||
# We now have busybox!
|
||||
self.run("chmod +x /tmp/busybox")
|
||||
|
||||
util.success("we now have busybox on bsd!")
|
||||
|
||||
# Notify everyone else about busybox
|
||||
self.has_busybox = True
|
||||
self.busybox_path = "/tmp/busybox"
|
||||
self.busybox_provides = (
|
||||
open("data/busybox-default-provides", "r").read().strip().split("\n")
|
||||
)
|
||||
|
||||
# Start the busybox shell
|
||||
util.info("starting busybox shell")
|
||||
self.process("exec /tmp/busybox ash", delim=False)
|
||||
self.shell = "/tmp/busybox ash"
|
||||
self.flush_output()
|
||||
self.reset()
|
||||
self.state = State.RAW
|
||||
|
||||
def bootstrap_busybox(self, url):
|
||||
""" Utilize the architecture we grabbed from `uname -m` to grab a
|
||||
@ -572,7 +501,7 @@ class PtyHandler:
|
||||
complete_while_typing=False,
|
||||
)
|
||||
|
||||
def which(self, name: str, request=True, quote=False) -> str:
|
||||
def which(self, name: str, request=True, quote=False) -> Optional[str]:
|
||||
""" Call which on the remote host and return the path. The results are
|
||||
cached to decrease the number of remote calls. """
|
||||
path = None
|
||||
@ -580,7 +509,7 @@ class PtyHandler:
|
||||
if self.has_busybox:
|
||||
if name in self.busybox_provides:
|
||||
if quote:
|
||||
return f"{shlex.quote(self.busybox_path)} {name}"
|
||||
return f"{shlex.quote(str(self.busybox_path))} {name}"
|
||||
else:
|
||||
return f"{self.busybox_path} {name}"
|
||||
|
||||
@ -612,339 +541,81 @@ class PtyHandler:
|
||||
r""" Process a new byte of input from stdin. This is to catch "\r~C" and open
|
||||
a local prompt """
|
||||
|
||||
# Send the new data to the client
|
||||
self.client.send(data)
|
||||
|
||||
# Only process data following a new line
|
||||
if data == b"\r":
|
||||
if self.input == b"":
|
||||
# Enter commmand mode after C-d
|
||||
if data == b"\x04":
|
||||
# Clear line
|
||||
self.client.send(b"\x15")
|
||||
# Enter command mode
|
||||
self.state = State.COMMAND
|
||||
# C-k is the prefix character
|
||||
elif data == b"\x0b":
|
||||
self.input = data
|
||||
elif len(data) == 0:
|
||||
return
|
||||
else:
|
||||
self.input += data
|
||||
|
||||
if self.input == b"\r~C":
|
||||
# Erase the current line on the remote host ("~C")
|
||||
# This is 2 backspace characters
|
||||
self.client.send(b"\x08" * 2 + b"\r")
|
||||
# Start processing local commands
|
||||
self.enter_command()
|
||||
elif len(self.input) >= 3:
|
||||
# Our only escapes are 3 characters (include the newline)
|
||||
self.client.send(data)
|
||||
else:
|
||||
# "C-k c" to enter command mode
|
||||
if data == b"c":
|
||||
self.client.send(b"\x15")
|
||||
self.state = State.SINGLE
|
||||
elif data == b":":
|
||||
self.state = State.SINGLE
|
||||
# "C-k C-k" or "C-k C-d" sends the second byte
|
||||
elif data == b"\x0b" or data == b"\x04":
|
||||
self.client.send(data)
|
||||
self.input = b""
|
||||
|
||||
def recv(self) -> bytes:
|
||||
""" Recieve data from the client """
|
||||
return self.client.recv(4096)
|
||||
|
||||
def enter_raw(self, save: bool = True):
|
||||
""" Enter raw mode on the local terminal """
|
||||
@property
|
||||
def state(self) -> State:
|
||||
return self._state
|
||||
|
||||
# Give the user a nice terminal
|
||||
@state.setter
|
||||
def state(self, value: State):
|
||||
if value == self._state:
|
||||
return
|
||||
|
||||
if value == State.RAW:
|
||||
self.flush_output()
|
||||
self.client.send(b"\n")
|
||||
|
||||
old_term_state = util.enter_raw_mode()
|
||||
|
||||
self.state = State.RAW
|
||||
|
||||
# Tell the command parser to stop
|
||||
util.success("pwncat is ready 🐈")
|
||||
self.saved_term_state = util.enter_raw_mode()
|
||||
self.command_parser.running = False
|
||||
|
||||
# Save the state if requested
|
||||
if save:
|
||||
self.saved_term_state = old_term_state
|
||||
|
||||
def enter_command(self):
|
||||
""" Enter commmand mode. This sets normal mode and uses prompt toolkit
|
||||
process commands from the user for the local machine """
|
||||
|
||||
self._state = value
|
||||
return
|
||||
if value == State.COMMAND:
|
||||
# Go back to normal mode
|
||||
self.restore()
|
||||
self.state = State.COMMAND
|
||||
|
||||
self.restore_local_term()
|
||||
self._state = State.COMMAND
|
||||
# Hopefully this fixes weird cursor position issues
|
||||
util.success("local terminal restored")
|
||||
# Setting the state to local command mode does not return until
|
||||
# command processing is complete.
|
||||
self.command_parser.run()
|
||||
return
|
||||
if value == State.SINGLE:
|
||||
# Go back to normal mode
|
||||
self.restore_local_term()
|
||||
self._state = State.SINGLE
|
||||
# Hopefully this fixes weird cursor position issues
|
||||
sys.stdout.write("\n")
|
||||
# Setting the state to local command mode does not return until
|
||||
# command processing is complete.
|
||||
self.command_parser.run_single()
|
||||
|
||||
self.command_parser.run()
|
||||
|
||||
# Go back to raw mode
|
||||
self.flush_output()
|
||||
self.client.send(b"\n")
|
||||
self.saved_term_state = util.enter_raw_mode()
|
||||
self._state = State.RAW
|
||||
return
|
||||
|
||||
# Process commands
|
||||
while self.state is State.COMMAND:
|
||||
try:
|
||||
try:
|
||||
line = self.prompt.prompt()
|
||||
except (EOFError, OSError):
|
||||
# The user pressed ctrl-d, go back
|
||||
self.enter_raw()
|
||||
continue
|
||||
|
||||
if len(line) > 0:
|
||||
if line[0] == "!":
|
||||
# Allow running shell commands
|
||||
subprocess.run(line[1:], shell=True)
|
||||
continue
|
||||
elif line[0] == "@":
|
||||
result = self.run(line[1:])
|
||||
sys.stdout.buffer.write(result)
|
||||
continue
|
||||
elif line[0] == "-":
|
||||
self.run(line[1:], wait=False)
|
||||
continue
|
||||
|
||||
try:
|
||||
argv = shlex.split(line)
|
||||
except ValueError as e:
|
||||
util.error(e.args[0])
|
||||
continue
|
||||
|
||||
# Empty command
|
||||
if len(argv) == 0:
|
||||
continue
|
||||
|
||||
try:
|
||||
method = getattr(self, f"do_{argv[0]}")
|
||||
except AttributeError:
|
||||
util.warn(f"{argv[0]}: command does not exist")
|
||||
continue
|
||||
|
||||
# Call the method
|
||||
method(argv[1:])
|
||||
except KeyboardInterrupt as exc:
|
||||
traceback.print_exc()
|
||||
continue
|
||||
|
||||
@with_parser
|
||||
def do_busybox(self, args):
|
||||
""" Attempt to upload a busybox binary which we can use as a consistent
|
||||
interface to local functionality """
|
||||
|
||||
if args.action == "list":
|
||||
if not self.has_busybox:
|
||||
util.error("busybox hasn't been installed yet (hint: run 'busybox'")
|
||||
return
|
||||
util.info("binaries which the remote busybox provides:")
|
||||
for name in self.busybox_provides:
|
||||
print(f" * {name}")
|
||||
elif args.action == "status":
|
||||
if not self.has_busybox:
|
||||
util.error("busybox hasn't been installed yet")
|
||||
return
|
||||
util.info(
|
||||
f"busybox is installed to: {Fore.BLUE}{self.busybox_path}{Fore.RESET}"
|
||||
)
|
||||
util.info(
|
||||
f"busybox provides {Fore.GREEN}{len(self.busybox_provides)}{Fore.RESET} applets"
|
||||
)
|
||||
elif args.action == "install":
|
||||
self.bootstrap_busybox(args.url, args.method)
|
||||
|
||||
def with_progress(
|
||||
self, title: str, target: Callable[[Callable], None], length: int = None
|
||||
):
|
||||
""" A shortcut to displaying a progress bar for various things. It will
|
||||
start a prompt_toolkit progress bar with the given title and a counter
|
||||
with the given length. Then, it will call `target` with an `on_progress`
|
||||
parameter. This parameter should be called for all progress updates. See
|
||||
the `do_upload` and `do_download` for examples w/ copyfileobj """
|
||||
|
||||
with ProgressBar(title) as pb:
|
||||
counter = pb(range(length))
|
||||
last_update = time.time()
|
||||
|
||||
def on_progress(copied, blocksz):
|
||||
""" Update the progress bar """
|
||||
if blocksz == -1:
|
||||
counter.stopped = True
|
||||
counter.done = True
|
||||
pb.invalidate()
|
||||
return
|
||||
|
||||
counter.items_completed += blocksz
|
||||
if counter.items_completed >= counter.total:
|
||||
counter.done = True
|
||||
counter.stopped = True
|
||||
if (time.time() - last_update) > 0.1:
|
||||
pb.invalidate()
|
||||
|
||||
target(on_progress)
|
||||
|
||||
# https://github.com/prompt-toolkit/python-prompt-toolkit/issues/964
|
||||
time.sleep(0.1)
|
||||
|
||||
@with_parser
|
||||
def do_download(self, args):
|
||||
""" Download a file from the remote host """
|
||||
|
||||
try:
|
||||
# Locate an appropriate downloader class
|
||||
DownloaderClass = downloader.find(self, args.method)
|
||||
except downloader.DownloadError as exc:
|
||||
util.error(f"{exc}")
|
||||
return
|
||||
|
||||
# Grab the arguments
|
||||
path = args.path
|
||||
basename = os.path.basename(args.path)
|
||||
outfile = args.output.format(basename=basename)
|
||||
|
||||
download = DownloaderClass(self, remote_path=path, local_path=outfile)
|
||||
|
||||
# Get the remote file size
|
||||
size = self.run(f'stat -c "%s" {shlex.quote(path)} 2>/dev/null || echo "none"')
|
||||
if b"none" in size:
|
||||
util.error(f"{path}: no such file or directory")
|
||||
return
|
||||
size = int(size)
|
||||
|
||||
with ProgressBar(
|
||||
[("#888888", "downloading with "), ("fg:ansiyellow", f"{download.NAME}")]
|
||||
) as pb:
|
||||
counter = pb(range(size))
|
||||
last_update = time.time()
|
||||
|
||||
def on_progress(copied, blocksz):
|
||||
""" Update the progress bar """
|
||||
if blocksz == -1:
|
||||
counter.stopped = True
|
||||
counter.done = True
|
||||
pb.invalidate()
|
||||
return
|
||||
|
||||
counter.items_completed += blocksz
|
||||
if counter.items_completed >= counter.total:
|
||||
counter.done = True
|
||||
counter.stopped = True
|
||||
if (time.time() - last_update) > 0.1:
|
||||
pb.invalidate()
|
||||
|
||||
try:
|
||||
download.serve(on_progress)
|
||||
if download.command():
|
||||
while not counter.done:
|
||||
time.sleep(0.2)
|
||||
finally:
|
||||
download.shutdown()
|
||||
|
||||
# https://github.com/prompt-toolkit/python-prompt-toolkit/issues/964
|
||||
time.sleep(0.1)
|
||||
|
||||
@with_parser
|
||||
def do_upload(self, args):
|
||||
""" Upload a file to the remote host """
|
||||
|
||||
if not os.path.isfile(args.path):
|
||||
util.error(f"{args.path}: no such file or directory")
|
||||
return
|
||||
|
||||
try:
|
||||
# Locate an appropriate downloader class
|
||||
UploaderClass = uploader.find(self, args.method)
|
||||
except uploader.UploadError as exc:
|
||||
util.error(f"{exc}")
|
||||
return
|
||||
|
||||
path = args.path
|
||||
basename = os.path.basename(args.path)
|
||||
name = basename
|
||||
outfile = args.output.format(basename=basename)
|
||||
|
||||
upload = UploaderClass(self, remote_path=outfile, local_path=path)
|
||||
|
||||
with ProgressBar(
|
||||
[("#888888", "uploading via "), ("fg:ansiyellow", f"{upload.NAME}")]
|
||||
) as pb:
|
||||
|
||||
counter = pb(range(os.path.getsize(path)))
|
||||
last_update = time.time()
|
||||
|
||||
def on_progress(copied, blocksz):
|
||||
""" Update the progress bar """
|
||||
counter.items_completed += blocksz
|
||||
if counter.items_completed >= counter.total:
|
||||
counter.done = True
|
||||
counter.stopped = True
|
||||
if (time.time() - last_update) > 0.1:
|
||||
pb.invalidate()
|
||||
|
||||
upload.serve(on_progress)
|
||||
upload.command()
|
||||
|
||||
try:
|
||||
while not counter.done:
|
||||
time.sleep(0.1)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
upload.shutdown()
|
||||
|
||||
# https://github.com/prompt-toolkit/python-prompt-toolkit/issues/964
|
||||
time.sleep(0.1)
|
||||
|
||||
def do_sync(self, argv):
|
||||
""" Synchronize the remote PTY with the local terminal settings """
|
||||
|
||||
TERM = os.environ.get("TERM", "xterm")
|
||||
columns, rows = os.get_terminal_size(0)
|
||||
|
||||
self.run(f"stty rows {rows}; stty columns {columns}; export TERM='{TERM}'")
|
||||
|
||||
def do_set(self, argv):
|
||||
""" Set or view the currently assigned variables """
|
||||
|
||||
if len(argv) == 0:
|
||||
util.info("local variables:")
|
||||
for k, v in self.vars.items():
|
||||
print(f" {k} = {shlex.quote(v)}")
|
||||
|
||||
util.info("user passwords:")
|
||||
for user, data in self.users.items():
|
||||
if data["password"] is not None:
|
||||
print(
|
||||
f" {Fore.GREEN}{user}{Fore.RESET} -> {Fore.CYAN}{shlex.quote(data['password'])}{Fore.RESET}"
|
||||
)
|
||||
return
|
||||
|
||||
parser = argparse.ArgumentParser(prog="set")
|
||||
parser.add_argument(
|
||||
"--password",
|
||||
"-p",
|
||||
action="store_true",
|
||||
help="set the password for the given user",
|
||||
)
|
||||
parser.add_argument("variable", help="the variable name or user")
|
||||
parser.add_argument("value", help="the new variable/user password value")
|
||||
|
||||
try:
|
||||
args = parser.parse_args(argv)
|
||||
except SystemExit:
|
||||
# The arguments were parsed incorrectly, return.
|
||||
return
|
||||
|
||||
if args.password is not None and args.variable not in self.users:
|
||||
util.error(f"{args.variable}: no such user")
|
||||
elif args.password is not None:
|
||||
self.users[args.variable]["password"] = args.value
|
||||
else:
|
||||
self.vars[args.variable] = args.value
|
||||
|
||||
def do_help(self, argv):
|
||||
""" View help for local commands """
|
||||
|
||||
if len(argv) == 0:
|
||||
commands = [x for x in dir(self) if x.startswith("do_")]
|
||||
else:
|
||||
commands = [x for x in dir(self) if x.startswith("do_") and x[3:] in argv]
|
||||
|
||||
for c in commands:
|
||||
help_msg = getattr(self, c).__doc__
|
||||
print(f"{c[3:]:15s}{help_msg}")
|
||||
|
||||
def do_reset(self, argv):
|
||||
""" Reset the remote terminal (calls sync, reset, and sets PS1) """
|
||||
self.reset()
|
||||
self.do_sync([])
|
||||
def restore_local_term(self):
|
||||
""" Save the local terminal state """
|
||||
util.restore_terminal(self.saved_term_state)
|
||||
|
||||
def run(self, cmd, wait=True, input: bytes = b"") -> bytes:
|
||||
""" Run a command in the context of the remote host and return the
|
||||
@ -1040,27 +711,27 @@ class PtyHandler:
|
||||
edelim = util.random_string(10) # "_PWNCAT_ENDDELIM_"
|
||||
|
||||
# List of ";" separated commands that will be run
|
||||
command = []
|
||||
commands: List[str] = []
|
||||
# Clear the prompt, or it will get displayed in our output due to the
|
||||
# background task
|
||||
command.append(" export PS1=")
|
||||
commands.append(" export PS1=")
|
||||
# Needed to disable job control messages in bash
|
||||
command.append("set +m")
|
||||
commands.append("set +m")
|
||||
# This is gross, but it allows us to recieve stderr and stdout, while
|
||||
# ignoring the job control start message.
|
||||
if "w" not in mode and not no_job:
|
||||
command.append(
|
||||
commands.append(
|
||||
f"{{ echo; echo {sdelim}; {cmd} && echo {edelim} || echo {edelim} & }} 2>/dev/null"
|
||||
)
|
||||
else:
|
||||
# This is dangerous. We are in raw mode, and if the process never
|
||||
# ends and doesn't provide a way to exit, then we are stuck.
|
||||
command.append(f"echo; echo {sdelim}; {cmd}; echo {edelim}")
|
||||
commands.append(f"echo; echo {sdelim}; {cmd}; echo {edelim}")
|
||||
# Re-enable normal job control in bash
|
||||
command.append("set -m")
|
||||
commands.append("set -m")
|
||||
|
||||
# Join them all into one command
|
||||
command = ";".join(command).encode("utf-8")
|
||||
command = ";".join(commands).encode("utf-8")
|
||||
|
||||
# Enter raw mode w/ no echo on the remote terminal
|
||||
# DANGER
|
||||
@ -1069,7 +740,7 @@ class PtyHandler:
|
||||
|
||||
self.client.sendall(command + b"\n")
|
||||
|
||||
while not self.recvuntil("\n").startswith(sdelim.encode("utf-8")):
|
||||
while not self.recvuntil(b"\n").startswith(sdelim.encode("utf-8")):
|
||||
continue
|
||||
|
||||
# Send the data if requested
|
||||
@ -1380,95 +1051,6 @@ class PtyHandler:
|
||||
|
||||
return result
|
||||
|
||||
def restore(self):
|
||||
""" Restore the terminal state """
|
||||
util.restore_terminal(self.saved_term_state)
|
||||
self.state = State.NORMAL
|
||||
|
||||
def setup_command_parsers(self):
|
||||
""" Setup the argparsers for the different local commands """
|
||||
|
||||
self.upload_parser = argparse.ArgumentParser(prog="upload")
|
||||
self.upload_parser.add_argument(
|
||||
"--method",
|
||||
"-m",
|
||||
choices=["", *uploader.get_names()],
|
||||
default=None,
|
||||
help="set the download method (default: auto)",
|
||||
)
|
||||
self.upload_parser.add_argument(
|
||||
"--output",
|
||||
"-o",
|
||||
default="./{basename}",
|
||||
help="path to the output file (default: basename of input)",
|
||||
)
|
||||
self.upload_parser.add_argument("path", help="path to the file to upload")
|
||||
|
||||
self.download_parser = argparse.ArgumentParser(prog="download")
|
||||
self.download_parser.add_argument(
|
||||
"--method",
|
||||
"-m",
|
||||
choices=downloader.get_names(),
|
||||
default=None,
|
||||
help="set the download method (default: auto)",
|
||||
)
|
||||
self.download_parser.add_argument(
|
||||
"--output",
|
||||
"-o",
|
||||
default="./{basename}",
|
||||
help="path to the output file (default: basename of input)",
|
||||
)
|
||||
self.download_parser.add_argument("path", help="path to the file to download")
|
||||
|
||||
self.back_parser = argparse.ArgumentParser(prog="back")
|
||||
|
||||
self.busybox_parser = argparse.ArgumentParser(prog="busybox")
|
||||
self.busybox_parser.add_argument(
|
||||
"--method",
|
||||
"-m",
|
||||
choices=uploader.get_names(),
|
||||
default="",
|
||||
help="set the upload method (default: auto)",
|
||||
)
|
||||
self.busybox_parser.add_argument(
|
||||
"--url",
|
||||
"-u",
|
||||
default=(
|
||||
"https://busybox.net/downloads/binaries/"
|
||||
"1.31.0-defconfig-multiarch-musl/"
|
||||
),
|
||||
help=(
|
||||
"url to download multiarch busybox binaries"
|
||||
"(default: 1.31.0-defconfig-multiarch-musl)"
|
||||
),
|
||||
)
|
||||
group = self.busybox_parser.add_mutually_exclusive_group(required=True)
|
||||
group.add_argument(
|
||||
"--install",
|
||||
"-i",
|
||||
action="store_const",
|
||||
dest="action",
|
||||
const="install",
|
||||
default="install",
|
||||
help="install busybox support for pwncat",
|
||||
)
|
||||
group.add_argument(
|
||||
"--list",
|
||||
"-l",
|
||||
action="store_const",
|
||||
dest="action",
|
||||
const="list",
|
||||
help="list all provided applets from the remote busybox",
|
||||
)
|
||||
group.add_argument(
|
||||
"--status",
|
||||
"-s",
|
||||
action="store_const",
|
||||
dest="action",
|
||||
const="status",
|
||||
help="show current pwncat busybox status",
|
||||
)
|
||||
|
||||
def whoami(self):
|
||||
result = self.run("whoami")
|
||||
return result.strip().decode("utf-8")
|
||||
|
@ -6,6 +6,7 @@ from prompt_toolkit.shortcuts import ProgressBar
|
||||
from functools import partial
|
||||
from colorama import Fore, Style
|
||||
from io import TextIOWrapper
|
||||
from enum import Enum, auto
|
||||
import netifaces
|
||||
import socket
|
||||
import string
|
||||
@ -24,6 +25,15 @@ CTRL_C = b"\x03"
|
||||
ALPHANUMERIC = string.ascii_letters + string.digits
|
||||
|
||||
|
||||
class State(Enum):
|
||||
""" The current PtyHandler state """
|
||||
|
||||
NORMAL = auto()
|
||||
RAW = auto()
|
||||
COMMAND = auto()
|
||||
SINGLE = auto()
|
||||
|
||||
|
||||
def human_readable_size(size, decimal_places=2):
|
||||
for unit in ["B", "KiB", "MiB", "GiB", "TiB"]:
|
||||
if size < 1024.0:
|
||||
@ -131,9 +141,6 @@ def enter_raw_mode():
|
||||
returns: the old state of the terminal
|
||||
"""
|
||||
|
||||
info("setting terminal to raw mode and disabling echo", overlay=True)
|
||||
success("pwncat is ready 🐈\n", overlay=True)
|
||||
|
||||
# Ensure we don't have any weird buffering issues
|
||||
sys.stdout.flush()
|
||||
|
||||
@ -176,7 +183,6 @@ def restore_terminal(state):
|
||||
# tty.setcbreak(sys.stdin)
|
||||
fcntl.fcntl(sys.stdin, fcntl.F_SETFL, state[1])
|
||||
sys.stdout.write("\n")
|
||||
info("local terminal restored")
|
||||
|
||||
|
||||
def get_ip_addr() -> str:
|
||||
|
Loading…
Reference in New Issue
Block a user