GP-5972: post-review

GP-5792: minimize calls while running
GP-5972: better tx refs
GP-5972: fixes for typing errors
GP-5972: temp mods
GP-5972: vscode mods
GP-5972: post-review
GP-5972: post-review
GP-5972: updates docs
GP-5972: support for 32-bit
GP-5972: mods post-PR
GP-5972: better error handling
GP-5972: ss-only
GP-5972: help
GP-5972: help
GP-5972: mostly functional tests
GP-5972: lame hook tests
GP-5972: methods pass
GP-5972: first pass - works a little
GP5972: some functionalityGP-5972: mem+GP-5921: basicsGP-5921: pc, spGP-5972: regsGP-5972: regs/mem/modsGP-5972: ghidraxdbgGP-5972: _base -> clientGP-5972: mods/memGP-5972: availableGP-5972: bptsGP-5972: -pybagGP-5972: initial stateGP-5972: misc fixesGP-5972: bptsGP-5972: various methods/slightly better stateGP-5972: del bptsGP-5972: more bptsGP-5972: better attachGP-5972: better launchGP-5972: tests round 0GP-5972: cmd tests - setsGP-5972: cmd tests passGP-5972: methodsGP-5972: methods exc 3
GP-5972: x64dbg init
This commit is contained in:
d-millar
2025-11-12 12:18:40 -05:00
parent 38043db812
commit e28c2ead26
26 changed files with 6854 additions and 0 deletions
@@ -0,0 +1 @@
@@ -0,0 +1 @@
# Debugger-agent-x64dbg
@@ -0,0 +1,33 @@
/* ###
* IP: GHIDRA
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Not technically a Java project, but required to be a Help project
apply from: "${rootProject.projectDir}/gradle/javaProject.gradle"
apply from: "${rootProject.projectDir}/gradle/helpProject.gradle"
apply from: "$rootProject.projectDir/gradle/distributableGhidraModule.gradle"
apply from: "$rootProject.projectDir/gradle/nativeProject.gradle"
apply from: "$rootProject.projectDir/gradle/hasPythonPackage.gradle"
apply plugin: 'eclipse'
eclipse.project.name = 'Debug Debugger-agent-x64dbg'
dependencies {
// Only for Help :/
api project(':Debugger-rmi-trace')
}
tasks.assemblePyPackage {
}
@@ -0,0 +1,13 @@
##VERSION: 2.0
##MODULE IP: Apache License 2.0
##MODULE IP: MIT
Module.manifest||GHIDRA||||END|
README.md||GHIDRA||||END|
src/main/help/help/TOC_Source.xml||GHIDRA||||END|
src/main/help/help/topics/x64dbg/x64dbg.html||GHIDRA||||END|
src/main/py/LICENSE||GHIDRA||||END|
src/main/py/MANIFEST.in||GHIDRA||||END|
src/main/py/README.md||GHIDRA||||END|
src/main/py/pyproject.toml||GHIDRA||||END|
src/main/py/src/ghidraxdbg/py.typed||GHIDRA||||END|
src/main/py/src/ghidraxdbg/schema.xml||GHIDRA||||END|
@@ -0,0 +1,34 @@
:: ###
:: IP: GHIDRA
::
:: Licensed under the Apache License, Version 2.0 (the "License");
:: you may not use this file except in compliance with the License.
:: You may obtain a copy of the License at
::
:: http://www.apache.org/licenses/LICENSE-2.0
::
:: Unless required by applicable law or agreed to in writing, software
:: distributed under the License is distributed on an "AS IS" BASIS,
:: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
:: See the License for the specific language governing permissions and
:: limitations under the License.
:: ##
::@title x64dbg attach
::@desc <html><body width="300px">
::@desc <h3>Attach with <tt>x64dbg</tt> (in a Python interpreter)</h3>
::@desc <p>
::@desc This will attach to a running target on the local machine using <tt>x64dbg.dll</tt>.
::@desc For setup instructions, press <b>F1</b>.
::@desc </p>
::@desc </body></html>
::@menu-group x64dbg
::@icon icon.debugger
::@help x64dbg#attach
::@depends Debugger-rmi-trace
::@env OPT_PYTHON_EXE:file!="python" "Python command" "The path to the Python 3 interpreter. Omit the full path to resolve using the system PATH."
::@env OPT_TARGET_PID:int=0 "Process id" "The target process id"
::@env OPT_X64DBG_EXE:file="C:\\Software\\release\\x64\\x64dbg.exe" "Path to x64dbg.exe" "Path to x64dbg.exe (or equivalent)."
@echo off
"%OPT_PYTHON_EXE%" -i ..\support\local-x64dbg-attach.py
@@ -0,0 +1,38 @@
:: ###
:: IP: GHIDRA
::
:: Licensed under the Apache License, Version 2.0 (the "License");
:: you may not use this file except in compliance with the License.
:: You may obtain a copy of the License at
::
:: http://www.apache.org/licenses/LICENSE-2.0
::
:: Unless required by applicable law or agreed to in writing, software
:: distributed under the License is distributed on an "AS IS" BASIS,
:: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
:: See the License for the specific language governing permissions and
:: limitations under the License.
:: ##
::@title x64dbg
::@image-opt env:OPT_TARGET_IMG
::@desc <html><body width="300px">
::@desc <h3>Launch with <tt>x64dbg</tt> (in a Python interpreter)</h3>
::@desc <p>
::@desc This will launch the target on the local machine using <tt>x64dbg.dll</tt>.
::@desc For setup instructions, press <b>F1</b>.
::@desc </p>
::@desc </body></html>
::@menu-group x64dbg
::@icon icon.debugger
::@help x64dbg#local
::@depends Debugger-rmi-trace
::@env OPT_PYTHON_EXE:file!="python" "Python command" "The path to the Python 3 interpreter. Omit the full path to resolve using the system PATH."
:: Use env instead of args, because "all args except first" is terrible to implement in batch
::@env OPT_TARGET_IMG:file="" "Image" "The target binary executable image"
::@env OPT_TARGET_ARGS:str="" "Arguments" "Command-line arguments to pass to the target"
::@env OPT_TARGET_DIR:str="" "Dir" "Initial directory"
::@env OPT_X64DBG_EXE:file="C:\\Software\\release\\x64\\x64dbg.exe" "Path to x64dbg.exe" "Path to x64dbg.exe (or equivalent)."
@echo off
"%OPT_PYTHON_EXE%" -i ..\support\local-x64dbg.py
@@ -0,0 +1,62 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
import os
import sys
def append_paths():
sys.path.append(
f"{os.getenv('MODULE_Debugger_rmi_trace_HOME')}/data/support")
from gmodutils import ghidra_module_pypath
sys.path.append(ghidra_module_pypath("Debugger-rmi-trace"))
sys.path.append(ghidra_module_pypath())
def main():
append_paths()
# Delay these imports until sys.path is patched
from ghidraxdbg import commands as cmd
from ghidraxdbg.hooks import on_state_changed
from ghidraxdbg.util import dbg
# So that the user can re-enter by typing repl()
global repl
repl = cmd.repl
cmd.ghidra_trace_connect(os.getenv('GHIDRA_TRACE_RMI_ADDR'))
cmd.ghidra_trace_attach(os.getenv('OPT_TARGET_PID'), start_trace=False)
try:
dbg.wait()
except KeyboardInterrupt as ki:
dbg.interrupt()
cmd.ghidra_trace_start(os.getenv('OPT_TARGET_PID'))
cmd.ghidra_trace_sync_enable()
cmd.ghidra_trace_txstart()
cmd.ghidra_trace_put_all()
cmd.repl()
if __name__ == '__main__':
try:
main()
except SystemExit as x:
if x.code != 0:
print(f"Exited with code {x.code}")
@@ -0,0 +1,69 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
import os
import sys
def append_paths():
sys.path.append(
f"{os.getenv('MODULE_Debugger_rmi_trace_HOME')}/data/support")
from gmodutils import ghidra_module_pypath
sys.path.append(ghidra_module_pypath("Debugger-rmi-trace"))
sys.path.append(ghidra_module_pypath())
def main():
append_paths()
# Delay these imports until sys.path is patched
from ghidraxdbg import commands as cmd
from ghidraxdbg.hooks import on_state_changed
from ghidraxdbg.util import dbg
# So that the user can re-enter by typing repl()
global repl
repl = cmd.repl
cmd.ghidra_trace_connect(os.getenv('GHIDRA_TRACE_RMI_ADDR'))
args = os.getenv('OPT_TARGET_ARGS')
initdir = os.getenv('OPT_TARGET_DIR')
target = os.getenv('OPT_TARGET_IMG')
cmd.ghidra_trace_create(target, args=args, initdir=initdir, start_trace=False)
try:
dbg.wait()
except KeyboardInterrupt as ki:
dbg.interrupt()
cmd.ghidra_trace_start(target)
cmd.ghidra_trace_sync_enable()
cmd.ghidra_trace_txstart()
if target is None or target == "":
cmd.ghidra_trace_put_available()
else:
cmd.ghidra_trace_put_all()
cmd.repl()
if __name__ == '__main__':
try:
main()
except SystemExit as x:
if x.code != 0:
print(f"Exited with code {x.code}")
@@ -0,0 +1,16 @@
<?xml version='1.0' encoding='ISO-8859-1'?>
<!-- See Base's TOC_Source.xml for help -->
<tocroot>
<tocref id="TraceRmiLauncherServicePlugin">
<tocdef id="x64dbg" text="x64dbg integration via x64dbg_automate"
target="help/topics/x64dbg/x64dbg.html">
<tocdef id="x64dbg_local" text="Local"
target="help/topics/x64dbg/x64dbg.html#local" />
<tocdef id="x64dbg_attach" text="Attach"
target="help/topics/x64dbg/x64dbg.html#attach" />
</tocdef>
</tocref>
</tocroot>
@@ -0,0 +1,102 @@
<!DOCTYPE doctype PUBLIC "-//W3C//DTD HTML 4.0 Frameset//EN">
<HTML>
<HEAD>
<META name="generator" content=
"HTML Tidy for Java (vers. 2009-12-01), see jtidy.sourceforge.net">
<TITLE>Debugger Launchers: x64dbg Debugger</TITLE>
<META http-equiv="Content-Type" content="text/html; charset=windows-1252">
<LINK rel="stylesheet" type="text/css" href="help/shared/DefaultStyle.css">
</HEAD>
<BODY lang="EN-US">
<H1>Debugger Launchers: x64dbg Debugger</H1>
<P>Integration with <B><TT>x64dbg</TT></B> is achieved using the Python 3
API <B><TT>x64dbg-automate-pyclient</TT></B> and underlying plugin <B><TT>x64dbg-automate</TT></B>, kindly provided by Darius Houle
(see https://github.com/dariushoule/x64dbg-automate & x64dbg-automate-pyclient). The console
debugger launches a full <B><TT>x64dbg</TT></B> session by default, synchronized with the
Ghidra debugger UI.</P>
<P>Two launchers are included out of the box, one for a local process and one for a local pid:</P>
<H2><A name="local"></A>Local</H2>
<P>The plain "<TT>local-x64dbg</TT>" launches the current program as a user-mode process
on the local system. If there is no current program, the user may specify the <B>Image</B> option
explicitly or launch x64dbg without a target.</P>
<H3><A name="setup"></A>Setup</H3>
<P>Make sure you have installed the executables for <B><TT>x64dbg-automate</TT></B> (typically the contents
of x64dbg/build[32|64]/Release) in the plugins directory for <B><TT>x64dbg</TT></B> (release/x[32|64]/plugins).</P>
<P>If you have access to PyPI, setting up your Python 3 environment is done using Pip. (Please
note the version specifier for Protobuf.)</P>
<UL style="list-style-type: none">
<LI>
<PRE>
python3 -m pip install x64bag_automate protobuf
</PRE>
</LI>
</UL>
<P>If you are offline, or would like to use our provided packages, we still use Pip, but with a
more complicated invocation:</P>
<UL style="list-style-type: none">
<LI>
<PRE>
cd C:\path\to\ghidra_<EM>
version</EM>\Ghidra\Debug
python3 -m pip install --no-index -f Debugger-rmi-trace\pypkg\dist -f Debugger-agent-x64dbg\pypkg\dist x64dbg_automate protobuf
</PRE>
</LI>
</UL>
<H3>Options</H3>
<UL>
<LI><B><TT>python</TT> command</B>: This is the command or path to the Python interpreter. It
must be version 3. Python 2 is not supported.</LI>
<LI><B>Image</B>: This is the path to the target binary image (EXE file). Ghidra will try to
fill this in based on information gathered when the current program was imported. If the file
exists and is executable on the local machine, it will be filled in automatically. Otherwise,
it is up to you to locate it. <B>NOTE:</B> If you have patched the current program database,
these changes are <EM>not</EM> applied to the target. You can either 1) apply the same
patches to the target once it is running, or 2) export a patched copy of your image and
direct this launcher to run it.</LI>
<LI><B>Arguments</B>: These are the command-line arguments to pass into the target process.</LI>
<LI><B>Dir</B>: The initial directory for the target process.</LI>
<LI><B>Path to <TT>x64dbg.exe</TT></B>: where the x64dbg executable resides (or the x32dbg executable
for 32-bit programs).</LI>
</UL>
<P>Once running, you are presented with a command-line interface in Ghidra's Terminal. This CLI
accepts your usual x64dbg native commands. You can escape from this CLI and enter a Python 3 REPL
by entering "<TT>.exit</TT>". This is not an actual x64dbg command, but our implementation
understands this to mean exit the x64dbg REPL. From the Python 3 REPL, you can access the
underlying Python-based API <TT>x64dbg_automate</TT>. This is an uncommon need, but may be useful for
diagnostics and/or workarounds. To re-enter the x64dbg REPL, enter "<TT>repl()</TT>".
Alternatively, if you are trying to quit, but typed "<TT>.exit</TT>", just type
"<TT>quit()</TT>" to terminate the session.</P>
<H2><A name="attach"></A>Attach</H2>
<P>This launcher allows the user to attach to a local running process. Options are the same as
those for the base x64dbg, except <B>Process Id</B> replaces <B>Image</B>.</P>
<H3>Options</H3>
<UL>
<LI><B>ProcessId</B>: The pid of the process you wish to attach to.</LI>
</BODY>
</HTML>
@@ -0,0 +1,11 @@
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
@@ -0,0 +1 @@
graft src
@@ -0,0 +1,3 @@
# Ghidra Trace RMI
Package for connecting x64dbg to Ghidra via Trace RMI.
@@ -0,0 +1,32 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
[project]
name = "ghidraxdbg"
version = "12.0"
authors = [
{ name="Ghidra Development Team" },
]
description = "Ghidra's Plugin for x64dbg"
readme = "README.md"
requires-python = ">=3.9"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: Apache Software License",
"Operating System :: OS Independent",
]
dependencies = [
"ghidratrace==12.0",
"x64dbg_automate>=0.5.0"
]
[project.urls]
"Homepage" = "https://github.com/NationalSecurityAgency/ghidra"
"Bug Tracker" = "https://github.com/NationalSecurityAgency/ghidra/issues"
[tool.setuptools.package-data]
ghidradbg = ["*.tlb", "py.typed"]
[tool.setuptools]
include-package-data = true
@@ -0,0 +1,17 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
from . import util, commands, methods, hooks
@@ -0,0 +1,236 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
from typing import Dict, List, Optional, Tuple
from ghidratrace.client import Address, RegVal
from . import util
language_map: Dict[str, List[str]] = {
'x86_32': ['x86:LE:32:default'],
'x86_64': ['x86:LE:64:default']
}
data64_compiler_map: Dict[Optional[str], str] = {
None: 'pointer64',
}
x86_compiler_map: Dict[Optional[str], str] = {
'windows': 'windows',
'Cygwin': 'windows',
'default': 'windows',
}
default_compiler_map: Dict[Optional[str], str] = {
'windows': 'default',
}
windows_compiler_map: Dict[Optional[str], str] = {
'windows': 'windows',
}
compiler_map : Dict[str, Dict[Optional[str], str]]= {
'DATA:BE:64:default': data64_compiler_map,
'DATA:LE:64:default': data64_compiler_map,
'x86:LE:32:default': x86_compiler_map,
'x86:LE:64:default': x86_compiler_map
}
def get_arch() -> str:
try:
type = str(util.dbg.get_actual_processor_type())
except Exception as e:
print(f"Error getting actual processor type: {e}")
return "Unknown"
if type == "32":
return "x86_32"
if type == "64":
return "x86_64"
if type == None:
return "x86_64"
return "Unknown"
def get_endian() -> str:
parm = util.get_convenience_variable('endian')
if parm != 'auto':
return parm
return 'little'
def get_osabi() -> str:
parm = util.get_convenience_variable('osabi')
if not parm in ['auto', 'default']:
return parm
try:
os = "Windows" #util.dbg.cmd("vertarget")
if "Windows" not in os:
return "default"
except Exception:
print("Error getting target OS/ABI")
pass
return "windows"
def compute_ghidra_language() -> str:
# First, check if the parameter is set
lang = util.get_convenience_variable('ghidra-language')
if lang != 'auto':
return lang
# Get the list of possible languages for the arch. We'll need to sift
# through them by endian and probably prefer default/simpler variants. The
# heuristic for "simpler" will be 'default' then shortest variant id.
arch = get_arch()
endian = get_endian()
lebe = ':BE:' if endian == 'big' else ':LE:'
if not arch in language_map:
return 'DATA' + lebe + '64:default'
langs = language_map[arch]
matched_endian = sorted(
(l for l in langs if lebe in l),
key=lambda l: 0 if l.endswith(':default') else len(l)
)
if len(matched_endian) > 0:
return matched_endian[0]
# NOTE: I'm disinclined to fall back to a language match with wrong endian.
return 'DATA' + lebe + '64:default'
def compute_ghidra_compiler(lang: str) -> str:
# First, check if the parameter is set
comp = util.get_convenience_variable('ghidra-compiler')
if comp != 'auto':
return comp
# Check if the selected lang has specific compiler recommendations
if not lang in compiler_map:
print(f"{lang} not found in compiler map")
return 'default'
comp_map = compiler_map[lang]
if comp_map == data64_compiler_map:
print(f"Using the DATA64 compiler map")
osabi = get_osabi()
if osabi in comp_map:
return comp_map[osabi]
if None in comp_map:
return comp_map[None]
print(f"{osabi} not found in compiler map")
return 'default'
def compute_ghidra_lcsp() -> Tuple[str, str]:
lang = compute_ghidra_language()
comp = compute_ghidra_compiler(lang)
return lang, comp
class DefaultMemoryMapper(object):
def __init__(self, defaultSpace: str) -> None:
self.defaultSpace = defaultSpace
def map(self, proc: int, offset: int) -> Tuple[str, Address]:
space = self.defaultSpace
return self.defaultSpace, Address(space, offset)
def map_back(self, proc: int, address: Address) -> int:
if address.space == self.defaultSpace:
return address.offset
raise ValueError(f"Address {address} is not in process {proc}")
DEFAULT_MEMORY_MAPPER = DefaultMemoryMapper('ram')
memory_mappers: Dict[str, DefaultMemoryMapper] = {}
def compute_memory_mapper(lang: str) -> DefaultMemoryMapper:
if not lang in memory_mappers:
return DEFAULT_MEMORY_MAPPER
return memory_mappers[lang]
class DefaultRegisterMapper(object):
def __init__(self, byte_order: str) -> None:
if not byte_order in ['big', 'little']:
raise ValueError("Invalid byte_order: {}".format(byte_order))
self.byte_order = byte_order
def map_name(self, proc: int, name: str):
return name
def map_value(self, proc: int, name: str, value: int):
try:
# TODO: this seems half-baked
av = value.to_bytes(8, "big")
except Exception:
raise ValueError("Cannot convert {}'s value: '{}', type: '{}'"
.format(name, value, type(value)))
return RegVal(self.map_name(proc, name), av)
def map_name_back(self, proc: int, name: str) -> str:
return name
def map_value_back(self, proc: int, name: str, value: bytes):
return RegVal(self.map_name_back(proc, name), value)
class Intel_x86_64_RegisterMapper(DefaultRegisterMapper):
def __init__(self):
super().__init__('little')
def map_name(self, proc, name):
if name is None:
return 'UNKNOWN'
if name == 'efl':
return 'rflags'
if name.startswith('zmm'):
# Ghidra only goes up to ymm, right now
return 'ymm' + name[3:]
return super().map_name(proc, name)
def map_value(self, proc, name, value):
rv = super().map_value(proc, name, value)
if rv.name.startswith('ymm') and len(rv.value) > 32:
return RegVal(rv.name, rv.value[-32:])
return rv
def map_name_back(self, proc, name):
if name == 'rflags':
return 'eflags'
DEFAULT_BE_REGISTER_MAPPER = DefaultRegisterMapper('big')
DEFAULT_LE_REGISTER_MAPPER = DefaultRegisterMapper('little')
register_mappers = {
'x86:LE:632:default': DEFAULT_LE_REGISTER_MAPPER,
'x86:LE:64:default': Intel_x86_64_RegisterMapper()
}
def compute_register_mapper(lang: str)-> DefaultRegisterMapper:
if not lang in register_mappers:
if ':BE:' in lang:
return DEFAULT_BE_REGISTER_MAPPER
if ':LE:' in lang:
return DEFAULT_LE_REGISTER_MAPPER
return register_mappers[lang]
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,417 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
from bisect import bisect_left, bisect_right
from dataclasses import dataclass, field
import functools
import sys
import threading
import time
import traceback
from typing import Any, Callable, Collection, Dict, Optional, TypeVar, cast
from ghidratrace.client import Schedule
from x64dbg_automate.events import EventType
from x64dbg_automate.models import BreakpointType
from . import commands, util
ALL_EVENTS = 0xFFFF
@dataclass(frozen=False)
class HookState:
installed = False
mem_catchpoint = None
@dataclass(frozen=False)
class ProcessState:
first = True
# For things we can detect changes to between stops
regions = False
modules = False
threads = False
breaks = True
watches = False
# For frames and threads that have already been synced since last stop
visited: set[Any] = field(default_factory=set)
waiting = False
def record(self, description: Optional[str] = None,
time: Optional[Schedule] = None) -> None:
first = self.first
self.first = False
trace = commands.STATE.require_trace()
if description is not None:
trace.snapshot(description, time=time)
if first:
commands.put_available()
commands.put_processes()
commands.put_environment()
commands.put_threads()
if self.threads:
commands.put_threads()
self.threads = False
thread = util.selected_thread()
if thread is not None:
if first or thread not in self.visited:
try:
commands.putreg()
commands.putmem('0x{:x}'.format(util.get_pc()),
"1", display_result=False)
commands.putmem('0x{:x}'.format(util.get_sp()-1),
"2", display_result=False)
commands.put_breakpoints(BreakpointType.BpNormal)
commands.put_breakpoints(BreakpointType.BpHardware)
commands.put_breakpoints(BreakpointType.BpMemory)
except Exception:
pass
#commands.put_frames()
self.visited.add(thread)
# TODO: hoping to support this at some point once the relevant APIs are exposed
# frame = util.selected_frame()
# hashable_frame = (thread, frame)
# if first or hashable_frame not in self.visited:
# self.visited.add(hashable_frame)
try:
if first or self.regions or self.modules:
commands.put_regions()
self.regions = False
self.modules = False
except:
pass
def record_continued(self) -> None:
try:
proc = util.selected_process()
commands.put_state(proc)
commands.put_breakpoints(BreakpointType.BpNormal)
commands.put_breakpoints(BreakpointType.BpHardware)
commands.put_breakpoints(BreakpointType.BpMemory)
except Exception:
pass
def record_exited(self, description: Optional[str] = None,
time: Optional[Schedule] = None) -> None:
# print("RECORD_EXITED")
trace = commands.STATE.require_trace()
if description is not None:
trace.snapshot(description, time=time)
proc = util.selected_process()
ipath = commands.PROCESS_PATTERN.format(procnum=proc)
procobj = trace.proxy_object_path(ipath)
#procobj.set_value('Exit Code', exit_code)
procobj.set_value('State', 'TERMINATED')
@dataclass(frozen=False)
class BrkState:
break_loc_counts: Dict[int, int] = field(default_factory=dict)
def update_brkloc_count(self, b, count: int) -> None:
self.break_loc_counts[b.GetID()] = count
def get_brkloc_count(self, b) -> int:
return self.break_loc_counts.get(b.GetID(), 0)
def del_brkloc_count(self, b) -> int:
if b not in self.break_loc_counts:
return 0 # TODO: Print a warning?
count = self.break_loc_counts[b.GetID()]
del self.break_loc_counts[b.GetID()]
return count
HOOK_STATE = HookState()
BRK_STATE = BrkState()
PROC_STATE: Dict[int, ProcessState] = {}
C = TypeVar('C', bound=Callable)
def log_errors(func: C) -> C:
"""Wrap a function in a try-except that prints and reraises the exception.
This is needed for exceptions that occur during event callbacks.
"""
@functools.wraps(func)
def _func(*args, **kwargs) -> Any:
try:
return func(*args, **kwargs)
except:
traceback.print_exc()
raise
return cast(C, _func)
@log_errors
def on_state_changed(*args) -> None:
# print("ON_STATE_CHANGED")
ev_type = args[0].event_type
# print(ev_type)
proc = util.selected_process()
if proc not in PROC_STATE:
return
PROC_STATE[proc].waiting = False
trace = commands.STATE.require_trace()
with trace.client.batch():
with trace.open_tx("State changed proc {}".format(proc)):
commands.put_state(proc)
try:
if ev_type == EventType.EVENT_RESUME_DEBUG:
on_cont()
elif ev_type == EventType.EVENT_PAUSE_DEBUG:
on_stop()
except Exception:
pass
@log_errors
def on_breakpoint_hit(*args) -> None:
# print("ON_THREADS_CHANGED")
proc = util.selected_process()
if proc not in PROC_STATE:
return
data = args[0].event_data
PROC_STATE[proc].breaks = True
@log_errors
def on_new_process(*args) -> None:
# print("ON_NEW_PROCESS")
trace = commands.STATE.trace
if trace is None:
return
with trace.client.batch():
with trace.open_tx("New Process {}".format(util.selected_process())):
commands.put_processes()
def on_process_selected() -> None:
# print("PROCESS_SELECTED")
proc = util.selected_process()
if proc not in PROC_STATE:
return
trace = commands.STATE.trace
if trace is None:
return
with trace.client.batch():
with trace.open_tx("Process {} selected".format(proc)):
PROC_STATE[proc].record()
commands.activate()
@log_errors
def on_process_deleted(*args) -> None:
# print("ON_PROCESS_DELETED")
exit_code = args[0]
proc = util.selected_process()
on_exited(proc)
if proc in PROC_STATE:
del PROC_STATE[proc]
trace = commands.STATE.trace
if trace is None:
return
with trace.client.batch():
with trace.open_tx("Process {} deleted".format(proc)):
commands.put_processes() # TODO: Could just delete the one....
@log_errors
def on_threads_changed(*args) -> None:
# print("ON_THREADS_CHANGED")
data = args[0].event_data
proc = util.selected_process()
if proc not in PROC_STATE:
return
util.threads[data.dwThreadId] = data
state = PROC_STATE[proc]
state.threads = True
state.waiting = False
trace = commands.STATE.require_trace()
with trace.client.batch():
with trace.open_tx("Threads changed proc {}".format(proc)):
#commands.put_threads()
commands.put_state(proc)
def on_thread_selected(*args) -> None:
# print("THREAD_SELECTED: args={}".format(args))
# sys.stdout.flush()
nthrd = args[0][1]
nproc = util.selected_process()
if nproc not in PROC_STATE:
return
trace = commands.STATE.trace
if trace is None:
return
with trace.client.batch():
with trace.open_tx("Thread {}.{} selected".format(nproc, nthrd)):
commands.put_state(nproc)
state = PROC_STATE[nproc]
if state.waiting:
state.record_continued()
else:
state.record()
commands.activate()
def on_register_changed(regnum) -> None:
# print("REGISTER_CHANGED")
proc = util.selected_process()
if proc not in PROC_STATE:
return
trace = commands.STATE.trace
if trace is None:
return
with trace.client.batch():
with trace.open_tx("Register {} changed".format(regnum)):
commands.putreg()
commands.activate()
def on_memory_changed(space) -> None:
proc = util.selected_process()
if proc not in PROC_STATE:
return
trace = commands.STATE.trace
if trace is None:
return
# Not great, but invalidate the whole space
# UI will only re-fetch what it needs
# But, some observations will not be recovered
try:
with trace.client.batch():
with trace.open_tx("Memory changed"):
commands.putmem_state(0, 2**64, 'unknown')
except Exception:
pass
def on_cont(*args) -> None:
# print("ON CONT")
proc = util.selected_process()
if proc not in PROC_STATE:
return
trace = commands.STATE.trace
if trace is None:
return
state = PROC_STATE[proc]
with trace.client.batch():
with trace.open_tx("Continued"):
state.record_continued()
return
def on_stop(*args) -> None:
# print("ON STOP")
proc = util.selected_process()
if proc not in PROC_STATE:
return
trace = commands.STATE.trace
if trace is None:
return
state = PROC_STATE[proc]
state.visited.clear()
time = None
with trace.client.batch():
with trace.open_tx("Stopped"):
description = "Stopped"
state.record(description, time)
try:
commands.put_event_thread()
except:
pass
commands.activate()
def on_exited(proc) -> None:
# print("ON EXITED")
if proc not in PROC_STATE:
# print("not in state")
return
trace = commands.STATE.trace
if trace is None:
return
state = PROC_STATE[proc]
state.visited.clear()
description = "Exited"
with trace.client.batch():
with trace.open_tx("Exited"):
state.record_exited(description)
commands.activate()
@log_errors
def on_modules_changed(*args) -> None:
# print("ON_MODULES_CHANGED")
#data = args[0].event_data
proc = util.selected_process()
if proc not in PROC_STATE:
return
state = PROC_STATE[proc]
state.modules = True
state.waiting = False
trace = commands.STATE.require_trace()
with trace.client.batch():
with trace.open_tx("Modules changed proc {}".format(proc)):
#commands.put_modules()
commands.put_state(proc)
def install_hooks() -> None:
# print("Installing hooks")
if HOOK_STATE.installed:
return
HOOK_STATE.installed = True
dbg = util.dbg.client
dbg.watch_debug_event(EventType.EVENT_OUTPUT_DEBUG_STRING, lambda x: on_breakpoint_hit(x))
dbg.watch_debug_event(EventType.EVENT_BREAKPOINT, lambda x: on_state_changed(x))
dbg.watch_debug_event(EventType.EVENT_SYSTEMBREAKPOINT, lambda x: on_state_changed(x))
dbg.watch_debug_event(EventType.EVENT_EXCEPTION, lambda x: on_state_changed(x))
dbg.watch_debug_event(EventType.EVENT_CREATE_THREAD, lambda x: on_threads_changed(x))
dbg.watch_debug_event(EventType.EVENT_EXIT_THREAD, lambda x: on_threads_changed(x))
dbg.watch_debug_event(EventType.EVENT_LOAD_DLL, lambda x: on_modules_changed(x))
dbg.watch_debug_event(EventType.EVENT_UNLOAD_DLL, lambda x: on_modules_changed(x))
dbg.watch_debug_event(EventType.EVENT_STEPPED, lambda x: on_state_changed(x))
dbg.watch_debug_event(EventType.EVENT_PAUSE_DEBUG, lambda x: on_state_changed(x))
dbg.watch_debug_event(EventType.EVENT_RESUME_DEBUG, lambda x: on_state_changed(x))
#dbg.watch_debug_event(EventType.EVENT_DEBUG, lambda x: on_state_changed(x))
def remove_hooks() -> None:
# print("Removing hooks")
if HOOK_STATE.installed:
HOOK_STATE.installed = False
def enable_current_process() -> None:
# print("Enable current process")
proc = util.selected_process()
PROC_STATE[proc] = ProcessState()
def disable_current_process() -> None:
proc = util.selected_process()
if proc in PROC_STATE:
# Silently ignore already disabled
del PROC_STATE[proc]
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,317 @@
<context>
<schema name="X64DbgRoot" canonical="yes" elementResync="NEVER" attributeResync="NEVER">
<interface name="EventScope" />
<attribute name="Sessions" schema="SessionContainer" required="yes" fixed="yes" />
<attribute name="Settings" schema="ANY" />
<attribute name="State" schema="State" />
<attribute-alias from="_state" to="State" />
<attribute name="Utility" schema="ANY" />
<attribute name="_display" schema="STRING" hidden="yes" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="ANY"/>
</schema>
<schema name="SessionContainer" canonical="yes" elementResync="NEVER" attributeResync="NEVER">
<element schema="Session" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="ANY"/>
</schema>
<schema name="Session" elementResync="NEVER" attributeResync="NEVER">
<interface name="Activatable" />
<interface name="FocusScope" />
<interface name="Aggregate" />
<interface name="ExecutionStateful" />
<element schema="VOID" />
<attribute name="Processes" schema="ProcessContainer" required="yes" fixed="yes" />
<attribute name="Available" schema="AvailableContainer" required="yes" fixed="yes" />
<attribute name="_event_thread" schema="OBJECT" hidden="yes" />
<attribute name="_focus" schema="Selectable" required="yes" hidden="yes" />
<attribute name="_display" schema="STRING" hidden="yes" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="ANY"/>
</schema>
<schema name="State" canonical="yes" elementResync="NEVER" attributeResync="NEVER">
<element schema="VOID" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="ANY"/>
</schema>
<schema name="Selectable" elementResync="NEVER" attributeResync="NEVER">
<element schema="OBJECT" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="VOID" />
</schema>
<schema name="DebugBreakpointContainer" canonical="yes" elementResync="NEVER" attributeResync="NEVER">
<interface name="Aggregate" />
<element schema="VOID" />
<attribute name="Software Breakpoints" schema="BreakpointContainer" required="yes" />
<attribute name="Hardware Breakpoints" schema="BreakpointContainer" required="yes" />
<attribute name="Memory Breakpoints" schema="BreakpointContainer" required="yes" />
<attribute name="Events" schema="EventContainer" required="yes" />
<attribute name="Exceptions" schema="ExceptionContainer" required="yes" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="VOID" />
</schema>
<schema name="BreakpointContainer" canonical="yes" elementResync="NEVER" attributeResync="NEVER">
<element schema="BreakpointSpec" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="ANY" />
</schema>
<schema name="EventContainer" canonical="yes" elementResync="NEVER" attributeResync="NEVER">
<element schema="Event" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="ANY" />
</schema>
<schema name="Event" canonical="yes" elementResync="NEVER" attributeResync="NEVER">
<element schema="ANY" />
<attribute name="Cont" schema="ContinueOption" />
<attribute name="Exec" schema="ExecutionOption" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="ANY" />
</schema>
<schema name="ExceptionContainer" canonical="yes" elementResync="NEVER" attributeResync="NEVER">
<element schema="Exception" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="ANY" />
</schema>
<schema name="Exception" canonical="yes" elementResync="NEVER" attributeResync="NEVER">
<element schema="ANY" />
<attribute name="Cont" schema="ContinueOption" />
<attribute name="Exec" schema="ExecutionOption" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="ANY" />
</schema>
<schema name="ContinueOption" canonical="yes" elementResync="NEVER" attributeResync="NEVER">
<interface name="Togglable" />
<element schema="VOID" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="ANY" />
</schema>
<schema name="ExecutionOption" canonical="yes" elementResync="NEVER" attributeResync="NEVER">
<interface name="Togglable" />
<element schema="VOID" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="ANY" />
</schema>
<schema name="AvailableContainer" canonical="yes" elementResync="ALWAYS" attributeResync="NEVER">
<element schema="Attachable" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="VOID" />
</schema>
<schema name="ProcessContainer" canonical="yes" elementResync="NEVER" attributeResync="NEVER">
<element schema="Process" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="ANY" />
</schema>
<schema name="BreakpointSpec" canonical="yes" elementResync="NEVER" attributeResync="NEVER">
<interface name="BreakpointSpec" />
<interface name="BreakpointLocation" />
<interface name="Togglable" />
<element schema="VOID" />
<attribute name="Name" schema="STRING" hidden="yes" />
<attribute-alias from="_name" to="Name" />
<attribute name="Kinds" schema="STRING" hidden="yes" />
<attribute-alias from="_kinds" to="Kinds" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute name="Range" schema="RANGE" />
<attribute-alias from="_range" to="Range" />
<attribute name="Enabled" schema="BOOL" required="yes" />
<attribute-alias from="_enabled" to="Enabled" />
<attribute name="Commands" schema="STRING" />
<attribute name="Condition" schema="STRING" />
<attribute name="Hit Count" schema="INT" />
<attribute name="Ignore Count" schema="INT" />
<attribute name="Pending" schema="BOOL" />
<attribute name="Silent" schema="BOOL" />
<attribute name="Temporary" schema="BOOL" />
<attribute schema="VOID" />
</schema>
<schema name="Attachable" elementResync="NEVER" attributeResync="NEVER">
<element schema="VOID" />
<attribute name="PID" schema="LONG" />
<attribute-alias from="_pid" to="PID" />
<attribute name="_display" schema="STRING" hidden="yes" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="VOID" />
</schema>
<schema name="Process" elementResync="NEVER" attributeResync="NEVER">
<interface name="Activatable" />
<interface name="Process" />
<interface name="Aggregate" />
<interface name="ExecutionStateful" />
<element schema="VOID" />
<attribute name="Threads" schema="ThreadContainer" required="yes" fixed="yes" />
<attribute name="Debug" schema="DebugBreakpointContainer" required="yes" fixed="yes" />
<attribute name="Exit Code" schema="LONG" />
<attribute-alias from="_exit_code" to="Exit Code" />
<attribute name="Environment" schema="Environment" required="yes" fixed="yes" />
<attribute name="Memory" schema="Memory" required="yes" fixed="yes" />
<attribute name="Modules" schema="ModuleContainer" required="yes" fixed="yes" />
<attribute name="Handle" schema="STRING" fixed="yes" />
<attribute name="Id" schema="STRING" fixed="yes" />
<attribute name="PID" schema="LONG" hidden="yes" />
<attribute-alias from="_pid" to="PID" />
<attribute name="State" schema="EXECUTION_STATE" required="yes" hidden="yes" />
<attribute-alias from="_state" to="State" />
<attribute name="_display" schema="STRING" hidden="yes" />
<attribute name="_short_display" schema="STRING" hidden="yes" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="ANY" />
</schema>
<schema name="Environment" elementResync="NEVER" attributeResync="NEVER">
<interface name="Environment" />
<element schema="VOID" />
<attribute name="OS" schema="STRING" />
<attribute name="Arch" schema="STRING" />
<attribute name="Endian" schema="STRING" />
<attribute name="Debugger" schema="STRING" />
<attribute-alias from="_os" to="OS" />
<attribute-alias from="_arch" to="Arch" />
<attribute-alias from="_endian" to="Endian" />
<attribute-alias from="_debugger" to="Debugger" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="VOID" />
</schema>
<schema name="ModuleContainer" canonical="yes" elementResync="ONCE" attributeResync="NEVER">
<element schema="Module" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="ANY" />
</schema>
<schema name="Memory" canonical="yes" elementResync="NEVER" attributeResync="NEVER">
<interface name="Memory" />
<element schema="MemoryRegion" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="VOID" />
</schema>
<schema name="ThreadContainer" canonical="yes" elementResync="NEVER" attributeResync="NEVER">
<element schema="Thread" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="ANY" />
</schema>
<schema name="Method" elementResync="NEVER" attributeResync="NEVER">
<interface name="Method" />
<element schema="VOID" />
<attribute name="_display" schema="STRING" required="yes" fixed="yes" hidden="yes" />
<attribute name="_return_type" schema="TYPE" required="yes" fixed="yes" hidden="yes" />
<attribute name="_parameters" schema="MAP_PARAMETERS" required="yes" fixed="yes" hidden="yes" />
<attribute schema="VOID" fixed="yes" hidden="yes" />
</schema>
<schema name="Thread" elementResync="NEVER" attributeResync="NEVER">
<interface name="Activatable" />
<interface name="Thread" />
<interface name="ExecutionStateful" />
<interface name="Aggregate" />
<element schema="VOID" />
<attribute name="Stack" schema="StackFramesContainer" required="yes" fixed="yes" />
<attribute name="Registers" schema="RegisterValueContainer" required="yes" fixed="yes" />
<attribute name="Environment" schema="ANY" fixed="yes" />
<attribute name="Id" schema="STRING" fixed="yes" />
<attribute name="TID" schema="LONG" />
<attribute-alias from="_tid" to="TID" />
<attribute name="State" schema="EXECUTION_STATE" required="yes" hidden="yes" />
<attribute-alias from="_state" to="State" />
<attribute name="_display" schema="STRING" hidden="yes" />
<attribute name="_short_display" schema="STRING" hidden="yes" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute name="Advance" schema="Method" required="yes" fixed="yes" hidden="yes" />
<attribute schema="ANY" />
</schema>
<schema name="Module" elementResync="NEVER" attributeResync="NEVER">
<interface name="Module" />
<element schema="VOID" />
<attribute name="Sections" schema="SectionContainer" required="yes" fixed="yes" />
<attribute name="Symbols" schema="SymbolContainer" required="yes" fixed="yes" />
<attribute name="Range" schema="RANGE" />
<attribute name="Name" schema="STRING" />
<attribute-alias from="_module_name" to="Name" />
<attribute-alias from="_range" to="Range" />
<attribute name="_display" schema="STRING" hidden="yes" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute name="ToDisplayString" schema="BOOL" hidden="yes" />
<attribute schema="VOID" />
</schema>
<schema name="MemoryRegion" elementResync="NEVER" attributeResync="NEVER">
<interface name="MemoryRegion" />
<element schema="VOID" />
<attribute name="Base" schema="LONG" required="yes" fixed="yes" />
<attribute name="Object File" schema="STRING" fixed="yes" />
<attribute name="_readable" schema="BOOL" required="yes" hidden="yes" />
<attribute name="_writable" schema="BOOL" required="yes" hidden="yes" />
<attribute name="_executable" schema="BOOL" required="yes" hidden="yes" />
<attribute name="Range" schema="RANGE" required="yes" hidden="yes" />
<attribute-alias from="_range" to="Range" />
<attribute name="_display" schema="STRING" hidden="yes" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="VOID" />
</schema>
<schema name="SectionContainer" canonical="yes" elementResync="NEVER" attributeResync="NEVER">
<element schema="Section" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="VOID" />
</schema>
<schema name="StackFramesContainer" canonical="yes" elementResync="NEVER" attributeResync="NEVER">
<interface name="Aggregate" />
<element schema="VOID" />
<attribute name="Frames" schema="Stack" required="yes" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="VOID" />
</schema>
<schema name="Stack" canonical="yes" elementResync="NEVER" attributeResync="NEVER">
<interface name="Stack" />
<element schema="StackFrame" />
<attribute name="_display" schema="STRING" hidden="yes" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="ANY" />
</schema>
<schema name="SymbolContainer" canonical="yes" elementResync="ONCE" attributeResync="NEVER">
<element schema="Symbol" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="VOID" />
</schema>
<schema name="Symbol" elementResync="NEVER" attributeResync="NEVER">
<element schema="VOID" />
<attribute name="_display" schema="STRING" hidden="yes" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="VOID" />
</schema>
<schema name="StackFrame" elementResync="NEVER" attributeResync="NEVER">
<interface name="Activatable" />
<interface name="StackFrame" />
<interface name="Aggregate" />
<element schema="VOID" />
<attribute name="Function" schema="STRING" hidden="yes" />
<attribute-alias from="_function" to="Function" />
<attribute name="Instruction Offset" schema="ADDRESS" required="yes" />
<attribute-alias from="_pc" to="Instruction Offset" />
<attribute name="Stack Offset" schema="ADDRESS" />
<attribute name="Return Offset" schema="ADDRESS" />
<attribute name="Frame Offset" schema="ADDRESS" />
<attribute name="_display" schema="STRING" hidden="yes" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="ANY" />
</schema>
<schema name="Section" elementResync="NEVER" attributeResync="NEVER">
<interface name="Section" />
<element schema="VOID" />
<attribute name="Range" schema="RANGE" />
<attribute-alias from="_range" to="Range" />
<attribute name="Offset" schema="STRING" fixed="yes" />
<attribute name="_display" schema="STRING" hidden="yes" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="VOID" />
</schema>
<schema name="RegisterValueContainer" attributeResync="ONCE">
<interface name="RegisterContainer" />
<attribute name="General Purpose Registers" schema="RegisterBank" />
<attribute name="Floating Point Registers" schema="RegisterBank" />
<attribute name="Advanced Vector Extensions" schema="RegisterBank" />
<attribute name="Memory Protection Extensions" schema="RegisterBank" />
<attribute name="FloatingPoint" schema="RegisterBank" />
<attribute name="SIMD" schema="RegisterBank" />
<attribute name="User" schema="RegisterBank" />
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="VOID" />
</schema>
<schema name="RegisterBank" canonical="yes" elementResync="ONCE" attributeResync="NEVER">
<attribute name="_order" schema="INT" hidden="yes" />
<attribute schema="VOID" />
</schema>
</context>
@@ -0,0 +1,292 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
from ghidratrace.client import Schedule
from collections import namedtuple
from ctypes import POINTER, byref, c_ulong, c_ulonglong, create_string_buffer
import functools
import io
import os
import queue
import psutil
import re
import sys
import threading
import traceback
from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple, TypeVar, Union, cast
from x64dbg_automate import X64DbgClient
from x64dbg_automate.events import CreateThreadEventData
from x64dbg_automate.models import Context32, Context64, Instruction, MemPage, RegDump
DbgVersion = namedtuple('DbgVersion', ['full', 'name', 'dotted', 'arch'])
conv_map: Dict[str, str] = {}
threads: Dict[int, CreateThreadEventData] = {}
class DebuggeeRunningException(BaseException):
pass
class GhidraDbg(object):
def __init__(self) -> None:
self._new_base()
client = self._client
client.start_session()
def _new_base(self) -> None:
executable = os.getenv('OPT_X64DBG_EXE')
if executable is None:
return
self._client = X64DbgClient(executable)
@property
def client(self) -> X64DbgClient:
return self._client
def cmd(self, cmdline: str, quiet: bool = True) -> str:
# Here, we let it print without capture if quiet is False
if quiet:
buffer = io.StringIO()
#self.client.callbacks.stdout = buffer
self.client.cmd_sync(cmdline)
return "completed"
else:
self.client.cmd_sync(cmdline)
return ""
def wait(self) -> None:
self._client.wait_until_stopped()
def interrupt(self) -> None:
self._client.pause()
def eval(self, input: str) -> Optional[list[int]]:
try:
return self._client.eval_sync(input)
except:
return None
def get_actual_processor_type(self) -> int:
return self.client.debugee_bitness()
@property
def pid(self) -> Optional[int]:
try:
return self.client.get_debugger_pid()
except:
# There is no process
return None
dbg = GhidraDbg()
def compute_dbg_ver() -> DbgVersion:
ver = dbg.client.get_debugger_version()
executable = os.getenv('OPT_X64DBG_EXE')
bitness = dbg.client.debugee_bitness()
return DbgVersion('Unknown', 'Unknown', ver, 'x{}'.format(bitness))
DBG_VERSION = compute_dbg_ver()
def get_target():
return 0 #dbg.get_current_system_id()
def disassemble1(addr: int) -> Instruction | None:
return dbg.client.disassemble_at(addr)
def get_inst(addr: int) -> Instruction | None:
return disassemble1(addr)
def get_inst_sz(addr: int) -> int:
inst = disassemble1(addr)
if inst is None:
return 0
return int(inst.instr_size)
def selected_process() -> int:
try:
pid = dbg.client.debugee_pid()
return pid
except:
# NB: we're intentionally returning 0 instead of None
return 0
def selected_process_space() -> int:
try:
return selected_process()
except:
# NB: we're intentionally returning 0 instead of None
return 0
def selected_thread() -> Optional[int]:
try:
ev = dbg.eval('tid()')
if ev is None:
return None
return ev[0]
except:
return None
def selected_frame() -> Optional[int]:
try:
line = dbg.cmd('.frame').strip()
if not line:
return None
num_str = line.split(sep=None, maxsplit=1)[0]
return int(num_str, 16)
except OSError:
return None
except ValueError:
return None
def select_thread(id: int) -> bool:
return dbg.client.switch_thread(id)
def select_frame(id: int) -> str:
return dbg.cmd('.frame /c {}'.format(id))
def reset_frames() -> str:
return dbg.cmd('.cxr')
def parse_and_eval(expr: Union[str, int],
type: Optional[int] = None) -> Union[int, float, bytes]:
if isinstance(expr, int):
return expr
return int(expr, 16)
def get_pc() -> int:
ctxt = dbg.client.get_regs().context
if hasattr(ctxt, 'rip'):
return ctxt.rip
else:
return ctxt.eip
def get_sp() -> int:
ctxt = dbg.client.get_regs().context
if hasattr(ctxt, 'rsp'):
return ctxt.rsp
else:
return ctxt.esp
def process_list0(running: bool = False) -> Union[
Iterable[Tuple[int, str, int]], Iterable[Tuple[int]]]:
"""Get the list of all processes."""
nproc = selected_process()
proc = psutil.Process(nproc)
sysids = []
names = []
try:
sysids.append(nproc)
names.append(proc.name())
return zip(sysids, names)
except Exception:
return zip(sysids)
def process_list(running: bool = False) -> Union[
Iterable[Tuple[int, str, int]], Iterable[Tuple[int]]]:
"""Get the list of all processes."""
sysids = []
names = []
try:
for pid in psutil.pids():
sysids.append(pid)
proc = psutil.Process(pid)
names.append(proc.name())
return zip(sysids, names)
except Exception:
return zip(sysids)
def thread_list(running: bool = False) -> Union[
Iterable[Tuple[int, int, str]], Iterable[Tuple[int]]]:
"""Get the list of all threads."""
nproc = selected_process()
proc = psutil.Process(nproc)
sysids = []
try:
for t in proc.threads():
sysids.append(t.id)
return zip(sysids)
except Exception:
return zip(sysids)
def full_mem() -> List[MemPage]:
return []
def split_path(pathString: str) -> List[str]:
list = []
segs = pathString.split(".")
for s in segs:
if s.endswith("]"):
if "[" not in s:
print(f"Missing terminator: {s}")
index = s.index("[")
list.append(s[:index])
list.append(s[index:])
else:
list.append(s)
return list
def get_kind(obj) -> Optional[int]:
"""Get the kind."""
if obj is None:
return None
kind = obj.GetKind()
if kind is None:
return None
return obj.GetKind().value
def terminate_session() -> None:
dbg.client.terminate_session()
def get_convenience_variable(id: str) -> Any:
if id not in conv_map:
return "auto"
val = conv_map[id]
if val is None:
return "auto"
return val
def set_convenience_variable(id: str, value: Any) -> None:
conv_map[id] = value
@@ -0,0 +1,399 @@
/* ###
* IP: GHIDRA
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package agent.x64dbg.rmi;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import java.util.*;
import org.junit.Test;
import ghidra.app.plugin.core.debug.utils.ManagedDomainObject;
import ghidra.debug.api.tracermi.RemoteMethod;
import ghidra.program.model.address.AddressSpace;
import ghidra.trace.database.ToyDBTraceBuilder;
import ghidra.trace.model.Lifespan;
import ghidra.trace.model.Trace;
import ghidra.trace.model.memory.TraceMemorySpace;
import ghidra.trace.model.target.TraceObject;
import ghidra.trace.model.target.path.*;
import ghidra.trace.model.time.TraceSnapshot;
public class X64dbgHooksTest extends AbstractX64dbgTraceRmiTest {
private static final long RUN_TIMEOUT_MS = 5000;
private static final long RETRY_MS = 500;
record PythonAndTrace(PythonAndConnection conn, ManagedDomainObject mdo)
implements AutoCloseable {
public void execute(String cmd) {
conn.execute(cmd);
}
public String executeCapture(String cmd) {
return conn.executeCapture(cmd);
}
@Override
public void close() throws Exception {
try {
conn.execute("util.terminate_session()");
conn.close();
} catch (Exception e) {
//IGNORE
}
try {
mdo.close();
} catch (Exception e) {
//IGNORE
}
}
}
@SuppressWarnings("resource")
protected PythonAndTrace startAndSyncPython(String exec) throws Exception {
PythonAndConnection conn = startAndConnectPython();
try {
ManagedDomainObject mdo;
conn.execute("from ghidraxdbg.commands import *");
conn.execute(
"util.set_convenience_variable('ghidra-language', 'x86:LE:64:default')");
if (exec != null) {
start(conn, exec);
mdo = waitDomainObject("/New Traces/x64dbg/" + exec.substring(exec.lastIndexOf("\\")+1));
}
else {
conn.execute("ghidra_trace_start()");
mdo = waitDomainObject("/New Traces/x64dbg/noname");
}
clearBreakpoints(conn);
tb = new ToyDBTraceBuilder((Trace) mdo.get());
return new PythonAndTrace(conn, mdo);
}
catch (Exception e) {
clearBreakpoints(conn);
conn.execute("util.terminate_session()");
conn.close();
throw e;
}
}
protected long lastSnap(PythonAndTrace conn) {
return conn.conn.connection().getLastSnapshot(tb.trace);
}
static final int INIT_NOTEPAD_THREAD_COUNT = 4; // This could be fragile
//@Test - doesn't generate more than the initial 4
public void testOnNewThread() throws Exception {
try (PythonAndTrace conn = startAndSyncPython(NOTEPAD)) {
conn.execute("from ghidraxdbg.commands import *");
txPut(conn, "processes");
waitForPass(() -> {
TraceObject proc = tb.objAny0("Sessions[].Processes[]");
assertNotNull(proc);
assertEquals("STOPPED", tb.objValue(proc, lastSnap(conn), "_state"));
}, RUN_TIMEOUT_MS, RETRY_MS);
txPut(conn, "threads");
waitForPass(() -> assertEquals(INIT_NOTEPAD_THREAD_COUNT,
tb.objValues(lastSnap(conn), "Sessions[].Processes[].Threads[]").size()),
RUN_TIMEOUT_MS, RETRY_MS);
// Via method, go is asynchronous
RemoteMethod go = conn.conn.getMethod("go");
TraceObject proc = tb.objAny0("Sessions[].Processes[]");
go.invoke(Map.of("process", proc)); // Initial breakpoint
go.invoke(Map.of("process", proc));
waitForPass(() -> assertThat(
tb.objValues(lastSnap(conn), "Sessions[].Processes[].Threads[]").size(),
greaterThan(INIT_NOTEPAD_THREAD_COUNT)),
RUN_TIMEOUT_MS, RETRY_MS);
}
}
@Test
public void testOnNewModule() throws Exception {
try (PythonAndTrace conn = startAndSyncPython(NOTEPAD)) {
conn.execute("from ghidraxdbg.commands import *");
txPut(conn, "processes");
TraceObject proc = tb.objAny0("Sessions[].Processes[]");
waitForPass(() -> {
assertNotNull(proc);
assertEquals("STOPPED", tb.objValue(proc, lastSnap(conn), "_state"));
}, RUN_TIMEOUT_MS, RETRY_MS);
txPut(conn, "modules");
waitForPass(() -> assertThat(
tb.objValues(lastSnap(conn), "Sessions[].Processes[].Modules[]").size(),
greaterThan(0)),
RUN_TIMEOUT_MS, RETRY_MS);
int size = tb.objValues(lastSnap(conn), "Sessions[].Processes[].Modules[]").size();
// Via method, go is asynchronous
RemoteMethod go = conn.conn.getMethod("go");
go.invoke(Map.of("process", proc)); // Initial breakpoint
go.invoke(Map.of("process", proc));
waitForPass(() -> assertThat(
tb.objValues(lastSnap(conn), "Sessions[].Processes[].Modules[]").size(),
greaterThan(size)),
RUN_TIMEOUT_MS, RETRY_MS);
}
}
@Test
public void testOnThreadSelected() throws Exception {
try (PythonAndTrace conn = startAndSyncPython(NOTEPAD)) {
txPut(conn, "processes");
conn.execute("util.dbg.client.stepi()"); // no initial event
waitForPass(() -> {
TraceObject proc = tb.objAny0("Sessions[0].Processes[]");
assertNotNull(proc);
assertEquals("STOPPED", tb.objValue(proc, lastSnap(conn), "_state"));
}, RUN_TIMEOUT_MS, RETRY_MS);
txPut(conn, "threads");
waitForPass(() -> {
List<Object> values = tb.objValues(lastSnap(conn), "Sessions[0].Processes[].Threads[]");
assertEquals(INIT_NOTEPAD_THREAD_COUNT, values.size());
}, RUN_TIMEOUT_MS, RETRY_MS);
// Now the real test
List<Object> values = tb.objValues(lastSnap(conn), "Sessions[0].Processes[].Threads[]");
TraceObject thread = (TraceObject) values.get(0);
Object tid0 = tb.objValue(thread, lastSnap(conn), "TID");
conn.execute("util.select_thread("+tid0.toString()+")");
waitForPass(() -> {
String tnum = conn.executeCapture("print(util.selected_thread())").strip();
assertEquals(tid0.toString(), tnum);
}, RUN_TIMEOUT_MS, RETRY_MS);
thread = (TraceObject) values.get(1);
Object tid1 = tb.objValue(thread, lastSnap(conn), "TID");
conn.execute("util.select_thread("+tid1.toString()+")");
waitForPass(() -> {
String tnum = conn.executeCapture("print(util.selected_thread())").strip();
assertEquals(tid1.toString(), tnum);
}, RUN_TIMEOUT_MS, RETRY_MS);
thread = (TraceObject) values.get(2);
Object tid2 = tb.objValue(thread, lastSnap(conn), "TID");
conn.execute("util.select_thread("+tid2.toString()+")");
waitForPass(() -> {
String tnum = conn.executeCapture("print(util.selected_thread())").strip();
assertEquals(tid2.toString(), tnum);
}, RUN_TIMEOUT_MS, RETRY_MS);
}
}
protected String getIndex(TraceObject object, String pattern, int n) {
if (object == null) {
return null;
}
PathPattern pat = PathFilter.parse(pattern).getSingletonPattern();
KeyPath path = object.getCanonicalPath();
if (path.size() < pat.asPath().size()) {
return null;
}
List<String> matched = pat.matchKeys(path, false);
if (matched == null) {
return null;
}
if (matched.size() <= n) {
return null;
}
return matched.get(n);
}
protected String threadIndex(TraceObject object) {
return getIndex(object, "Sessions[].Processes[].Threads[]", 2);
}
protected String frameIndex(TraceObject object) {
return getIndex(object, "Sessions[].Processes[].Threads[].Stack.Frames[]", 3);
}
@Test
public void testOnRegisterChanged() throws Exception {
try (PythonAndTrace conn = startAndSyncPython(NOTEPAD)) {
conn.execute("ghidra_trace_txstart('Tx')");
conn.execute("ghidra_trace_putreg()");
conn.execute("ghidra_trace_txcommit()");
conn.execute("util.dbg.cmd('rax=0x1234')");
conn.execute("util.dbg.client.stepi()"); // no real event for register changes
String path = "Sessions[].Processes[].Threads[].Registers";
TraceObject registers = Objects.requireNonNull(tb.objAny(path, Lifespan.at(0)));
AddressSpace space = tb.trace.getBaseAddressFactory()
.getAddressSpace(registers.getCanonicalPath().toString());
TraceMemorySpace regs = tb.trace.getMemoryManager().getMemorySpace(space, false);
waitForPass(() -> assertEquals("1234",
regs.getValue(lastSnap(conn), tb.reg("RAX")).getUnsignedValue().toString(16)));
}
}
@Test
public void testOnCont() throws Exception {
try (PythonAndTrace conn = startAndSyncPython(NOTEPAD)) {
txPut(conn, "processes");
conn.execute("util.dbg.client.go()");
conn.execute("util.dbg.client.go()");
TraceObject proc = waitForValue(() -> tb.objAny0("Sessions[].Processes[]"));
waitForPass(() -> {
assertEquals("RUNNING", tb.objValue(proc, lastSnap(conn), "_state"));
}, RUN_TIMEOUT_MS, RETRY_MS);
}
}
@Test
public void testOnStop() throws Exception {
try (PythonAndTrace conn = startAndSyncPython(NOTEPAD)) {
txPut(conn, "processes");
TraceObject proc = waitForValue(() -> tb.objAny0("Sessions[].Processes[]"));
waitForPass(() -> {
conn.execute("util.terminate_session()");
assertEquals("STOPPED", tb.objValue(proc, lastSnap(conn), "_state"));
}, RUN_TIMEOUT_MS, RETRY_MS);
}
}
//@Test - TODO: currently missing relevant events
public void testOnExited() throws Exception {
try (PythonAndTrace conn = startAndSyncPython("netstat.exe")) {
txPut(conn, "processes");
// Do the synchronous wait here, since netstat should terminate
conn.execute("util.dbg.client.go()");
waitForPass(() -> {
TraceSnapshot snapshot =
tb.trace.getTimeManager().getSnapshot(lastSnap(conn), false);
assertNotNull(snapshot);
assertEquals("Exited with code 0", snapshot.getDescription());
TraceObject proc = tb.objAny0("Sessions[].Processes[]");
assertNotNull(proc);
Object val = tb.objValue(proc, lastSnap(conn), "_exit_code");
assertThat(val, instanceOf(Number.class));
assertEquals(0, ((Number) val).longValue());
conn.execute("util.terminate_session()");
}, RUN_TIMEOUT_MS, RETRY_MS);
}
}
@Test
public void testOnBreakpointCreated() throws Exception {
try (PythonAndTrace conn = startAndSyncPython(NOTEPAD)) {
txPut(conn, "breakpoints");
assertEquals(0,
tb.objValues(lastSnap(conn), "Sessions[].Processes[].Debug.Software Breakpoints[]").size());
conn.execute("pc = util.get_pc()");
conn.execute("util.dbg.client.set_breakpoint(address_or_symbol=pc)");
conn.execute("util.dbg.client.stepi()"); // no real event for bpt changes
waitForPass(() -> {
List<Object> brks =
tb.objValues(lastSnap(conn), "Sessions[].Processes[].Debug.Software Breakpoints[]");
assertEquals(1, brks.size());
});
}
}
//@Test - works but has timing issues
public void testOnBreakpointModified() throws Exception {
try (PythonAndTrace conn = startAndSyncPython(NOTEPAD)) {
txPut(conn, "breakpoints");
assertEquals(0,
tb.objValues(lastSnap(conn), "Sessions[].Processes[].Debug.Software Breakpoints[]").size());
conn.execute("pc = util.get_pc()");
conn.execute("util.dbg.client.set_breakpoint(address_or_symbol=pc)");
conn.execute("util.dbg.client.stepi()"); // no real event for bpt changes
TraceObject brk = waitForPass(() -> {
List<Object> brks =
tb.objValues(lastSnap(conn), "Sessions[].Processes[].Debug.Software Breakpoints[]");
assertEquals(1, brks.size());
return (TraceObject) brks.get(0);
});
assertEquals(true, tb.objValue(brk, lastSnap(conn), "Enabled"));
conn.execute("util.dbg.client.toggle_breakpoint(address_name_symbol_or_none=pc, on=False)");
conn.execute("util.dbg.client.stepi()");
conn.execute("util.dbg.client.wait_until_stopped()");
conn.execute("util.dbg.client.stepi()");
assertEquals(false, tb.objValue(brk, lastSnap(conn), "Enabled"));
}
}
@Test
public void testOnBreakpointDeleted() throws Exception {
try (PythonAndTrace conn = startAndSyncPython(NOTEPAD)) {
txPut(conn, "breakpoints");
assertEquals(0,
tb.objValues(lastSnap(conn), "Sessions[].Processes[].Debug.Software Breakpoints[]").size());
conn.execute("pc = util.get_pc()");
conn.execute("util.dbg.client.set_breakpoint(address_or_symbol=pc)");
conn.execute("util.dbg.client.stepi()"); // no real event for bpt changes
TraceObject brk = waitForPass(() -> {
List<Object> brks =
tb.objValues(lastSnap(conn), "Sessions[].Processes[].Debug.Software Breakpoints[]");
assertEquals(1, brks.size());
return (TraceObject) brks.get(0);
});
conn.execute("util.dbg.client.clear_breakpoint(address_name_symbol_or_none=pc)");
conn.execute("util.dbg.client.stepi()");
waitForPass(() -> assertEquals(0,
tb.objValues(lastSnap(conn), "Sessions[].Processes[].Debug.Software Breakpoints[]").size()));
}
}
private void start(PythonAndConnection conn, String obj) {
conn.execute("from ghidraxdbg.commands import *");
if (obj != null)
conn.execute("ghidra_trace_create('" + obj + "', wait=True)");
else
conn.execute("ghidra_trace_create()");
conn.execute("ghidra_trace_sync_enable()");
}
private void txPut(PythonAndTrace conn, String obj) {
conn.execute("ghidra_trace_txstart('Tx" + obj + "')");
conn.execute("ghidra_trace_put_" + obj + "()");
conn.execute("ghidra_trace_txcommit()");
}
private void clearBreakpoints(PythonAndConnection conn) {
conn.execute("util.dbg.client.clear_breakpoint(None)");
conn.execute("util.dbg.client.clear_hardware_breakpoint(None)");
conn.execute("util.dbg.client.clear_memory_breakpoint(None)");
}
}