Merge pull request #2120 from DonLakeFlyer/BurstDownload

New burst mode ftp file download
This commit is contained in:
Lorenz Meier
2015-05-04 17:24:01 +02:00
9 changed files with 617 additions and 385 deletions
File diff suppressed because it is too large Load Diff
+51 -50
View File
@@ -39,45 +39,44 @@
#include <dirent.h>
#include <queue.h>
#include <nuttx/wqueue.h>
#include <systemlib/err.h>
#include "mavlink_messages.h"
#include "mavlink_main.h"
#include "mavlink_stream.h"
#include "mavlink_bridge_header.h"
class MavlinkFtpTest;
/// @brief MAVLink remote file server. Support FTP like commands using MAVLINK_MSG_ID_FILE_TRANSFER_PROTOCOL message.
/// A limited number of requests (kRequestQueueSize) may be outstanding at a time. Additional messages will be discarded.
class MavlinkFTP
/// MAVLink remote file server. Support FTP like commands using MAVLINK_MSG_ID_FILE_TRANSFER_PROTOCOL message.
class MavlinkFTP : public MavlinkStream
{
public:
/// @brief Returns the one Mavlink FTP server in the system.
static MavlinkFTP* get_server(void);
/// @brief Contructor is only public so unit test code can new objects.
MavlinkFTP();
MavlinkFTP(Mavlink *mavlink);
~MavlinkFTP();
static MavlinkStream *new_instance(Mavlink *mavlink);
/// @brief Adds the specified message to the work queue.
void handle_message(Mavlink* mavlink, mavlink_message_t *msg);
typedef void (*ReceiveMessageFunc_t)(const mavlink_message_t *msg, MavlinkFtpTest* ftpTest);
/// Handle possible FTP message
void handle_message(const mavlink_message_t *msg);
typedef void (*ReceiveMessageFunc_t)(const mavlink_file_transfer_protocol_t* ftp_req, void *worker_data);
/// @brief Sets up the server to run in unit test mode.
/// @param rcvmsgFunc Function which will be called to handle outgoing mavlink messages.
/// @param ftp_test MavlinkFtpTest object which the function is associated with
void set_unittest_worker(ReceiveMessageFunc_t rcvMsgFunc, MavlinkFtpTest *ftp_test);
/// @param worker_data Data to pass to worker
void set_unittest_worker(ReceiveMessageFunc_t rcvMsgFunc, void *worker_data);
/// @brief This is the payload which is in mavlink_file_transfer_protocol_t.payload. We pad the structure ourselves to
/// 32 bit alignment to avoid usage of any pack pragmas.
struct PayloadHeader
{
uint16_t seqNumber; ///< sequence number for message
uint16_t seq_number; ///< sequence number for message
uint8_t session; ///< Session id for read and write commands
uint8_t opcode; ///< Command opcode
uint8_t size; ///< Size of data
uint8_t req_opcode; ///< Request opcode returned in kRspAck, kRspNak message
uint8_t padding[2]; ///< 32 bit aligment padding
uint8_t burst_complete; ///< Only used if req_opcode=kCmdBurstReadFile - 1: set of burst packets complete, 0: More burst packets coming.
uint8_t padding; ///< 32 bit aligment padding
uint32_t offset; ///< Offsets for List and Read commands
uint8_t data[]; ///< command data, varies by Opcode
};
@@ -100,6 +99,7 @@ public:
kCmdTruncateFile, ///< Truncate file at <path> to <offset> length
kCmdRename, ///< Rename <path1> to <path2>
kCmdCalcFileCRC32, ///< Calculate CRC32 for file at <path>
kCmdBurstReadFile, ///< Burst download session file
kRspAck = 128, ///< Ack response
kRspNak ///< Nak response
@@ -118,35 +118,22 @@ public:
kErrUnknownCommand ///< Unknown command opcode
};
// MavlinkStream overrides
virtual const char *get_name(void) const;
virtual uint8_t get_id(void);
virtual unsigned get_size(void);
private:
/// @brief Unit of work which is queued to work_queue
struct Request
{
work_s work; ///< work queue entry
Mavlink *mavlink; ///< Mavlink to reply to
uint8_t serverSystemId; ///< System ID to send from
uint8_t serverComponentId; ///< Component ID to send from
uint8_t serverChannel; ///< Channel to send to
uint8_t targetSystemId; ///< System ID to target reply to
mavlink_file_transfer_protocol_t message; ///< Protocol message
};
Request *_get_request(void);
void _return_request(Request *req);
void _lock_request_queue(void);
void _unlock_request_queue(void);
char *_data_as_cstring(PayloadHeader* payload);
static void _worker_trampoline(void *arg);
void _process_request(Request *req);
void _reply(Request *req);
void _process_request(mavlink_file_transfer_protocol_t* ftp_req, uint8_t target_system_id);
void _reply(mavlink_file_transfer_protocol_t* ftp_req);
int _copy_file(const char *src_path, const char *dst_path, size_t length);
ErrorCode _workList(PayloadHeader *payload);
ErrorCode _workOpen(PayloadHeader *payload, int oflag);
ErrorCode _workRead(PayloadHeader *payload);
ErrorCode _workBurst(PayloadHeader* payload, uint8_t target_system_id);
ErrorCode _workWrite(PayloadHeader *payload);
ErrorCode _workTerminate(PayloadHeader *payload);
ErrorCode _workReset(PayloadHeader* payload);
@@ -156,14 +143,13 @@ private:
ErrorCode _workTruncateFile(PayloadHeader *payload);
ErrorCode _workRename(PayloadHeader *payload);
ErrorCode _workCalcFileCRC32(PayloadHeader *payload);
static const unsigned kRequestQueueSize = 2; ///< Max number of queued requests
Request _request_bufs[kRequestQueueSize]; ///< Request buffers which hold work
dq_queue_t _request_queue; ///< Queue of available Request buffers
sem_t _request_queue_sem; ///< Semaphore for locking access to _request_queue
int _find_unused_session(void);
bool _valid_session(unsigned index);
uint8_t _getServerSystemId(void);
uint8_t _getServerComponentId(void);
uint8_t _getServerChannel(void);
// Overrides from MavlinkStream
virtual void send(const hrt_abstime t);
static const char kDirentFile = 'F'; ///< Identifies File returned from List command
static const char kDirentDir = 'D'; ///< Identifies Directory returned from List command
@@ -172,9 +158,24 @@ private:
/// @brief Maximum data size in RequestHeader::data
static const uint8_t kMaxDataLength = MAVLINK_MSG_FILE_TRANSFER_PROTOCOL_FIELD_PAYLOAD_LEN - sizeof(PayloadHeader);
static const unsigned kMaxSession = 2; ///< Max number of active sessions
int _session_fds[kMaxSession]; ///< Session file descriptors, 0 for empty slot
struct SessionInfo {
int fd;
uint32_t file_size;
bool stream_download;
uint32_t stream_offset;
uint16_t stream_seq_number;
uint8_t stream_target_system_id;
};
struct SessionInfo _session_info; ///< Session info, fd=-1 for no active session
ReceiveMessageFunc_t _utRcvMsgFunc; ///< Unit test override for mavlink message sending
MavlinkFtpTest *_ftp_test; ///< Additional parameter to _utRcvMsgFunc;
ReceiveMessageFunc_t _utRcvMsgFunc; ///< Unit test override for mavlink message sending
void *_worker_data; ///< Additional parameter to _utRcvMsgFunc;
/* do not allow copying this class */
MavlinkFTP(const MavlinkFTP&);
MavlinkFTP operator=(const MavlinkFTP&);
// Mavlink test needs to be able to call send
friend class MavlinkFtpTest;
};
+9
View File
@@ -131,6 +131,7 @@ Mavlink::Mavlink() :
_streams(nullptr),
_mission_manager(nullptr),
_parameters_manager(nullptr),
_mavlink_ftp(nullptr),
_mode(MAVLINK_MODE_NORMAL),
_channel(MAVLINK_COMM_0),
_logbuffer {},
@@ -868,6 +869,9 @@ Mavlink::handle_message(const mavlink_message_t *msg)
/* handle packet with parameter component */
_parameters_manager->handle_message(msg);
/* handle packet with ftp component */
_mavlink_ftp->handle_message(msg);
if (get_forwarding_on()) {
/* forward any messages to other mavlink instances */
@@ -1364,6 +1368,11 @@ Mavlink::task_main(int argc, char *argv[])
_parameters_manager->set_interval(interval_from_rate(120.0f));
LL_APPEND(_streams, _parameters_manager);
/* MAVLINK_FTP stream */
_mavlink_ftp = (MavlinkFTP *) MavlinkFTP::new_instance(this);
_mavlink_ftp->set_interval(interval_from_rate(120.0f));
LL_APPEND(_streams, _mavlink_ftp);
/* MISSION_STREAM stream, actually sends all MISSION_XXX messages at some rate depending on
* remote requests rate. Rate specified here controls how much bandwidth we will reserve for
* mission messages. */
+4 -2
View File
@@ -59,6 +59,7 @@
#include "mavlink_messages.h"
#include "mavlink_mission.h"
#include "mavlink_parameters.h"
#include "mavlink_ftp.h"
class Mavlink
{
@@ -296,8 +297,9 @@ private:
MavlinkOrbSubscription *_subscriptions;
MavlinkStream *_streams;
MavlinkMissionManager *_mission_manager;
MavlinkParametersManager *_parameters_manager;
MavlinkMissionManager *_mission_manager;
MavlinkParametersManager *_parameters_manager;
MavlinkFTP *_mavlink_ftp;
MAVLINK_MODE _mode;
-6
View File
@@ -134,8 +134,6 @@ MavlinkReceiver::MavlinkReceiver(Mavlink *parent) :
_time_offset(0)
{
// make sure the FTP server is started
(void)MavlinkFTP::get_server();
}
MavlinkReceiver::~MavlinkReceiver()
@@ -202,10 +200,6 @@ MavlinkReceiver::handle_message(mavlink_message_t *msg)
handle_message_request_data_stream(msg);
break;
case MAVLINK_MSG_ID_FILE_TRANSFER_PROTOCOL:
MavlinkFTP::get_server()->handle_message(_mavlink, msg);
break;
case MAVLINK_MSG_ID_SYSTEM_TIME:
handle_message_system_time(msg);
break;
-2
View File
@@ -73,8 +73,6 @@ public:
* @return 0 if updated / sent, -1 if unchanged
*/
int update(const hrt_abstime t);
static MavlinkStream *new_instance(const Mavlink *mavlink);
static const char *get_name_static();
virtual const char *get_name() const = 0;
virtual uint8_t get_id() = 0;
File diff suppressed because it is too large Load Diff
@@ -48,7 +48,18 @@ public:
virtual bool run_tests(void);
static void receive_message(const mavlink_message_t *msg, MavlinkFtpTest* ftpTest);
static void receive_message_handler_generic(const mavlink_file_transfer_protocol_t* ftp_req, void *worker_data);
/// Worker data for stream handler
struct BurstInfo {
MavlinkFtpTest* ftp_test_class;
int burst_state;
bool single_packet_file;
uint32_t file_size;
uint8_t* file_bytes;
};
static void receive_message_handler_burst(const mavlink_file_transfer_protocol_t* ftp_req, void *worker_data);
static const uint8_t serverSystemId = 50; ///< System ID for server
static const uint8_t serverComponentId = 1; ///< Component ID for server
@@ -75,31 +86,45 @@ private:
bool _terminate_badsession_test(void);
bool _read_test(void);
bool _read_badsession_test(void);
bool _burst_test(void);
bool _removedirectory_test(void);
bool _createdirectory_test(void);
bool _removefile_test(void);
void _receive_message(const mavlink_message_t *msg);
void _setup_ftp_msg(MavlinkFTP::PayloadHeader *payload_header, uint8_t size, const uint8_t *data, mavlink_message_t *msg);
bool _decode_message(const mavlink_message_t *msg, mavlink_file_transfer_protocol_t *ftp_msg, MavlinkFTP::PayloadHeader **payload);
void _receive_message_handler_generic(const mavlink_file_transfer_protocol_t* ftp_req);
void _setup_ftp_msg(const MavlinkFTP::PayloadHeader *payload_header, uint8_t size, const uint8_t *data, mavlink_message_t *msg);
bool _decode_message(const mavlink_file_transfer_protocol_t *ftp_msg, const MavlinkFTP::PayloadHeader **payload);
bool _send_receive_msg(MavlinkFTP::PayloadHeader *payload_header,
uint8_t size,
const uint8_t *data,
mavlink_file_transfer_protocol_t *ftp_msg_reply,
MavlinkFTP::PayloadHeader **payload_reply);
const MavlinkFTP::PayloadHeader **payload_reply);
void _cleanup_microsd(void);
MavlinkFTP *_ftp_server;
mavlink_message_t _reply_msg;
uint16_t _lastOutgoingSeqNumber;
struct ReadTestCase {
/// A single download test case
struct DownloadTestCase {
const char *file;
const uint16_t length;
bool singlePacketRead;
bool exactlyFillPacket;
};
static const ReadTestCase _rgReadTestCases[];
/// The set of test cases for download testing
static const DownloadTestCase _rgDownloadTestCases[];
/// States for stream download handler
enum {
burst_state_first_ack,
burst_state_last_ack,
burst_state_nak_eof,
burst_state_complete
};
bool _receive_message_handler_burst(const mavlink_file_transfer_protocol_t* ftp_req, BurstInfo* burst_info);
MavlinkFTP* _ftp_server;
uint16_t _expected_seq_number;
mavlink_file_transfer_protocol_t _reply_msg;
static const char _unittest_microsd_dir[];
static const char _unittest_microsd_file[];
@@ -38,6 +38,7 @@
MODULE_COMMAND = mavlink_tests
SRCS = mavlink_tests.cpp \
mavlink_ftp_test.cpp \
../mavlink_stream.cpp \
../mavlink_ftp.cpp \
../mavlink.c