Skip to content

Commit 69a86c0

Browse files
committed
upload_chunk for upos
1 parent 2f5d24e commit 69a86c0

1 file changed

Lines changed: 39 additions & 25 deletions

File tree

ssup/src/uploader/upos.rs

Lines changed: 39 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::constants::{CONCURRENCY, USER_AGENT};
22
use crate::uploader::utils::read_chunk;
33
use crate::video::VideoPart;
44
use anyhow::{anyhow, bail};
5+
use bytes::Bytes;
56
use futures::{Stream, StreamExt, TryStreamExt};
67
use reqwest::header::{HeaderMap, HeaderValue};
78
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
@@ -77,6 +78,39 @@ impl Upos {
7778
})
7879
}
7980

81+
pub async fn upload_chunk(
82+
&self,
83+
chunk: Bytes,
84+
current_chunk: usize,
85+
chunks_num: usize,
86+
start: usize,
87+
total_size: u64,
88+
) -> anyhow::Result<Value> {
89+
let len = chunk.len();
90+
let params = Protocol {
91+
upload_id: &self.upload_id,
92+
chunks: chunks_num,
93+
total: total_size,
94+
chunk: current_chunk,
95+
size: len,
96+
part_number: current_chunk + 1,
97+
start,
98+
end: start + len,
99+
};
100+
101+
let url = self.url.clone();
102+
let response = self
103+
.client
104+
.put(url)
105+
.query(&params)
106+
.body(chunk)
107+
.send()
108+
.await?;
109+
response.error_for_status()?;
110+
111+
Ok(json!({"partNumber": params.part_number, "eTag": "etag"}))
112+
}
113+
80114
pub(crate) async fn upload_stream<P>(
81115
&self,
82116
file_path: P,
@@ -90,32 +124,16 @@ impl Upos {
90124
let chunk_size = self.bucket.chunk_size;
91125
let chunks_num = (total_size as f64 / chunk_size as f64).ceil() as usize; // 获取分块数量
92126

93-
let client = &self.client;
94-
let url = &self.url;
95-
let upload_id = &*self.upload_id;
96127
let stream = read_chunk(file, chunk_size)
97128
.enumerate()
98129
.map(move |(i, chunk)| async move {
99130
let chunk = chunk?;
100131
let len = chunk.len();
101-
let params = Protocol {
102-
upload_id,
103-
chunks: chunks_num,
104-
total: total_size,
105-
chunk: i,
106-
size: len,
107-
part_number: i + 1,
108-
start: i * chunk_size,
109-
end: i * chunk_size + len,
110-
};
132+
let part = self
133+
.upload_chunk(chunk, i, chunks_num, i * chunk_size, total_size)
134+
.await?;
111135

112-
let response = client.put(url).query(&params).body(chunk).send().await?;
113-
response.error_for_status()?;
114-
115-
Ok::<_, anyhow::Error>((
116-
json!({"partNumber": params.chunk + 1, "eTag": "etag"}),
117-
len,
118-
))
136+
Ok::<_, anyhow::Error>((part, len))
119137
})
120138
.buffer_unordered(*CONCURRENCY.read());
121139
Ok(stream)
@@ -135,11 +153,7 @@ impl Upos {
135153
self.get_ret_video_info(&parts, path).await
136154
}
137155

138-
pub(crate) async fn get_ret_video_info<P>(
139-
&self,
140-
parts: &[Value],
141-
path: P,
142-
) -> anyhow::Result<VideoPart>
156+
pub async fn get_ret_video_info<P>(&self, parts: &[Value], path: P) -> anyhow::Result<VideoPart>
143157
where
144158
P: AsRef<Path>,
145159
{

0 commit comments

Comments
 (0)