chore(gdb): add CorruptedValue sentinel and is_ok protocol for memory corruption resilience (#9911)

This commit is contained in:
Benign X
2026-03-27 16:24:23 +08:00
committed by GitHub
parent 388c97ed9d
commit 9c12c953fb
23 changed files with 401 additions and 178 deletions
+3 -1
View File
@@ -1,4 +1,4 @@
from .value import Value, ValueInput
from .value import Value, ValueInput, CorruptedValue, CorruptedError
from .lvgl import (
curr_inst,
LVDisplay,
@@ -68,6 +68,8 @@ __all__ = [
"format_style_value",
"Value",
"ValueInput",
"CorruptedValue",
"CorruptedError",
"LVCacheEntry",
"LVCacheLRURB",
"LVCacheLRURBIterator",
+12 -5
View File
@@ -3,6 +3,7 @@ import gdb
from lvglgdb.lvgl import curr_inst
from lvglgdb.lvgl import LVObject, dump_obj_info
from lvglgdb.value import CorruptedError
class DumpObj(gdb.Command):
@@ -25,8 +26,11 @@ class DumpObj(gdb.Command):
return
# dump children
for child in obj.children:
self.dump_obj(child, depth + 1, limit=limit)
try:
for child in obj.children:
self.dump_obj(child, depth + 1, limit=limit)
except CorruptedError:
print(" " * (depth + 1) + "(corrupted children)")
def invoke(self, args, from_tty):
parser = argparse.ArgumentParser(description="Dump lvgl obj tree.")
@@ -58,6 +62,9 @@ class DumpObj(gdb.Command):
depth = 0
for disp in curr_inst().displays():
print(f"Display {hex(disp)}")
for screen in disp.screens:
print(f'{" " * (depth + 1)}Screen@{hex(screen)}')
self.dump_obj(screen, depth=depth + 1, limit=args.level)
try:
for screen in disp.screens:
print(f"{' ' * (depth + 1)}Screen@{hex(screen)}")
self.dump_obj(screen, depth=depth + 1, limit=args.level)
except CorruptedError:
print(f"{' ' * (depth + 1)}(corrupted screens)")
@@ -4,6 +4,7 @@ from datetime import datetime
import gdb
from lvglgdb.lvgl.data_utils import safe_collect
from lvglgdb.value import CorruptedError
# Registry of simple subsystems: (dict_key, lvgl_accessor_method, label)
@@ -181,7 +182,6 @@ def _collect_subjects(lvgl) -> list:
"""Collect subjects from object event lists across all displays."""
seen = set()
result = []
from lvglgdb.lvgl.core.lv_observer import LVSubject
for disp in lvgl.displays():
for screen in disp.screens:
_collect_subjects_from_obj(screen, seen, result)
@@ -199,14 +199,22 @@ def _collect_subjects_from_obj(obj, seen, result):
user_data = dsc.user_data
if not int(user_data):
continue
# Check if this looks like a subject (has subs_ll field)
subject = LVSubject(user_data)
addr = int(subject)
if addr not in seen:
seen.add(addr)
result.append(subject.snapshot().as_dict())
except Exception:
# Per-dsc failure: skip this descriptor, continue others
continue
for child in obj.children:
_collect_subjects_from_obj(child, seen, result)
try:
for child in obj.children:
try:
_collect_subjects_from_obj(child, seen, result)
except Exception:
# Per-child failure: skip this child, continue siblings
continue
except CorruptedError:
# Children pointer unreadable: stop traversal for this subtree
pass
+10 -8
View File
@@ -22,12 +22,14 @@ class InfoDrawUnit(gdb.Command):
print(f"Draw Unit: {unit}, Name: {name}")
type_name = DRAW_UNIT_TYPE_NAMES.get(name, "lv_draw_unit_t")
try:
target_type = gdb.lookup_type(type_name)
except gdb.error:
target_type = gdb.lookup_type("lv_draw_unit_t")
casted = unit.cast(target_type, ptr=True)
if casted is None:
casted = unit.cast(type_name, ptr=True)
if casted is None or not casted.is_ok:
casted = unit.cast("lv_draw_unit_t", ptr=True)
print(casted.dereference().format_string(pretty_structs=True, symbols=True))
if casted is None or not casted.is_ok:
print(f" (corrupted: {casted})")
return
deref = casted.dereference()
if not deref.is_ok:
print(f" (corrupted: {deref})")
return
print(deref.format_string(pretty_structs=True, symbols=True))
+1 -1
View File
@@ -53,7 +53,7 @@ class LVGL:
from ..draw.lv_draw_unit import LVDrawUnit
head = self.lv_global.draw_info.unit_head
if int(head):
if head:
yield from LVDrawUnit(head)
def image_cache(self):
+1 -1
View File
@@ -35,7 +35,7 @@ class LVGroup(Value):
@property
def obj_focus(self) -> Value:
focus_pp = self.super_value("obj_focus")
if not int(focus_pp):
if not focus_pp:
return None
return focus_pp.dereference()
+45 -42
View File
@@ -1,5 +1,3 @@
import gdb
from lvglgdb.value import Value, ValueInput
from lvglgdb.lvgl.misc.lv_style import LVStyle, decode_selector
@@ -59,12 +57,21 @@ class LVObject(Value):
def __init__(self, obj: ValueInput):
super().__init__(Value.normalize(obj, "lv_obj_t"))
@property
def obj_class(self):
from .lv_obj_class import LVObjClass
class_p = self.super_value("class_p")
if not class_p:
return None
return LVObjClass(class_p)
@property
def class_name(self):
name = self.class_p.name
if name:
return name.string()
return self.class_p.format_string(symbols=True, address=True, styling=True)
cls = self.obj_class
if not cls:
return "(no class)"
return cls.name
@property
def x1(self):
@@ -83,8 +90,14 @@ class LVObject(Value):
return int(self.coords.y2)
@property
def child_count(self):
return self.spec_attr.child_cnt if self.spec_attr else 0
def child_cnt(self) -> int:
"""Return child count, 0 if corrupted."""
if not self.spec_attr:
return 0
cnt = self.spec_attr.super_value("child_cnt")
if not cnt.is_ok:
return 0
return int(cnt)
@property
def event_list(self):
@@ -100,11 +113,8 @@ class LVObject(Value):
def children(self):
if not self.spec_attr:
return
for i in range(self.child_count):
try:
yield LVObject(self.spec_attr.children[i])
except Exception:
continue
for i in range(self.child_cnt):
yield LVObject(self.spec_attr.children[i].read_value())
@property
def obj_styles(self):
@@ -114,16 +124,13 @@ class LVObject(Value):
return
styles_arr = self.super_value("styles")
for i in range(count):
try:
raw = styles_arr[i]
flags = []
raw_val = Value(raw)
for flag_name in ("is_local", "is_trans", "is_theme", "is_disabled"):
if raw_val.safe_field(flag_name, False, bool):
flags.append(flag_name.replace("is_", ""))
yield ObjStyle(i, int(raw.selector), flags, LVStyle(raw.style))
except (gdb.MemoryError, gdb.error):
continue
raw = styles_arr[i]
flags = []
raw_val = Value(raw)
for flag_name in ("is_local", "is_trans", "is_theme", "is_disabled"):
if raw_val.safe_field(flag_name, False, bool):
flags.append(flag_name.replace("is_", ""))
yield ObjStyle(i, int(raw.selector), flags, LVStyle(raw.style))
@property
def styles(self):
@@ -138,20 +145,17 @@ class LVObject(Value):
from lvglgdb.lvgl.snapshot import Snapshot
from lvglgdb.lvgl.data_utils import ptr_or_none
d = {
"addr": hex(int(self)),
"class_name": self.class_name,
"coords": {
"x1": self.x1,
"y1": self.y1,
"x2": self.x2,
"y2": self.y2,
},
"child_count": int(self.child_count),
"style_count": int(self.style_cnt),
"parent_addr": ptr_or_none(self.super_value("parent")),
"group_addr": self._get_group_addr(),
}
d = Snapshot.safe_fields(self, [
("addr", lambda s: hex(int(s))),
("class_name", lambda s: s.class_name, "(corrupted)"),
("coords", lambda s: {
"x1": s.x1, "y1": s.y1, "x2": s.x2, "y2": s.y2,
}, {"x1": 0, "y1": 0, "x2": 0, "y2": 0}),
("child_count", lambda s: s.child_cnt, 0),
("style_count", lambda s: int(s.style_cnt), 0),
("parent_addr", lambda s: ptr_or_none(s.super_value("parent"))),
("group_addr", lambda s: s._get_group_addr()),
])
if include_children:
d["children"] = self._collect_children(include_styles)
if include_styles:
@@ -187,12 +191,11 @@ class LVObject(Value):
spec = self.spec_attr
if not spec or not int(spec):
return None
try:
grp = spec.group
addr = int(grp)
return hex(addr) if addr else None
except Exception:
grp = spec.safe_field("group")
if grp is None:
return None
addr = int(grp)
return hex(addr) if addr else None
def dump_obj_info(obj: LVObject):
@@ -43,8 +43,7 @@ class LVObjClass(Value):
@property
def name(self) -> str:
n = self.super_value("name")
return n.string() if int(n) else "(unnamed)"
return self.super_value("name").string(fallback="(unnamed)")
@property
def base_class(self):
+9 -12
View File
@@ -1,22 +1,24 @@
import functools
from typing import Callable, Iterable, Optional
from typing import Callable, Optional
import gdb
from lvglgdb.value import Value
def fmt_cb(cb: Value) -> str:
def fmt_cb(cb) -> str:
"""Format callback pointer as resolved symbol string or '-' for NULL.
Strips null bytes that may appear in some GDB output."""
if not cb.is_ok:
return str(cb)
addr = int(cb)
if not addr:
return "-"
return cb.format_string(symbols=True, address=True).replace("\x00", "")
def ptr_or_none(val: Value) -> Optional[str]:
def ptr_or_none(val) -> Optional[str]:
"""Convert pointer to hex string or None if NULL."""
if not val.is_ok:
return None
addr = int(val)
return hex(addr) if addr else None
@@ -24,20 +26,18 @@ def ptr_or_none(val: Value) -> Optional[str]:
def safe_collect(
items_or_label,
transform: Callable = None,
on_mem_error: Callable = None,
):
"""Unified safe collection with two usage modes.
Iteration mode — collect items, skipping failures:
safe_collect(items, transform, on_mem_error=None)
safe_collect(items, transform)
Decorator mode — wrap an entire collector function:
@safe_collect("subsystem name")
def collect_xxx(): ...
In decorator mode, exceptions cause a gdb warning and return [].
In iteration mode, gdb.MemoryError/gdb.error calls on_mem_error
(or skips), other exceptions skip silently.
In iteration mode, per-item exceptions are skipped silently.
"""
if isinstance(items_or_label, str):
label = items_or_label
@@ -60,9 +60,6 @@ def safe_collect(
for item in items_or_label:
try:
result.append(transform(item))
except (gdb.MemoryError, gdb.error) as e:
if on_mem_error is not None:
result.append(on_mem_error(item, e))
except Exception:
continue
except Exception:
+13 -9
View File
@@ -32,23 +32,27 @@ class LVDisplay(Value):
"""Get vertical resolution in pixels"""
return int(self.super_value("ver_res"))
@property
def screen_cnt(self) -> int:
"""Return screen count, 0 if corrupted."""
cnt = self.super_value("screen_cnt")
if not cnt.is_ok:
return 0
return int(cnt)
@property
def screens(self):
screens = self.super_value("screens")
for i in range(self.screen_cnt):
yield LVObject(screens[i])
yield LVObject(self.super_value("screens")[i].read_value())
@property
def layer_addrs(self) -> dict:
"""Map screen address -> layer name for known layer pointers."""
result = {}
for name in self._LAYER_NAMES:
try:
ptr = self.super_value(name)
if int(ptr):
result[int(ptr)] = name
except Exception:
pass
ptr = self.super_value(name)
if ptr.is_ok and int(ptr):
result[int(ptr)] = name
return result
# Buffer-related properties
@@ -77,6 +81,6 @@ class LVDisplay(Value):
"addr": hex(int(self)),
"hor_res": self.hor_res,
"ver_res": self.ver_res,
"screen_count": int(self.screen_cnt),
"screen_count": self.screen_cnt,
}
return Snapshot(d, source=self, display_spec=self._DISPLAY_SPEC)
+1 -1
View File
@@ -167,7 +167,7 @@ class LVDrawBuf(Value):
buf = io.BytesIO()
img.save(buf, format="PNG")
return buf.getvalue()
except (gdb.MemoryError, Exception):
except Exception:
return None
def _convert_to_image(
@@ -18,8 +18,7 @@ class LVDrawUnit(Value):
@property
def name(self) -> str:
n = self.super_value("name")
return n.string() if int(n) else "(unnamed)"
return self.super_value("name").string(fallback="(unnamed)")
@property
def idx(self) -> int:
+6 -2
View File
@@ -31,6 +31,10 @@ class LVCache(Value):
else datatype
)
@property
def name(self) -> str:
return self.super_value("name").string(fallback="(unnamed)")
def snapshot(self):
from lvglgdb.lvgl.snapshot import Snapshot
@@ -43,7 +47,7 @@ class LVCache(Value):
d = {
"addr": hex(int(self)),
"name": self.name.as_string(),
"name": self.name,
"node_size": int(self.node_size),
"max_size": int(self.max_size),
"current_size": int(self.size),
@@ -79,7 +83,7 @@ class LVCache(Value):
iterator = iter(self)
if iterator is None:
errors = [f"unsupported cache type: {self.name.as_string()}"]
errors = [f"unsupported cache type: {self.name}"]
else:
errors = iterator.sanity_check(entry_checker)
@@ -1,7 +1,7 @@
from typing import Union
import gdb
from lvglgdb.value import Value, ValueInput
from lvglgdb.value import CorruptedError, Value, ValueInput
class LVCacheEntry(Value):
@@ -58,7 +58,7 @@ class LVCacheEntry(Value):
try:
data = self.get_data()
return f"CacheEntry(ref_cnt={self.get_ref_count()}, valid={not self.is_invalid()}, data={data.dereference()})"
except gdb.error:
except CorruptedError:
pass
return super().__str__()
@@ -28,23 +28,24 @@ class LVCacheLRURBIterator(LVCacheIteratorBase):
"""Yield (ll_addr, data, back_ptr) for each ll node.
Skips nodes with inaccessible memory (e.g. corrupted pointers).
"""
inferior = gdb.selected_inferior()
for ll_node in LVList(lru_cache.ll):
try:
ll_addr = int(ll_node)
rb_node = Value(ll_node).cast(rb_node_pp_t)
data = rb_node.data
data_addr = int(data)
if data_addr == 0:
continue
# Probe data pointer accessibility before dereferencing
inferior.read_memory(data_addr, 1)
back_ptr = int(
Value(data_addr + rb_size - ptr_size).cast(void_pp_t).dereference()
)
yield ll_addr, data, back_ptr
except (gdb.MemoryError, gdb.error):
ll_addr = int(ll_node)
rb_node = Value(ll_node).cast(rb_node_pp_t)
if not rb_node:
continue
data = rb_node.data
if not data:
continue
data = data.read_value()
if not data.is_ok:
continue
data_addr = int(data)
if data_addr == 0:
continue
back_ptr = int(
Value(data_addr + rb_size - ptr_size).cast(void_pp_t).dereference()
)
yield ll_addr, data, back_ptr
def _collect_entries(self):
"""Collect entries from LRU RB cache by traversing the linked list (MRU→LRU order)"""
@@ -169,19 +170,13 @@ class LVCacheLRURB(LVCache):
def is_count_based(self):
"""Check if this is count-based LRU cache"""
try:
name = str(self.name)
return "count" in name.lower() or "lru_rb_count" in str(self.clz).lower()
except Exception:
return False
name = self.name
return "count" in name.lower() or "lru_rb_count" in str(self.clz).lower()
def is_size_based(self):
"""Check if this is size-based LRU cache"""
try:
name = str(self.name)
return "size" in name.lower() or "lru_rb_size" in str(self.clz).lower()
except Exception:
return False
name = self.name
return "size" in name.lower() or "lru_rb_size" in str(self.clz).lower()
def __iter__(self):
"""Create iterator for this LRU RB cache"""
@@ -1,5 +1,4 @@
import gdb
from lvglgdb.value import Value
from lvglgdb.value import CorruptedError, Value
from .lv_cache import LVCache
from .lv_cache_entry import LVCacheEntry
@@ -97,7 +96,7 @@ class LVImageCache(object):
data_size = int(decoded.data_size) if decoded else 0
size_str = f"{w}x{h}"
decoder_name = data_ptr.decoder.name.as_string()
decoder_name = data_ptr.decoder.name.string()
if src_type == 0: # LV_IMAGE_SRC_VARIABLE
src_str = src.format_string(
@@ -106,13 +105,13 @@ class LVImageCache(object):
type_str = "var"
elif src_type == 1: # LV_IMAGE_SRC_FILE
src_str = (
src.cast("char", ptr=True).as_string() if src else "(null)"
src.cast("char", ptr=True).string() if src else "(null)"
)
type_str = "file"
else:
src_str = f"{int(src):#x}" if src else "0x0"
except gdb.error as e:
except CorruptedError as e:
src_str = src_str or str(e)
extras = dict(zip(iterator.extra_fields, iterator.get_extra(entry)))
@@ -142,8 +141,8 @@ class LVImageCache(object):
return [f"entry {int(entry):#x}: null data pointer"]
try:
return LVImageCacheData(data_ptr).sanity_check(int(entry))
except gdb.error as e:
return [f"entry {int(entry):#x}: gdb error: {e}"]
except Exception as e:
return [f"entry {int(entry):#x}: error: {e}"]
def sanity_check(self):
"""Run sanity check on image cache with image-specific entry validation"""
@@ -20,8 +20,7 @@ class LVImageDecoder(Value):
@property
def name(self) -> str:
n = self.super_value("name")
return n.string() if int(n) else "(unnamed)"
return self.super_value("name").string(fallback="(unnamed)")
@property
def info_cb(self) -> Value:
@@ -1,5 +1,4 @@
import gdb
from lvglgdb.value import Value
from lvglgdb.value import CorruptedError, Value
from .lv_cache import LVCache
from .lv_cache_entry import LVCacheEntry
@@ -81,7 +80,7 @@ class LVImageHeaderCache(object):
cf = int(header.cf)
size_str = f"{w}x{h}"
decoder_name = data_ptr.decoder.name.as_string()
decoder_name = data_ptr.decoder.name.string()
if src_type == 0: # LV_IMAGE_SRC_VARIABLE
src_str = src.format_string(
@@ -90,13 +89,13 @@ class LVImageHeaderCache(object):
type_str = "var"
elif src_type == 1: # LV_IMAGE_SRC_FILE
src_str = (
src.cast("char", ptr=True).as_string() if src else "(null)"
src.cast("char", ptr=True).string() if src else "(null)"
)
type_str = "file"
else:
src_str = f"{int(src):#x}" if src else "0x0"
except gdb.error as e:
except CorruptedError as e:
src_str = src_str or str(e)
extras = dict(zip(iterator.extra_fields, iterator.get_extra(entry)))
@@ -124,8 +123,8 @@ class LVImageHeaderCache(object):
return [f"entry {int(entry):#x}: null data pointer"]
try:
return LVImageHeaderCacheData(data_ptr).sanity_check(int(entry))
except gdb.error as e:
return [f"entry {int(entry):#x}: gdb error: {e}"]
except Exception as e:
return [f"entry {int(entry):#x}: error: {e}"]
def sanity_check(self):
"""Run sanity check on image header cache with header-specific entry validation"""
+3 -11
View File
@@ -47,13 +47,8 @@ class LVList(Value):
if not self.current:
raise StopIteration
nodetype = self.nodetype if self.nodetype else self.lv_ll_node_t
node = self.current.cast(nodetype)
try:
self.current = self._next(self.current)
except (gdb.MemoryError, gdb.error):
self.current = None
node = self.current.cast(self.nodetype or self.lv_ll_node_t)
self.current = self._next(self.current)
return node
@property
@@ -62,10 +57,7 @@ class LVList(Value):
node = self.head
while node:
count += 1
try:
node = self._next(node)
except (gdb.MemoryError, gdb.error):
break
node = self._next(node)
return count
def snapshot(self):
+10 -16
View File
@@ -1,7 +1,7 @@
from typing import Union
import gdb
from lvglgdb.value import Value, ValueInput
from lvglgdb.value import CorruptedError, Value, ValueInput
class LVRedBlackTree(Value):
@@ -105,23 +105,17 @@ class LVRedBlackTreeIterator:
if not self.current:
raise StopIteration
try:
data = self.tree.get_data(self.current)
except (gdb.MemoryError, gdb.error):
data = None
data = self.tree.get_data(self.current)
# Advance to next node (in-order traversal)
try:
if self.current.right:
self.current = self.tree.minimum_from(self.current.right)
else:
parent = self.current.parent
while parent and self.current == parent.right:
self.current = parent
parent = parent.parent
if self.current.right:
self.current = self.tree.minimum_from(self.current.right)
else:
parent = self.current.parent
while parent and self.current == parent.right:
self.current = parent
except (gdb.MemoryError, gdb.error):
self.current = None
parent = parent.parent
self.current = parent
if data is None:
return self.__next__()
@@ -137,7 +131,7 @@ class LVRedBlackTreeIterator:
data = self.tree.get_data(current)
if data:
return f"LVRedBlackTreeIterator(current={data})"
except:
except CorruptedError:
pass
return f"LVRedBlackTreeIterator(current=0x{int(current):x})"
+3 -3
View File
@@ -2,7 +2,7 @@ from dataclasses import dataclass
from typing import Iterator
import gdb
from lvglgdb.value import Value, ValueInput
from lvglgdb.value import CorruptedError, Value, ValueInput
from .lv_style_consts import (
STYLE_PROP_NAMES,
PART_NAMES,
@@ -50,7 +50,7 @@ def format_style_value(prop_id: int, value: Value) -> str:
return f"{ptr:#x}" if ptr else "NULL"
else:
return str(int(value.num))
except gdb.error:
except CorruptedError:
return str(value)
@@ -74,7 +74,7 @@ def _style_value_data(prop_id: int, value: Value) -> dict:
return {"value_str": f"{ptr:#x}" if ptr else "NULL"}
else:
return {"value_str": str(int(value.num))}
except gdb.error:
except CorruptedError:
return {"value_str": str(value)}
+21 -2
View File
@@ -23,7 +23,7 @@ class Snapshot:
@classmethod
def fallback(cls, addr: Callable = lambda x: int(x), **extra_fns):
"""Decorator: on exception, return a corrupted snapshot dict.
"""Decorator: on CorruptedError, return a corrupted snapshot dict.
Each extra kwarg may be a callable (called with the first arg)
or a static value.
@@ -31,9 +31,10 @@ class Snapshot:
def decorator(fn):
@functools.wraps(fn)
def wrapper(item, *args, **kwargs):
from ..value import CorruptedError
try:
return fn(item, *args, **kwargs)
except Exception as e:
except CorruptedError as e:
try:
resolved_addr = hex(addr(item))
except Exception:
@@ -53,6 +54,24 @@ class Snapshot:
return wrapper
return decorator
@staticmethod
def safe_fields(source, field_specs: list) -> dict:
"""Collect fields with per-field CorruptedError handling.
field_specs: list of (key, callable) or (key, callable, default)
"""
from ..value import CorruptedError
d = {}
for spec in field_specs:
key, fn = spec[0], spec[1]
default = spec[2] if len(spec) > 2 else None
try:
d[key] = fn(source)
except CorruptedError:
d[key] = default
return d
# --- dict-like read access ---
def __getitem__(self, key: str) -> Any:
+214 -13
View File
@@ -7,6 +7,11 @@ class Value(gdb.Value):
def __init__(self, value: Union[gdb.Value, "Value"]):
super().__init__(value)
@property
def is_ok(self) -> bool:
"""True for valid Value, False for CorruptedValue."""
return True
@staticmethod
def normalize(val: "ValueInput", target_type: Optional[str] = None) -> "Value":
"""Normalize input to a typed Value pointer.
@@ -22,7 +27,11 @@ class Value(gdb.Value):
Raises:
ValueError: If target_type lookup fails or cast fails
CorruptedError: If val is a CorruptedValue sentinel
"""
if isinstance(val, CorruptedValue):
raise CorruptedError(val._addr, val._error)
if isinstance(val, str):
val = gdb.parse_and_eval(val)
@@ -72,26 +81,134 @@ class Value(gdb.Value):
return val
# -- Memory error helpers --
def _safe_addr(self) -> int:
"""Extract address for diagnostic purposes, returning 0 on failure."""
try:
return int(self)
except (gdb.error, OverflowError):
return 0
def _is_memory_error(self, e: gdb.error) -> bool:
"""Check if a gdb.error is actually a memory access failure."""
msg = str(e).lower()
return any(k in msg for k in ("memory", "access", "cannot read"))
def _wrap_memory_error(self, fn) -> Union["Value", "CorruptedValue"]:
"""Call fn, converting memory errors to CorruptedValue."""
try:
return fn()
except gdb.MemoryError as e:
return CorruptedValue(self._safe_addr(), e)
except gdb.error as e:
if self._is_memory_error(e):
return CorruptedValue(self._safe_addr(), e)
raise
def read_value(self) -> Union["Value", "CorruptedValue"]:
"""Force a memory read probe. Returns CorruptedValue if unreadable."""
try:
addr = int(gdb.Value.__int__(self))
except (gdb.MemoryError, gdb.error):
return CorruptedValue(self._safe_addr(), gdb.MemoryError(
f"Cannot access memory at address {hex(self._safe_addr())}"))
if addr:
try:
gdb.selected_inferior().read_memory(addr, 1)
except gdb.MemoryError as e:
return CorruptedValue(addr, e)
except gdb.error as e:
if self._is_memory_error(e):
return CorruptedValue(addr, e)
raise
return self
# -- Hooked GDB access methods --
def __bool__(self) -> bool:
"""Wrap gdb.Value truthiness check to handle memory errors."""
try:
return bool(gdb.Value.__bool__(self))
except gdb.MemoryError:
return False
except gdb.error as e:
if self._is_memory_error(e):
return False
msg = str(e).lower()
if "cannot convert" in msg:
return False
raise
def __int__(self) -> int:
"""Wrap gdb.Value int conversion to handle memory errors."""
try:
return int(gdb.Value.__int__(self))
except gdb.MemoryError:
return 0
except gdb.error as e:
if self._is_memory_error(e):
return 0
# "Cannot convert value to long" happens when a corrupted
# pointer dereferences into a struct that GDB cannot coerce
# to an integer. Treat as zero rather than propagating.
msg = str(e).lower()
if "cannot convert" in msg:
return 0
raise
def __getitem__(self, key):
return Value(super().__getitem__(key))
try:
raw = gdb.Value.__getitem__(self, key)
except gdb.MemoryError as e:
return CorruptedValue(self._safe_addr(), e)
except gdb.error as e:
if self._is_memory_error(e):
return CorruptedValue(self._safe_addr(), e)
raise
return Value(raw)
def __getattr__(self, key):
if hasattr(super(), key):
return Value(super().__getattribute__(key))
return Value(super().__getitem__(key))
try:
if hasattr(gdb.Value, key):
raw = gdb.Value.__getattribute__(self, key)
else:
raw = gdb.Value.__getitem__(self, key)
except gdb.MemoryError as e:
return CorruptedValue(self._safe_addr(), e)
except gdb.error as e:
if self._is_memory_error(e):
return CorruptedValue(self._safe_addr(), e)
raise
return Value(raw)
def dereference(self) -> Union["Value", "CorruptedValue"]:
try:
raw = gdb.Value.dereference(self)
except gdb.MemoryError as e:
return CorruptedValue(self._safe_addr(), e)
except gdb.error as e:
if self._is_memory_error(e):
return CorruptedValue(self._safe_addr(), e)
raise
return Value(raw)
def cast(
self, type_name: Union[str, gdb.Type], ptr: bool = False
) -> Optional["Value"]:
) -> Optional[Union["Value", "CorruptedValue"]]:
try:
gdb_type = (
gdb.lookup_type(type_name) if isinstance(type_name, str) else type_name
)
if ptr:
gdb_type = gdb_type.pointer()
return Value(super().cast(gdb_type))
except gdb.error:
return None
return Value(gdb.Value.cast(self, gdb_type))
except gdb.MemoryError as e:
return CorruptedValue(self._safe_addr(), e)
except gdb.error as e:
if self._is_memory_error(e):
return CorruptedValue(self._safe_addr(), e)
return None # Non-memory error preserves original behavior
def super_value(self, attr: str) -> "Value":
return self[attr]
@@ -108,12 +225,16 @@ class Value(gdb.Value):
except (gdb.error, gdb.MemoryError):
return default
def as_string(self):
"""Convert to string if possible"""
def string(self, *args, fallback=None, **kwargs) -> str:
"""Read a C string, returning fallback on memory error."""
try:
return self.string()
except gdb.error:
return str(self)
return gdb.Value.string(self, *args, **kwargs)
except gdb.MemoryError:
return fallback if fallback is not None else f"(corrupted@{hex(self._safe_addr())})"
except gdb.error as e:
if self._is_memory_error(e):
return fallback if fallback is not None else f"(corrupted@{hex(self._safe_addr())})"
raise
def __str__(self):
"""Provide better string representation for debugging"""
@@ -129,6 +250,8 @@ class Value(gdb.Value):
"""Provide detailed representation"""
try:
content = self.dereference()
if isinstance(content, CorruptedValue):
return f"Value({self.__str__()}: {content!r})"
return f"Value({self.__str__()}: {content})"
except gdb.error:
pass
@@ -136,5 +259,83 @@ class Value(gdb.Value):
return f"Value({self.__str__()})"
class CorruptedError(Exception):
"""Raised when a CorruptedValue field is accessed (terminal operation)."""
def __init__(self, addr: int, original_error: Exception):
self.addr = addr
self.original_error = original_error
super().__init__(
f"Corrupted value @{hex(addr)}: {original_error}"
)
class CorruptedValue:
"""Sentinel for inaccessible memory. Infectious through chained ops.
Not a subclass of Value or gdb.Value. Propagates through dereference()
and cast(), terminates with CorruptedError on field access.
bool(CorruptedValue) == False enables natural loop termination.
"""
def __init__(self, addr: int, error: Exception):
object.__setattr__(self, "_addr", addr)
object.__setattr__(self, "_error", error)
# -- Safe exits: never raise --
@property
def is_ok(self) -> bool:
"""Always False for CorruptedValue."""
return False
def __int__(self) -> int:
return self._addr
def __bool__(self) -> bool:
return False
def snapshot(self):
"""Return a corrupted Snapshot with diagnostic info."""
from .lvgl.snapshot import Snapshot
d = {
"addr": hex(self._addr),
"class_name": "(corrupted)",
"error": str(self._error),
}
return Snapshot(d, source=self)
def __repr__(self) -> str:
return f"CorruptedValue(@{hex(self._addr)}: {self._error})"
def __str__(self) -> str:
return f"(corrupted@{hex(self._addr)})"
# -- Propagating ops: return self, never raise --
def read_value(self):
return self
def dereference(self):
return self
def cast(self, *args, **kwargs):
return self
def string(self, *args, fallback=None, **kwargs) -> str:
"""Return fallback string for corrupted value."""
return fallback if fallback is not None else f"(corrupted@{hex(self._addr)})"
# -- Terminal ops: raise CorruptedError / AttributeError --
def __getattr__(self, key):
if key.startswith("__") and key.endswith("__"):
raise AttributeError(key)
raise CorruptedError(self._addr, self._error)
def __getitem__(self, key):
raise CorruptedError(self._addr, self._error)
# Type alias for all wrapper class __init__ parameters
ValueInput = Union[str, int, gdb.Value, Value]