diff --git a/Ghidra/Debug/Framework-TraceModeling/src/main/java/ghidra/trace/database/target/DBTraceObjectManager.java b/Ghidra/Debug/Framework-TraceModeling/src/main/java/ghidra/trace/database/target/DBTraceObjectManager.java index e281ab679c..5c54750deb 100644 --- a/Ghidra/Debug/Framework-TraceModeling/src/main/java/ghidra/trace/database/target/DBTraceObjectManager.java +++ b/Ghidra/Debug/Framework-TraceModeling/src/main/java/ghidra/trace/database/target/DBTraceObjectManager.java @@ -422,7 +422,8 @@ public class DBTraceObjectManager implements TraceObjectManager, DBTraceManager public Stream getAllValues() { return Stream.concat( valueMap.values().stream().map(v -> v.getWrapper()), - valueWbCache.streamAllValues().map(v -> v.getWrapper())); + StreamUtils.lock(lock.readLock(), + valueWbCache.streamAllValues().map(v -> v.getWrapper()))); } protected Stream streamValuesIntersectingData(Lifespan span, @@ -672,7 +673,7 @@ public class DBTraceObjectManager implements TraceObjectManager, DBTraceManager return new UnionAddressSetView( valueMap.getAddressSetView(Lifespan.at(snap), v -> acceptValue(v.getWrapper(), key, ifaceCls, predicate)), - valueWbCache.getObjectsAddresSet(snap, key, ifaceCls, predicate)); + valueWbCache.getObjectsAddressSet(snap, key, ifaceCls, predicate)); } public I getSuccessor(TraceObject seed, diff --git a/Ghidra/Debug/Framework-TraceModeling/src/main/java/ghidra/trace/database/target/DBTraceObjectValueWriteBehindCache.java b/Ghidra/Debug/Framework-TraceModeling/src/main/java/ghidra/trace/database/target/DBTraceObjectValueWriteBehindCache.java index 7437dd65b5..0f0b3cd537 100644 --- a/Ghidra/Debug/Framework-TraceModeling/src/main/java/ghidra/trace/database/target/DBTraceObjectValueWriteBehindCache.java +++ b/Ghidra/Debug/Framework-TraceModeling/src/main/java/ghidra/trace/database/target/DBTraceObjectValueWriteBehindCache.java @@ -176,28 +176,30 @@ class DBTraceObjectValueWriteBehindCache { } public Stream streamAllValues() { - return doStreamAllValues(); + return StreamUtils.sync(cachedValues, doStreamAllValues()); } public DBTraceObjectValueBehind get(DBTraceObject parent, String key, long snap) { - var keys = cachedValues.get(parent); - if (keys == null) { - return null; - } - var values = keys.get(key); - if (values == null) { - return null; - } + synchronized (cachedValues) { + var keys = cachedValues.get(parent); + if (keys == null) { + return null; + } + var values = keys.get(key); + if (values == null) { + return null; + } - var floor = values.floorEntry(snap); - if (floor == null) { - return null; - } + var floor = values.floorEntry(snap); + if (floor == null) { + return null; + } - if (!floor.getValue().getLifespan().contains(snap)) { - return null; + if (!floor.getValue().getLifespan().contains(snap)) { + return null; + } + return floor.getValue(); } - return floor.getValue(); } public Stream streamParents(DBTraceObject child, Lifespan lifespan) { @@ -236,25 +238,29 @@ class DBTraceObjectValueWriteBehindCache { } public Stream streamValues(DBTraceObject parent, Lifespan lifespan) { - // TODO: Better indexing? - var keys = cachedValues.get(parent); - if (keys == null) { - return Stream.of(); + synchronized (cachedValues) { + var keys = cachedValues.get(parent); + if (keys == null) { + return Stream.of(); + } + return StreamUtils.sync(cachedValues, + keys.values().stream().flatMap(v -> streamSub(v, lifespan, true))); } - return keys.values().stream().flatMap(v -> streamSub(v, lifespan, true)); } public Stream streamValues(DBTraceObject parent, String key, Lifespan lifespan, boolean forward) { - var keys = cachedValues.get(parent); - if (keys == null) { - return Stream.of(); + synchronized (cachedValues) { + var keys = cachedValues.get(parent); + if (keys == null) { + return Stream.of(); + } + var values = keys.get(key); + if (values == null) { + return Stream.of(); + } + return StreamUtils.sync(cachedValues, streamSub(values, lifespan, forward)); } - var values = keys.get(key); - if (values == null) { - return Stream.of(); - } - return streamSub(values, lifespan, forward); } static boolean intersectsRange(Object value, AddressRange range) { @@ -265,14 +271,16 @@ class DBTraceObjectValueWriteBehindCache { private Stream streamValuesIntersectingLifespan(Lifespan lifespan, String entryKey) { // TODO: In-memory spatial index? - var top = cachedValues.values().stream(); - var keys = entryKey == null - ? top.flatMap(v -> v.values().stream()) - : top.flatMap(v -> v.entrySet() - .stream() - .filter(e -> entryKey.equals(e.getKey())) - .map(e -> e.getValue())); - return keys.flatMap(v -> streamSub(v, lifespan, true)); + synchronized (cachedValues) { + var top = cachedValues.values().stream(); + var keys = entryKey == null + ? top.flatMap(v -> v.values().stream()) + : top.flatMap(v -> v.entrySet() + .stream() + .filter(e -> entryKey.equals(e.getKey())) + .map(e -> e.getValue())); + return StreamUtils.sync(cachedValues, keys.flatMap(v -> streamSub(v, lifespan, true))); + } } public Stream streamValuesIntersecting(Lifespan lifespan, @@ -302,38 +310,46 @@ class DBTraceObjectValueWriteBehindCache { return null; } - public AddressSetView getObjectsAddresSet(long snap, + public AddressSetView getObjectsAddressSet(long snap, String key, Class ifaceCls, Predicate predicate) { return new AbstractAddressSetView() { AddressSet collectRanges() { AddressSet result = new AddressSet(); - for (DBTraceObjectValueBehind v : StreamUtils - .iter(streamValuesIntersectingLifespan(Lifespan.at(snap), key))) { - AddressRange range = getIfRangeOrAddress(v.getValue()); - if (range == null) { - continue; + try (LockHold hold = LockHold.lock(manager.lock.readLock())) { + synchronized (cachedValues) { + for (DBTraceObjectValueBehind v : StreamUtils + .iter(streamValuesIntersectingLifespan(Lifespan.at(snap), key))) { + AddressRange range = getIfRangeOrAddress(v.getValue()); + if (range == null) { + continue; + } + if (!DBTraceObjectManager.acceptValue(v.getWrapper(), key, ifaceCls, + predicate)) { + continue; + } + result.add(range); + } } - if (!DBTraceObjectManager.acceptValue(v.getWrapper(), key, ifaceCls, - predicate)) { - continue; - } - result.add(range); } return result; } @Override public boolean contains(Address addr) { - for (DBTraceObjectValueBehind v : StreamUtils - .iter(streamValuesIntersectingLifespan(Lifespan.at(snap), key))) { - if (!addr.equals(v.getValue())) { - continue; + try (LockHold hold = LockHold.lock(manager.lock.readLock())) { + synchronized (cachedValues) { + for (DBTraceObjectValueBehind v : StreamUtils + .iter(streamValuesIntersectingLifespan(Lifespan.at(snap), key))) { + if (!addr.equals(v.getValue())) { + continue; + } + if (!DBTraceObjectManager.acceptValue(v.getWrapper(), key, ifaceCls, + predicate)) { + continue; + } + return true; + } } - if (!DBTraceObjectManager.acceptValue(v.getWrapper(), key, ifaceCls, - predicate)) { - continue; - } - return true; } return false; } diff --git a/Ghidra/Debug/ProposedUtils/src/main/java/ghidra/util/StreamUtils.java b/Ghidra/Debug/ProposedUtils/src/main/java/ghidra/util/StreamUtils.java index 292f40a9ac..cb5a2df2c1 100644 --- a/Ghidra/Debug/ProposedUtils/src/main/java/ghidra/util/StreamUtils.java +++ b/Ghidra/Debug/ProposedUtils/src/main/java/ghidra/util/StreamUtils.java @@ -19,10 +19,25 @@ import java.util.*; import java.util.stream.Stream; import java.util.stream.StreamSupport; +import ghidra.util.database.DBSynchronizedSpliterator; +import ghidra.util.database.SynchronizedSpliterator; + +/** + * Some utilities for streams + */ public class StreamUtils { private StreamUtils() { } + /** + * Union two sorted streams into a single sorted stream + * + * @param the type of elements + * @param streams the streams to be merged + * @param comparator the comparator that orders each stream and that will order the resulting + * stream + * @return the sorted stream + */ @SuppressWarnings("unchecked") public static Stream merge(Collection> streams, Comparator comparator) { @@ -33,8 +48,53 @@ public class StreamUtils { streams.stream().map(s -> s.spliterator()).toList(), comparator), false); } + /** + * Adapt a stream into an iterable + * + * @param the type of elements + * @param stream the stream + * @return an iterable over the same elements in the stream in the same order + */ @SuppressWarnings("unchecked") public static Iterable iter(Stream stream) { return () -> (Iterator) stream.iterator(); } + + /** + * Wrap the given stream into a synchronized stream on the given object's intrinsic lock + * + *

+ * NOTE: This makes no guarantees regarding the consistency or visit order if the + * underlying resource is modified between elements being visited. It merely prevents the stream + * client from accessing the underlying resource concurrently. For such guarantees, the client + * may need to acquire the lock for its whole use of the stream. + * + * @param the type of elements + * @param lock the object on which to synchronize + * @param stream the (un)synchronized stream + * @return the synchronized stream + */ + public static Stream sync(Object lock, Stream stream) { + var wrapped = new SynchronizedSpliterator(stream.spliterator(), lock); + return StreamSupport.stream(wrapped, stream.isParallel()); + } + + /** + * Wrap the given stream into a synchronized stream on the given lock + * + *

+ * NOTE: This makes no guarantees regarding the consistency or visit order if the + * underlying resource is modified between elements being visited. It merely prevents the stream + * client from accessing the underlying resource concurrently. For such guarantees, the client + * may need to acquire the lock for its whole use of the stream. + * + * @param the type of elements + * @param lock the lock + * @param stream the (un)synchronized stream + * @return the synchronized stream + */ + public static Stream lock(java.util.concurrent.locks.Lock lock, Stream stream) { + var wrapped = new DBSynchronizedSpliterator(stream.spliterator(), lock); + return StreamSupport.stream(wrapped, stream.isParallel()); + } } diff --git a/Ghidra/Debug/ProposedUtils/src/main/java/ghidra/util/database/DBSynchronizedSpliterator.java b/Ghidra/Debug/ProposedUtils/src/main/java/ghidra/util/database/DBSynchronizedSpliterator.java new file mode 100644 index 0000000000..d0aa128e34 --- /dev/null +++ b/Ghidra/Debug/ProposedUtils/src/main/java/ghidra/util/database/DBSynchronizedSpliterator.java @@ -0,0 +1,78 @@ +/* ### + * IP: GHIDRA + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ghidra.util.database; + +import java.util.Spliterator; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.function.Consumer; + +import ghidra.util.LockHold; + +/** + * Wraps an unsynchronized spliterator in one that synchronizes on a given {@link Lock} + * + * @param the type of elements + */ +public class DBSynchronizedSpliterator implements Spliterator { + private final Spliterator spliterator; + private final Lock lock; + + public DBSynchronizedSpliterator(Spliterator spliterator, Lock lock) { + this.spliterator = spliterator; + this.lock = lock; + } + + @Override + public boolean tryAdvance(Consumer action) { + AtomicReference ref = new AtomicReference<>(); + boolean result; + try (LockHold hold = LockHold.lock(lock)) { + result = spliterator.tryAdvance(ref::set); + } + if (!result) { + return false; + } + action.accept(ref.get()); + return true; + } + + @Override + public Spliterator trySplit() { + Spliterator newSplit; + try (LockHold hold = LockHold.lock(lock)) { + newSplit = spliterator.trySplit(); + } + if (newSplit == null) { + return null; + } + return new DBSynchronizedSpliterator<>(newSplit, lock); + } + + @Override + public long estimateSize() { + try (LockHold hold = LockHold.lock(lock)) { + return spliterator.estimateSize(); + } + } + + @Override + public int characteristics() { + try (LockHold hold = LockHold.lock(lock)) { + return spliterator.characteristics(); + } + } +} diff --git a/Ghidra/Debug/ProposedUtils/src/main/java/ghidra/util/database/SynchronizedSpliterator.java b/Ghidra/Debug/ProposedUtils/src/main/java/ghidra/util/database/SynchronizedSpliterator.java new file mode 100644 index 0000000000..857f61631b --- /dev/null +++ b/Ghidra/Debug/ProposedUtils/src/main/java/ghidra/util/database/SynchronizedSpliterator.java @@ -0,0 +1,76 @@ +/* ### + * IP: GHIDRA + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ghidra.util.database; + +import java.util.Spliterator; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +/** + * Wraps an unsynchronized spliterator in one that synchronizes on a given object's intrinsic lock, + * often the collection that provided the stream or spliterator. + * + * @param the type of elements + */ +public class SynchronizedSpliterator implements Spliterator { + private final Spliterator spliterator; + private final Object lock; + + public SynchronizedSpliterator(Spliterator spliterator, Object lock) { + this.spliterator = spliterator; + this.lock = lock; + } + + @Override + public boolean tryAdvance(Consumer action) { + AtomicReference ref = new AtomicReference<>(); + boolean result; + synchronized (lock) { + result = spliterator.tryAdvance(ref::set); + } + if (!result) { + return false; + } + action.accept(ref.get()); + return true; + } + + @Override + public Spliterator trySplit() { + Spliterator newSplit; + synchronized (lock) { + newSplit = spliterator.trySplit(); + } + if (newSplit == null) { + return null; + } + return new SynchronizedSpliterator<>(newSplit, lock); + } + + @Override + public long estimateSize() { + synchronized (lock) { + return spliterator.estimateSize(); + } + } + + @Override + public int characteristics() { + synchronized (lock) { + return spliterator.characteristics(); + } + } +}