一、IO交互

文件下載過程中的IO交互主要分為以下幾個步驟:
1、APP通過 avSendIOCtrl(IOCTRL_FILEMANAGER_FILE_LIST_REQ) 查詢指定時間內(nèi)的文件列表
2、設(shè)備通過 avSendIOCtrl(IOCTRL_FILEMANAGER_FILE_LIST_RESP) 回復文件列表
3、用戶選擇文件后,APP通過 avSendIOCtrl(IOCTRL_FILEMANAGER_FILE_DOWNLOAD_REQ) 通知設(shè)備
4、設(shè)備根據(jù)資源情況,通過 avSendIOCtrl(IOCTRL_FILEMANAGER_FILE_DOWNLOAD_RESP) 告知APP使用的通道數(shù)和API類型
5、雙方創(chuàng)建通道并進行數(shù)據(jù)傳輸
6、傳輸完成后關(guān)閉通道
二、RDT通道的創(chuàng)建和銷毀
創(chuàng)建(APP端/設(shè)備端通用)
int createChannelForDownload(int sid, int iotc_channel_id)
{
return RDT_Create(sid, 5000, iotcChannelId);
}
銷毀(APP端/設(shè)備端通用)
void destoryChannelOfDownload(int rdt_id)
{
if (rdt_id < 0)
return;
RDT_Abort(rdt_id);
}
三、數(shù)據(jù)的傳輸
RDT協(xié)議本身提供的接口只有Read和Write,傳輸過程存在粘包的情況,需要另外設(shè)計切包組包的機制。每個數(shù)據(jù)包分為包頭、數(shù)據(jù)和包尾,格式參考:公版RDT幀定義
設(shè)備端
說明:RDT的數(shù)據(jù)傳輸是可靠的傳輸,每次調(diào)用RDT_Write,都將數(shù)據(jù)寫入本地的發(fā)送緩存區(qū),所以等所有的數(shù)據(jù)都使用RDT_Write寫入后,需要檢查發(fā)送緩存區(qū)是否為空,為空,則說明數(shù)據(jù)已經(jīng)送完。
寫入數(shù)據(jù)相關(guān)(偽)代碼:
寫入數(shù)據(jù)相關(guān)(偽)代碼:
#define RDT_HEADER_FRAMEBEGIN_ADDR 0
#define RDT_HEADER_FILENAME_ADDR 4
#define RDT_HEADER_FILESIZE_ADDR 68
#define RDT_HEADER_FRAMESIZE_ADDR 72
#define RDT_HEADER_ENDFLAG_ADDR 76
#define RDT_FRAME_BUFFER_ADDR 77
#define RDT_FRAME_BUFFER_SIZE 10243
#define RDT_FRAME_HEADER_SIZE 77
#define RDT_PAYLOAD_WRITE_POSTION 77
#define RDT_FRAME_TAIL_SIZE 2
#define RDT_FILENAME_MAX_LEN (RDT_HEADER_FILESIZE_ADDR - RDT_HEADER_FILENAME_ADDR) // 64字節(jié)
int createChannelForDownload(int sid, int iotc_channel_id);
void destroyChannelOfDownload(int rdt_id);
union typeXChange {
uint32_t uint32_data;
char char_data[4];
};
bool swapUint32ToCharBuffer(const uint32_t uint32_data, char* buffer, size_t bufferSize) {
if (!buffer || bufferSize != 4)
return false;
typeXChange tmp;
tmp.uint32_data = htonl(uint32_data); // 轉(zhuǎn)換為網(wǎng)絡(luò)字節(jié)序(大端)
memcpy(buffer, tmp.char_data, 4);
return true;
}
生成幀頭
void createPacketHeader(char* buffer_write_postion, size_t bufferSize, const char* fileName, uint32_t fileSize, uint32_t payloadSize,bool endFlag) {
// 校驗緩沖區(qū)是否足夠存儲header
if (!buffer_write_postion || bufferSize < RDT_FRAME_HEADER_SIZE)
return;
memset(buffer_write_postion, 0, RDT_FRAME_HEADER_SIZE);
// 寫入幀頭標識 IOTC
buffer_write_postion[0] = 'I';
buffer_write_postion[1] = 'O';
buffer_write_postion[2] = 'T';
buffer_write_postion[3] = 'C';
// 拷貝文件名:限制長度,手動加終止符
strncpy(buffer_write_postion + RDT_HEADER_FILENAME_ADDR, fileName, RDT_FILENAME_MAX_LEN);
buffer_write_postion[RDT_HEADER_FILESIZE_ADDR - 1] = '\0'; // 確保終止
// 寫入文件大小、payload大小(轉(zhuǎn)換為uint32_t,避免截斷)
swapUint32ToCharBuffer(fileSize, buffer_write_postion + RDT_HEADER_FILESIZE_ADDR, 4);
swapUint32ToCharBuffer(payloadSize, buffer_write_postion + RDT_HEADER_FRAMESIZE_ADDR, 4);
// 寫入結(jié)束標志
buffer_write_postion[RDT_HEADER_ENDFLAG_ADDR] = endFlag ? 1 : 0;
}
生成幀尾
void createPacketTail(char* buffer_write_postion, size_t bufferSize) {
if (!buffer_write_postion || bufferSize < RDT_FRAME_TAIL_SIZE)
return;
buffer_write_postion[0] = 'G';
buffer_write_postion[1] = 'C';
}
發(fā)送單個文件
int sendOneFile2Client(int rdt_id, file& f, bool isLastFile) {
int ret = 0;
bool rdt_endflag = false;
size_t fileSize = f.size();
std::string fileName = f.name();
uint32_t rdt_file_size = static_cast(fileSize); // 轉(zhuǎn)為uint32_t(若文件超4G需改用uint64_t)
const size_t buffer_for_payload_size = RDT_FRAME_BUFFER_SIZE - RDT_FRAME_HEADER_SIZE - RDT_FRAME_TAIL_SIZE;
// 檢查文件是否打開成功
if (!f.open()) {
fprintf(stderr, "File %s open failed\n", fileName.c_str());
return -1;
}
char* buffer = new char[RDT_FRAME_BUFFER_SIZE]();
if (!buffer) {
ret = -ENOMEM;
goto LAB_SEND_RETURN_ERROR;
}
while (true) {
// 讀取payload:用ssize_t區(qū)分成功/失敗
ssize_t readSize = f.read(buffer + RDT_PAYLOAD_WRITE_POSTION, buffer_for_payload_size);
if (readSize < 0) { // 讀取錯誤
fprintf(stderr, "File %s read failed, err=%zd\n", fileName.c_str(), readSize);
delete[] buffer;
ret = -2;
goto LAB_SEND_RETURN_ERROR;
} else if (readSize == 0) { // 讀取到文件尾
delete[] buffer;
break;
}
// 判斷是否為最后一包
if (readSize < buffer_for_payload_size) {
if (isLastFile) {
rdt_endflag = true;
}
}
// 構(gòu)建header和tail
createPacketHeader(buffer, RDT_FRAME_BUFFER_SIZE, fileName.c_str(), rdt_file_size, static_cast(readSize), rdt_endflag);
createPacketTail(buffer + RDT_FRAME_HEADER_SIZE + readSize, RDT_FRAME_TAIL_SIZE);
//發(fā)送完整包長度(header + payload + tail)
size_t send_len = RDT_FRAME_HEADER_SIZE + readSize + RDT_FRAME_TAIL_SIZE;
ret = RDT_Write(rdt_id, buffer, send_len);
if (ret < 0) {
fprintf(stderr, "RDT_Write failed, err=%d\n", ret);
delete[] buffer;
goto LAB_SEND_RETURN_ERROR;
}
// 最后一包:刷新并退出
if (rdt_endflag) {
RDT_Flush(rdt_id);
break;
}
}
if(buffer) delete[] buffer; // 釋放緩沖區(qū)
buffer = NULL;
f.close();
return 0; // 成功返回0
LAB_SEND_RETURN_ERROR:
if(buffer) delete[] buffer; // 釋放緩沖區(qū):
f.close(); // 確保文件關(guān)閉,避免泄漏
return ret;
}
發(fā)送文件列表
void sendFileList(void* arg, int sid, int iotc_channel_id) {
int rdt_id = createChannelForDownload(sid, iotc_channel_id);
if (rdt_id < 0) {
fprintf(stderr, "Create channel failed, rdt_id=%d\n", rdt_id);
return;
}
int ret = 0;
FileList* fileList = static_cast(arg);
int fileCount = fileList->count();
int fileIndex = 1;
//遍歷文件,發(fā)送文件列表
for (auto& file : fileList->getFiles()) {
bool isLastFile = (fileCount == fileIndex);
ret = sendOneFile2Client(rdt_id, file, isLastFile);
if (ret < 0) {
fprintf(stderr, "Send file %s error %d\n", file.name().c_str(), ret);
break;
}
fileIndex++;
}
// 檢查發(fā)送隊列是否清空
if (ret == 0) {
do {
st_RDT_Status status;
ret = RDT_Status_Check(rdt_id, &status);
if (ret < 0) {
fprintf(stderr, "RDT_Status_Check failed, err=%d\n", ret);
break;
}
if (status.BufSizeInSendQueue == 0) {
break;
} else {
usleep(100 * 1000); // 替代msleep,跨平臺
}
} while (true);
}
// 關(guān)閉通道
destroyChannelOfDownload(rdt_id);
}
APP端
接收端也需要按照對應(yīng)的格式進行切包和解析數(shù)據(jù),如果切包有誤,則可能導致數(shù)據(jù)解析異常。
讀取通道數(shù)據(jù)和解析方法(偽)代碼:
讀取通道數(shù)據(jù)和解析方法(偽)代碼:
//讀數(shù)據(jù)頭
#define HEADER_SIZE 77
#define RDT_BUFFER_SIZE 20480
int createChannelForDownload(int sid, int iotc_channel_id);
void destroyChannelOfDownload(int rdt_id);
// 解析RDT Header
bool parseRDTHeader(const char* headerBuffer, std::string& fileName, int& dataLength, bool& endFlag) {
// 示例:從headerBuffer解析fileName、dataLength、endFlag
fileName = "example.txt";
dataLength = 1024;
endFlag = false;
return true;
}
// 讀取完整的RDT Header
int readRDTHeader(int rdt_id, char* headerBuffer, int bufferSize) {
if (rdt_id < 0 || !headerBuffer || bufferSize < HEADER_SIZE) {
fprintf(stderr, "readRDTHeader: invalid params\n");
return -1;
}
memset(headerBuffer, 0, HEADER_SIZE); // 僅清空Header部分,避免越界
int restSize = HEADER_SIZE;
int headerIndex = 0;
int retryCount = 0; // 超時重試計數(shù)
do {
//傳入rdt_id參數(shù)
int size = RDT_Read(rdt_id, headerBuffer + headerIndex, restSize, 1000);
if (size > 0) {
headerIndex += size;
restSize -= size;
retryCount = 0; // 成功讀取,重置重試計數(shù)
} else if (size == RDT_ER_TIMEOUT) {
retryCount++;
if (retryCount >= MAX_READ_RETRY) { // 超過最大重試次數(shù),退出
fprintf(stderr, "readRDTHeader: timeout after %d retries\n", MAX_READ_RETRY);
return RDT_ER_TIMEOUT;
}
usleep(10 * 1000); // 超時后短暫休眠,避免頻繁重試
} else { // 其他錯誤(如通道關(guān)閉)
fprintf(stderr, "readRDTHeader: RDT_Read error %d\n", size);
return size;
}
} while (restSize > 0);
return 0;
}
讀取二進制數(shù)據(jù)并保存到文件
int readBinaryDataAndSaveOneFrame(file& f, int rdt_id,char* dataBuffer, int bufferSize,int dataLength) {
// bufferSize需大于0,dataLength需大于0
if (rdt_id < 0 || !dataBuffer || bufferSize <= 0 || dataLength <= readbinarydataandsaveoneframe:="" invalid="" return="" if="" file="" not="" int="" restdatasize="" dataindex="0;" retrycount="0;" while=""> 0) {
// 動態(tài)調(diào)整讀取大?。喝ufferSize和剩余數(shù)據(jù)的較小值
int readSize = (bufferSize < restDataSize) ? bufferSize : restDataSize;
int size = RDT_Read(rdt_id, dataBuffer + dataIndex, readSize, 1000);
if (size > 0) {
// 寫入文件:僅寫入實際讀取的size字節(jié)
int writeRet = f.write(dataBuffer + dataIndex, size);
if (writeRet < 0) {
fprintf(stderr, "readBinaryDataAndSaveOneFrame: file write error\n");
return -3;
}
restDataSize -= size;
dataIndex += size;
retryCount = 0;
} else if (size == RDT_ER_TIMEOUT) {
retryCount++;
if (retryCount >= MAX_READ_RETRY) {
fprintf(stderr, "readBinaryDataAndSaveOneFrame: timeout\n");
return RDT_ER_TIMEOUT;
}
usleep(100 * 1000);
} else { // 其他錯誤
fprintf(stderr, "readBinaryDataAndSaveOneFrame: RDT_Read error %d\n", size);
return size;
}
}
return 0;
}
讀取尾標識
int readTail(int rdt_id) {
const int TAIL_SIZE = 2;
char buffer[TAIL_SIZE] = {0};
int restSize = TAIL_SIZE;
int bufferIndex = 0; // 初始化偏移量
int retryCount = 0;
do {
int size = RDT_Read(rdt_id, buffer + bufferIndex, restSize, 1000);
if (size > 0) {
bufferIndex += size;
restSize -= size;
retryCount = 0;
} else if (size == RDT_ER_TIMEOUT) {
retryCount++;
if (retryCount >= MAX_READ_RETRY) {
fprintf(stderr, "readTail: timeout\n");
return RDT_ER_TIMEOUT;
}
usleep(100 * 1000);
} else {
fprintf(stderr, "readTail: RDT_Read error %d\n", size);
return size;
}
} while (restSize > 0);
// 校驗尾標識
if (buffer[0] != 'G' || buffer[1] != 'C') {
fprintf(stderr, "readTail: invalid tail (expected GC, got %c%c)\n", buffer[0], buffer[1]);
return -1;
}
return 0;
}
接收文件主邏輯
int recvFilesFromChannel(int rdt_id) {
if (rdt_id < 0) {
fprintf(stderr, "recvFilesFromChannel: invalid rdt_id\n");
return -1;
}
bool endFlag = false;
int ret = 0;
file f;
std::string fileName, lastFileName;
char headerBuffer[HEADER_SIZE] = {0};
char dataBuffer[RDT_BUFFER_SIZE] = {0};
while (!endFlag) {
// 1. 讀取Header
ret = readRDTHeader(rdt_id, headerBuffer, sizeof(headerBuffer));
if (ret < 0) {
fprintf(stderr, "recvFilesFromChannel: read header failed %d\n", ret);
break;
}
// 2. 解析Header
int dataLength = 0;
if (!parseRDTHeader(headerBuffer, fileName, dataLength, endFlag)) {
fprintf(stderr, "recvFilesFromChannel: parse header failed\n");
ret = -2;
break;
}
// 3. 處理文件打開/切換
if (!f.isOpen()) {
if (!f.open(fileName)) { // 檢查文件打開結(jié)果
fprintf(stderr, "recvFilesFromChannel: open file %s failed\n", fileName.c_str());
ret = -3;
break;
}
lastFileName = fileName;
fprintf(stdout, "recvFilesFromChannel: open file %s\n", fileName.c_str());
} else {
if (fileName != lastFileName) {
f.close(); // 關(guān)閉舊文件
if (!f.open(fileName)) {
fprintf(stderr, "recvFilesFromChannel: open new file %s failed\n", fileName.c_str());
ret = -3;
break;
}
lastFileName = fileName;
fprintf(stdout, "recvFilesFromChannel: switch to file %s\n", fileName.c_str());
}
}
// 4. 讀取二進制數(shù)據(jù)并保存
ret = readBinaryDataAndSaveOneFrame(f, rdt_id, dataBuffer, RDT_BUFFER_SIZE, dataLength);
if (ret < 0) {
fprintf(stderr, "recvFilesFromChannel: read data failed %d\n", ret);
break;
}
// 5. 讀取并校驗尾標識
ret = readTail(rdt_id);
if (ret < 0) {
fprintf(stderr, "recvFilesFromChannel: read tail failed %d\n", ret);
break;
}
}
// 確保文件關(guān)閉,避免資源泄漏
if (f.isOpen()) {
f.close();
fprintf(stdout, "recvFilesFromChannel: close file %s\n", lastFileName.c_str());
}
return ret;
}
接收線程
struct ChannelParams { // 定義參數(shù)結(jié)構(gòu)體,傳遞外部數(shù)據(jù)
int sid;
int iotc_channel_id;
};
void createRDTChannelAndSaveFiles(void* arg) {
if (!arg) {
fprintf(stderr, "createRDTChannelAndSaveFiles: arg is null\n");
return;
}
ChannelParams* params = static_cast(arg);
int rdt_id = createChannelForDownload(params->sid, params->iotc_channel_id);
if (rdt_id < 0) {
fprintf(stderr, "createRDTChannelAndSaveFiles: create channel failed %d\n", rdt_id);
return;
}
// 接收數(shù)據(jù)并處理錯誤
int ret = recvFilesFromChannel(rdt_id);
if (ret < 0) {
fprintf(stderr, "createRDTChannelAndSaveFiles: recv files failed %d\n", ret);
} else {
fprintf(stdout, "createRDTChannelAndSaveFiles: recv files success\n");
}
destroyChannelOfDownload(rdt_id);
}
四、結(jié)束標志的判斷
在文件傳輸過程中,結(jié)束標志的判斷非常重要。根據(jù)前面的代碼實現(xiàn),結(jié)束標志的處理主要體現(xiàn)在以下幾個方面:
- 包尾標識:每個數(shù)據(jù)包的結(jié)尾都有"GC"標識,用于確認數(shù)據(jù)包的完整性
- 文件結(jié)束標志:當傳輸最后一個文件時,會設(shè)置endflag為1
- 緩存區(qū)檢查:發(fā)送完成后,通過RDT_Status_Check檢查發(fā)送緩存區(qū)是否為空
- 讀取大小判斷:當讀取的數(shù)據(jù)大小小于緩沖區(qū)大小時,認為是文件的最后一包
這些機制共同確保了文件傳輸?shù)目煽啃院屯暾?,避免了?shù)據(jù)丟失或傳輸不完整的問題。
