fix: handle connection reset
Signed-off-by: Christina Sørensen <christina@cafkafk.com>
This commit is contained in:
parent
ce5f8bab99
commit
0fc5a908aa
1 changed files with 34 additions and 11 deletions
45
src/main.rs
45
src/main.rs
|
@ -88,26 +88,46 @@ mod nix {
|
||||||
}
|
}
|
||||||
|
|
||||||
mod net {
|
mod net {
|
||||||
use std::net::SocketAddr;
|
use std::{net::SocketAddr, time::Duration};
|
||||||
|
|
||||||
use reqwest::{StatusCode, ClientBuilder, Client};
|
use reqwest::{StatusCode, ClientBuilder, Client};
|
||||||
use async_recursion::async_recursion;
|
use async_recursion::async_recursion;
|
||||||
|
use tokio::time::sleep;
|
||||||
|
|
||||||
#[async_recursion]
|
#[async_recursion]
|
||||||
pub async fn nar_exists(client: Client, domain: &str, hash: &str) -> usize {
|
pub async fn nar_exists(client: Client, domain: &str, hash: &str, slide: u64) -> usize {
|
||||||
let response = client
|
let response = client
|
||||||
.get(format!("https://{domain}/{hash}.narinfo"))
|
.head(format!("https://{domain}/{hash}.narinfo"))
|
||||||
.send()
|
.send()
|
||||||
.await
|
.await;
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
match response.status().as_u16() {
|
match response {
|
||||||
200 => 1,
|
Ok(response) if response.status().as_u16() == 200 => 1,
|
||||||
// Retry on ConnectionReset
|
Ok(response) if response.status().as_u16() == 404 => 0,
|
||||||
104 => nar_exists(client, domain, hash).await,
|
_ => {
|
||||||
_ => 0
|
// We're so fast now we get rate limited.
|
||||||
|
//
|
||||||
|
// Writng an actual sliding window seems kinda hard,
|
||||||
|
// so we do this instead.
|
||||||
|
sleep(Duration::from_millis(slide)).await;
|
||||||
|
nar_exists(client, domain, hash, slide * 2).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// match response.status().as_u16() {
|
||||||
|
// 200 => 1,
|
||||||
|
// // Retry on ConnectionReset
|
||||||
|
// 104 => {
|
||||||
|
// // We're so fast now we get rate limited.
|
||||||
|
// //
|
||||||
|
// // Writng an actual sliding window seems kinda hard,
|
||||||
|
// // so we do this instead.
|
||||||
|
// sleep(Duration::from_millis(slide)).await;
|
||||||
|
// nar_exists(client, domain, hash, slide * 2).await
|
||||||
|
// },
|
||||||
|
// _ => 0
|
||||||
|
// }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// #[tokio::main(flavor = "multi_thread", worker_threads = 100)]
|
// #[tokio::main(flavor = "multi_thread", worker_threads = 100)]
|
||||||
|
@ -129,6 +149,9 @@ async fn main() -> io::Result<()> {
|
||||||
let binding = get_requisites("DBCAC");
|
let binding = get_requisites("DBCAC");
|
||||||
let connection_buffer = binding.lines().map(|line| line.to_owned()).collect::<Vec<_>>();
|
let connection_buffer = binding.lines().map(|line| line.to_owned()).collect::<Vec<_>>();
|
||||||
|
|
||||||
|
// FIXME make constant
|
||||||
|
let slide = 100;
|
||||||
|
|
||||||
// FIXME we take ten just for testing
|
// FIXME we take ten just for testing
|
||||||
let tasks = connection_buffer
|
let tasks = connection_buffer
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
@ -137,7 +160,7 @@ async fn main() -> io::Result<()> {
|
||||||
let client = client.clone();
|
let client = client.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
info!("connecting to {domain} {domain_addr:#?} for {hash}");
|
info!("connecting to {domain} {domain_addr:#?} for {hash}");
|
||||||
net::nar_exists(client, domain, &hash).await
|
net::nar_exists(client, domain, &hash, slide).await
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
.collect_vec();
|
.collect_vec();
|
||||||
|
|
Loading…
Reference in a new issue