chore(gdb): improve resilience against memory corruption and older LVGL versions (#9885)

This commit is contained in:
Benign X
2026-03-21 13:06:03 +08:00
committed by GitHub
parent 556bd623c3
commit fca39997f5
13 changed files with 266 additions and 105 deletions

View File

@@ -1,22 +1,9 @@
import base64
import functools
from datetime import datetime
import gdb
def safe_collect(subsystem: str):
"""Decorator that wraps a collector function with try/except/warning."""
def decorator(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
try:
return fn(*args, **kwargs)
except Exception as e:
gdb.write(f"Warning: failed to collect {subsystem}: {e}\n")
return []
return wrapper
return decorator
from lvglgdb.lvgl.data_utils import safe_collect
# Registry of simple subsystems: (dict_key, lvgl_accessor_method, label)
@@ -39,14 +26,14 @@ def _collect_simple(lvgl, dict_key: str, accessor: str | None, label: str) -> li
For entries with accessor != None: [x.snapshot().as_dict() for x in lvgl.<accessor>()]
For entries with accessor == None: [s.as_dict() for s in lvgl.<dict_key>().snapshots()]
"""
try:
@safe_collect(label)
def _inner():
if accessor is not None:
return [x.snapshot().as_dict() for x in getattr(lvgl, accessor)()]
else:
return [s.as_dict() for s in getattr(lvgl, dict_key)().snapshots()]
except Exception as e:
gdb.write(f"Warning: failed to collect {label}: {e}\n")
return []
return [s.as_dict() for s in getattr(lvgl, dict_key)().snapshots()]
return _inner()
def collect_all() -> dict:
@@ -126,31 +113,24 @@ def _collect_displays(lvgl) -> list:
@safe_collect("object trees")
def _collect_object_trees(lvgl) -> list:
"""Collect object trees for all displays with layer name annotations."""
from lvglgdb.lvgl.snapshot import Snapshot
result = []
for disp in lvgl.displays():
# Read special layer pointers for name annotation
layer_addrs = {}
for name in ("bottom_layer", "act_scr", "top_layer", "sys_layer"):
try:
ptr = disp.super_value(name)
if int(ptr):
layer_addrs[int(ptr)] = name
except Exception:
pass
addrs = disp.layer_addrs
tree = {
"display_addr": hex(int(disp)),
"screens": [],
}
for screen in disp.screens:
@Snapshot.fallback(layer_name=lambda s: addrs.get(int(s)))
def _screen_snapshot(screen):
snap = screen.snapshot(
include_children=True, include_styles=True
include_children=True, include_styles=True,
).as_dict()
# Annotate with layer name if this screen is a known layer
screen_addr = int(screen)
snap["layer_name"] = layer_addrs.get(screen_addr)
tree["screens"].append(snap)
result.append(tree)
snap["layer_name"] = addrs.get(int(screen))
return snap
result.append({
"display_addr": hex(int(disp)),
"screens": safe_collect(disp.screens, _screen_snapshot),
})
return result

View File

@@ -1,3 +1,5 @@
import gdb
from lvglgdb.value import Value, ValueInput
from lvglgdb.lvgl.misc.lv_style import LVStyle, decode_selector
@@ -99,7 +101,10 @@ class LVObject(Value):
if not self.spec_attr:
return
for i in range(self.child_count):
yield LVObject(self.spec_attr.children[i])
try:
yield LVObject(self.spec_attr.children[i])
except Exception:
continue
@property
def obj_styles(self):
@@ -109,17 +114,16 @@ class LVObject(Value):
return
styles_arr = self.super_value("styles")
for i in range(count):
raw = styles_arr[i]
flags = []
if int(raw.is_local):
flags.append("local")
if int(raw.is_trans):
flags.append("trans")
if int(raw.is_theme):
flags.append("theme")
if int(raw.is_disabled):
flags.append("disabled")
yield ObjStyle(i, int(raw.selector), flags, LVStyle(raw.style))
try:
raw = styles_arr[i]
flags = []
raw_val = Value(raw)
for flag_name in ("is_local", "is_trans", "is_theme", "is_disabled"):
if raw_val.safe_field(flag_name, False, bool):
flags.append(flag_name.replace("is_", ""))
yield ObjStyle(i, int(raw.selector), flags, LVStyle(raw.style))
except (gdb.MemoryError, gdb.error):
continue
@property
def styles(self):
@@ -149,14 +153,35 @@ class LVObject(Value):
"group_addr": self._get_group_addr(),
}
if include_children:
d["children"] = [
c.snapshot(include_children=True, include_styles=include_styles).as_dict()
for c in self.children
]
d["children"] = self._collect_children(include_styles)
if include_styles:
d["styles"] = [s.snapshot().as_dict() for s in self.obj_styles]
d["styles"] = self._collect_styles()
return Snapshot(d, source=self, display_spec=self._DISPLAY_SPEC)
def _collect_children(self, include_styles):
"""Collect child snapshots, substituting corrupted snapshots on error."""
from lvglgdb.lvgl.snapshot import Snapshot
from lvglgdb.lvgl.data_utils import safe_collect
@Snapshot.fallback()
def _snap(c):
return c.snapshot(
include_children=True, include_styles=include_styles,
).as_dict()
return safe_collect(self.children, _snap)
def _collect_styles(self):
"""Collect style snapshots, substituting corrupted entries on error."""
from lvglgdb.lvgl.snapshot import Snapshot
from lvglgdb.lvgl.data_utils import safe_collect
@Snapshot.fallback()
def _snap(s):
return s.snapshot().as_dict()
return safe_collect(self.obj_styles, _snap)
def _get_group_addr(self):
"""Get group address from spec_attr, or None."""
spec = self.spec_attr
@@ -195,6 +220,9 @@ def dump_obj_styles(obj: ValueInput):
return [entry["prop_name"], value_str]
for s in styles:
if "error" in s:
print(f"(corrupted) {s.get('addr', '?')}{s['error']}")
continue
print(f"[{s['index']}] {s['selector_str']} {s['flags_str']}")
print_table(
s.get("properties", []), ["prop", "value"], _style_row,

View File

@@ -1,4 +1,7 @@
from typing import Optional
import functools
from typing import Callable, Iterable, Optional
import gdb
from lvglgdb.value import Value
@@ -16,3 +19,52 @@ def ptr_or_none(val: Value) -> Optional[str]:
"""Convert pointer to hex string or None if NULL."""
addr = int(val)
return hex(addr) if addr else None
def safe_collect(
items_or_label,
transform: Callable = None,
on_mem_error: Callable = None,
):
"""Unified safe collection with two usage modes.
Iteration mode — collect items, skipping failures:
safe_collect(items, transform, on_mem_error=None)
Decorator mode — wrap an entire collector function:
@safe_collect("subsystem name")
def collect_xxx(): ...
In decorator mode, exceptions cause a gdb warning and return [].
In iteration mode, gdb.MemoryError/gdb.error calls on_mem_error
(or skips), other exceptions skip silently.
"""
if isinstance(items_or_label, str):
label = items_or_label
def decorator(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
try:
return fn(*args, **kwargs)
except Exception as e:
import traceback
gdb.write(f"Warning: failed to collect {label}: {e}\n")
traceback.print_exc()
return []
return wrapper
return decorator
result = []
try:
for item in items_or_label:
try:
result.append(transform(item))
except (gdb.MemoryError, gdb.error) as e:
if on_mem_error is not None:
result.append(on_mem_error(item, e))
except Exception:
continue
except Exception:
pass
return result

View File

@@ -17,6 +17,8 @@ class LVDisplay(Value):
"empty_msg": "No displays.",
}
_LAYER_NAMES = ("bottom_layer", "act_scr", "top_layer", "sys_layer")
def __init__(self, disp: ValueInput):
super().__init__(Value.normalize(disp, "lv_display_t"))
@@ -36,6 +38,19 @@ class LVDisplay(Value):
for i in range(self.screen_cnt):
yield LVObject(screens[i])
@property
def layer_addrs(self) -> dict:
"""Map screen address -> layer name for known layer pointers."""
result = {}
for name in self._LAYER_NAMES:
try:
ptr = self.super_value(name)
if int(ptr):
result[int(ptr)] = name
except Exception:
pass
return result
# Buffer-related properties
@property
def buf_1(self):

View File

@@ -23,7 +23,7 @@ class LVDrawUnit(Value):
@property
def idx(self) -> int:
return int(self.super_value("idx"))
return self.safe_field("idx", -1, int)
@property
def next(self):

View File

@@ -100,11 +100,11 @@ class LVAnim(Value):
@property
def reverse_duration(self) -> int:
return int(self.super_value("reverse_duration"))
return self.safe_field("reverse_duration", 0, int)
@property
def reverse_delay(self) -> int:
return int(self.super_value("reverse_delay"))
return self.safe_field("reverse_delay", 0, int)
@property
def repeat_delay(self) -> int:
@@ -116,15 +116,15 @@ class LVAnim(Value):
@property
def is_paused(self) -> bool:
return bool(int(self.super_value("is_paused")))
return self.safe_field("is_paused", False, bool)
@property
def reverse_play_in_progress(self) -> bool:
return bool(int(self.super_value("reverse_play_in_progress")))
return self.safe_field("reverse_play_in_progress", False, bool)
@property
def early_apply(self) -> bool:
return bool(int(self.super_value("early_apply")))
return self.safe_field("early_apply", False, bool)
def _status_str(self) -> str:
"""Short status string for table display."""

View File

@@ -25,15 +25,26 @@ class LVCacheLRURBIterator(LVCacheIteratorBase):
return lru_cache, rb_size, ptr_size, rb_node_pp_t, void_pp_t
def _iter_ll_nodes(self, lru_cache, rb_size, ptr_size, rb_node_pp_t, void_pp_t):
"""Yield (ll_addr, data, back_ptr) for each ll node"""
"""Yield (ll_addr, data, back_ptr) for each ll node.
Skips nodes with inaccessible memory (e.g. corrupted pointers).
"""
inferior = gdb.selected_inferior()
for ll_node in LVList(lru_cache.ll):
ll_addr = int(ll_node)
rb_node = Value(ll_node).cast(rb_node_pp_t)
data = rb_node.data
back_ptr = int(
Value(int(data) + rb_size - ptr_size).cast(void_pp_t).dereference()
)
yield ll_addr, data, back_ptr
try:
ll_addr = int(ll_node)
rb_node = Value(ll_node).cast(rb_node_pp_t)
data = rb_node.data
data_addr = int(data)
if data_addr == 0:
continue
# Probe data pointer accessibility before dereferencing
inferior.read_memory(data_addr, 1)
back_ptr = int(
Value(data_addr + rb_size - ptr_size).cast(void_pp_t).dereference()
)
yield ll_addr, data, back_ptr
except (gdb.MemoryError, gdb.error):
continue
def _collect_entries(self):
"""Collect entries from LRU RB cache by traversing the linked list (MRU→LRU order)"""
@@ -46,10 +57,13 @@ class LVCacheLRURBIterator(LVCacheIteratorBase):
for ll_addr, data, back_ptr in self._iter_ll_nodes(
lru_cache, rb_size, ptr_size, rb_node_pp_t, void_pp_t
):
entry = LVCacheEntry.from_data_ptr(data, self.cache.datatype)
entry.extra = Value(back_ptr)
entry.ll_addr = ll_addr
self._entries.append(entry)
try:
entry = LVCacheEntry.from_data_ptr(data, self.cache.datatype)
entry.extra = Value(back_ptr)
entry.ll_addr = ll_addr
self._entries.append(entry)
except Exception:
continue
except Exception as e:
self._collect_error = f"_collect_entries failed: {e}"
@@ -158,7 +172,7 @@ class LVCacheLRURB(LVCache):
try:
name = str(self.name)
return "count" in name.lower() or "lru_rb_count" in str(self.clz).lower()
except:
except Exception:
return False
def is_size_based(self):
@@ -166,7 +180,7 @@ class LVCacheLRURB(LVCache):
try:
name = str(self.name)
return "size" in name.lower() or "lru_rb_size" in str(self.clz).lower()
except:
except Exception:
return False
def __iter__(self):

View File

@@ -50,17 +50,23 @@ class LVList(Value):
nodetype = self.nodetype if self.nodetype else self.lv_ll_node_t
node = self.current.cast(nodetype)
self.current = self._next(self.current)
try:
self.current = self._next(self.current)
except (gdb.MemoryError, gdb.error):
self.current = None
return node
@property
def len(self):
len = 0
count = 0
node = self.head
while node:
len += 1
node = self._next(node)
return len
count += 1
try:
node = self._next(node)
except (gdb.MemoryError, gdb.error):
break
return count
def snapshot(self):
from lvglgdb.lvgl.snapshot import Snapshot
@@ -72,4 +78,3 @@ class LVList(Value):
"nodetype": str(self.nodetype) if self.nodetype else None,
}
return Snapshot(d, source=self, display_spec=self._DISPLAY_SPEC)

View File

@@ -105,18 +105,26 @@ class LVRedBlackTreeIterator:
if not self.current:
raise StopIteration
data = self.tree.get_data(self.current)
try:
data = self.tree.get_data(self.current)
except (gdb.MemoryError, gdb.error):
data = None
# Move to next node (in-order traversal)
if self.current.right:
self.current = self.tree.minimum_from(self.current.right)
else:
parent = self.current.parent
while parent and self.current == parent.right:
# Advance to next node (in-order traversal)
try:
if self.current.right:
self.current = self.tree.minimum_from(self.current.right)
else:
parent = self.current.parent
while parent and self.current == parent.right:
self.current = parent
parent = parent.parent
self.current = parent
parent = parent.parent
self.current = parent
except (gdb.MemoryError, gdb.error):
self.current = None
if data is None:
return self.__next__()
return data
def __str__(self):

View File

@@ -113,12 +113,15 @@ class LVStyle(Value):
yield StyleEntry(prop_id, const_props[j].value)
j += 1
elif prop_cnt > 0:
# Normal style: values[prop_cnt] then props[prop_cnt] (uint8_t)
# Normal style: values[prop_cnt] then props[prop_cnt]
# C code: (lv_style_prop_t*)vp + prop_cnt * sizeof(lv_style_value_t)
# The pointer arithmetic uses lv_style_prop_t element size as stride.
base = self.values_and_props
value_t = gdb.lookup_type("lv_style_value_t")
prop_t = gdb.lookup_type("lv_style_prop_t")
values_ptr = base.cast(value_t, ptr=True)
props_offset = prop_cnt * value_t.sizeof
props_ptr = Value(int(base) + props_offset).cast("uint8_t", ptr=True)
props_offset = prop_cnt * value_t.sizeof * prop_t.sizeof
props_ptr = Value(int(base) + props_offset).cast(prop_t, ptr=True)
for j in range(prop_cnt):
prop_id = int(props_ptr[j])

View File

@@ -1,4 +1,5 @@
from typing import Any, Dict, Iterator, Optional
import functools
from typing import Any, Callable, Dict, Iterator, Optional
class Snapshot:
@@ -7,6 +8,9 @@ class Snapshot:
Holds a pure Python dict (JSON-serializable), an optional reference
to the original wrapper (_source), and an optional display spec that
describes how to format the data for terminal output.
A snapshot may be marked *corrupted* to indicate that the underlying
data could not be fully read (e.g. inaccessible target memory).
"""
__slots__ = ("_data", "_source", "_display_spec")
@@ -17,6 +21,38 @@ class Snapshot:
self._source = source
self._display_spec = display_spec
@classmethod
def fallback(cls, addr: Callable = lambda x: int(x), **extra_fns):
"""Decorator: on exception, return a corrupted snapshot dict.
Each extra kwarg may be a callable (called with the first arg)
or a static value.
"""
def decorator(fn):
@functools.wraps(fn)
def wrapper(item, *args, **kwargs):
try:
return fn(item, *args, **kwargs)
except Exception as e:
try:
resolved_addr = hex(addr(item))
except Exception:
resolved_addr = "0x?"
extra = {}
for k, v in extra_fns.items():
try:
extra[k] = v(item) if callable(v) else v
except Exception:
extra[k] = None
d = {"addr": resolved_addr,
"class_name": "(corrupted)",
"error": str(e)}
if extra:
d.update(extra)
return d
return wrapper
return decorator
# --- dict-like read access ---
def __getitem__(self, key: str) -> Any:

View File

@@ -38,15 +38,27 @@ class Value(gdb.Value):
if typ.code == gdb.TYPE_CODE_PTR:
target = typ.target().strip_typedefs()
if target.code in (gdb.TYPE_CODE_STRUCT, gdb.TYPE_CODE_UNION):
target_type = str(target)
# Prefer .name or .tag over str() to avoid polluted
# anonymous struct expansions like "const struct {...}".
target_type = target.name or target.tag
if target_type is not None:
# If val is already a pointer whose target matches target_type,
# keep the original type to avoid gdb.lookup_type() which may
# resolve to a different compilation unit's definition when
# macro-controlled fields cause multiple struct definitions.
typ = val.type.strip_typedefs()
if typ.code == gdb.TYPE_CODE_PTR:
target = typ.target()
tname = target.name or target.tag or ""
if tname == target_type:
return val
try:
gdb.lookup_type(target_type)
except gdb.error:
raise ValueError(f"Type not found: {target_type}")
typ = val.type.strip_typedefs()
if typ.code != gdb.TYPE_CODE_PTR:
if typ.code in (gdb.TYPE_CODE_INT, gdb.TYPE_CODE_ENUM):
val = Value(val.cast(gdb.lookup_type(target_type).pointer()))
@@ -61,11 +73,7 @@ class Value(gdb.Value):
return val
def __getitem__(self, key):
try:
value = super().__getitem__(key)
except gdb.error:
value = super().__getattr__(key)
return Value(value)
return Value(super().__getitem__(key))
def __getattr__(self, key):
if hasattr(super(), key):
@@ -88,6 +96,18 @@ class Value(gdb.Value):
def super_value(self, attr: str) -> "Value":
return self[attr]
def safe_field(self, attr: str, default=None, cast=None):
"""Access a struct field that may not exist in older LVGL versions.
Returns the field value (optionally converted by *cast*), or
*default* when the field is missing.
"""
try:
v = Value(gdb.Value.__getitem__(self, attr))
return cast(int(v)) if cast is not None else v
except (gdb.error, gdb.MemoryError):
return default
def as_string(self):
"""Convert to string if possible"""
try:

View File

@@ -1,6 +1,6 @@
[project]
name = "lvglgdb"
version = "0.4.0"
version = "0.4.1"
description = "LVGL GDB scripts"
readme = "README.md"
requires-python = ">=3.10"