From 81fb010b1acd90293e41ea74b81bc9da4e708b88 Mon Sep 17 00:00:00 2001 From: Caleb Stewart Date: Wed, 13 May 2020 23:38:07 -0400 Subject: [PATCH 1/3] Added command framework with automatic argparse, syntax highlighting and tab completion including verbose help. Still in the progress of converting old commands before merging to master --- pwncat/commands/__init__.py | 330 ++++++++++++++++++++++++++++++++++++ pwncat/commands/back.py | 12 ++ pwncat/commands/base.py | 77 +++++++++ pwncat/commands/privesc.py | 204 ++++++++++++++++++++++ pwncat/commands/test.py | 26 +++ pwncat/privesc/__init__.py | 2 +- pwncat/pty.py | 159 ++--------------- 7 files changed, 660 insertions(+), 150 deletions(-) create mode 100644 pwncat/commands/__init__.py create mode 100644 pwncat/commands/back.py create mode 100644 pwncat/commands/base.py create mode 100644 pwncat/commands/privesc.py create mode 100644 pwncat/commands/test.py diff --git a/pwncat/commands/__init__.py b/pwncat/commands/__init__.py new file mode 100644 index 0000000..163fbaf --- /dev/null +++ b/pwncat/commands/__init__.py @@ -0,0 +1,330 @@ +#!/usr/bin/env python3 +from prompt_toolkit import PromptSession, ANSI +from prompt_toolkit.shortcuts import ProgressBar +from prompt_toolkit.completion import ( + Completer, + PathCompleter, + Completion, + CompleteEvent, + NestedCompleter, + WordCompleter, + merge_completers, +) +from pygments.lexer import RegexLexer, bygroups, include +from pygments.token import * +from pygments.style import Style +from prompt_toolkit.styles.pygments import style_from_pygments_cls +from prompt_toolkit.lexers import PygmentsLexer +from prompt_toolkit.document import Document +from pygments.styles import get_style_by_name +from prompt_toolkit.auto_suggest import AutoSuggestFromHistory +from typing import Dict, Any, List, Iterable +from enum import Enum, auto +import argparse +import pkgutil +import shlex +import os +import re + +from pprint import pprint + +from pwncat.commands.base import CommandDefinition, Complete +from pwncat import util + + +class CommandParser: + """ Handles dynamically loading command classes, parsing input, and + dispatching commands. """ + + def __init__(self, pty: "pwncat.pty.PtyHandler"): + """ We need to dynamically load commands from pwncat.commands """ + + self.commands: List["CommandDefinition"] = [] + + for loader, module_name, is_pkg in pkgutil.walk_packages(__path__): + if module_name == "base": + continue + self.commands.append( + loader.find_module(module_name).load_module(module_name).Command(pty) + ) + + completer = CommandCompleter(pty, self.commands) + lexer = PygmentsLexer(CommandLexer.build(self.commands)) + style = style_from_pygments_cls(get_style_by_name("monokai")) + auto_suggest = AutoSuggestFromHistory() + + self.prompt = PromptSession( + [ + ("fg:ansiyellow bold", "(local) "), + ("fg:ansimagenta bold", "pwncat"), + ("", "$ "), + ], + completer=completer, + lexer=lexer, + style=style, + auto_suggest=auto_suggest, + complete_while_typing=False, + ) + + self.pty = pty + + def run(self): + + self.running = True + + while self.running: + try: + try: + line = self.prompt.prompt().strip() + except (EOFError, OSError): + self.pty.enter_raw() + self.running = False + continue + + if line == "": + continue + + self.dispatch_line(line) + except KeyboardInterrupt: + continue + + def dispatch_line(self, line: str): + """ Parse the given line of command input and dispatch a command """ + + try: + # Spit the line with shell rules + argv = shlex.split(line) + except ValueError as e: + util.error(e.args[0]) + return + + # Search for a matching command + for command in self.commands: + if command.PROG == argv[0]: + break + else: + util.error(f"{argv[0]}: unknown command") + return + + try: + # Parse the arguments + args = command.parser.parse_args(argv[1:]) + + # Run the command + command.run(args) + except SystemExit: + # The arguments were icncorrect + return + + +class CommandLexer(RegexLexer): + + tokens = {} + + @classmethod + def build(cls, commands: List["CommandDefinition"]) -> "CommandLexer": + """ Build the RegexLexer token list from the command definitions """ + + root = [] + for command in commands: + root.append(("^" + re.escape(command.PROG), Name.Function, command.PROG)) + mode = [] + for args, descr in command.ARGS.items(): + for arg in args.split(","): + if not arg.startswith("-"): + continue + if descr[0] != Complete.NONE: + # Enter param state + mode.append((r"\s+" + re.escape(arg), descr[1], "param")) + else: + # Don't enter param state + mode.append((r"\s+" + re.escape(arg), descr[1])) + mode.append((r"\"", String, "string")) + mode.append((r".", Text)) + cls.tokens[command.PROG] = mode + + root.append((r".", Text)) + cls.tokens["root"] = root + cls.tokens["param"] = [ + (r"\"", String, "string"), + (r"\s", Text, "#pop"), + (r"[^\s]", Text), + ] + cls.tokens["string"] = [ + (r"[^\"\\]+", String), + (r"\\.", String.Escape), + ('"', String, "#pop"), + ] + + pprint(cls.tokens) + + return cls + + +class RemotePathCompleter(Completer): + """ Complete remote file names/paths """ + + def __init__(self, pty: "PtyHandler"): + self.pty = pty + + def get_completions(self, document: Document, complete_event: CompleteEvent): + + before = document.text_before_cursor.split()[-1] + path, partial_name = os.path.split(before) + + if path == "": + path = "." + + pipe = self.pty.subprocess(f"ls -1 -a {shlex.quote(path)}", "r") + + for name in pipe: + name = name.decode("utf-8").strip() + if name.startswith(partial_name): + yield Completion( + name, + start_position=-len(partial_name), + display=[("#ff0000", "(remote)"), ("", f" {name}")], + ) + + +class LocalPathCompleter(Completer): + """ Complete local file names/paths """ + + def __init__(self, pty: "PtyHandler"): + self.pty = pty + + def get_completions(self, document: Document, complete_event: CompleteEvent): + + before = document.text_before_cursor.split()[-1] + path, partial_name = os.path.split(before) + + if path == "": + path = "." + + # Ensure the directory exists + if not os.path.isdir(path): + return + + for name in os.listdir(path): + if name.startswith(partial_name): + yield Completion( + name, + start_position=-len(partial_name), + display=[("fg:ansiyellow", "(local)"), ("", f" {name}")], + ) + + +class CommandCompleter(Completer): + """ Complete commands from a given list of commands """ + + def __init__( + self, pty: "pwncat.pty.PtyHandler", commands: List["CommandDefinition"] + ): + """ Construct a new command completer """ + + self.layers = {} + local_file_completer = LocalPathCompleter(pty) + remote_file_completer = RemotePathCompleter(pty) + + for command in commands: + self.layers[command.PROG] = [None, [], {}] + option_names = [] + positional_completers = [] + for name_list, descr in command.ARGS.items(): + name_list = name_list.split(",") + if descr[0] == Complete.CHOICES: + completer = WordCompleter(descr[3]["choices"]) + elif descr[0] == Complete.LOCAL_FILE: + completer = local_file_completer + elif descr[0] == Complete.REMOTE_FILE: + completer = remote_file_completer + elif descr[0] == Complete.NONE: + completer = None + if len(name_list) == 1 and not name_list[0].startswith("-"): + self.layers[command.PROG][1].append(completer) + else: + for name in name_list: + self.layers[command.PROG][2][name] = completer + option_names.append(name) + self.layers[command.PROG][0] = WordCompleter(option_names) + + self.completer = WordCompleter(list(self.layers)) + + def get_completions( + self, document: Document, complete_event: CompleteEvent + ) -> Iterable[Completion]: + """ Get a list of completions for the given document """ + + text = document.text_before_cursor.lstrip() + try: + args = shlex.split(text) + except ValueError: + try: + args = shlex.split(text + '"') + except ValueError: + args = shlex.split(text + "'") + + # We haven't finished typing the command. Use our word completer for + # commands + if text == "" or (len(args) == 1 and not text.endswith(" ")): + yield from self.completer.get_completions(document, complete_event) + return + + # Not in a known command, can't autocomplete + if args[0] not in self.layers: + return + + command = self.layers[args[0]] + args = args[1:] + next_completer = command[0] + this_completer = command[0] + positional = 0 + # state = "options", completing options next + # state = "arguments", completing arguments to options next + state = "options" + + for arg in args: + if state == "options": + # Flag options + if arg.startswith("-"): + # Exact match, with a sub-completer + if arg in command[2] and command[2][arg] is not None: + # Completer for next argument + next_completer = command[2][arg] + state = "arguments" + # Exact match, with no arguments + elif arg in command[2]: + # Command has no argument, next completer is options + # completer + next_completer = command[0] + state = "options" + this_completer = command[0] + # Non-exact match + else: + next_completer = command[0] + this_completer = command[0] + state = "options" + # Appears to be a positional argument, grab next positional + # completer and increment positional count + else: + if positional < len(command[1]): + this_completer = command[1][positional] + next_completer = command[0] + state = "options" + positional += 1 + else: + this_completer = command[0] + next_completer = command[0] + state = "options" + else: + # Completing an argument to a option/switch. We can't verify + # it's legitimacy, so we assume it's right, and reset to a + # default state. + state = "options" + this_completer = next_completer + next_completer = command[0] + + if text.endswith(" "): + yield from next_completer.get_completions(document, complete_event) + else: + yield from this_completer.get_completions(document, complete_event) diff --git a/pwncat/commands/back.py b/pwncat/commands/back.py new file mode 100644 index 0000000..211eed2 --- /dev/null +++ b/pwncat/commands/back.py @@ -0,0 +1,12 @@ +#!/usr/bin/env python3 +from pwncat.commands.base import CommandDefinition, Complete, parameter + + +class Command(CommandDefinition): + """ Return to the remote terminal """ + + PROG = "back" + ARGS = {} + + def run(self, args): + self.pty.enter_raw() diff --git a/pwncat/commands/base.py b/pwncat/commands/base.py new file mode 100644 index 0000000..d270e10 --- /dev/null +++ b/pwncat/commands/base.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 +from typing import Dict, Any +from pygments.token import * +from enum import Enum, auto +from functools import partial +import argparse + + +class Complete(Enum): + # Complete from the choices array in kwargs + CHOICES = auto() + # Complete from a local file + LOCAL_FILE = auto() + # Complete from a remote file + REMOTE_FILE = auto() + # This argument has no parameter + NONE = auto() + + +def parameter(complete, token=Name.Label, *args, **kwargs): + """ Build a parameter tuple from argparse arguments """ + return (complete, token, args, kwargs) + + +class CommandDefinition: + """ Default help/description goes here """ + + PROG = "unimplemented" + ARGS = {} + DEFAULTS = {} + + # An example definition of arguments + # PROG = "command" + # ARGS = { + # "--all,-a": parameter( + # Complete.NONE, action="store_true", help="A switch/option" + # ), + # "--file,-f": parameter(Complete.LOCAL_FILE, help="A local file"), + # "--rfile": parameter(Complete.REMOTE_FILE, help="A remote file"), + # "positional": parameter( + # Complete.CHOICES, choices=["a", "b", "c"], help="Choose one!" + # ), + # } + + def __init__(self, pty: "pwncat.pty.PtyHandler"): + """ Initialize a new command instance. Parse the local arguments array + into an argparse object. """ + + self.pty = pty + + # Create the parser object + self.parser = argparse.ArgumentParser(prog=self.PROG, description=self.__doc__) + + self.build_parser(self.parser, self.ARGS) + + def run(self, args): + """ Perform whatever your command is. `args` has already been parsed with + your argparse definitions. """ + raise NotImplementedError + + def build_parser(self, parser: argparse.ArgumentParser, args: Dict[str, Any]): + """ Fill the given parser with arguments based on the dict """ + + for arg, descr in args.items(): + names = arg.split(",") + if "choices" in descr[3] and callable(descr[3]["choices"]): + print("we're doing it", descr[3]["choices"]) + method = descr[3]["choices"] + + class wrapper: + def __iter__(wself): + yield from method(self) + + descr[3]["choices"] = wrapper() + parser.add_argument(*names, *descr[2], **descr[3]) + + parser.set_defaults(**self.DEFAULTS) diff --git a/pwncat/commands/privesc.py b/pwncat/commands/privesc.py new file mode 100644 index 0000000..d9a25bb --- /dev/null +++ b/pwncat/commands/privesc.py @@ -0,0 +1,204 @@ +#!/usr/bin/env python3 +from typing import List, Callable +from pwncat.commands.base import CommandDefinition, Complete, parameter +from pwncat import util, privesc +from colorama import Fore +import argparse +import shutil +import sys + + +class StoreConstOnce(argparse.Action): + """ Only allow the user to store a value in the destination once. This prevents + users from selection multiple actions in the privesc parser. """ + + def __call__(self, parser, namespace, values, option_string=None): + if hasattr(self, "__" + self.dest + "_seen"): + raise argparse.ArgumentError(self, "only one action may be specified") + setattr(self, "__" + self.dest + "_seen", True) + setattr(namespace, self.dest, self.const) + + +def StoreForAction(action: List[str]) -> Callable: + """ Generates a custom argparse Action subclass which verifies that the current + selected "action" option is one of the provided actions in this function. If + not, an error is raised. """ + + class StoreFor(argparse.Action): + """ Store the value if the currently selected action matches the list of + actions passed to this function. """ + + def __call__(self, parser, namespace, values, option_string=None): + if getattr(namespace, "action", None) not in action: + raise argparse.ArgumentError( + self, f"{option_string}: only valid for {action}", + ) + + setattr(namespace, self.dest, values) + + return StoreFor + + +class Command(CommandDefinition): + """ Attempt various privilege escalation methods. This command will attempt + search for privilege escalation across all known modules. Privilege escalation + routes can grant file read, file write or shell capabilities. The "escalate" + mode will attempt to abuse any of these to gain a shell. + + Further, escalation and file read/write actions will attempt to escalate multiple + times to reach the target user if possible, attempting all known escalation paths + until one arrives at the target user. """ + + def get_user_choices(self): + """ Get a list of all users on the remote machine. This is used for + parameter checking and tab completion of the "users" parameter below. """ + return list(self.pty.users) + + PROG = "privesc" + ARGS = { + "--list,-l": parameter( + Complete.NONE, + action=StoreConstOnce, + nargs=0, + const="list", + dest="action", + help="Enumerate and list available privesc techniques", + ), + "--all,-a": parameter( + Complete.NONE, + action="store_const", + dest="user", + const=None, + help="list escalations for all users", + ), + "--user,-u": parameter( + Complete.CHOICES, + default="root", + choices=get_user_choices, + metavar="USER", + help="the user to gain privileges as", + ), + "--max-depth,-m": parameter( + Complete.NONE, + default=None, + type=int, + help="Maximum depth for the privesc search (default: no maximum)", + ), + "--read,-r": parameter( + Complete.NONE, + action=StoreConstOnce, + nargs=0, + const="read", + dest="action", + help="Attempt to read a remote file as the specified user", + ), + "--write,-w": parameter( + Complete.NONE, + action=StoreConstOnce, + nargs=0, + const="write", + dest="action", + help="Attempt to write a remote file as the specified user", + ), + "--path,-p": parameter( + Complete.REMOTE_FILE, + action=StoreForAction(["write", "read"]), + help="Remote path for read or write actions", + ), + "--escalate,-e": parameter( + Complete.NONE, + action=StoreConstOnce, + nargs=0, + const="escalate", + dest="action", + help="Attempt to escalate to gain a full shell as the target user", + ), + "--data,-d": parameter( + Complete.LOCAL_FILE, + action=StoreForAction(["write"]), + default=None, + help="The local file to write to the remote file", + ), + } + DEFAULTS = {"action": "list"} + + def run(self, args): + + if args.action == "list": + techniques = self.pty.privesc.search(args.user) + if len(techniques) == 0: + util.warn("no techniques found") + else: + for tech in techniques: + util.info(f" - {tech}") + elif args.action == "read": + if not args.path: + self.parser.error("missing required argument: --path") + try: + read_pipe, chain = self.pty.privesc.read_file( + args.path, args.user, args.max_depth + ) + util.success("file successfully opened!") + + # Read the data from the pipe + shutil.copyfileobj(read_pipe, sys.stdout.buffer) + read_pipe.close() + + # Unwrap in case we had to privesc to get here + self.pty.privesc.unwrap(chain) + + except privesc.PrivescError as exc: + util.error(f"read file failed: {exc}") + elif args.action == "write": + # Make sure the correct arguments are present + if not args.path: + self.parser.error("missing required argument: --path") + if not args.data: + self.parser.error("missing required argument: --data") + + # Read in the data file + with open(args.data, "rb") as f: + data = f.read() + + try: + # Attempt to write the data to the remote file + chain = self.pty.privesc.write_file( + args.path, data, target_user=args.user, depth=args.max_depth, + ) + self.pty.privesc.unwrap(chain) + util.success("file written successfully!") + except privesc.PrivescError as exc: + util.error(f"file write failed: {exc}") + elif args.action == "escalate": + try: + chain = self.pty.privesc.escalate(args.user, args.max_depth) + + ident = self.pty.id + backdoor = False + if ident["euid"]["id"] == 0 and ident["uid"]["id"] != 0: + util.progress( + "EUID != UID. installing backdoor to complete privesc" + ) + try: + self.pty.privesc.add_backdoor() + backdoor = True + except privesc.PrivescError as exc: + util.warn(f"backdoor installation failed: {exc}") + + util.success("privilege escalation succeeded using:") + for i, (technique, _) in enumerate(chain): + arrow = f"{Fore.YELLOW}\u2ba1{Fore.RESET} " + print(f"{(i+1)*' '}{arrow}{technique}") + + if backdoor: + print( + ( + f"{(len(chain)+1)*' '}{arrow}" + f"{Fore.YELLOW}pwncat{Fore.RESET} backdoor" + ) + ) + + self.pty.reset() + self.pty.do_back([]) + except privesc.PrivescError as exc: + util.error(f"escalation failed: {exc}") diff --git a/pwncat/commands/test.py b/pwncat/commands/test.py new file mode 100644 index 0000000..8ee1e41 --- /dev/null +++ b/pwncat/commands/test.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python3 +from pygments.token import * + +from pwncat.commands.base import CommandDefinition, Complete, parameter + + +class Command(CommandDefinition): + """ Command Description """ + + PROG = "test" + ARGS = { + "--remote,-r": parameter( + Complete.REMOTE_FILE, Name.LABEL, help="Argument Help" + ), + "--local,-l": parameter(Complete.LOCAL_FILE, Name.LABEL, help="Argument Help"), + "--choice,-c": parameter( + Complete.CHOICES, + Name.LABEL, + choices=["one", "two", "three"], + help="Select one!", + ), + } + + def run(self, args): + + print(args.arg) diff --git a/pwncat/privesc/__init__.py b/pwncat/privesc/__init__.py index 9a811e6..33f1b86 100644 --- a/pwncat/privesc/__init__.py +++ b/pwncat/privesc/__init__.py @@ -22,7 +22,7 @@ from pwncat import util # privesc_methods = [SetuidMethod, SuMethod] # privesc_methods = [SuMethod, SudoMethod, SetuidMethod, DirtycowMethod, ScreenMethod] # privesc_methods = [SuMethod, SudoMethod, ScreenMethod, SetuidMethod] -privesc_methods = [SuMethod, SudoMethod] +privesc_methods = [SuMethod, SetuidMethod] class Finder: diff --git a/pwncat/pty.py b/pwncat/pty.py index 2c5c19e..6a514c1 100644 --- a/pwncat/pty.py +++ b/pwncat/pty.py @@ -36,6 +36,7 @@ from pwncat import downloader, uploader, privesc from pwncat.file import RemoteBinaryPipe from pwncat.lexer import LocalCommandLexer, PwncatStyle from pwncat.gtfobins import GTFOBins, Capability, Stream +from pwncat.commands import CommandParser from colorama import Fore @@ -365,6 +366,8 @@ class PtyHandler: # Save our terminal state self.stty_saved = self.run("stty -g").decode("utf-8").strip() + self.command_parser = CommandParser(self) + # Force the local TTY to enter raw mode self.enter_raw() @@ -645,6 +648,9 @@ class PtyHandler: self.state = State.RAW + # Tell the command parser to stop + self.command_parser.running = False + # Save the state if requested if save: self.saved_term_state = old_term_state @@ -660,6 +666,10 @@ class PtyHandler: # Hopefully this fixes weird cursor position issues sys.stdout.write("\n") + self.command_parser.run() + + return + # Process commands while self.state is State.COMMAND: try: @@ -730,155 +740,6 @@ class PtyHandler: elif args.action == "install": self.bootstrap_busybox(args.url, args.method) - @with_parser - def do_back(self, _): - """ Exit command mode """ - self.enter_raw(save=False) - - def do_privesc(self, argv): - """ Attempt privilege escalation """ - - parser = argparse.ArgumentParser(prog="privesc") - parser.add_argument( - "--list", - "-l", - action="store_true", - help="do not perform escalation. list potential escalation methods", - ) - parser.add_argument( - "--all", - "-a", - action="store_const", - dest="user", - const=None, - help="when listing methods, list for all users. when escalating, escalate to root.", - ) - parser.add_argument( - "--user", - "-u", - choices=[user for user in self.users], - default="root", - help="the target user", - ) - parser.add_argument( - "--max-depth", - "-m", - type=int, - default=None, - help="Maximum depth for the privesc search (default: no maximum)", - ) - parser.add_argument( - "--read", - "-r", - type=str, - default=None, - help="remote filename to try and read", - ) - parser.add_argument( - "--write", - "-w", - type=str, - default=None, - help="attempt to write to a remote file as the specified user", - ) - parser.add_argument( - "--data", - "-d", - type=str, - default=None, - help="the data to write a file. ignored if not write mode", - ) - parser.add_argument( - "--text", - "-t", - action="store_true", - default=False, - help="whether to use safe readers/writers", - ) - - try: - args = parser.parse_args(argv) - except SystemExit: - # The arguments were parsed incorrectly, return. - return - - if args.list: - techniques = self.privesc.search(args.user) - if len(techniques) == 0: - util.warn("no techniques found") - else: - for tech in techniques: - util.info(f" - {tech}") - elif args.read: - try: - read_pipe, chain = self.privesc.read_file( - args.read, args.user, args.max_depth - ) - - # Read the data from the pipe - sys.stdout.buffer.write(read_pipe.read(4096)) - read_pipe.close() - - # Unwrap in case we had to privesc to get here - self.privesc.unwrap(chain) - - except privesc.PrivescError as exc: - util.error(f"read file failed: {exc}") - elif args.write: - if args.data is None: - util.error("no data specified") - else: - if args.data.startswith("@"): - with open(args.data[1:], "rb") as f: - data = f.read() - else: - data = args.data.encode("utf-8") - try: - chain = self.privesc.write_file( - args.write, - data, - safe=not args.text, - target_user=args.user, - depth=args.max_depth, - ) - self.privesc.unwrap(chain) - util.success("file written successfully!") - except privesc.PrivescError as exc: - util.error(f"file write failed: {exc}") - else: - try: - chain = self.privesc.escalate(args.user, args.max_depth) - - ident = self.id - backdoor = False - if ident["euid"]["id"] == 0 and ident["uid"]["id"] != 0: - util.progress( - "EUID != UID. installing backdoor to complete privesc" - ) - try: - self.privesc.add_backdoor() - backdoor = True - except privesc.PrivescError as exc: - util.warn(f"backdoor installation failed: {exc}") - - util.success("privilege escalation succeeded using:") - for i, (technique, _) in enumerate(chain): - arrow = f"{Fore.YELLOW}\u2ba1{Fore.RESET} " - print(f"{(i+1)*' '}{arrow}{technique}") - - if backdoor: - print( - ( - f"{(len(chain)+1)*' '}{arrow}" - f"{Fore.YELLOW}pwncat{Fore.RESET} backdoor" - ) - ) - - self.reset() - self.do_back([]) - except privesc.PrivescError as exc: - util.error(f"escalation failed: {exc}") - def with_progress( self, title: str, target: Callable[[Callable], None], length: int = None ): From b1f3c54087b2dedd0ae97ab37648af33d0adcc0e Mon Sep 17 00:00:00 2001 From: Caleb Stewart Date: Thu, 14 May 2020 04:01:28 -0400 Subject: [PATCH 2/3] Added upload, download, and help commands. Improved error checking on PtyHandler.open --- pwncat/commands/__init__.py | 11 ++- pwncat/commands/base.py | 84 ++++++++++++++++++++- pwncat/commands/busybox.py | 78 ++++++++++++++++++++ pwncat/commands/download.py | 51 +++++++++++++ pwncat/commands/help.py | 25 +++++++ pwncat/commands/privesc.py | 39 ++-------- pwncat/commands/upload.py | 56 ++++++++++++++ pwncat/pty.py | 143 ++++++++++++++++++++++++++---------- pwncat/util.py | 69 ++++++++++++++++- 9 files changed, 478 insertions(+), 78 deletions(-) create mode 100644 pwncat/commands/busybox.py create mode 100644 pwncat/commands/download.py create mode 100644 pwncat/commands/help.py create mode 100644 pwncat/commands/upload.py diff --git a/pwncat/commands/__init__.py b/pwncat/commands/__init__.py index 163fbaf..04946c9 100644 --- a/pwncat/commands/__init__.py +++ b/pwncat/commands/__init__.py @@ -45,7 +45,9 @@ class CommandParser: if module_name == "base": continue self.commands.append( - loader.find_module(module_name).load_module(module_name).Command(pty) + loader.find_module(module_name) + .load_module(module_name) + .Command(pty, self) ) completer = CommandCompleter(pty, self.commands) @@ -139,6 +141,7 @@ class CommandLexer(RegexLexer): else: # Don't enter param state mode.append((r"\s+" + re.escape(arg), descr[1])) + mode.append((r"\s+(\-\-help|\-h)", Name.Label)) mode.append((r"\"", String, "string")) mode.append((r".", Text)) cls.tokens[command.PROG] = mode @@ -156,8 +159,6 @@ class CommandLexer(RegexLexer): ('"', String, "#pop"), ] - pprint(cls.tokens) - return cls @@ -246,7 +247,9 @@ class CommandCompleter(Completer): for name in name_list: self.layers[command.PROG][2][name] = completer option_names.append(name) - self.layers[command.PROG][0] = WordCompleter(option_names) + self.layers[command.PROG][0] = WordCompleter( + option_names + ["--help", "-h"] + ) self.completer = WordCompleter(list(self.layers)) diff --git a/pwncat/commands/base.py b/pwncat/commands/base.py index d270e10..510fda8 100644 --- a/pwncat/commands/base.py +++ b/pwncat/commands/base.py @@ -1,9 +1,11 @@ #!/usr/bin/env python3 -from typing import Dict, Any +from typing import Dict, Any, List, Callable from pygments.token import * from enum import Enum, auto from functools import partial import argparse +import shlex +import os class Complete(Enum): @@ -17,6 +19,71 @@ class Complete(Enum): NONE = auto() +class StoreConstOnce(argparse.Action): + """ Only allow the user to store a value in the destination once. This prevents + users from selection multiple actions in the privesc parser. """ + + def __call__(self, parser, namespace, values, option_string=None): + if hasattr(self, "__" + self.dest + "_seen"): + raise argparse.ArgumentError(self, "only one action may be specified") + setattr(self, "__" + self.dest + "_seen", True) + setattr(namespace, self.dest, self.const) + + +def StoreForAction(action: List[str]) -> Callable: + """ Generates a custom argparse Action subclass which verifies that the current + selected "action" option is one of the provided actions in this function. If + not, an error is raised. """ + + class StoreFor(argparse.Action): + """ Store the value if the currently selected action matches the list of + actions passed to this function. """ + + def __call__(self, parser, namespace, values, option_string=None): + if getattr(namespace, "action", None) not in action: + raise argparse.ArgumentError( + self, f"{option_string}: only valid for {action}", + ) + + setattr(namespace, self.dest, values) + + return StoreFor + + +def RemoteFileType(file_exist=True, directory_exist=False): + def _type(command: "CommandDefinition", name: str): + """ Ensures that the remote file named exists. This should only be used for + arguments which represent files on the remote system which should be viewable + by the running user (e.g. not helpful for privesc methods). """ + + # Attempt to find the "test" command + test = command.pty.which("test") + if test is None: + test = command.pty.which("[") + + # No test command, this is a nicety, not a necessity. + if test is None: + return name + + # Check if the file exists + if file_exist: + result = command.pty.run(f"{test} -f {shlex.quote(name)} && echo exists") + if b"exists" not in result: + raise argparse.ArgumentTypeError(f"{name}: no such file or directory") + elif directory_exist: + dirpath = os.path.dirname(name) + result = command.pty.run(f"{test} -d {shlex.quote(dirpath)} && echo exists") + if b"exists" not in result: + raise argparse.ArgumentTypeError( + f"{dirpath}: no such file or directory" + ) + + # it exists + return name + + return _type + + def parameter(complete, token=Name.Label, *args, **kwargs): """ Build a parameter tuple from argparse arguments """ return (complete, token, args, kwargs) @@ -42,11 +109,12 @@ class CommandDefinition: # ), # } - def __init__(self, pty: "pwncat.pty.PtyHandler"): + def __init__(self, pty: "pwncat.pty.PtyHandler", cmdparser: "CommandParser"): """ Initialize a new command instance. Parse the local arguments array into an argparse object. """ self.pty = pty + self.cmdparser = cmdparser # Create the parser object self.parser = argparse.ArgumentParser(prog=self.PROG, description=self.__doc__) @@ -63,8 +131,9 @@ class CommandDefinition: for arg, descr in args.items(): names = arg.split(",") + + # Patch choice to work with a callable if "choices" in descr[3] and callable(descr[3]["choices"]): - print("we're doing it", descr[3]["choices"]) method = descr[3]["choices"] class wrapper: @@ -72,6 +141,15 @@ class CommandDefinition: yield from method(self) descr[3]["choices"] = wrapper() + + # Patch "type" so we can see "self" + if ( + "type" in descr[3] + and isinstance(descr[3]["type"], tuple) + and descr[3]["type"][0] == "method" + ): + descr[3]["type"] = partial(descr[3]["type"][1], self) + parser.add_argument(*names, *descr[2], **descr[3]) parser.set_defaults(**self.DEFAULTS) diff --git a/pwncat/commands/busybox.py b/pwncat/commands/busybox.py new file mode 100644 index 0000000..eb37e81 --- /dev/null +++ b/pwncat/commands/busybox.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python3 +from colorama import Fore +from pwncat.commands.base import ( + CommandDefinition, + Complete, + parameter, + StoreConstOnce, + StoreForAction, +) +from pwncat import util + + +class Command(CommandDefinition): + """ Manage installation of a known-good busybox binary on the remote system. + After installing busybox, pwncat will be able to utilize it's functionality + to augment or stabilize local binaries. This command can download a remote + busybox binary appropriate for the remote architecture and then upload it + to the remote system. """ + + PROG = "busybox" + ARGS = { + "--list,-l": parameter( + Complete.NONE, + action=StoreConstOnce, + nargs=0, + const="list", + dest="action", + help="List applets which the remote busybox provides", + ), + "--install,-i": parameter( + Complete.NONE, + action=StoreConstOnce, + nargs=0, + const="install", + dest="action", + help="Install busybox on the remote host for use with pwncat", + ), + "--status,-s": parameter( + Complete.NONE, + action=StoreConstOnce, + nargs=0, + const="status", + dest="action", + help="List the current busybox installation status", + ), + "--url,-u": parameter( + Complete.NONE, + action=StoreForAction(["install"]), + nargs=1, + help="The base URL to download busybox binaries from (default: 1.31.0-defconfig-multiarch-musl)", + default=( + "https://busybox.net/downloads/binaries/1.31.0-defconfig-multiarch-musl/" + ), + ), + } + DEFAULTS = {"action": "status"} + + def run(self, args): + + if args.action == "list": + if not self.pty.has_busybox: + util.error("busybox hasn't been installed yet (hint: run 'busybox'") + return + util.info("binaries which the remote busybox provides:") + for name in self.pty.busybox_provides: + print(f" * {name}") + elif args.action == "status": + if not self.pty.has_busybox: + util.error("busybox hasn't been installed yet") + return + util.info( + f"busybox is installed to: {Fore.BLUE}{self.pty.busybox_path}{Fore.RESET}" + ) + util.info( + f"busybox provides {Fore.GREEN}{len(self.pty.busybox_provides)}{Fore.RESET} applets" + ) + elif args.action == "install": + self.pty.bootstrap_busybox(args.url) diff --git a/pwncat/commands/download.py b/pwncat/commands/download.py new file mode 100644 index 0000000..b2b69f4 --- /dev/null +++ b/pwncat/commands/download.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +from pwncat.commands.base import ( + CommandDefinition, + Complete, + parameter, + StoreConstOnce, + StoreForAction, + RemoteFileType, +) +from functools import partial +from colorama import Fore +from pwncat import util +import argparse +import datetime +import time +import os + + +class Command(CommandDefinition): + """ Download a file from the remote host to the local host""" + + PROG = "download" + ARGS = { + "source": parameter(Complete.REMOTE_FILE), + "destination": parameter(Complete.LOCAL_FILE), + } + + def run(self, args): + + try: + length = self.pty.get_file_size(args.source) + started = time.time() + with open(args.destination, "wb") as destination: + with self.pty.open(args.source, "rb", length=length) as source: + util.with_progress( + [ + ("", "downloading "), + ("fg:ansigreen", args.source), + ("", " to "), + ("fg:ansired", args.destination), + ], + partial(util.copyfileobj, source, destination), + length=length, + ) + elapsed = time.time() - started + util.success( + f"downloaded {Fore.CYAN}{util.human_readable_size(length)}{Fore.RESET} " + f"in {Fore.GREEN}{util.human_readable_delta(elapsed)}{Fore.RESET}" + ) + except (FileNotFoundError, PermissionError, IsADirectoryError) as exc: + self.parser.error(str(exc)) diff --git a/pwncat/commands/help.py b/pwncat/commands/help.py new file mode 100644 index 0000000..e1529f2 --- /dev/null +++ b/pwncat/commands/help.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +from pwncat.commands.base import CommandDefinition, Complete, parameter +from pwncat import util + + +class Command(CommandDefinition): + """ List known commands and print their associated help documentation. """ + + def get_command_names(self): + """ Get the list of known commands """ + return [c.PROG for c in self.cmdparser.commands] + + PROG = "help" + ARGS = {"topic": parameter(Complete.CHOICES, choices=get_command_names, nargs="?")} + + def run(self, args): + if args.topic: + for command in self.cmdparser.commands: + if command.PROG == args.topic: + command.parser.print_help() + break + else: + util.info("the following commands are available:") + for command in self.cmdparser.commands: + print(f" * {command.PROG}") diff --git a/pwncat/commands/privesc.py b/pwncat/commands/privesc.py index d9a25bb..50cf5b1 100644 --- a/pwncat/commands/privesc.py +++ b/pwncat/commands/privesc.py @@ -1,6 +1,12 @@ #!/usr/bin/env python3 from typing import List, Callable -from pwncat.commands.base import CommandDefinition, Complete, parameter +from pwncat.commands.base import ( + CommandDefinition, + Complete, + parameter, + StoreConstOnce, + StoreForAction, +) from pwncat import util, privesc from colorama import Fore import argparse @@ -8,37 +14,6 @@ import shutil import sys -class StoreConstOnce(argparse.Action): - """ Only allow the user to store a value in the destination once. This prevents - users from selection multiple actions in the privesc parser. """ - - def __call__(self, parser, namespace, values, option_string=None): - if hasattr(self, "__" + self.dest + "_seen"): - raise argparse.ArgumentError(self, "only one action may be specified") - setattr(self, "__" + self.dest + "_seen", True) - setattr(namespace, self.dest, self.const) - - -def StoreForAction(action: List[str]) -> Callable: - """ Generates a custom argparse Action subclass which verifies that the current - selected "action" option is one of the provided actions in this function. If - not, an error is raised. """ - - class StoreFor(argparse.Action): - """ Store the value if the currently selected action matches the list of - actions passed to this function. """ - - def __call__(self, parser, namespace, values, option_string=None): - if getattr(namespace, "action", None) not in action: - raise argparse.ArgumentError( - self, f"{option_string}: only valid for {action}", - ) - - setattr(namespace, self.dest, values) - - return StoreFor - - class Command(CommandDefinition): """ Attempt various privilege escalation methods. This command will attempt search for privilege escalation across all known modules. Privilege escalation diff --git a/pwncat/commands/upload.py b/pwncat/commands/upload.py new file mode 100644 index 0000000..5a5d765 --- /dev/null +++ b/pwncat/commands/upload.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 +from pwncat.commands.base import ( + CommandDefinition, + Complete, + parameter, + StoreConstOnce, + StoreForAction, + RemoteFileType, +) +from functools import partial +from colorama import Fore +from pwncat import util +import argparse +import datetime +import time +import os + + +class Command(CommandDefinition): + """ Upload a file from the local host to the remote host""" + + PROG = "upload" + ARGS = { + "source": parameter(Complete.LOCAL_FILE), + "destination": parameter( + Complete.REMOTE_FILE, + type=("method", RemoteFileType(file_exist=False, directory_exist=True)), + ), + } + + def run(self, args): + + try: + length = os.path.getsize(args.source) + started = time.time() + with open(args.source, "rb") as source: + with self.pty.open( + args.destination, "wb", length=length + ) as destination: + util.with_progress( + [ + ("", "uploading "), + ("fg:ansigreen", args.source), + ("", " to "), + ("fg:ansired", args.destination), + ], + partial(util.copyfileobj, source, destination), + length=length, + ) + elapsed = time.time() - started + util.success( + f"uploaded {Fore.CYAN}{util.human_readable_size(length)}{Fore.RESET} " + f"in {Fore.GREEN}{util.human_readable_delta(elapsed)}{Fore.RESET}" + ) + except (FileNotFoundError, PermissionError, IsADirectoryError) as exc: + self.parser.error(str(exc)) diff --git a/pwncat/pty.py b/pwncat/pty.py index 6a514c1..d2079ae 100644 --- a/pwncat/pty.py +++ b/pwncat/pty.py @@ -427,7 +427,7 @@ class PtyHandler: self.flush_output() self.reset() - def bootstrap_busybox(self, url, method): + def bootstrap_busybox(self, url): """ Utilize the architecture we grabbed from `uname -m` to grab a precompiled busybox binary and upload it to the remote machine. This makes uploading/downloading and dependency tracking easier. It also @@ -454,36 +454,33 @@ class PtyHandler: util.warn(f"no busybox for architecture: {self.arch}") return - with ProgressBar(f"downloading busybox for {self.arch}") as pb: - counter = pb(int(r.headers["Content-Length"])) - with tempfile.NamedTemporaryFile("wb", delete=False) as filp: - last_update = time.time() - busybox_local_path = filp.name - for chunk in r.iter_content(chunk_size=1024 * 1024): - filp.write(chunk) - counter.items_completed += len(chunk) - if (time.time() - last_update) > 0.1: - pb.invalidate() - counter.stopped = True - pb.invalidate() - time.sleep(0.1) + # Grab the content length if provided + length = r.headers.get("Content-Length", None) + if length is not None: + length = int(length) # Stage a temporary file for busybox busybox_remote_path = ( self.run("mktemp -t busyboxXXXXX").decode("utf-8").strip() ) - # Upload busybox using the best known method to the remote server - self.do_upload( - ["-m", method, "-o", busybox_remote_path, busybox_local_path] - ) + # Open the remote file for writing + with self.open(busybox_remote_path, "wb", length=length) as filp: + + # Local function for transferring the content + def transfer(on_progress): + for chunk in r.iter_content(chunk_size=1024 * 1024): + filp.write(chunk) + on_progress(len(chunk)) + + # Run the transfer with a progress bar + util.with_progress( + f"uploading busybox for {self.arch}", transfer, length, + ) # Make busybox executable self.run(f"chmod +x {shlex.quote(busybox_remote_path)}") - # Remove local busybox copy - os.unlink(busybox_local_path) - util.success( f"uploaded busybox to {Fore.GREEN}{busybox_remote_path}{Fore.RESET}" ) @@ -504,22 +501,25 @@ class PtyHandler: if stat is not None: util.progress("enumerating remote binary permissions") which_provides = [f"`which {p}`" for p in provides] - permissions = ( - self.run(f"{stat} -c %A {' '.join(which_provides)}") - .decode("utf-8") - .strip() - .split("\n") - ) new_provides = [] - for name, perms in zip(provides, permissions): - if "No such" in perms: - # The remote system doesn't have this binary - continue - if "s" not in perms.lower(): - util.progress(f"keeping {Fore.BLUE}{name}{Fore.RESET} in busybox") - new_provides.append(name) - else: - util.progress(f"pruning {Fore.RED}{name}{Fore.RESET} from busybox") + + with self.subprocess( + f"{stat} -c %A {' '.join(which_provides)}", "r" + ) as pipe: + for name, perms in zip(provides, pipe): + perms = perms.decode("utf-8").strip().lower() + if "no such" in perms: + # The remote system doesn't have this binary + continue + if "s" not in perms: + util.progress( + f"keeping {Fore.BLUE}{name}{Fore.RESET} in busybox" + ) + new_provides.append(name) + else: + util.progress( + f"pruning {Fore.RED}{name}{Fore.RESET} from busybox" + ) util.success(f"pruned {len(provides)-len(new_provides)} setuid entries") provides = new_provides @@ -590,7 +590,7 @@ class PtyHandler: elif name not in self.known_binaries and request: # It hasn't been looked up before, request it. path = self.run(f"which {shlex.quote(name)}").decode("utf-8").strip() - if path == "": + if path == "" or "which: no" in path: path = None if name in self.binary_aliases and path is None: @@ -1100,6 +1100,35 @@ class PtyHandler: with self.open("/tmp/stream_test", "r") as filp: print(filp.read()) + def get_file_size(self, path: str): + """ Get the size of a remote file """ + + stat = self.which("stat") + if stat is None: + return None + + test = self.which("test") + if test is None: + test = self.which("[") + + if test is not None: + result = self.run( + f"{test} -e {shlex.quote(path)} && echo exists;" + f"{test} -r {shlex.quote(path)} && echo readable" + ) + if b"exists" not in result: + raise FileNotFoundError(f"No such file or directory: '{path}'") + if b"readable" not in result: + raise PermissionError(f"Permission denied: '{path}'") + + size = self.run(f"{stat} -c %s {shlex.quote(path)}").decode("utf-8").strip() + try: + size = int(size) + except ValueError: + return None + + return size + def open_read(self, path: str, mode: str): """ Open a remote file for reading """ @@ -1107,6 +1136,20 @@ class PtyHandler: binary_path = None stream = Stream.ANY + test = self.which("test") + if test is None: + test = self.which("[") + + if test is not None: + result = self.run( + f"{test} -e {shlex.quote(path)} && echo exists;" + f"{test} -r {shlex.quote(path)} && echo readable" + ) + if b"exists" not in result: + raise FileNotFoundError(f"No such file or directory: '{path}'") + if b"readable" not in result: + raise PermissionError(f"Permission denied: '{path}'") + # If we want binary transfer, we can't use Stream.PRINT if "b" in mode: stream = stream & ~Stream.PRINT @@ -1150,6 +1193,32 @@ class PtyHandler: method = None stream = Stream.ANY + test = self.which("test") + if test is None: + test = self.which("[") + + # Try to save ourselves... + if test is not None: + result = self.run( + f"{test} -e {shlex.quote(path)} && echo exists;" + f"{test} -d {shlex.quote(path)} && echo directory;" + f"{test} -w {shlex.quote(path)} && echo writable" + ) + if b"directory" in result: + raise IsADirectoryError(f"Is a directory: '{path}'") + if b"exists" in result and not b"writable" in result: + raise PermissionError(f"Permission denied: '{path}'") + if b"exists" not in result: + parent = os.path.dirname(path) + result = self.run( + f"{test} -d {shlex.quote(parent)} && echo exists;" + f"{test} -w {shlex.quote(parent)} && echo writable" + ) + if b"exists" not in result: + raise FileNotFoundError(f"No such file or directory: '{path}'") + if b"writable" not in result: + raise PermissionError(f"Permission denied: '{path}'") + # If we want binary transfer, we can't use Stream.PRINT if "b" in mode: stream = stream & ~Stream.PRINT diff --git a/pwncat/util.py b/pwncat/util.py index 19c99c5..e5301e1 100644 --- a/pwncat/util.py +++ b/pwncat/util.py @@ -2,6 +2,7 @@ from typing import Tuple, BinaryIO, Callable from http.server import BaseHTTPRequestHandler, HTTPServer from socketserver import TCPServer, BaseRequestHandler +from prompt_toolkit.shortcuts import ProgressBar from functools import partial from colorama import Fore, Style from io import TextIOWrapper @@ -13,6 +14,7 @@ import threading import logging import termios import fcntl +import time import tty import sys import os @@ -22,6 +24,37 @@ CTRL_C = b"\x03" ALPHANUMERIC = string.ascii_letters + string.digits +def human_readable_size(size, decimal_places=2): + for unit in ["B", "KiB", "MiB", "GiB", "TiB"]: + if size < 1024.0: + break + size /= 1024.0 + return f"{size:.{decimal_places}f}{unit}" + + +def human_readable_delta(seconds): + """ This produces a human-readable time-delta output suitable for output to + the terminal. It assumes that "seconds" is less than 1 day. I.e. it will only + display at most, hours minutes and seconds. """ + + if seconds < 60: + return f"{seconds:.2f} seconds" + + output = [] + output.append(f"{int(seconds%60)} seconds") + + minutes = seconds // 60 + output.append(f"{minutes % 60} minutes") + + if minutes < 60: + return f"{output[1]} and {output[0]}" + + hours = minutes // 60 + output.append(f"{hours} hours") + + return f"{output[2]}, {output[1]} and {output[0]}" + + def copyfileobj(src, dst, callback, nomv=False): """ Copy a file object to another file object with a callback. This method assumes that both files are binary and support readinto @@ -39,7 +72,7 @@ def copyfileobj(src, dst, callback, nomv=False): for chunk in iter(lambda: src.read(length), b""): dst.write(chunk) copied += len(chunk) - callback(copied, len(chunk)) + callback(len(chunk)) else: with memoryview(bytearray(length)) as mv: while True: @@ -52,7 +85,39 @@ def copyfileobj(src, dst, callback, nomv=False): else: dst.write(mv) copied += n - callback(copied, n) + callback(n) + + +def with_progress(title: str, target: Callable[[Callable], None], length: int = None): + """ A shortcut to displaying a progress bar for various things. It will + start a prompt_toolkit progress bar with the given title and a counter + with the given length. Then, it will call `target` with an `on_progress` + parameter. This parameter should be called for all progress updates. See + the `do_upload` and `do_download` for examples w/ copyfileobj """ + + with ProgressBar(title) as pb: + counter = pb(range(length)) + last_update = time.time() + + def on_progress(blocksz): + """ Update the progress bar """ + if blocksz == -1: + counter.stopped = True + counter.done = True + pb.invalidate() + return + + counter.items_completed += blocksz + if counter.items_completed >= counter.total: + counter.done = True + counter.stopped = True + if (time.time() - last_update) > 0.1: + pb.invalidate() + + target(on_progress) + + # https://github.com/prompt-toolkit/python-prompt-toolkit/issues/964 + time.sleep(0.1) def random_string(length: int = 8): From 45810027d0786b07e8b98f8358c80abe4454b631 Mon Sep 17 00:00:00 2001 From: Caleb Stewart Date: Thu, 14 May 2020 22:18:21 -0400 Subject: [PATCH 3/3] All old commands ported over --- pwncat/__main__.py | 5 +- pwncat/commands/__init__.py | 32 +- pwncat/commands/back.py | 3 +- pwncat/commands/privesc.py | 3 +- pwncat/commands/sync.py | 33 ++ pwncat/pty.py | 582 +++++------------------------------- pwncat/util.py | 14 +- 7 files changed, 162 insertions(+), 510 deletions(-) create mode 100644 pwncat/commands/sync.py diff --git a/pwncat/__main__.py b/pwncat/__main__.py index 0081fdb..d56562c 100644 --- a/pwncat/__main__.py +++ b/pwncat/__main__.py @@ -106,11 +106,12 @@ def main(): sys.stdout.buffer.write(data) sys.stdout.flush() except ConnectionResetError: - handler.restore() + handler.restore_local_term() util.warn("connection reset by remote host") finally: # Restore the shell - handler.restore() + handler.restore_local_term() + util.success("local terminal restored") if __name__ == "__main__": diff --git a/pwncat/commands/__init__.py b/pwncat/commands/__init__.py index 04946c9..3e8ac92 100644 --- a/pwncat/commands/__init__.py +++ b/pwncat/commands/__init__.py @@ -18,6 +18,7 @@ from prompt_toolkit.lexers import PygmentsLexer from prompt_toolkit.document import Document from pygments.styles import get_style_by_name from prompt_toolkit.auto_suggest import AutoSuggestFromHistory +from prompt_toolkit.history import InMemoryHistory from typing import Dict, Any, List, Iterable from enum import Enum, auto import argparse @@ -29,6 +30,7 @@ import re from pprint import pprint from pwncat.commands.base import CommandDefinition, Complete +from pwncat.util import State from pwncat import util @@ -50,6 +52,7 @@ class CommandParser: .Command(pty, self) ) + history = InMemoryHistory() completer = CommandCompleter(pty, self.commands) lexer = PygmentsLexer(CommandLexer.build(self.commands)) style = style_from_pygments_cls(get_style_by_name("monokai")) @@ -66,10 +69,35 @@ class CommandParser: style=style, auto_suggest=auto_suggest, complete_while_typing=False, + history=history, + ) + self.toolbar = PromptSession( + [ + ("fg:ansiyellow bold", "(local) "), + ("fg:ansimagenta bold", "pwncat"), + ("", "$ "), + ], + completer=completer, + lexer=lexer, + style=style, + auto_suggest=auto_suggest, + complete_while_typing=False, + prompt_in_toolbar=True, + history=history, ) self.pty = pty + def run_single(self): + + try: + line = self.toolbar.prompt().strip() + except (EOFError, OSError, KeyboardInterrupt): + pass + else: + if line != "": + self.dispatch_line(line) + def run(self): self.running = True @@ -79,7 +107,7 @@ class CommandParser: try: line = self.prompt.prompt().strip() except (EOFError, OSError): - self.pty.enter_raw() + self.pty.state = State.RAW self.running = False continue @@ -165,7 +193,7 @@ class CommandLexer(RegexLexer): class RemotePathCompleter(Completer): """ Complete remote file names/paths """ - def __init__(self, pty: "PtyHandler"): + def __init__(self, pty: "pwncat.pty.PtyHandler"): self.pty = pty def get_completions(self, document: Document, complete_event: CompleteEvent): diff --git a/pwncat/commands/back.py b/pwncat/commands/back.py index 211eed2..dc48610 100644 --- a/pwncat/commands/back.py +++ b/pwncat/commands/back.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 from pwncat.commands.base import CommandDefinition, Complete, parameter +from pwncat import util class Command(CommandDefinition): @@ -9,4 +10,4 @@ class Command(CommandDefinition): ARGS = {} def run(self, args): - self.pty.enter_raw() + self.pty.state = util.State.RAW diff --git a/pwncat/commands/privesc.py b/pwncat/commands/privesc.py index 50cf5b1..56bc269 100644 --- a/pwncat/commands/privesc.py +++ b/pwncat/commands/privesc.py @@ -8,6 +8,7 @@ from pwncat.commands.base import ( StoreForAction, ) from pwncat import util, privesc +from pwncat.util import State from colorama import Fore import argparse import shutil @@ -174,6 +175,6 @@ class Command(CommandDefinition): ) self.pty.reset() - self.pty.do_back([]) + self.pty.state = State.RAW except privesc.PrivescError as exc: util.error(f"escalation failed: {exc}") diff --git a/pwncat/commands/sync.py b/pwncat/commands/sync.py new file mode 100644 index 0000000..670cea5 --- /dev/null +++ b/pwncat/commands/sync.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 +from pwncat.commands.base import CommandDefinition, Complete, parameter +from pwncat import util +import os + + +class Command(CommandDefinition): + """ Synchronize the remote terminal with the local terminal. This will + attempt to set the remote prompt, terminal width, terminal height, and TERM + environment variables to enable to cleanest interface to the remote system + possible. """ + + PROG = "sync" + ARGS = {} + DEFAULTS = {} + + def run(self, args): + + # Get the terminal type + TERM = os.environ.get("TERM", None) + if TERM is None: + util.warn("no local TERM set. falling back to 'xterm'") + TERM = "xterm" + + # Get the width and height + columns, rows = os.get_terminal_size(0) + + # Update the state + self.pty.run( + f"stty rows {rows};" f"stty columns {columns};" f"export TERM='{TERM}'" + ) + + util.success("terminal state synchronized") diff --git a/pwncat/pty.py b/pwncat/pty.py index d2079ae..26dbc9e 100644 --- a/pwncat/pty.py +++ b/pwncat/pty.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -from typing import Dict, Optional, Iterable, IO, Callable +from typing import Dict, Optional, Iterable, IO, Callable, Any from prompt_toolkit import PromptSession, ANSI from prompt_toolkit.shortcuts import ProgressBar from prompt_toolkit.completion import ( @@ -31,6 +31,7 @@ import os import re import io +from pwncat.util import State from pwncat import util from pwncat import downloader, uploader, privesc from pwncat.file import RemoteBinaryPipe @@ -41,14 +42,6 @@ from pwncat.commands import CommandParser from colorama import Fore -class State(enum.Enum): - """ The current PtyHandler state """ - - NORMAL = enum.auto() - RAW = enum.auto() - COMMAND = enum.auto() - - def with_parser(f): @wraps(f) def _decorator(self, argv): @@ -123,6 +116,7 @@ class CommandCompleter(Completer): # Split document. text = document.text_before_cursor.lstrip() stripped_len = len(document.text_before_cursor) - len(text) + prev_term: Optional[str] # If there is a space, check for the first term, and use a # subcompleter. @@ -191,12 +185,12 @@ class PtyHandler: local terminal if requested and exit raw mode. """ self.client = client - self.state = "normal" + self._state = State.COMMAND self.saved_term_state = None self.input = b"" self.lhost = None - self.known_binaries = {} - self.known_users = {} + self.known_binaries: Dict[str, Optional[str]] = {} + self.known_users: Dict[str, Any] = {} self.vars = {"lhost": util.get_ip_addr()} self.remote_prefix = "\\[\\033[01;31m\\](remote)\\[\\033[00m\\]" self.remote_prompt = ( @@ -205,7 +199,7 @@ class PtyHandler: ) self.prompt = self.build_prompt_session() self.has_busybox = False - self.busybox_path = None + self.busybox_path: Optional[str] = None self.binary_aliases = { "python": [ "python2", @@ -222,9 +216,6 @@ class PtyHandler: self.gtfo: GTFOBins = GTFOBins("data/gtfobins.json", self.which) self.default_privkey = "./data/pwncat" - # Setup the argument parsers for local the local prompt - self.setup_command_parsers() - # We should always get a response within 3 seconds... self.client.settimeout(1) @@ -280,11 +271,6 @@ class PtyHandler: if self.arch == "amd64": self.arch = "x86_64" - # Check if we are on BSD - response = self.run("uname -a").decode("utf-8").strip() - if "bsd" in response.lower(): - self.bootstrap_bsd() - # Ensure history is disabled util.info("disabling remote command history", overlay=True) self.run("unset HISTFILE; export HISTCONTROL=ignorespace") @@ -357,10 +343,6 @@ class PtyHandler: # Disable automatic margins, which fuck up the prompt self.run("tput rmam") - # Synchronize the terminals - util.info("synchronizing terminal state", overlay=True) - self.do_sync([]) - self.privesc = privesc.Finder(self) # Save our terminal state @@ -368,64 +350,11 @@ class PtyHandler: self.command_parser = CommandParser(self) + # Synchronize the terminals + self.command_parser.dispatch_line("sync") + # Force the local TTY to enter raw mode - self.enter_raw() - - def bootstrap_bsd(self): - """ BSD acts differently than linux (since it isn't). While pwncat isn't - specifically dependent on linux, it does depend on the interfaces provided - by standard commands in linux. BSD has diverged. The easiest solution is to - download a BSD version of busybox, which provides all the normal interfaces - we expect. We can't use the normal busybox bootstrap, since it depends on - some of these interfaces. We have to do it manually for BSD. """ - - if self.arch != "x86_64" and self.arch != "i386" or self.which("dd") is None: - util.error(f"unable to support freebsd on {self.arch}.") - util.error( - "upload your own busybox and tell pwncat where to find it w/ the busybox command to enable full functionality" - ) - return - - dd = self.which("dd") - length = os.path.getsize("data/busybox-bsd") - command = f"{dd} of=/tmp/busybox bs=1 count={length}" - - with ProgressBar("uploading busydbox via dd") as pb: - counter = pb(length) - last_update = time.time() - - def on_progress(count, blocksz): - counter.items_completed += blocksz - if (time.time() - last_update) > 0.1: - pb.invalidate() - last_update = time.time() - - with open("data/busybox-bsd", "rb") as filp: - util.copyfileobj(filp, pipe, on_progress) - - counter.done = True - counter.stopped = True - pb.invalidate() - time.sleep(0.1) - - # We now have busybox! - self.run("chmod +x /tmp/busybox") - - util.success("we now have busybox on bsd!") - - # Notify everyone else about busybox - self.has_busybox = True - self.busybox_path = "/tmp/busybox" - self.busybox_provides = ( - open("data/busybox-default-provides", "r").read().strip().split("\n") - ) - - # Start the busybox shell - util.info("starting busybox shell") - self.process("exec /tmp/busybox ash", delim=False) - self.shell = "/tmp/busybox ash" - self.flush_output() - self.reset() + self.state = State.RAW def bootstrap_busybox(self, url): """ Utilize the architecture we grabbed from `uname -m` to grab a @@ -572,7 +501,7 @@ class PtyHandler: complete_while_typing=False, ) - def which(self, name: str, request=True, quote=False) -> str: + def which(self, name: str, request=True, quote=False) -> Optional[str]: """ Call which on the remote host and return the path. The results are cached to decrease the number of remote calls. """ path = None @@ -580,7 +509,7 @@ class PtyHandler: if self.has_busybox: if name in self.busybox_provides: if quote: - return f"{shlex.quote(self.busybox_path)} {name}" + return f"{shlex.quote(str(self.busybox_path))} {name}" else: return f"{self.busybox_path} {name}" @@ -612,339 +541,81 @@ class PtyHandler: r""" Process a new byte of input from stdin. This is to catch "\r~C" and open a local prompt """ - # Send the new data to the client - self.client.send(data) - - # Only process data following a new line - if data == b"\r": - self.input = data - elif len(data) == 0: - return + if self.input == b"": + # Enter commmand mode after C-d + if data == b"\x04": + # Clear line + self.client.send(b"\x15") + # Enter command mode + self.state = State.COMMAND + # C-k is the prefix character + elif data == b"\x0b": + self.input = data + else: + self.client.send(data) else: - self.input += data - - if self.input == b"\r~C": - # Erase the current line on the remote host ("~C") - # This is 2 backspace characters - self.client.send(b"\x08" * 2 + b"\r") - # Start processing local commands - self.enter_command() - elif len(self.input) >= 3: - # Our only escapes are 3 characters (include the newline) + # "C-k c" to enter command mode + if data == b"c": + self.client.send(b"\x15") + self.state = State.SINGLE + elif data == b":": + self.state = State.SINGLE + # "C-k C-k" or "C-k C-d" sends the second byte + elif data == b"\x0b" or data == b"\x04": + self.client.send(data) self.input = b"" def recv(self) -> bytes: """ Recieve data from the client """ return self.client.recv(4096) - def enter_raw(self, save: bool = True): - """ Enter raw mode on the local terminal """ + @property + def state(self) -> State: + return self._state - # Give the user a nice terminal - self.flush_output() - self.client.send(b"\n") - - old_term_state = util.enter_raw_mode() - - self.state = State.RAW - - # Tell the command parser to stop - self.command_parser.running = False - - # Save the state if requested - if save: - self.saved_term_state = old_term_state - - def enter_command(self): - """ Enter commmand mode. This sets normal mode and uses prompt toolkit - process commands from the user for the local machine """ - - # Go back to normal mode - self.restore() - self.state = State.COMMAND - - # Hopefully this fixes weird cursor position issues - sys.stdout.write("\n") - - self.command_parser.run() - - return - - # Process commands - while self.state is State.COMMAND: - try: - try: - line = self.prompt.prompt() - except (EOFError, OSError): - # The user pressed ctrl-d, go back - self.enter_raw() - continue - - if len(line) > 0: - if line[0] == "!": - # Allow running shell commands - subprocess.run(line[1:], shell=True) - continue - elif line[0] == "@": - result = self.run(line[1:]) - sys.stdout.buffer.write(result) - continue - elif line[0] == "-": - self.run(line[1:], wait=False) - continue - - try: - argv = shlex.split(line) - except ValueError as e: - util.error(e.args[0]) - continue - - # Empty command - if len(argv) == 0: - continue - - try: - method = getattr(self, f"do_{argv[0]}") - except AttributeError: - util.warn(f"{argv[0]}: command does not exist") - continue - - # Call the method - method(argv[1:]) - except KeyboardInterrupt as exc: - traceback.print_exc() - continue - - @with_parser - def do_busybox(self, args): - """ Attempt to upload a busybox binary which we can use as a consistent - interface to local functionality """ - - if args.action == "list": - if not self.has_busybox: - util.error("busybox hasn't been installed yet (hint: run 'busybox'") - return - util.info("binaries which the remote busybox provides:") - for name in self.busybox_provides: - print(f" * {name}") - elif args.action == "status": - if not self.has_busybox: - util.error("busybox hasn't been installed yet") - return - util.info( - f"busybox is installed to: {Fore.BLUE}{self.busybox_path}{Fore.RESET}" - ) - util.info( - f"busybox provides {Fore.GREEN}{len(self.busybox_provides)}{Fore.RESET} applets" - ) - elif args.action == "install": - self.bootstrap_busybox(args.url, args.method) - - def with_progress( - self, title: str, target: Callable[[Callable], None], length: int = None - ): - """ A shortcut to displaying a progress bar for various things. It will - start a prompt_toolkit progress bar with the given title and a counter - with the given length. Then, it will call `target` with an `on_progress` - parameter. This parameter should be called for all progress updates. See - the `do_upload` and `do_download` for examples w/ copyfileobj """ - - with ProgressBar(title) as pb: - counter = pb(range(length)) - last_update = time.time() - - def on_progress(copied, blocksz): - """ Update the progress bar """ - if blocksz == -1: - counter.stopped = True - counter.done = True - pb.invalidate() - return - - counter.items_completed += blocksz - if counter.items_completed >= counter.total: - counter.done = True - counter.stopped = True - if (time.time() - last_update) > 0.1: - pb.invalidate() - - target(on_progress) - - # https://github.com/prompt-toolkit/python-prompt-toolkit/issues/964 - time.sleep(0.1) - - @with_parser - def do_download(self, args): - """ Download a file from the remote host """ - - try: - # Locate an appropriate downloader class - DownloaderClass = downloader.find(self, args.method) - except downloader.DownloadError as exc: - util.error(f"{exc}") + @state.setter + def state(self, value: State): + if value == self._state: return - # Grab the arguments - path = args.path - basename = os.path.basename(args.path) - outfile = args.output.format(basename=basename) - - download = DownloaderClass(self, remote_path=path, local_path=outfile) - - # Get the remote file size - size = self.run(f'stat -c "%s" {shlex.quote(path)} 2>/dev/null || echo "none"') - if b"none" in size: - util.error(f"{path}: no such file or directory") + if value == State.RAW: + self.flush_output() + self.client.send(b"\n") + util.success("pwncat is ready 🐈") + self.saved_term_state = util.enter_raw_mode() + self.command_parser.running = False + self._state = value return - size = int(size) + if value == State.COMMAND: + # Go back to normal mode + self.restore_local_term() + self._state = State.COMMAND + # Hopefully this fixes weird cursor position issues + util.success("local terminal restored") + # Setting the state to local command mode does not return until + # command processing is complete. + self.command_parser.run() + return + if value == State.SINGLE: + # Go back to normal mode + self.restore_local_term() + self._state = State.SINGLE + # Hopefully this fixes weird cursor position issues + sys.stdout.write("\n") + # Setting the state to local command mode does not return until + # command processing is complete. + self.command_parser.run_single() - with ProgressBar( - [("#888888", "downloading with "), ("fg:ansiyellow", f"{download.NAME}")] - ) as pb: - counter = pb(range(size)) - last_update = time.time() - - def on_progress(copied, blocksz): - """ Update the progress bar """ - if blocksz == -1: - counter.stopped = True - counter.done = True - pb.invalidate() - return - - counter.items_completed += blocksz - if counter.items_completed >= counter.total: - counter.done = True - counter.stopped = True - if (time.time() - last_update) > 0.1: - pb.invalidate() - - try: - download.serve(on_progress) - if download.command(): - while not counter.done: - time.sleep(0.2) - finally: - download.shutdown() - - # https://github.com/prompt-toolkit/python-prompt-toolkit/issues/964 - time.sleep(0.1) - - @with_parser - def do_upload(self, args): - """ Upload a file to the remote host """ - - if not os.path.isfile(args.path): - util.error(f"{args.path}: no such file or directory") + # Go back to raw mode + self.flush_output() + self.client.send(b"\n") + self.saved_term_state = util.enter_raw_mode() + self._state = State.RAW return - try: - # Locate an appropriate downloader class - UploaderClass = uploader.find(self, args.method) - except uploader.UploadError as exc: - util.error(f"{exc}") - return - - path = args.path - basename = os.path.basename(args.path) - name = basename - outfile = args.output.format(basename=basename) - - upload = UploaderClass(self, remote_path=outfile, local_path=path) - - with ProgressBar( - [("#888888", "uploading via "), ("fg:ansiyellow", f"{upload.NAME}")] - ) as pb: - - counter = pb(range(os.path.getsize(path))) - last_update = time.time() - - def on_progress(copied, blocksz): - """ Update the progress bar """ - counter.items_completed += blocksz - if counter.items_completed >= counter.total: - counter.done = True - counter.stopped = True - if (time.time() - last_update) > 0.1: - pb.invalidate() - - upload.serve(on_progress) - upload.command() - - try: - while not counter.done: - time.sleep(0.1) - except KeyboardInterrupt: - pass - finally: - upload.shutdown() - - # https://github.com/prompt-toolkit/python-prompt-toolkit/issues/964 - time.sleep(0.1) - - def do_sync(self, argv): - """ Synchronize the remote PTY with the local terminal settings """ - - TERM = os.environ.get("TERM", "xterm") - columns, rows = os.get_terminal_size(0) - - self.run(f"stty rows {rows}; stty columns {columns}; export TERM='{TERM}'") - - def do_set(self, argv): - """ Set or view the currently assigned variables """ - - if len(argv) == 0: - util.info("local variables:") - for k, v in self.vars.items(): - print(f" {k} = {shlex.quote(v)}") - - util.info("user passwords:") - for user, data in self.users.items(): - if data["password"] is not None: - print( - f" {Fore.GREEN}{user}{Fore.RESET} -> {Fore.CYAN}{shlex.quote(data['password'])}{Fore.RESET}" - ) - return - - parser = argparse.ArgumentParser(prog="set") - parser.add_argument( - "--password", - "-p", - action="store_true", - help="set the password for the given user", - ) - parser.add_argument("variable", help="the variable name or user") - parser.add_argument("value", help="the new variable/user password value") - - try: - args = parser.parse_args(argv) - except SystemExit: - # The arguments were parsed incorrectly, return. - return - - if args.password is not None and args.variable not in self.users: - util.error(f"{args.variable}: no such user") - elif args.password is not None: - self.users[args.variable]["password"] = args.value - else: - self.vars[args.variable] = args.value - - def do_help(self, argv): - """ View help for local commands """ - - if len(argv) == 0: - commands = [x for x in dir(self) if x.startswith("do_")] - else: - commands = [x for x in dir(self) if x.startswith("do_") and x[3:] in argv] - - for c in commands: - help_msg = getattr(self, c).__doc__ - print(f"{c[3:]:15s}{help_msg}") - - def do_reset(self, argv): - """ Reset the remote terminal (calls sync, reset, and sets PS1) """ - self.reset() - self.do_sync([]) + def restore_local_term(self): + """ Save the local terminal state """ + util.restore_terminal(self.saved_term_state) def run(self, cmd, wait=True, input: bytes = b"") -> bytes: """ Run a command in the context of the remote host and return the @@ -1040,27 +711,27 @@ class PtyHandler: edelim = util.random_string(10) # "_PWNCAT_ENDDELIM_" # List of ";" separated commands that will be run - command = [] + commands: List[str] = [] # Clear the prompt, or it will get displayed in our output due to the # background task - command.append(" export PS1=") + commands.append(" export PS1=") # Needed to disable job control messages in bash - command.append("set +m") + commands.append("set +m") # This is gross, but it allows us to recieve stderr and stdout, while # ignoring the job control start message. if "w" not in mode and not no_job: - command.append( + commands.append( f"{{ echo; echo {sdelim}; {cmd} && echo {edelim} || echo {edelim} & }} 2>/dev/null" ) else: # This is dangerous. We are in raw mode, and if the process never # ends and doesn't provide a way to exit, then we are stuck. - command.append(f"echo; echo {sdelim}; {cmd}; echo {edelim}") + commands.append(f"echo; echo {sdelim}; {cmd}; echo {edelim}") # Re-enable normal job control in bash - command.append("set -m") + commands.append("set -m") # Join them all into one command - command = ";".join(command).encode("utf-8") + command = ";".join(commands).encode("utf-8") # Enter raw mode w/ no echo on the remote terminal # DANGER @@ -1069,7 +740,7 @@ class PtyHandler: self.client.sendall(command + b"\n") - while not self.recvuntil("\n").startswith(sdelim.encode("utf-8")): + while not self.recvuntil(b"\n").startswith(sdelim.encode("utf-8")): continue # Send the data if requested @@ -1380,95 +1051,6 @@ class PtyHandler: return result - def restore(self): - """ Restore the terminal state """ - util.restore_terminal(self.saved_term_state) - self.state = State.NORMAL - - def setup_command_parsers(self): - """ Setup the argparsers for the different local commands """ - - self.upload_parser = argparse.ArgumentParser(prog="upload") - self.upload_parser.add_argument( - "--method", - "-m", - choices=["", *uploader.get_names()], - default=None, - help="set the download method (default: auto)", - ) - self.upload_parser.add_argument( - "--output", - "-o", - default="./{basename}", - help="path to the output file (default: basename of input)", - ) - self.upload_parser.add_argument("path", help="path to the file to upload") - - self.download_parser = argparse.ArgumentParser(prog="download") - self.download_parser.add_argument( - "--method", - "-m", - choices=downloader.get_names(), - default=None, - help="set the download method (default: auto)", - ) - self.download_parser.add_argument( - "--output", - "-o", - default="./{basename}", - help="path to the output file (default: basename of input)", - ) - self.download_parser.add_argument("path", help="path to the file to download") - - self.back_parser = argparse.ArgumentParser(prog="back") - - self.busybox_parser = argparse.ArgumentParser(prog="busybox") - self.busybox_parser.add_argument( - "--method", - "-m", - choices=uploader.get_names(), - default="", - help="set the upload method (default: auto)", - ) - self.busybox_parser.add_argument( - "--url", - "-u", - default=( - "https://busybox.net/downloads/binaries/" - "1.31.0-defconfig-multiarch-musl/" - ), - help=( - "url to download multiarch busybox binaries" - "(default: 1.31.0-defconfig-multiarch-musl)" - ), - ) - group = self.busybox_parser.add_mutually_exclusive_group(required=True) - group.add_argument( - "--install", - "-i", - action="store_const", - dest="action", - const="install", - default="install", - help="install busybox support for pwncat", - ) - group.add_argument( - "--list", - "-l", - action="store_const", - dest="action", - const="list", - help="list all provided applets from the remote busybox", - ) - group.add_argument( - "--status", - "-s", - action="store_const", - dest="action", - const="status", - help="show current pwncat busybox status", - ) - def whoami(self): result = self.run("whoami") return result.strip().decode("utf-8") diff --git a/pwncat/util.py b/pwncat/util.py index e5301e1..142f296 100644 --- a/pwncat/util.py +++ b/pwncat/util.py @@ -6,6 +6,7 @@ from prompt_toolkit.shortcuts import ProgressBar from functools import partial from colorama import Fore, Style from io import TextIOWrapper +from enum import Enum, auto import netifaces import socket import string @@ -24,6 +25,15 @@ CTRL_C = b"\x03" ALPHANUMERIC = string.ascii_letters + string.digits +class State(Enum): + """ The current PtyHandler state """ + + NORMAL = auto() + RAW = auto() + COMMAND = auto() + SINGLE = auto() + + def human_readable_size(size, decimal_places=2): for unit in ["B", "KiB", "MiB", "GiB", "TiB"]: if size < 1024.0: @@ -131,9 +141,6 @@ def enter_raw_mode(): returns: the old state of the terminal """ - info("setting terminal to raw mode and disabling echo", overlay=True) - success("pwncat is ready 🐈\n", overlay=True) - # Ensure we don't have any weird buffering issues sys.stdout.flush() @@ -176,7 +183,6 @@ def restore_terminal(state): # tty.setcbreak(sys.stdin) fcntl.fcntl(sys.stdin, fcntl.F_SETFL, state[1]) sys.stdout.write("\n") - info("local terminal restored") def get_ip_addr() -> str: