feat(simulation): add multi-video support into gstreamer plugin

Modified plugin `src/modules/simulation/gz_plugins/gstreamer`.
Previously when launched the plugin performed scanning of
all gazebo topics based on some regex, selected the first camera one
and launched gstreamer pipeline fetching images from the topic
and publishing into either udpsink or rtmp.
Now all the streaming functionality was moved into separate class:
`CameraStream`. It is responsible both for sub/unsub to gz topic
and gstreamer pipeline lifecycle.
Main class `GstCameraSystem` now stores a collection of
`CameraStream` instances which are created using gazebo's ECM
`eachNew` method (i.e. whenever a new camera appeared
in the world) and removed using `eachRemove` method
(whenever a camera disappears). The last thing worth noting is
that multi-vehicle streaming only works for udpsink, not RTMP,
since it requires careful consideration on how
the many rtmp paths should be specified.
On the other hand, udp is simple:
we use specified port as a base one and add instance number to it
to obtain exact port for each camera stream.
Moreover, `GstCameraSystem` keeps track on busy ports
and ensures that whenever restarted an instance
will stream its video to the same port as before.
This commit is contained in:
Peter Filimonov
2026-05-04 12:02:26 -03:00
parent ba36572571
commit b101ce90b6
3 changed files with 361 additions and 209 deletions
File diff suppressed because it is too large Load Diff
@@ -38,19 +38,72 @@
#include <thread>
#include <mutex>
#include <atomic>
#include <regex>
#include <unordered_set>
#include <gz/sim/System.hh>
#include <gz/transport/Node.hh>
#include <gz/msgs/image.pb.h>
#include <gz/sim/components/Name.hh>
#include <gz/sim/components/World.hh>
#include <gz/sim/components/ParentEntity.hh>
#include <gst/gst.h>
#include <gst/app/gstappsrc.h>
namespace custom
{
class CameraStream
{
public:
CameraStream(const std::string &udpHost, int udpPort, bool useCuda,
bool useRtmp, const std::string &rtmpLocation,
const std::string &cameraTopic);
void start();
void stop();
int getUdpPort() const
{
return _udpPort;
}
~CameraStream();
private:
std::string _cameraTopic;
// Transport
gz::transport::Node _node;
// Image processing
gz::msgs::Image _currentFrame;
std::mutex _frameMutex;
std::atomic<bool> _newFrameAvailable{};
// GStreamer elements
GMainLoop *_gstLoop{};
GstElement *_pipeline{};
GstElement *_source{};
std::thread _gstThread;
std::atomic<bool> _running{};
// stream params
int _width{0};
int _height{0};
double _rate{30.0};
std::string _udpHost;
int _udpPort = 5600;
bool _useRtmp{};
std::string _rtmpLocation;
bool _useCuda = true;
void onCameraInfo(const gz::msgs::Image &msg);
void onImage(const gz::msgs::Image &msg);
void gstThreadFunc();
};
class GstCameraSystem :
public gz::sim::System,
public gz::sim::ISystemConfigure,
@@ -58,7 +111,6 @@ class GstCameraSystem :
{
public:
GstCameraSystem();
~GstCameraSystem();
void Configure(const gz::sim::Entity &_entity,
const std::shared_ptr<const sdf::Element> &_sdf,
@@ -68,48 +120,51 @@ public:
void PostUpdate(const gz::sim::UpdateInfo &_info,
const gz::sim::EntityComponentManager &_ecm) override;
private:
void onImage(const gz::msgs::Image &msg);
void onCameraInfo(const gz::msgs::Image &msg);
// Video streams
std::unordered_map<gz::sim::Entity, std::unique_ptr<CameraStream>> _streams;
// Find first camera topic in the world
void findCameraTopic();
void gstThreadFunc();
// Transport
gz::transport::Node _node;
// Image processing
gz::msgs::Image _currentFrame;
std::mutex _frameMutex;
std::atomic<bool> _newFrameAvailable {};
// GStreamer elements
GMainLoop *_gstLoop {};
GstElement *_pipeline {};
GstElement *_source {};
std::thread _gstThread;
std::atomic<bool> _running {};
std::string _buildTopic(const gz::sim::EntityComponentManager &_ecm,
const gz::sim::Entity &sensorEntity,
const gz::sim::components::Name *sensorName,
const gz::sim::components::ParentEntity *parent);
// Configuration
std::string _worldName;
std::string _udpHost;
int _udpPort = 5600;
int _baseUdpPort = 5600;
bool _useRtmp {};
std::string _rtmpLocation;
bool _useCuda = true;
// Topic info
std::string _cameraTopic;
int _width {};
int _height {};
double _rate = 30.0;
// methods below are used to maintain consistency in udp ports when restarting instances. For example, if we have
// instance 1 streaming on port 5601 we also want it to stream to the same port if we shutdown and restart the
// instance
std::unordered_set<int> _usedUdpPorts;
int getAvailableUdpPort()
{
int port = _baseUdpPort;
while (_usedUdpPorts.count(port) > 0) {
port++;
}
_usedUdpPorts.insert(port);
return port;
}
void freeUdpPort(int port)
{
auto it_port = _usedUdpPorts.find(port);
if (it_port != _usedUdpPorts.end()) {
_usedUdpPorts.erase(it_port);
}
}
// Topic pattern for matching camera image topics
std::regex _cameraTopicPattern;
// Flag to control discovery
bool _initialized {};
};
} // namespace custom
@@ -47,6 +47,20 @@ For RTMP streams, you can use any RTMP-compatible player, such as:
- VLC: Media > Open Network Stream > rtmp://your-rtmp-url
- ffplay: `ffplay rtmp://your-rtmp-url`
## Multi-vehicle simulation
The port specified in the SDF file is used as a base port. Each instance gets base_port + instance_index:
| Instance | Port |
|----------|------|
| 0 | 5600 |
| 1 | 5601 |
| 2 | 5602 |
Note: ports are assigned by the GStreamer plugin in launch order, not by the -i flag value. Always start instances sequentially: 0, then 1, then 2, etc.
Restarting an individual instance preserves its port — e.g. a restarted instance 1 will use port 5601 again.
*Warning:* RTMP streaming for multi-vehicle simulation is currently not supported.
## Environment Variables
- `PX4_VIDEO_HOST_IP`: Can be set to override the default UDP destination IP