write rosbags

This commit is contained in:
Andreas Antener
2015-03-15 20:39:57 +01:00
parent 0daf2232ec
commit f3a2c66e89
3 changed files with 32 additions and 1 deletions
@@ -39,6 +39,7 @@ PKG = 'px4'
import unittest import unittest
import rospy import rospy
import rosbag
from numpy import linalg from numpy import linalg
import numpy as np import numpy as np
@@ -62,8 +63,9 @@ class DirectOffboardPosctlTest(unittest.TestCase):
def setUp(self): def setUp(self):
rospy.init_node('test_node', anonymous=True) rospy.init_node('test_node', anonymous=True)
self.bag = rosbag.Bag('direct_offboard_posctl_test.bag', 'w', compression="lz4")
rospy.Subscriber('iris/vehicle_control_mode', vehicle_control_mode, self.vehicle_control_mode_callback) rospy.Subscriber('iris/vehicle_control_mode', vehicle_control_mode, self.vehicle_control_mode_callback)
rospy.Subscriber("iris/vehicle_local_position", vehicle_local_position, self.position_callback) self.sub_vlp = rospy.Subscriber("iris/vehicle_local_position", vehicle_local_position, self.position_callback)
self.pub_spt = rospy.Publisher('iris/position_setpoint_triplet', position_setpoint_triplet, queue_size=10) self.pub_spt = rospy.Publisher('iris/position_setpoint_triplet', position_setpoint_triplet, queue_size=10)
self.rate = rospy.Rate(10) # 10hz self.rate = rospy.Rate(10) # 10hz
self.has_pos = False self.has_pos = False
@@ -74,12 +76,17 @@ class DirectOffboardPosctlTest(unittest.TestCase):
if self.fpa: if self.fpa:
self.fpa.stop() self.fpa.stop()
self.sub_vlp.unregister()
self.rate.sleep()
self.bag.close()
# #
# General callback functions used in tests # General callback functions used in tests
# #
def position_callback(self, data): def position_callback(self, data):
self.has_pos = True self.has_pos = True
self.local_position = data self.local_position = data
self.bag.write('vehicle_local_position', data)
def vehicle_control_mode_callback(self, data): def vehicle_control_mode_callback(self, data):
self.control_mode = data self.control_mode = data
@@ -39,8 +39,10 @@ PKG = 'px4'
import unittest import unittest
import rospy import rospy
import rosbag
from px4.msg import vehicle_control_mode from px4.msg import vehicle_control_mode
from px4.msg import vehicle_local_position
from std_msgs.msg import Header from std_msgs.msg import Header
from std_msgs.msg import Float64 from std_msgs.msg import Float64
from geometry_msgs.msg import PoseStamped, Quaternion from geometry_msgs.msg import PoseStamped, Quaternion
@@ -56,8 +58,10 @@ class MavrosOffboardAttctlTest(unittest.TestCase):
def setUp(self): def setUp(self):
rospy.init_node('test_node', anonymous=True) rospy.init_node('test_node', anonymous=True)
self.bag = rosbag.Bag('mavros_offboard_attctl_test.bag', 'w', compression="lz4")
rospy.wait_for_service('iris/mavros/cmd/arming', 30) rospy.wait_for_service('iris/mavros/cmd/arming', 30)
rospy.Subscriber('iris/vehicle_control_mode', vehicle_control_mode, self.vehicle_control_mode_callback) rospy.Subscriber('iris/vehicle_control_mode', vehicle_control_mode, self.vehicle_control_mode_callback)
self.sub_vlp = rospy.Subscriber("iris/vehicle_local_position", vehicle_local_position, self.vehicle_position_callback)
rospy.Subscriber("iris/mavros/position/local", PoseStamped, self.position_callback) rospy.Subscriber("iris/mavros/position/local", PoseStamped, self.position_callback)
self.pub_att = rospy.Publisher('iris/mavros/setpoint/attitude', PoseStamped, queue_size=10) self.pub_att = rospy.Publisher('iris/mavros/setpoint/attitude', PoseStamped, queue_size=10)
self.pub_thr = rospy.Publisher('iris/mavros/setpoint/att_throttle', Float64, queue_size=10) self.pub_thr = rospy.Publisher('iris/mavros/setpoint/att_throttle', Float64, queue_size=10)
@@ -66,9 +70,17 @@ class MavrosOffboardAttctlTest(unittest.TestCase):
self.control_mode = vehicle_control_mode() self.control_mode = vehicle_control_mode()
self.local_position = PoseStamped() self.local_position = PoseStamped()
def tearDown(self):
self.sub_vlp.unregister()
self.rate.sleep()
self.bag.close()
# #
# General callback functions used in tests # General callback functions used in tests
# #
def vehicle_position_callback(self, data):
self.bag.write('vehicle_local_position', data)
def position_callback(self, data): def position_callback(self, data):
self.has_pos = True self.has_pos = True
self.local_position = data self.local_position = data
@@ -40,11 +40,13 @@ PKG = 'px4'
import unittest import unittest
import rospy import rospy
import math import math
import rosbag
from numpy import linalg from numpy import linalg
import numpy as np import numpy as np
from px4.msg import vehicle_control_mode from px4.msg import vehicle_control_mode
from px4.msg import vehicle_local_position
from std_msgs.msg import Header from std_msgs.msg import Header
from geometry_msgs.msg import PoseStamped, Quaternion from geometry_msgs.msg import PoseStamped, Quaternion
from tf.transformations import quaternion_from_euler from tf.transformations import quaternion_from_euler
@@ -60,7 +62,9 @@ class MavrosOffboardPosctlTest(unittest.TestCase):
def setUp(self): def setUp(self):
rospy.init_node('test_node', anonymous=True) rospy.init_node('test_node', anonymous=True)
self.bag = rosbag.Bag('mavros_offboard_posctl_test.bag', 'w', compression="lz4")
rospy.Subscriber('iris/vehicle_control_mode', vehicle_control_mode, self.vehicle_control_mode_callback) rospy.Subscriber('iris/vehicle_control_mode', vehicle_control_mode, self.vehicle_control_mode_callback)
self.sub_vlp = rospy.Subscriber("iris/vehicle_local_position", vehicle_local_position, self.vehicle_position_callback)
rospy.Subscriber("iris/mavros/position/local", PoseStamped, self.position_callback) rospy.Subscriber("iris/mavros/position/local", PoseStamped, self.position_callback)
self.pub_spt = rospy.Publisher('iris/mavros/setpoint/local_position', PoseStamped, queue_size=10) self.pub_spt = rospy.Publisher('iris/mavros/setpoint/local_position', PoseStamped, queue_size=10)
self.rate = rospy.Rate(10) # 10hz self.rate = rospy.Rate(10) # 10hz
@@ -68,9 +72,17 @@ class MavrosOffboardPosctlTest(unittest.TestCase):
self.local_position = PoseStamped() self.local_position = PoseStamped()
self.control_mode = vehicle_control_mode() self.control_mode = vehicle_control_mode()
def tearDown(self):
self.sub_vlp.unregister()
self.rate.sleep()
self.bag.close()
# #
# General callback functions used in tests # General callback functions used in tests
# #
def vehicle_position_callback(self, data):
self.bag.write('vehicle_local_position', data)
def position_callback(self, data): def position_callback(self, data):
self.has_pos = True self.has_pos = True
self.local_position = data self.local_position = data