chore(gdb): add cache sanity check with rb/ll cross-validation and check command (#9805)

This commit is contained in:
Benign X
2026-03-05 20:52:10 +08:00
committed by GitHub
parent 11f13318c1
commit 7db70fc54f
8 changed files with 413 additions and 93 deletions

View File

@@ -3,7 +3,7 @@ import gdb
from .core import DumpObj
from .display import DumpDisplayBuf
from .draw import InfoDrawUnit
from .misc import InfoStyle, DumpCache
from .misc import InfoStyle, DumpCache, CheckPrefix, CheckCache
from .debugger import Debugger
from .drivers import Lvglobal
@@ -23,6 +23,8 @@ Debugger()
DumpObj()
DumpDisplayBuf()
DumpCache()
CheckPrefix()
CheckCache()
# Infos
InfoStyle()

View File

@@ -1,4 +1,4 @@
from .lv_style import InfoStyle
from .lv_cache import DumpCache
from .lv_cache import DumpCache, CheckPrefix, CheckCache
__all__ = ["InfoStyle", "DumpCache"]
__all__ = ["InfoStyle", "DumpCache", "CheckPrefix", "CheckCache"]

View File

@@ -38,3 +38,53 @@ class DumpCache(gdb.Command):
return
cache.print_entries()
class CheckPrefix(gdb.Command):
"""prefix command for check subcommands"""
def __init__(self):
super(CheckPrefix, self).__init__(
"check", gdb.COMMAND_USER, gdb.COMPLETE_NONE, True
)
def invoke(self, args, from_tty):
gdb.execute("help check")
class CheckCache(gdb.Command):
"""run sanity check on specified cache"""
def __init__(self):
super(CheckCache, self).__init__(
"check cache", gdb.COMMAND_USER, gdb.COMPLETE_EXPRESSION
)
def invoke(self, args, from_tty):
parser = argparse.ArgumentParser(description="Run cache sanity check.")
parser.add_argument(
"cache",
type=str,
choices=["image", "image_header"],
default="image",
help="cache to check.",
)
from lvglgdb import curr_inst
try:
args = parser.parse_args(gdb.string_to_argv(args))
except SystemExit:
return
cache = None
if args.cache == "image":
cache = curr_inst().image_cache()
elif args.cache == "image_header":
cache = curr_inst().image_header_cache()
if not cache:
print("Invalid cache: ", args.cache)
return
cache.sanity_check()

View File

@@ -77,6 +77,29 @@ class LVCache(Value):
elif count < int(cache_entries_cnt):
print(f" ... {cache_entries_cnt - count} more entries not shown")
def sanity_check(self, entry_checker=None):
"""Run sanity check and print results as a table"""
from prettytable import PrettyTable
iterator = iter(self)
if iterator is None:
errors = [f"unsupported cache type: {self.name.as_string()}"]
else:
errors = iterator.sanity_check(entry_checker)
table = PrettyTable()
table.field_names = ["#", "status", "detail"]
table.align["detail"] = "l"
if errors:
for i, err in enumerate(errors):
table.add_row([i, "FAIL", err])
else:
table.add_row([0, "PASS", f"all {len(iterator)} entries OK"])
print(table)
return errors
def dump_cache_info(cache: ValueInput, datatype: Union[gdb.Type, str]):
"""Dump cache information"""

View File

@@ -11,6 +11,7 @@ class LVCacheIteratorBase:
self.cache = cache
self._entries: List[Value] = []
self._current_index = 0
self._collect_error: Optional[str] = None
self._collect_entries()
def __iter__(self):
@@ -34,3 +35,37 @@ class LVCacheIteratorBase:
def _collect_entries(self):
"""To be implemented by subclasses"""
raise NotImplementedError("Subclasses must implement _collect_entries")
@property
def extra_fields(self):
"""Extra column names provided by this iterator, override in subclasses"""
return []
def get_extra(self, entry):
"""Get extra column values for an entry, override in subclasses"""
return []
def sanity_check(self, entry_checker=None):
"""Run sanity check on cache entries, override in subclasses for structural checks.
Args:
entry_checker: optional callable(entry) -> list of error strings
Returns:
list of error strings, empty means all good
"""
self._entries.clear()
self._current_index = 0
self._collect_error = None
self._collect_entries()
errors = []
if self._collect_error:
errors.append(self._collect_error)
if entry_checker:
for entry in self._entries:
try:
errs = entry_checker(entry)
if errs:
errors.extend(errs)
except Exception as e:
errors.append(f"entry_checker raised: {e}")
return errors

View File

@@ -2,41 +2,139 @@ import gdb
from lvglgdb.value import Value, ValueInput
from .lv_cache_iter_base import LVCacheIteratorBase
from .lv_rb import LVRedBlackTree
from .lv_cache_entry import LVCacheEntry
from .lv_cache import LVCache
from .lv_ll import LVList
class LVCacheLRURBIterator(LVCacheIteratorBase):
"""Iterator for LRU RB cache implementation - traverses linked list and red-black tree"""
"""Iterator for LRU RB cache implementation - traverses linked list in LRU order"""
def __init__(self, cache):
super().__init__(cache)
def _get_lru_params(self):
"""Get common parameters for ll traversal"""
lru_cache = self.cache.cast("lv_lru_rb_t_", ptr=True)
if not lru_cache:
return None
rb_size = int(lru_cache.rb.size)
ptr_size = gdb.lookup_type("void").pointer().sizeof
rb_node_pp_t = gdb.lookup_type("lv_rb_node_t").pointer().pointer()
void_pp_t = gdb.lookup_type("void").pointer().pointer()
return lru_cache, rb_size, ptr_size, rb_node_pp_t, void_pp_t
def _iter_ll_nodes(self, lru_cache, rb_size, ptr_size, rb_node_pp_t, void_pp_t):
"""Yield (ll_addr, data, back_ptr) for each ll node"""
for ll_node in LVList(lru_cache.ll):
ll_addr = int(ll_node)
rb_node = Value(ll_node).cast(rb_node_pp_t)
data = rb_node.data
back_ptr = int(
Value(int(data) + rb_size - ptr_size).cast(void_pp_t).dereference()
)
yield ll_addr, data, back_ptr
def _collect_entries(self):
"""Collect entries from LRU RB cache by traversing the linked list"""
"""Collect entries from LRU RB cache by traversing the linked list (MRU→LRU order)"""
try:
# Cast cache to lv_lru_rb_t_ to access internal structures
lru_cache = self.cache.cast("lv_lru_rb_t_", ptr=True)
if not lru_cache:
params = self._get_lru_params()
if not params:
return
lru_cache, rb_size, ptr_size, rb_node_pp_t, void_pp_t = params
# Access the linked list
rb = lru_cache.rb
if not rb or not rb.root:
return
for ll_addr, data, back_ptr in self._iter_ll_nodes(
lru_cache, rb_size, ptr_size, rb_node_pp_t, void_pp_t
):
entry = LVCacheEntry.from_data_ptr(data, self.cache.datatype)
entry.extra = Value(back_ptr)
entry.ll_addr = ll_addr
self._entries.append(entry)
rb = LVRedBlackTree(rb)
for node in rb:
self._entries.append(
LVCacheEntry.from_data_ptr(node, self.cache.datatype)
)
except Exception as e:
print(f"Error in _collect_lru_entries: {e}")
self._collect_error = f"_collect_entries failed: {e}"
import traceback
traceback.print_exc()
@property
def extra_fields(self):
return ["ll"]
def get_extra(self, entry):
return [f"{int(entry.extra):#x}"]
def sanity_check(self, entry_checker=None):
"""Verify rb tree and ll linked list consistency:
1. Node sets match between rb and ll
2. Cross-pointers form a closed loop: ll_node→rb_node and rb_node.back_ptr→ll_node
"""
errors = super().sanity_check(entry_checker)
try:
from .lv_rb import LVRedBlackTree
params = self._get_lru_params()
if not params:
errors.append("failed to cast cache to lv_lru_rb_t_")
return errors
lru_cache = params[0]
ll_data_set = set()
ll_data_list = []
for entry in self._entries:
data_addr = int(entry.get_data())
ll_data_set.add(data_addr)
ll_data_list.append(data_addr)
back_ptr = int(entry.extra)
ll_addr = entry.ll_addr
if back_ptr != ll_addr:
errors.append(
f"cross-ptr mismatch: data {data_addr:#x} "
f"back_ptr={back_ptr:#x} != ll_node={ll_addr:#x}"
)
rb_data_set = set()
rb_data_list = []
rb_tree = LVRedBlackTree(lru_cache.rb)
for data in rb_tree:
rb_data_set.add(int(data))
rb_data_list.append(int(data))
if len(ll_data_list) != len(ll_data_set):
from collections import Counter
for addr, cnt in Counter(ll_data_list).items():
if cnt > 1:
errors.append(f"duplicate in ll: {addr:#x} appears {cnt} times")
if len(rb_data_list) != len(rb_data_set):
from collections import Counter
for addr, cnt in Counter(rb_data_list).items():
if cnt > 1:
errors.append(f"duplicate in rb: {addr:#x} appears {cnt} times")
only_in_ll = ll_data_set - rb_data_set
only_in_rb = rb_data_set - ll_data_set
for addr in only_in_ll:
errors.append(f"node {addr:#x} in ll but not in rb tree")
for addr in only_in_rb:
errors.append(f"node {addr:#x} in rb tree but not in ll")
if not errors:
ll_count = len(ll_data_set)
rb_count = len(rb_data_set)
if ll_count != rb_count:
errors.append(f"count mismatch: ll={ll_count}, rb={rb_count}")
except Exception as e:
errors.append(f"sanity_check error: {e}")
return errors
class LVCacheLRURB(LVCache):
"""LVGL LRU-based cache using red-black tree iterator"""

View File

@@ -1,4 +1,3 @@
from typing import Union
import gdb
from prettytable import PrettyTable
from lvglgdb.value import Value
@@ -6,6 +5,43 @@ from .lv_cache import LVCache
from .lv_cache_entry import LVCacheEntry
class LVImageCacheData(Value):
"""Wrapper for lv_image_cache_data_t with sanity check"""
def __init__(self, data_ptr):
super().__init__(Value.normalize(data_ptr, "lv_image_cache_data_t"))
def sanity_check(self, entry_addr):
"""Validate image cache data fields"""
errors = []
prefix = f"entry {entry_addr:#x}"
decoded = self.decoded
if not decoded:
errors.append(f"{prefix}: null decoded pointer")
return errors
header = decoded.header
w = int(header.w)
h = int(header.h)
if w <= 0 or h <= 0:
errors.append(f"{prefix}: invalid size {w}x{h}")
data_size = int(decoded.data_size)
if data_size <= 0:
errors.append(f"{prefix}: invalid data_size {data_size}")
src_type = int(self.src_type)
if src_type not in (0, 1):
errors.append(f"{prefix}: unknown src_type {src_type}")
src = self.src
if not src or int(src) == 0:
errors.append(f"{prefix}: null src pointer")
return errors
class LVImageCache(object):
def __init__(self, cache: Value):
self._cache = LVCache(cache, "lv_image_cache_data_t")
@@ -15,46 +51,52 @@ class LVImageCache(object):
def print_entries(self):
"""Print image cache entries using prettytable format"""
table = PrettyTable()
table.field_names = [
"entry",
"size",
"data_size",
"cf",
"rc",
"type",
"decoder",
"decoded",
"src",
]
table.align = "r" # Right align all columns by default
table.align["src"] = "l" # Left align source column
table.align["type"] = "c" # Center align type column
iterator = iter(self._cache)
extra_fields = iterator.extra_fields
for entry in self._cache:
table = PrettyTable()
fields = (
["entry"]
+ extra_fields
+ ["size", "data_size", "cf", "rc", "type", "decoder", "decoded", "src"]
)
table.field_names = fields
table.align = "r"
table.align["src"] = "l"
table.align["type"] = "c"
for entry in iterator:
entry: LVCacheEntry
data_ptr = entry.get_data()
if not data_ptr:
continue
decoded = data_ptr.decoded
ref_cnt = 0
decoded_ptr = 0
size_str = ""
data_size = 0
cf = 0
decoder_name = ""
type_str = "unkn"
src_str = ""
try:
ref_cnt = entry.get_ref_count()
decoded = data_ptr.decoded
decoded_ptr = int(decoded) if decoded else 0
src_type = int(data_ptr.src_type)
src = data_ptr.src
header = decoded.header
w = int(header.w)
h = int(header.h)
cf = int(header.cf)
data_size = int(decoded.data_size) if decoded else 0
decoded_ptr = int(decoded) if decoded else 0
decoder_name = data_ptr.decoder.name.as_string()
src_type = int(data_ptr.src_type)
src = data_ptr.src
ref_cnt = entry.get_ref_count()
size_str = f"{w}x{h}"
decoder_name = data_ptr.decoder.name.as_string()
if src_type == 0: # LV_IMAGE_SRC_VARIABLE
src_str = src.format_string(
symbols=True, address=True, styling=True
@@ -65,26 +107,41 @@ class LVImageCache(object):
src.cast("char", ptr=True).as_string() if src else "(null)"
)
type_str = "file"
else: # Unknown type
else:
src_str = f"{int(src):#x}" if src else "0x0"
type_str = "unkn"
table.add_row(
[
f"{int(entry):#x}",
size_str,
f"{data_size}",
f"{cf}",
f"{ref_cnt}",
type_str,
decoder_name,
f"{int(decoded_ptr):#x}",
src_str,
]
)
except gdb.error as e:
table.add_row(["ERROR", "", "", "", "", "", "", "", str(e)])
continue
src_str = src_str or str(e)
row = (
[f"{int(entry):#x}"]
+ iterator.get_extra(entry)
+ [
size_str,
f"{data_size}",
f"{cf}",
f"{ref_cnt}",
type_str,
decoder_name,
f"{decoded_ptr:#x}",
src_str,
]
)
table.add_row(row)
print(table)
@staticmethod
def _check_image_entry(entry):
"""Delegate sanity check to LVImageCacheData"""
data_ptr = entry.get_data()
if not data_ptr:
return [f"entry {int(entry):#x}: null data pointer"]
try:
return LVImageCacheData(data_ptr).sanity_check(int(entry))
except gdb.error as e:
return [f"entry {int(entry):#x}: gdb error: {e}"]
def sanity_check(self):
"""Run sanity check on image cache with image-specific entry validation"""
return self._cache.sanity_check(self._check_image_entry)

View File

@@ -1,4 +1,3 @@
from typing import Union
import gdb
from prettytable import PrettyTable
from lvglgdb.value import Value
@@ -6,6 +5,34 @@ from .lv_cache import LVCache
from .lv_cache_entry import LVCacheEntry
class LVImageHeaderCacheData(Value):
"""Wrapper for lv_image_header_cache_data_t with sanity check"""
def __init__(self, data_ptr):
super().__init__(Value.normalize(data_ptr, "lv_image_header_cache_data_t"))
def sanity_check(self, entry_addr):
"""Validate image header cache data fields"""
errors = []
prefix = f"entry {entry_addr:#x}"
header = self.header
w = int(header.w)
h = int(header.h)
if w <= 0 or h <= 0:
errors.append(f"{prefix}: invalid size {w}x{h}")
src_type = int(self.src_type)
if src_type not in (0, 1):
errors.append(f"{prefix}: unknown src_type {src_type}")
src = self.src
if not src or int(src) == 0:
errors.append(f"{prefix}: null src pointer")
return errors
class LVImageHeaderCache(object):
def __init__(self, cache: Value):
self._cache = LVCache(cache, "lv_image_header_cache_data_t")
@@ -14,34 +41,46 @@ class LVImageHeaderCache(object):
self._cache.print_info()
def print_entries(self):
"""Print image cache entries using prettytable format"""
table = PrettyTable()
table.field_names = ["size", "cf", "rc", "type", "decoder", "src"]
table.align = "r" # Right align all columns by default
table.align["src"] = "l" # Left align source column
table.align["type"] = "c" # Center align type column
"""Print image header cache entries using prettytable format"""
iterator = iter(self._cache)
extra_fields = iterator.extra_fields
for entry in self._cache:
table = PrettyTable()
fields = (
["entry"] + extra_fields + ["size", "cf", "rc", "type", "decoder", "src"]
)
table.field_names = fields
table.align = "r"
table.align["src"] = "l"
table.align["type"] = "c"
for entry in iterator:
entry: LVCacheEntry
data_ptr = entry.get_data()
if not data_ptr:
continue
ref_cnt = 0
size_str = ""
cf = 0
decoder_name = ""
type_str = "unkn"
src_str = ""
try:
ref_cnt = entry.get_ref_count()
src_type = int(data_ptr.src_type)
src = data_ptr.src
header = data_ptr.header
w = int(header.w)
h = int(header.h)
cf = int(header.cf)
src_type = int(data_ptr.src_type)
src = data_ptr.src
decoder_name = data_ptr.decoder.name.as_string()
ref_cnt = entry.get_ref_count()
size_str = f"{w}x{h}"
decoder_name = data_ptr.decoder.name.as_string()
if src_type == 0: # LV_IMAGE_SRC_VARIABLE
src_str = src.format_string(
symbols=True, address=True, styling=True
@@ -52,23 +91,39 @@ class LVImageHeaderCache(object):
src.cast("char", ptr=True).as_string() if src else "(null)"
)
type_str = "file"
else: # Unknown type
else:
src_str = f"{int(src):#x}" if src else "0x0"
type_str = "unkn"
table.add_row(
[
size_str,
f"{cf}",
f"{ref_cnt}",
type_str,
decoder_name,
src_str,
]
)
except gdb.error as e:
table.add_row(["ERROR", "", "", "", "", str(e)])
continue
src_str = src_str or str(e)
row = (
[f"{int(entry):#x}"]
+ iterator.get_extra(entry)
+ [
size_str,
f"{cf}",
f"{ref_cnt}",
type_str,
decoder_name,
src_str,
]
)
table.add_row(row)
print(table)
@staticmethod
def _check_header_entry(entry):
"""Delegate sanity check to LVImageHeaderCacheData"""
data_ptr = entry.get_data()
if not data_ptr:
return [f"entry {int(entry):#x}: null data pointer"]
try:
return LVImageHeaderCacheData(data_ptr).sanity_check(int(entry))
except gdb.error as e:
return [f"entry {int(entry):#x}: gdb error: {e}"]
def sanity_check(self):
"""Run sanity check on image header cache with header-specific entry validation"""
return self._cache.sanity_check(self._check_header_entry)