use clap::{Parser, Subcommand}; use pbr::{ProgressBar, Units}; use punt::Operation; use rusb::{self, DeviceHandle, UsbContext}; use std::{error::Error, fs::File, io::Read}; const TIMEOUT: std::time::Duration = std::time::Duration::from_millis(500); #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Cli { #[command(subcommand)] command: Commands, } #[derive(Subcommand)] enum Commands { /// List connected devices List, /// Flash the adaptor Flash { #[arg()] file: String, }, /// Enter the bootloader EnterBootloader, /// Exit from the bootloader ExitBootloader, } fn match_usb_device( device: rusb::Device, vendor_id: u16, product_id: u16, vendor_string: &str, product_string: &str, ) -> rusb::Result>> { let device_desc = device.device_descriptor()?; if device_desc.vendor_id() != vendor_id || device_desc.product_id() != product_id { return Ok(None); } let device_handle = device.open()?; // Choose first language (the punt bootloader only supports English anyway) let language = device_handle.read_languages(TIMEOUT)?[0]; let v_string = device_handle.read_manufacturer_string(language, &device_desc, TIMEOUT)?; let p_string = device_handle.read_product_string(language, &device_desc, TIMEOUT)?; if v_string != vendor_string || p_string != product_string { return Ok(None); } Ok(Some(device_handle)) } fn enter_bootloader(handle: rusb::DeviceHandle) -> rusb::Result<()> { match handle.write_control( rusb::request_type( rusb::Direction::Out, rusb::RequestType::Vendor, rusb::Recipient::Device, ), 0xb0, 0, 0, &[0u8; 0], std::time::Duration::ZERO, ) { // We expect a communication failure here since this command immediately stops all USB // communications Err(rusb::Error::Io) => Ok(()), _ => Err(rusb::Error::Io), } } fn application_targets(context: &T) -> rusb::Result>> { Ok(context .devices()? .iter() .filter_map(|device| { match_usb_device(device, 0x16c0, 0x05e1, "25120.org", "HPGL XY").ok()? }) .collect()) } fn bootloader_targets( context: &T, ) -> rusb::Result>> { Ok(context .devices()? .iter() .filter_map(|device| { match_usb_device(device, 0x16c0, 0x05dc, "25120.org", "punt") .ok() .flatten() .map(TryInto::try_into)? .ok() }) .collect()) } fn main() -> Result<(), Box> { let cli = Cli::parse(); let context = rusb::Context::new()?; match cli.command { Commands::List => { println!("Connected devices in application mode:"); for handle in application_targets(&context)? { println!( "{}", handle.read_serial_number_string( handle.read_languages(TIMEOUT)?[0], &handle.device().device_descriptor()?, TIMEOUT )? ); } println!("Connected devices in bootloader mode:"); for handle in bootloader_targets(&context)? { println!("{}", handle.serial()); } } Commands::EnterBootloader => { for handle in application_targets(&context)? { enter_bootloader(handle)?; } } Commands::ExitBootloader => { for mut handle in bootloader_targets(&context)? { handle.exit_bootloader()?; } } Commands::Flash { file } => { let mut handles; loop { handles = bootloader_targets(&context)?; if !handles.is_empty() { break; } for handle in application_targets(&context)? { enter_bootloader(handle)?; } } for mut handle in handles { println!("Flashing {}.", handle.serial()); let bootloader_info = handle.bootloader_info()?; let start = bootloader_info.application_base; let mut file = File::open(&file)?; let mut buff = Vec::new(); file.read_to_end(&mut buff)?; let erase = handle.erase_area(start, buff.len())?; println!("Erasing {} pages.", erase.total()); let mut pb = ProgressBar::new(erase.total() as u64); pb.show_speed = false; pb.show_time_left = false; for status in erase { pb.set(status? as u64); } pb.finish(); let program = handle.program_at(buff.as_slice(), start)?; println!("Flashing {} bytes.", program.total()); let mut pb = ProgressBar::new(program.total() as u64); pb.set_units(Units::Bytes); for status in program { pb.set(status? as u64); } pb.finish(); match handle.verify(buff.as_slice(), start) { Ok(()) => println!("CRC verification successful."), Err(e) => { println!("CRC verification error: {}", e); return Err(e.into()); } } println!("Exiting bootloader."); handle.exit_bootloader()?; } } } // let targets: Vec<_> = context // .devices()? // .iter() // .filter_map(|device| { // match_usb_device(device, 0x16c0, 0x05e1, "25120.org", "HPGL XY").ok()? // }) // .collect(); // for mut handle in targets { // println!("Found device: {}", handle.device().bus_number()); // // device_handle.claim_interface(0)?; // println!("Entering Bootloader"); // enter_bootloader(handle)?; // } Ok(()) }