mirror of
https://github.com/calebstewart/pwncat.git
synced 2024-11-27 19:04:15 +01:00
Added initial configuration object and config script parsing including on_load hook
This commit is contained in:
parent
9a73d076f0
commit
3fbb4076d1
52
data/pwncatrc
Normal file
52
data/pwncatrc
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
# Set your remote hosts file
|
||||||
|
set lhost 127.0.0.1
|
||||||
|
# Set your command prefix
|
||||||
|
set prefix c-k
|
||||||
|
# Set the default private key to use for privilege escalation
|
||||||
|
set privkey "~/.pwncat/id_rsa"
|
||||||
|
# Set the pwncat backdoor user and password
|
||||||
|
set backdoor_user "pwncat"
|
||||||
|
set backdoor_pass "pwncat"
|
||||||
|
|
||||||
|
# This will fail because we haven't finished loading
|
||||||
|
busybox -i
|
||||||
|
|
||||||
|
set on_load {
|
||||||
|
# This will succeed because `on_load` runs after the session is established.
|
||||||
|
busybox -i
|
||||||
|
}
|
||||||
|
|
||||||
|
# This would run at start after a successful connection
|
||||||
|
# privesc -l
|
||||||
|
|
||||||
|
# map takes a key followed by an action. The key is assumed to be followed after
|
||||||
|
# the prefix, which is set with the "set" command above.
|
||||||
|
|
||||||
|
# # pass the a second "C-k" through to the remote host
|
||||||
|
# map C-k "pass"
|
||||||
|
#
|
||||||
|
# # enter local command mode via a different shortcut (vice C-d)
|
||||||
|
# map c "set state command"
|
||||||
|
#
|
||||||
|
# # synchronize remote and local terminals
|
||||||
|
# map s "sync"
|
||||||
|
#
|
||||||
|
# # open a quick local command entry prompt
|
||||||
|
# map : "set state single"
|
||||||
|
#
|
||||||
|
# # send C-d directly to the remote host if prefixed. By default, C-d toggles back
|
||||||
|
# # and forth between local and remote prompts. This allows you to send C-d to the
|
||||||
|
# # remote host while at the remote prompt
|
||||||
|
# map C-d "pass"
|
||||||
|
#
|
||||||
|
# # Specify a shortcut to a local shell command
|
||||||
|
# shortcut ! "local"
|
||||||
|
#
|
||||||
|
# # Specify a shortuct to a remote shell command
|
||||||
|
# shortcut @ "run"
|
||||||
|
#
|
||||||
|
# # Alias the upload command with "up"
|
||||||
|
# alias up "upload"
|
||||||
|
#
|
||||||
|
# # Alias the download command with "down"
|
||||||
|
# alias down "download"
|
@ -56,6 +56,7 @@ def main():
|
|||||||
help="Method to create a pty on the remote host (default: script)",
|
help="Method to create a pty on the remote host (default: script)",
|
||||||
default="script",
|
default="script",
|
||||||
)
|
)
|
||||||
|
parser.add_argument("--config", "-c", help="Configuration script", default=None)
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
if args.type == "reverse":
|
if args.type == "reverse":
|
||||||
@ -82,7 +83,7 @@ def main():
|
|||||||
util.info(f"connection to {address[0]}:{address[1]} established", overlay=True)
|
util.info(f"connection to {address[0]}:{address[1]} established", overlay=True)
|
||||||
|
|
||||||
# Create a PTY handler to proctor communications with the remote PTY
|
# Create a PTY handler to proctor communications with the remote PTY
|
||||||
handler = PtyHandler(client)
|
handler = PtyHandler(client, args.config)
|
||||||
|
|
||||||
# Setup the selector to wait for data asynchronously from both streams
|
# Setup the selector to wait for data asynchronously from both streams
|
||||||
selector = selectors.DefaultSelector()
|
selector = selectors.DefaultSelector()
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
from typing import TextIO
|
||||||
from prompt_toolkit import PromptSession, ANSI
|
from prompt_toolkit import PromptSession, ANSI
|
||||||
from prompt_toolkit.shortcuts import ProgressBar
|
from prompt_toolkit.shortcuts import ProgressBar
|
||||||
from prompt_toolkit.completion import (
|
from prompt_toolkit.completion import (
|
||||||
@ -34,6 +35,63 @@ from pwncat.util import State
|
|||||||
from pwncat import util
|
from pwncat import util
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_blocks(source: str):
|
||||||
|
""" This is a dumb lexer that turns strings of text with code blocks (squigly
|
||||||
|
braces) into a single long string separated by semicolons. All code blocks are
|
||||||
|
converted to strings recursively with correct escaping levels. The resulting
|
||||||
|
string can be sent to break_commands to iterate over the commands. """
|
||||||
|
|
||||||
|
result = []
|
||||||
|
in_brace = False
|
||||||
|
inside_quotes = False
|
||||||
|
i = 0
|
||||||
|
lineno = 1
|
||||||
|
|
||||||
|
while i < len(source):
|
||||||
|
if not inside_quotes:
|
||||||
|
if source[i] == '"':
|
||||||
|
inside_quotes = True
|
||||||
|
result.append("\\" * int(in_brace) + '"')
|
||||||
|
elif source[i] == "{" and not in_brace:
|
||||||
|
result.append('"')
|
||||||
|
in_brace = True
|
||||||
|
elif source[i] == "}":
|
||||||
|
if not in_brace:
|
||||||
|
raise ValueError(f"line {lineno}: mismatched closing brace")
|
||||||
|
in_brace = False
|
||||||
|
result.append('"')
|
||||||
|
elif source[i] == "\\":
|
||||||
|
result.append("\\" * (int(in_brace)))
|
||||||
|
elif source[i] == "\n" and in_brace:
|
||||||
|
result.append("\\n")
|
||||||
|
elif source[i] == "#":
|
||||||
|
# Comment
|
||||||
|
while i < len(source) and source[i] != "\n":
|
||||||
|
i += 1
|
||||||
|
else:
|
||||||
|
result.append(source[i])
|
||||||
|
else:
|
||||||
|
if source[i] == '"':
|
||||||
|
inside_quotes = False
|
||||||
|
result.append("\\" * int(in_brace) + '"')
|
||||||
|
elif source[i] == "\\":
|
||||||
|
result.append("\\" * (in_brace + 1))
|
||||||
|
elif source[i] == "\n":
|
||||||
|
raise ValueError(f"line {lineno}: newlines cannot appear in strings")
|
||||||
|
else:
|
||||||
|
result.append(source[i])
|
||||||
|
if source[i] == "\n":
|
||||||
|
lineno += 1
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
if in_brace:
|
||||||
|
raise ValueError(f"mismatched braces")
|
||||||
|
if inside_quotes:
|
||||||
|
raise ValueError("missing ending quote")
|
||||||
|
|
||||||
|
return "".join(result).split("\n")
|
||||||
|
|
||||||
|
|
||||||
class CommandParser:
|
class CommandParser:
|
||||||
""" Handles dynamically loading command classes, parsing input, and
|
""" Handles dynamically loading command classes, parsing input, and
|
||||||
dispatching commands. """
|
dispatching commands. """
|
||||||
@ -87,6 +145,33 @@ class CommandParser:
|
|||||||
)
|
)
|
||||||
|
|
||||||
self.pty = pty
|
self.pty = pty
|
||||||
|
self.loading_complete = False
|
||||||
|
|
||||||
|
@property
|
||||||
|
def loaded(self):
|
||||||
|
return self.loading_complete
|
||||||
|
|
||||||
|
@loaded.setter
|
||||||
|
def loaded(self, value: bool):
|
||||||
|
assert value == True
|
||||||
|
self.loading_complete = True
|
||||||
|
self.eval(self.pty.config["on_load"], "on_load")
|
||||||
|
|
||||||
|
def eval(self, source: str, name: str = "<script>"):
|
||||||
|
""" Evaluate the given source file. This will execute the given string
|
||||||
|
as a script of commands. Syntax is the same except that commands may
|
||||||
|
be separated by semicolons, comments are accepted as following a "#" and
|
||||||
|
multiline strings are supported with '"{' and '}"' as delimeters. """
|
||||||
|
|
||||||
|
in_multiline_string = False
|
||||||
|
lineno = 1
|
||||||
|
|
||||||
|
for command in resolve_blocks(source):
|
||||||
|
try:
|
||||||
|
self.dispatch_line(command)
|
||||||
|
except Exception as exc:
|
||||||
|
util.error(f"{name}: command: {str(exc)}")
|
||||||
|
break
|
||||||
|
|
||||||
def run_single(self):
|
def run_single(self):
|
||||||
|
|
||||||
@ -121,6 +206,11 @@ class CommandParser:
|
|||||||
def dispatch_line(self, line: str):
|
def dispatch_line(self, line: str):
|
||||||
""" Parse the given line of command input and dispatch a command """
|
""" Parse the given line of command input and dispatch a command """
|
||||||
|
|
||||||
|
# Account for blank or whitespace only lines
|
||||||
|
line = line.strip()
|
||||||
|
if line == "":
|
||||||
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Spit the line with shell rules
|
# Spit the line with shell rules
|
||||||
argv = shlex.split(line)
|
argv = shlex.split(line)
|
||||||
@ -136,9 +226,17 @@ class CommandParser:
|
|||||||
util.error(f"{argv[0]}: unknown command")
|
util.error(f"{argv[0]}: unknown command")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if not self.loading_complete and not command.LOCAL:
|
||||||
|
util.error(
|
||||||
|
f"{argv[0]}: non-local commands cannot run until after session setup."
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
args = [a.encode("utf-8").decode("unicode_escape") for a in argv[1:]]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Parse the arguments
|
# Parse the arguments
|
||||||
args = command.parser.parse_args(argv[1:])
|
args = command.parser.parse_args(args)
|
||||||
|
|
||||||
# Run the command
|
# Run the command
|
||||||
command.run(args)
|
command.run(args)
|
||||||
|
@ -95,6 +95,7 @@ class CommandDefinition:
|
|||||||
PROG = "unimplemented"
|
PROG = "unimplemented"
|
||||||
ARGS = {}
|
ARGS = {}
|
||||||
DEFAULTS = {}
|
DEFAULTS = {}
|
||||||
|
LOCAL = False
|
||||||
|
|
||||||
# An example definition of arguments
|
# An example definition of arguments
|
||||||
# PROG = "command"
|
# PROG = "command"
|
||||||
|
46
pwncat/commands/set.py
Normal file
46
pwncat/commands/set.py
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
from colorama import Fore
|
||||||
|
from pwncat.commands.base import CommandDefinition, Complete, parameter
|
||||||
|
from pwncat import util
|
||||||
|
|
||||||
|
|
||||||
|
class Command(CommandDefinition):
|
||||||
|
""" Set variable runtime variable parameters for pwncat """
|
||||||
|
|
||||||
|
def get_config_variables(self):
|
||||||
|
return list(self.pty.config.values)
|
||||||
|
|
||||||
|
PROG = "set"
|
||||||
|
ARGS = {
|
||||||
|
"variable": parameter(
|
||||||
|
Complete.CHOICES,
|
||||||
|
nargs="?",
|
||||||
|
choices=get_config_variables,
|
||||||
|
metavar="VARIABLE",
|
||||||
|
help="the variable name to modify",
|
||||||
|
),
|
||||||
|
"value": parameter(
|
||||||
|
Complete.LOCAL_FILE, nargs="?", help="the value for the given variable"
|
||||||
|
),
|
||||||
|
}
|
||||||
|
LOCAL = True
|
||||||
|
|
||||||
|
def run(self, args):
|
||||||
|
if args.variable is not None and args.value is not None:
|
||||||
|
try:
|
||||||
|
self.pty.config[args.variable] = args.value
|
||||||
|
except ValueError as exc:
|
||||||
|
util.error(str(exc))
|
||||||
|
elif args.variable is not None:
|
||||||
|
value = self.pty.config[args.variable]
|
||||||
|
print(
|
||||||
|
f" {Fore.CYAN}{args.variable}{Fore.RESET} = "
|
||||||
|
f"{Fore.YELLOW}{repr(value)}{Fore.RESET}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
for name in self.pty.config:
|
||||||
|
value = self.pty.config[name]
|
||||||
|
print(
|
||||||
|
f" {Fore.CYAN}{name}{Fore.RESET} = "
|
||||||
|
f"{Fore.YELLOW}{repr(value)}{Fore.RESET}"
|
||||||
|
)
|
75
pwncat/config.py
Normal file
75
pwncat/config.py
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
from prompt_toolkit.input.ansi_escape_sequences import REVERSE_ANSI_SEQUENCES
|
||||||
|
from prompt_toolkit.keys import ALL_KEYS, Keys
|
||||||
|
from typing import Any, Dict, List
|
||||||
|
import commentjson as json
|
||||||
|
import ipaddress
|
||||||
|
import re
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
def key_type(value: str) -> bytes:
|
||||||
|
""" Converts a key name to a ansi keycode. The value can either be a single
|
||||||
|
printable character or a named key from prompt_toolkit Keys """
|
||||||
|
if len(value) == 1:
|
||||||
|
return value.encode("utf-8")
|
||||||
|
if value not in ALL_KEYS:
|
||||||
|
raise ValueError(f"invalid key: {value}")
|
||||||
|
key = [key for key in Keys if key.value == value][0]
|
||||||
|
return REVERSE_ANSI_SEQUENCES[key].encode("utf-8")
|
||||||
|
|
||||||
|
|
||||||
|
class KeyType:
|
||||||
|
def __init__(self, name: str):
|
||||||
|
if len(name) == 1:
|
||||||
|
self.name = name
|
||||||
|
self.value = name.encode("utf-8")
|
||||||
|
else:
|
||||||
|
if name not in ALL_KEYS:
|
||||||
|
raise ValueError(f"{name}: invalid key")
|
||||||
|
key = [key for key in Keys if key.value == name][0]
|
||||||
|
self.name = name
|
||||||
|
self.value = REVERSE_ANSI_SEQUENCES[key].encode("utf-8")
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"Key(name={repr(self.name)})"
|
||||||
|
|
||||||
|
def __bytes__(self):
|
||||||
|
return self.value
|
||||||
|
|
||||||
|
|
||||||
|
def local_file_type(value: str) -> str:
|
||||||
|
""" Ensure the local file exists """
|
||||||
|
if not os.path.isfile(value):
|
||||||
|
raise ValueError(f"{value}: no such file or directory")
|
||||||
|
return value
|
||||||
|
|
||||||
|
|
||||||
|
class Config:
|
||||||
|
def __init__(self, pty: "pwncat.pty.PtyHandler"):
|
||||||
|
|
||||||
|
# Basic key-value store w/ typing
|
||||||
|
self.values: Dict[str, Dict[str, Any]] = {
|
||||||
|
"lhost": {"value": None, "type": ipaddress.ip_address},
|
||||||
|
"prefix": {"value": "C-k", "type": KeyType},
|
||||||
|
"privkey": {"value": "data/pwncat", "type": local_file_type},
|
||||||
|
"backdoor_user": {"value": "pwncat", "type": str},
|
||||||
|
"backdoor_pass": {"value": "pwncat", "type": str},
|
||||||
|
"on_load": {"value": "", "type": str},
|
||||||
|
}
|
||||||
|
|
||||||
|
# Map ascii escape sequences or printable bytes to lists of commands to
|
||||||
|
# run.
|
||||||
|
self.bindings: Dict[bytes, str] = {}
|
||||||
|
|
||||||
|
def __getitem__(self, name: str) -> Any:
|
||||||
|
""" Get a configuration item """
|
||||||
|
return self.values[name]["value"]
|
||||||
|
|
||||||
|
def __setitem__(self, name: str, value: Any):
|
||||||
|
""" Set a configuration item """
|
||||||
|
item = self.values[name]
|
||||||
|
item["value"] = item["type"](value)
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
yield from self.values
|
@ -38,6 +38,7 @@ from pwncat.file import RemoteBinaryPipe
|
|||||||
from pwncat.lexer import LocalCommandLexer, PwncatStyle
|
from pwncat.lexer import LocalCommandLexer, PwncatStyle
|
||||||
from pwncat.gtfobins import GTFOBins, Capability, Stream
|
from pwncat.gtfobins import GTFOBins, Capability, Stream
|
||||||
from pwncat.commands import CommandParser
|
from pwncat.commands import CommandParser
|
||||||
|
from pwncat.config import Config
|
||||||
|
|
||||||
from colorama import Fore
|
from colorama import Fore
|
||||||
|
|
||||||
@ -179,11 +180,12 @@ class PtyHandler:
|
|||||||
"script",
|
"script",
|
||||||
]
|
]
|
||||||
|
|
||||||
def __init__(self, client: socket.SocketType, has_pty: bool = False):
|
def __init__(self, client: socket.SocketType, config_path: str):
|
||||||
""" Initialize a new Pty Handler. This will handle creating the PTY and
|
""" Initialize a new Pty Handler. This will handle creating the PTY and
|
||||||
setting the local terminal to raw. It also maintains the state to open a
|
setting the local terminal to raw. It also maintains the state to open a
|
||||||
local terminal if requested and exit raw mode. """
|
local terminal if requested and exit raw mode. """
|
||||||
|
|
||||||
|
self.config = Config(self)
|
||||||
self.client = client
|
self.client = client
|
||||||
self._state = State.COMMAND
|
self._state = State.COMMAND
|
||||||
self.saved_term_state = None
|
self.saved_term_state = None
|
||||||
@ -212,9 +214,14 @@ class PtyHandler:
|
|||||||
"sh": ["bash", "zsh", "dash"],
|
"sh": ["bash", "zsh", "dash"],
|
||||||
"nc": ["netcat", "ncat"],
|
"nc": ["netcat", "ncat"],
|
||||||
}
|
}
|
||||||
self.has_pty = has_pty
|
|
||||||
self.gtfo: GTFOBins = GTFOBins("data/gtfobins.json", self.which)
|
self.gtfo: GTFOBins = GTFOBins("data/gtfobins.json", self.which)
|
||||||
self.default_privkey = "./data/pwncat"
|
self.default_privkey = "./data/pwncat"
|
||||||
|
self.command_parser = CommandParser(self)
|
||||||
|
|
||||||
|
# Run the configuration script
|
||||||
|
with open(config_path, "r") as filp:
|
||||||
|
config_script = filp.read()
|
||||||
|
self.command_parser.eval(config_script, config_path)
|
||||||
|
|
||||||
# We should always get a response within 3 seconds...
|
# We should always get a response within 3 seconds...
|
||||||
self.client.settimeout(1)
|
self.client.settimeout(1)
|
||||||
@ -348,7 +355,8 @@ class PtyHandler:
|
|||||||
# Save our terminal state
|
# Save our terminal state
|
||||||
self.stty_saved = self.run("stty -g").decode("utf-8").strip()
|
self.stty_saved = self.run("stty -g").decode("utf-8").strip()
|
||||||
|
|
||||||
self.command_parser = CommandParser(self)
|
# The session is fully setup now
|
||||||
|
self.command_parser.loaded = True
|
||||||
|
|
||||||
# Synchronize the terminals
|
# Synchronize the terminals
|
||||||
self.command_parser.dispatch_line("sync")
|
self.command_parser.dispatch_line("sync")
|
||||||
|
Loading…
Reference in New Issue
Block a user