diff --git a/Cargo.lock b/Cargo.lock index 93277c8..0eaa9e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -87,6 +87,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "async-recursion" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.52", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -714,6 +725,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.9" @@ -843,12 +863,14 @@ checksum = "e4a24736216ec316047a1fc4252e27dabb04218aa4a3f37c6e7ddbf1f9782b54" name = "nix-weather" version = "0.0.1" dependencies = [ + "async-recursion", "clap", "clap_complete", "clap_mangen", "dns-lookup", "domain", "futures", + "itertools", "log", "openssl", "pretty_env_logger", diff --git a/Cargo.toml b/Cargo.toml index 53e3f59..8de4931 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,10 +16,12 @@ version = "0.0.1" build = "build.rs" [dependencies] +async-recursion = "1.0.5" clap = { version = "4.5.1", features = ["cargo"] } dns-lookup = "2.0.4" domain = { version = "0.9.3", features = ["tokio", "resolv"] } futures = "0.3.30" +itertools = "0.12.1" log = "0.4.21" openssl = { version = "0.10.63" } pretty_env_logger = "0.5.0" diff --git a/src/main.rs b/src/main.rs index 64ecd35..8e24726 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,8 +6,9 @@ use std::{io, net::{IpAddr, SocketAddr}}; use dns_lookup::lookup_host; -use futures::{stream, StreamExt}; +use futures::{stream, StreamExt, future::join_all}; use rayon::prelude::*; +use itertools::Itertools; #[allow(unused)] use log::{debug, error, info, trace, warn}; @@ -90,7 +91,9 @@ mod net { use std::net::SocketAddr; use reqwest::StatusCode; + use async_recursion::async_recursion; + #[async_recursion] pub async fn nar_exists(domain: &str, domain_addr: SocketAddr, hash: &str) -> usize { let response = reqwest::Client::builder() .resolve(domain, domain_addr) @@ -100,16 +103,18 @@ mod net { .send() .await .unwrap(); - if response.status() == StatusCode::from_u16(200).unwrap() { - 1 - } - else { - 0 + + match response.status().as_u16() { + 200 => 1, + // Retry on ConnectionReset + 104 => nar_exists(domain, domain_addr, hash).await, + _ => 0 } } } -#[tokio::main] +// #[tokio::main(flavor = "multi_thread", worker_threads = 100)] +#[tokio::main(flavor = "multi_thread")] async fn main() -> io::Result<()> { pretty_env_logger::init(); let matches = cli::build_cli().get_matches(); @@ -121,16 +126,28 @@ async fn main() -> io::Result<()> { let ip = ips[0]; - let binding = get_requisites("DBCAC"); //let connection_buffer = stream::iter(binding.lines().map(|line| line.to_owned()).collect::>()); //.buffer_unordered(20); - let connection_buffer = stream::iter(binding.lines().map(|line| line.to_owned()).collect::>()); //.buffer_unordered(20); + //let connection_buffer = stream::iter(binding.lines().map(|line| line.to_owned()).collect::>()); //.buffer_unordered(20); + + let binding = get_requisites("DBCAC"); + let connection_buffer = binding.lines().map(|line| line.to_owned()).collect::>(); + // FIXME we take ten just for testing - let stuff = connection_buffer.take(1000).then(|hash| async move { - info!("connecting to {hostname} {ip:#?} for {hash}"); - net::nar_exists(hostname, SocketAddr::new(ip.clone(), 443), &hash).await - }).collect::>(); + let tasks = connection_buffer + .into_iter() + //.take(1000) + .map(|hash| { + tokio::spawn(async move { + info!("connecting to {hostname} {ip:#?} for {hash}"); + net::nar_exists(hostname, SocketAddr::new(ip.clone(), 443), &hash).await + }) + }) + .collect_vec(); + + let sum: usize = join_all(tasks).await.into_iter().map(|result| result.unwrap()).sum(); + + println!("sum {:#?}", sum); //map(|hash| async {net::nar_exists(hostname, SocketAddr::new(ip.clone(), 443), hash).await}).collect::>(); - println!("sum {:#?}", stuff.await.par_iter().sum::()); // let response = reqwest::Client::builder() // .resolve(