Move reopen functionality to a new trait

This is still not the ideal API, but it makes the code quite a bit more
readable since we no longer have to pass around closures everywhere that
multithreaded reads and writes to the same file are needed.

Signed-off-by: Andrew Gunnerson <accounts+github@chiller3.com>
This commit is contained in:
Andrew Gunnerson
2023-10-11 21:59:08 -04:00
parent 8ae1c54c13
commit 2e8e86766b
11 changed files with 173 additions and 157 deletions
+11 -24
View File
@@ -30,7 +30,7 @@ use crate::{
self, AlgorithmType, AppendedDescriptorMut, AppendedDescriptorRef, Descriptor, Footer,
Header,
},
stream::{self, PSeekFile},
stream::{self, PSeekFile, Reopen},
util,
};
@@ -145,7 +145,7 @@ fn write_raw_and_verify(
let raw_file = write_raw(path, reader, copy_size, cancel_signal)?;
let result = verify_and_repair(None, raw_file.reopen(), descriptor, true, cancel_signal);
let result = verify_and_repair(None, raw_file.reopen()?, descriptor, true, cancel_signal);
// Chop off the old hash tree and FEC data.
raw_file.set_len(f.original_image_size)?;
@@ -180,12 +180,8 @@ fn write_raw_and_update(
match info.header.appended_descriptor_mut()? {
AppendedDescriptorMut::HashTree(d) => {
d.image_size = image_size;
d.update(
|| Ok(Box::new(raw_file.reopen())),
|| Ok(Box::new(raw_file.reopen())),
cancel_signal,
)
.context("Failed to update hash tree descriptor")?;
d.update(&raw_file, &raw_file, cancel_signal)
.context("Failed to update hash tree descriptor")?;
}
AppendedDescriptorMut::Hash(d) => {
d.image_size = image_size;
@@ -384,7 +380,7 @@ fn verify_and_repair(
AppendedDescriptorRef::HashTree(d) => {
status!("Verifying hash tree descriptor{suffix}");
match d.verify(|| Ok(Box::new(file.reopen())), cancel_signal) {
match d.verify(&file, cancel_signal) {
Err(
e @ avb::Error::InvalidRootDigest { .. }
| e @ avb::Error::InvalidHashTree { .. },
@@ -392,17 +388,12 @@ fn verify_and_repair(
warning!("Failed to verify hash tree descriptor{suffix}: {e}");
warning!("Attempting to repair using FEC data{suffix}");
d.repair(
|| Ok(Box::new(file.reopen())),
|| Ok(Box::new(file.reopen())),
cancel_signal,
)
.with_context(|| format!("Failed to repair data{suffix}"))?;
d.repair(&file, &file, cancel_signal)
.with_context(|| format!("Failed to repair data{suffix}"))?;
d.verify(|| Ok(Box::new(file.reopen())), cancel_signal)
.map(|_| {
status!("Successfully repaired data{suffix}");
})
d.verify(&file, cancel_signal).map(|_| {
status!("Successfully repaired data{suffix}");
})
}
ret => ret,
}
@@ -518,11 +509,7 @@ fn repack_subcommand(cli: &RepackCli, cancel_signal: &AtomicBool) -> Result<()>
// Write new hash tree and FEC data instead of copying the original.
// THere could have been errors in the original FEC data itself.
if let AppendedDescriptorMut::HashTree(d) = info.header.appended_descriptor_mut()? {
d.update(
|| Ok(Box::new(file.reopen())),
|| Ok(Box::new(file.reopen())),
cancel_signal,
)?;
d.update(&file, &file, cancel_signal)?;
}
file
+4 -8
View File
@@ -53,7 +53,7 @@ fn write_fec(path: &Path, fec: &FecImage) -> Result<()> {
fn generate_subcommand(cli: &GenerateCli, cancel_signal: &AtomicBool) -> Result<()> {
let input = open_input(&cli.input, false)?;
let fec = FecImage::generate(|| Ok(Box::new(input.reopen())), cli.parity, cancel_signal)
let fec = FecImage::generate(&input, cli.parity, cancel_signal)
.context("Failed to generate FEC data")?;
write_fec(&cli.fec, &fec)?;
@@ -65,7 +65,7 @@ fn verify_subcommand(cli: &VerifyCli, cancel_signal: &AtomicBool) -> Result<()>
let input = open_input(&cli.input, false)?;
let fec = read_fec(&cli.fec)?;
fec.verify(|| Ok(Box::new(input.reopen())), cancel_signal)
fec.verify(&input, cancel_signal)
.context("Failed to verify data")?;
Ok(())
@@ -78,12 +78,8 @@ fn repair_subcommand(cli: &RepairCli, cancel_signal: &AtomicBool) -> Result<()>
// The separate buffered readers and writers are safe because the function
// guarantees that every thread touches disjoint offsets and every offset is
// read and written at most once.
fec.repair(
|| Ok(Box::new(input.reopen())),
|| Ok(Box::new(input.reopen())),
cancel_signal,
)
.context("Failed to repair file")?;
fec.repair(&input, &input, cancel_signal)
.context("Failed to repair file")?;
Ok(())
}
+30 -34
View File
@@ -9,7 +9,7 @@ use std::{
ffi::{OsStr, OsString},
fmt::Display,
fs::{self, File},
io::{self, BufReader, BufWriter, Cursor, Read, Seek, SeekFrom, Write},
io::{BufReader, BufWriter, Cursor, Read, Seek, SeekFrom, Write},
path::{Path, PathBuf},
sync::{atomic::AtomicBool, Mutex},
time::Instant,
@@ -43,8 +43,8 @@ use crate::{
build::tools::releasetools::OtaMetadata, chromeos_update_engine::DeltaArchiveManifest,
},
stream::{
self, CountingWriter, FromReader, HolePunchingWriter, PSeekFile, ReadSeek, SectionReader,
ToWriter,
self, CountingWriter, FromReader, HolePunchingWriter, PSeekFile, ReadSeek, ReadSeekReopen,
Reopen, SectionReader, ToWriter,
},
util,
};
@@ -149,7 +149,7 @@ pub fn get_required_images(
/// in `external_images`, the real file on the filesystem is opened. Otherwise,
/// the image is extracted from the payload.
fn open_input_streams(
open_payload: impl Fn() -> io::Result<Box<dyn ReadSeek>> + Sync,
payload: &(dyn ReadSeekReopen + Sync),
required_images: &HashMap<String, String>,
external_images: &HashMap<String, PathBuf>,
header: &PayloadHeader,
@@ -174,9 +174,8 @@ fn open_input_streams(
} else {
status!("Extracting from original payload: {name}");
let stream =
payload::extract_image_to_memory(&open_payload, header, name, cancel_signal)
.with_context(|| format!("Failed to extract from original payload: {name}"))?;
let stream = payload::extract_image_to_memory(payload, header, name, cancel_signal)
.with_context(|| format!("Failed to extract from original payload: {name}"))?;
input_streams.insert(name.clone(), Box::new(stream));
}
}
@@ -547,7 +546,7 @@ fn compress_image(
#[allow(clippy::too_many_arguments)]
fn patch_ota_payload(
open_payload: impl Fn() -> io::Result<Box<dyn ReadSeek>> + Sync,
payload: &(dyn ReadSeekReopen + Sync),
writer: impl Write,
external_images: &HashMap<String, PathBuf>,
boot_partition: &str,
@@ -558,8 +557,8 @@ fn patch_ota_payload(
cert_ota: &Certificate,
cancel_signal: &AtomicBool,
) -> Result<(String, u64)> {
let header =
PayloadHeader::from_reader(open_payload()?).context("Failed to load OTA payload header")?;
let header = PayloadHeader::from_reader(payload.reopen_boxed()?)
.context("Failed to load OTA payload header")?;
if !header.is_full_ota() {
bail!("Payload is a delta OTA, not a full OTA");
}
@@ -601,7 +600,7 @@ fn patch_ota_payload(
// from the old payload). The values will be replaced later if the images
// need to be patched (eg. boot or vbmeta image).
let mut input_streams = open_input_streams(
&open_payload,
payload,
&required_images,
external_images,
&header_locked,
@@ -666,7 +665,7 @@ fn patch_ota_payload(
let header_locked = header.lock().unwrap();
let mut payload_writer = PayloadWriter::new(writer, header_locked.clone(), key_ota.clone())
.context("Failed to write payload header")?;
let mut orig_payload_reader = open_payload().context("Failed to open payload")?;
let mut orig_payload_reader = payload.reopen_boxed().context("Failed to open payload")?;
while payload_writer
.begin_next_operation()
@@ -820,19 +819,16 @@ fn patch_ota_zip(
bail!("{path} is not stored uncompressed");
}
let payload_offset = reader.data_start();
let payload_size = reader.size();
// The zip library doesn't provide us with a seekable reader, so
// we make our own from the underlying file.
let payload_reader = SectionReader::new(
BufReader::new(raw_reader.reopen()?),
reader.data_start(),
reader.size(),
)?;
let (p, m) = patch_ota_payload(
|| {
// The zip library doesn't provide us with a seekable
// reader, so we make our own from the underlying file.
Ok(Box::new(SectionReader::new(
BufReader::new(raw_reader.reopen()),
payload_offset,
payload_size,
)?))
},
&payload_reader,
&mut writer,
external_images,
boot_partition,
@@ -923,18 +919,18 @@ fn extract_ota_zip(
})
.collect::<Result<HashMap<_, _>>>()?;
let payload_reader = SectionReader::new(
BufReader::new(raw_reader.reopen()?),
payload_offset,
payload_size,
)?;
// Extract the images. Each time we're asked to open a new file, we just
// clone the relevant PSeekFile. We only ever have one actual kernel file
// descriptor for each file.
payload::extract_images(
|| {
Ok(Box::new(SectionReader::new(
BufReader::new(raw_reader.reopen()),
payload_offset,
payload_size,
)?))
},
|name| Ok(Box::new(BufWriter::new(output_files[name].reopen()))),
&payload_reader,
|name| Ok(Box::new(BufWriter::new(output_files[name].reopen()?))),
header,
images.iter().map(|n| n.as_str()),
cancel_signal,
@@ -1020,7 +1016,7 @@ pub fn patch_subcommand(cli: &PatchCli, cancel_signal: &AtomicBool) -> Result<()
let raw_reader = File::open(&cli.input)
.map(PSeekFile::new)
.with_context(|| format!("Failed to open for reading: {:?}", cli.input))?;
let mut zip_reader = ZipArchive::new(BufReader::new(raw_reader.reopen()))
let mut zip_reader = ZipArchive::new(BufReader::new(raw_reader.reopen()?))
.with_context(|| format!("Failed to read zip: {:?}", cli.input))?;
// Open the output file for reading too, so we can verify offsets later.
@@ -1109,7 +1105,7 @@ pub fn extract_subcommand(cli: &ExtractCli, cancel_signal: &AtomicBool) -> Resul
let raw_reader = File::open(&cli.input)
.map(PSeekFile::new)
.with_context(|| format!("Failed to open for reading: {:?}", cli.input))?;
let mut zip = ZipArchive::new(BufReader::new(raw_reader.reopen()))
let mut zip = ZipArchive::new(BufReader::new(raw_reader.reopen()?))
.with_context(|| format!("Failed to read zip: {:?}", cli.input))?;
let payload_entry = zip
.by_name(ota::PATH_PAYLOAD)
@@ -1119,7 +1115,7 @@ pub fn extract_subcommand(cli: &ExtractCli, cancel_signal: &AtomicBool) -> Resul
// Open the payload data directly.
let mut payload_reader = SectionReader::new(
BufReader::new(raw_reader.reopen()),
BufReader::new(raw_reader.reopen()?),
payload_offset,
payload_size,
)
+20 -20
View File
@@ -28,8 +28,8 @@ use crate::{
padding,
},
stream::{
self, CountingReader, FromReader, ReadDiscardExt, ReadSeek, ReadStringExt, ToWriter,
WriteSeek, WriteStringExt, WriteZerosExt,
self, CountingReader, FromReader, ReadDiscardExt, ReadSeekReopen, ReadStringExt, ToWriter,
WriteSeekReopen, WriteStringExt, WriteZerosExt,
},
util,
};
@@ -427,7 +427,7 @@ impl HashTreeDescriptor {
///
/// NOTE: The result is **not** padded to the block size.
fn hash_one_level_parallel(
open_input: impl Fn() -> io::Result<Box<dyn ReadSeek>> + Sync,
input: &(dyn ReadSeekReopen + Sync),
image_size: u64,
block_size: u32,
algorithm: &'static Algorithm,
@@ -449,7 +449,7 @@ impl HashTreeDescriptor {
let start = c * chunk_size;
let size = chunk_size.min(image_size - start);
let mut reader = open_input()?;
let mut reader = input.reopen_boxed()?;
reader.seek(SeekFrom::Start(start))?;
Self::hash_one_level(reader, size, block_size, algorithm, salt, cancel_signal)
@@ -461,7 +461,7 @@ impl HashTreeDescriptor {
/// Calculate the hash tree for the given input in parallel.
fn calculate_hash_tree(
open_input: impl Fn() -> io::Result<Box<dyn ReadSeek>> + Sync,
input: &(dyn ReadSeekReopen + Sync),
image_size: u64,
block_size: u32,
algorithm: &'static Algorithm,
@@ -470,7 +470,7 @@ impl HashTreeDescriptor {
) -> io::Result<(Vec<u8>, Vec<u8>)> {
// Small files are hashed directly, exactly like a hash descriptor.
if image_size <= u64::from(block_size) {
let mut reader = open_input()?;
let mut reader = input.reopen_boxed()?;
let mut buf = vec![0u8; block_size as usize];
reader.read_exact(&mut buf)?;
@@ -500,7 +500,7 @@ impl HashTreeDescriptor {
} else {
// Initially read from file.
Self::hash_one_level_parallel(
&open_input,
input,
level_size,
block_size,
algorithm,
@@ -590,13 +590,13 @@ impl HashTreeDescriptor {
/// original state by truncating it to [`Self::image_size`].
pub fn update(
&mut self,
open_input: impl Fn() -> io::Result<Box<dyn ReadSeek>> + Sync,
open_output: impl Fn() -> io::Result<Box<dyn WriteSeek>> + Sync,
input: &(dyn ReadSeekReopen + Sync),
output: &(dyn WriteSeekReopen + Sync),
cancel_signal: &AtomicBool,
) -> Result<()> {
let algorithm = ring_algorithm(&self.hash_algorithm)?;
let (root_digest, hash_tree) = Self::calculate_hash_tree(
&open_input,
input,
self.image_size,
self.data_block_size,
algorithm,
@@ -610,7 +610,7 @@ impl HashTreeDescriptor {
let tree_size = hash_tree.len() as u64;
let mut writer = open_output()?;
let mut writer = output.reopen_boxed()?;
writer.seek(SeekFrom::Start(self.image_size))?;
writer
.write_all(&hash_tree)
@@ -633,7 +633,7 @@ impl HashTreeDescriptor {
// The FEC covers the hash tree as well.
let fec = Fec::new(self.image_size + tree_size, self.data_block_size, parity)?;
let fec_data = fec.generate(open_input, cancel_signal)?;
let fec_data = fec.generate(input, cancel_signal)?;
let fec_size = fec_data
.len()
.to_u64()
@@ -660,7 +660,7 @@ impl HashTreeDescriptor {
/// handles to the same file.
pub fn verify(
&self,
open_input: impl Fn() -> io::Result<Box<dyn ReadSeek>> + Sync,
input: &(dyn ReadSeekReopen + Sync),
cancel_signal: &AtomicBool,
) -> Result<()> {
self.check_offsets()?;
@@ -672,7 +672,7 @@ impl HashTreeDescriptor {
}
let (actual_root_digest, actual_hash_tree) = Self::calculate_hash_tree(
&open_input,
input,
self.image_size,
self.data_block_size,
algorithm,
@@ -687,7 +687,7 @@ impl HashTreeDescriptor {
});
}
let mut reader = open_input()?;
let mut reader = input.reopen_boxed()?;
reader.seek(SeekFrom::Start(self.tree_offset))?;
let mut hash_tree = vec![0u8; self.tree_size as usize];
@@ -716,7 +716,7 @@ impl HashTreeDescriptor {
.read_exact(&mut fec_data)
.map_err(|e| Error::ReadFieldError("fec_data", e))?;
fec.verify(open_input, &fec_data, cancel_signal)?;
fec.verify(input, &fec_data, cancel_signal)?;
}
Ok(())
@@ -732,8 +732,8 @@ impl HashTreeDescriptor {
/// actually valid.
pub fn repair(
&self,
open_input: impl Fn() -> io::Result<Box<dyn ReadSeek>> + Sync,
open_output: impl Fn() -> io::Result<Box<dyn WriteSeek>> + Sync,
input: &(dyn ReadSeekReopen + Sync),
output: &(dyn WriteSeekReopen + Sync),
cancel_signal: &AtomicBool,
) -> Result<()> {
self.check_offsets()?;
@@ -743,7 +743,7 @@ impl HashTreeDescriptor {
return Err(Error::FecMissing);
}
let mut reader = open_input()?;
let mut reader = input.reopen_boxed()?;
reader.seek(SeekFrom::Start(self.fec_offset))?;
let (fec, fec_size) = self.get_fec()?;
@@ -754,7 +754,7 @@ impl HashTreeDescriptor {
.read_exact(&mut fec_data)
.map_err(|e| Error::ReadFieldError("fec_data", e))?;
fec.repair(open_input, open_output, &fec_data, cancel_signal)?;
fec.repair(input, output, &fec_data, cancel_signal)?;
Ok(())
}
+22 -31
View File
@@ -19,7 +19,7 @@ use thiserror::Error;
use crate::{
format::verityrs,
stream::{self, FromReader, ReadSeek, ToWriter, WriteSeek, WriteZerosExt},
stream::{self, FromReader, ReadSeekReopen, ToWriter, WriteSeekReopen, WriteZerosExt},
util::NumBytes,
};
@@ -419,7 +419,7 @@ impl Fec {
/// This function is multithreaded and uses rayon's global thread pool.
pub fn generate(
&self,
open_input: impl Fn() -> io::Result<Box<dyn ReadSeek>> + Sync,
input: &(dyn ReadSeekReopen + Sync),
cancel_signal: &AtomicBool,
) -> Result<Vec<u8>> {
let fec_size = self.fec_size();
@@ -430,7 +430,7 @@ impl Fec {
.map(|(round, buf)| -> Result<()> {
stream::check_cancel(cancel_signal)?;
let reader = open_input()?;
let reader = input.reopen_boxed()?;
self.generate_one_round(reader, round as u64, buf)
})
.collect::<Result<()>>()?;
@@ -445,7 +445,7 @@ impl Fec {
/// This function is multithreaded and uses rayon's global thread pool.
pub fn verify(
&self,
open_input: impl Fn() -> io::Result<Box<dyn ReadSeek>> + Sync,
input: &(dyn ReadSeekReopen + Sync),
fec: &[u8],
cancel_signal: &AtomicBool,
) -> Result<()> {
@@ -463,7 +463,7 @@ impl Fec {
.map(|(round, buf)| -> Result<()> {
stream::check_cancel(cancel_signal)?;
let reader = open_input()?;
let reader = input.reopen_boxed()?;
self.verify_one_round(reader, round as u64, buf)
})
.collect::<Result<()>>()?;
@@ -484,8 +484,8 @@ impl Fec {
/// This function is multithreaded and uses rayon's global thread pool.
pub fn repair(
&self,
open_input: impl Fn() -> io::Result<Box<dyn ReadSeek>> + Sync,
open_output: impl Fn() -> io::Result<Box<dyn WriteSeek>> + Sync,
input: &(dyn ReadSeekReopen + Sync),
output: &(dyn WriteSeekReopen + Sync),
fec: &[u8],
cancel_signal: &AtomicBool,
) -> Result<u64> {
@@ -504,8 +504,8 @@ impl Fec {
.map(|(round, buf)| -> Result<u64> {
stream::check_cancel(cancel_signal)?;
let reader = open_input()?;
let writer = open_output()?;
let reader = input.reopen_boxed()?;
let writer = output.reopen_boxed()?;
self.repair_one_round(reader, writer, round as u64, buf)
})
.collect::<Result<Vec<u64>>>()?
@@ -542,16 +542,16 @@ impl FecImage {
/// Generate FEC data for a file. `parity` is the number of parity bytes per
/// 255-byte Reed-Solomon codeword.
pub fn generate(
open_input: impl Fn() -> io::Result<Box<dyn ReadSeek>> + Sync,
input: &(dyn ReadSeekReopen + Sync),
parity: u8,
cancel_signal: &AtomicBool,
) -> Result<Self> {
let data_size = {
let mut file = open_input()?;
let mut file = input.reopen_boxed()?;
file.seek(SeekFrom::End(0))?
};
let fec = Fec::new(data_size, FEC_BLOCK_SIZE as u32, parity)?;
let fec_data = fec.generate(open_input, cancel_signal)?;
let fec_data = fec.generate(input, cancel_signal)?;
Ok(Self {
fec: fec_data,
@@ -564,11 +564,11 @@ impl FecImage {
/// [`Self::repair()`] if performing a repair is not necessary.
pub fn verify(
&self,
open_input: impl Fn() -> io::Result<Box<dyn ReadSeek>> + Sync,
input: &(dyn ReadSeekReopen + Sync),
cancel_signal: &AtomicBool,
) -> Result<()> {
let fec = Fec::new(self.data_size, FEC_BLOCK_SIZE as u32, self.parity)?;
fec.verify(open_input, &self.fec, cancel_signal)
fec.verify(input, &self.fec, cancel_signal)
}
/// Repair a file using this instance's FEC data. The maximum correctable
@@ -588,12 +588,12 @@ impl FecImage {
/// that multiple threads will always read and write disjoint file offsets.
pub fn repair(
&self,
open_input: impl Fn() -> io::Result<Box<dyn ReadSeek>> + Sync,
open_output: impl Fn() -> io::Result<Box<dyn WriteSeek>> + Sync,
input: &(dyn ReadSeekReopen + Sync),
output: &(dyn WriteSeekReopen + Sync),
cancel_signal: &AtomicBool,
) -> Result<u64> {
let fec = Fec::new(self.data_size, FEC_BLOCK_SIZE as u32, self.parity)?;
fec.repair(open_input, open_output, &self.fec, cancel_signal)
fec.repair(input, output, &self.fec, cancel_signal)
}
/// Build one instance of the FEC header. The caller is responsible for
@@ -764,18 +764,15 @@ mod tests {
let num_codewords = fec.rounds as usize * block_size as usize;
// Generate FEC data.
let fec_data = fec
.generate(|| Ok(Box::new(file.reopen())), &cancel_signal)
.unwrap();
let fec_data = fec.generate(&file, &cancel_signal).unwrap();
// Verify that there are no errors.
fec.verify(|| Ok(Box::new(file.reopen())), &fec_data, &cancel_signal)
.unwrap();
fec.verify(&file, &fec_data, &cancel_signal).unwrap();
// Verify that errors are detected.
corrupt_byte(&mut file, 0);
assert_matches!(
fec.verify(|| Ok(Box::new(file.reopen())), &fec_data, &cancel_signal,),
fec.verify(&file, &fec_data, &cancel_signal),
Err(Error::HasErrors)
);
@@ -785,13 +782,7 @@ mod tests {
}
// Verify that all the single-byte errors can be fixed.
fec.repair(
|| Ok(Box::new(file.reopen())),
|| Ok(Box::new(file.reopen())),
&fec_data,
&cancel_signal,
)
.unwrap();
fec.repair(&file, &file, &fec_data, &cancel_signal).unwrap();
let repaired_digest = {
let mut buf = Vec::new();
@@ -826,7 +817,7 @@ mod tests {
file.write_all(&buf).unwrap();
}
let image = FecImage::generate(|| Ok(Box::new(file.reopen())), 2, &cancel_signal).unwrap();
let image = FecImage::generate(&file, 2, &cancel_signal).unwrap();
let mut fec_file = Cursor::new(Vec::new());
image.to_writer(&mut fec_file).unwrap();
+7 -7
View File
@@ -34,8 +34,8 @@ use crate::{
InstallOperation, PartitionInfo, PartitionUpdate, Signatures,
},
stream::{
self, CountingReader, CountingWriter, FromReader, HashingWriter, ReadDiscardExt, ReadSeek,
SharedCursor, WriteSeek,
self, CountingReader, CountingWriter, FromReader, HashingWriter, ReadDiscardExt,
ReadSeekReopen, Reopen, SharedCursor, WriteSeek,
},
};
@@ -871,7 +871,7 @@ pub fn apply_operation(
/// multithreaded and uses rayon's global thread pool. `open_payload` will be
/// called from multiple threads.
pub fn extract_image_to_memory(
open_payload: impl Fn() -> io::Result<Box<dyn ReadSeek>> + Sync,
payload: &(dyn ReadSeekReopen + Sync),
header: &PayloadHeader,
partition_name: &str,
cancel_signal: &AtomicBool,
@@ -888,8 +888,8 @@ pub fn extract_image_to_memory(
.operations
.par_iter()
.map(|op| -> Result<()> {
let reader = open_payload()?;
let writer = stream.reopen();
let reader = payload.reopen_boxed()?;
let writer = stream.reopen()?;
apply_operation(
reader,
@@ -911,7 +911,7 @@ pub fn extract_image_to_memory(
/// is done multithreaded and uses rayon's global thread pool. `open_payload`
/// and `open_output` will be called from multiple threads.
pub fn extract_images<'a>(
open_payload: impl Fn() -> io::Result<Box<dyn ReadSeek>> + Sync,
payload: &(dyn ReadSeekReopen + Sync),
open_output: impl Fn(&str) -> io::Result<Box<dyn WriteSeek>> + Sync,
header: &PayloadHeader,
partition_names: impl IntoIterator<Item = &'a str>,
@@ -938,7 +938,7 @@ pub fn extract_images<'a>(
operations
.into_par_iter()
.map(|(name, op)| -> Result<()> {
let reader = open_payload()?;
let reader = payload.reopen_boxed()?;
let writer = open_output(name)?;
apply_operation(
+70 -17
View File
@@ -5,7 +5,7 @@
use std::{
fs::File,
io::{self, Cursor, Read, Seek, SeekFrom, Write},
io::{self, BufReader, BufWriter, Cursor, Read, Seek, SeekFrom, Write},
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex, RwLock,
@@ -30,6 +30,28 @@ pub trait WriteSeek: Write + Seek {}
impl<W: Write + Seek> WriteSeek for W {}
/// A trait for seekable and reopenable readers.
pub trait ReadSeekReopen: ReadSeek {
fn reopen_boxed(&self) -> io::Result<Box<dyn ReadSeek>>;
}
impl<R: ReadSeek + Reopen + 'static> ReadSeekReopen for R {
fn reopen_boxed(&self) -> io::Result<Box<dyn ReadSeek>> {
Ok(Box::new(self.reopen()?))
}
}
/// A trait for seekable and reopenable writers.
pub trait WriteSeekReopen: WriteSeek {
fn reopen_boxed(&self) -> io::Result<Box<dyn WriteSeek>>;
}
impl<W: WriteSeek + Reopen + 'static> WriteSeekReopen for W {
fn reopen_boxed(&self) -> io::Result<Box<dyn WriteSeek>> {
Ok(Box::new(self.reopen()?))
}
}
/// Common function for reading a structure from a reader.
pub trait FromReader<R: Read>: Sized {
type Error;
@@ -168,6 +190,25 @@ impl<W: Write> WriteStringExt for W {
}
}
/// Extensions for file-like types to reopen themselves.
pub trait Reopen: Sized {
/// Open a new handle to the same file. The new handle is independently
/// seekable and the file offset is initially set to 0.
fn reopen(&self) -> io::Result<Self>;
}
impl<R: Read + Reopen> Reopen for BufReader<R> {
fn reopen(&self) -> io::Result<Self> {
Ok(BufReader::new(self.get_ref().reopen()?))
}
}
impl<W: Write + Reopen> Reopen for BufWriter<W> {
fn reopen(&self) -> io::Result<Self> {
Ok(BufWriter::new(self.get_ref().reopen()?))
}
}
/// A reader wrapper that implements [`Seek`], but only for reporting the
/// current file position.
pub struct CountingReader<R: Read> {
@@ -325,6 +366,14 @@ impl<R: Read + Seek> SectionReader<R> {
}
}
impl<R: Read + Seek + Reopen> Reopen for SectionReader<R> {
fn reopen(&self) -> io::Result<Self> {
let inner = self.inner.reopen()?;
Self::new(inner, self.start, self.size)
}
}
impl<R: Read + Seek> Read for SectionReader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let to_read = self.size.saturating_sub(self.pos).min(buf.len() as u64) as usize;
@@ -417,13 +466,6 @@ impl PSeekFile {
}
}
pub fn reopen(&self) -> Self {
Self {
file: self.file.clone(),
offset: 0,
}
}
pub fn set_len(&self, size: u64) -> io::Result<()> {
let file_locked = self.file.read().unwrap();
file_locked.set_len(size)
@@ -458,6 +500,15 @@ impl PSeekFile {
}
}
impl Reopen for PSeekFile {
fn reopen(&self) -> io::Result<Self> {
Ok(Self {
file: self.file.clone(),
offset: 0,
})
}
}
impl Read for PSeekFile {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let n = self.read_at(buf)?;
@@ -530,12 +581,14 @@ impl SharedCursor {
..Default::default()
}
}
}
pub fn reopen(&self) -> Self {
Self {
impl Reopen for SharedCursor {
fn reopen(&self) -> io::Result<Self> {
Ok(Self {
inner: self.inner.clone(),
offset: 0,
}
})
}
}
@@ -668,8 +721,8 @@ mod tests {
use super::{
CountingReader, CountingWriter, HashingReader, HashingWriter, HolePunchingWriter,
PSeekFile, ReadDiscardExt, ReadStringExt, SectionReader, SharedCursor, WriteStringExt,
WriteZerosExt,
PSeekFile, ReadDiscardExt, ReadStringExt, Reopen, SectionReader, SharedCursor,
WriteStringExt, WriteZerosExt,
};
const FOOBAR_SHA256: [u8; 32] = [
@@ -848,8 +901,8 @@ mod tests {
fn pseek_file() {
let raw_file = tempfile::tempfile().unwrap();
let mut a = PSeekFile::new(raw_file);
let mut b = a.reopen();
let mut c = b.reopen();
let mut b = a.reopen().unwrap();
let mut c = b.reopen().unwrap();
b.write_all(b"foobar").unwrap();
c.write_all(b"hello").unwrap();
@@ -868,8 +921,8 @@ mod tests {
#[test]
fn shared_cursor() {
let mut a = SharedCursor::default();
let mut b = a.reopen();
let mut c = b.reopen();
let mut b = a.reopen().unwrap();
let mut c = b.reopen().unwrap();
b.write_all(b"foobar").unwrap();
c.write_all(b"hello").unwrap();
+2 -10
View File
@@ -138,10 +138,7 @@ fn round_trip_appended_hash_tree_image() {
// Verify the hash tree and FEC data.
match header.appended_descriptor().unwrap() {
AppendedDescriptorRef::HashTree(d) => {
d.verify(|| Ok(Box::new(reader.reopen())), &cancel_signal)
.unwrap();
}
AppendedDescriptorRef::HashTree(d) => d.verify(&reader, &cancel_signal).unwrap(),
AppendedDescriptorRef::Hash(_) => panic!("Expected hash tree descriptor"),
}
@@ -166,12 +163,7 @@ fn round_trip_appended_hash_tree_image() {
d.fec_offset = 0;
d.fec_size = 0;
d.update(
|| Ok(Box::new(writer.reopen())),
|| Ok(Box::new(writer.reopen())),
&cancel_signal,
)
.unwrap();
d.update(&writer, &writer, &cancel_signal).unwrap();
}
AppendedDescriptorMut::Hash(_) => panic!("Expected hash tree descriptor"),
}
+3 -2
View File
@@ -18,7 +18,7 @@ use std::{
};
use anyhow::{anyhow, bail, Context, Result};
use avbroot::stream::PSeekFile;
use avbroot::stream::{PSeekFile, Reopen};
use serde::{Deserialize, Serialize};
/// Minimum download chunk size per task.
@@ -309,7 +309,8 @@ fn download_ranges(
}
if let Some(thread_range) = remaining.pop_front() {
let file_cloned = file.reopen();
// PSeekFile's reopen can't fail.
let file_cloned = file.reopen().unwrap();
let thread_range_cloned = thread_range.clone();
let tx_cloned = tx.clone();
+3 -3
View File
@@ -27,7 +27,7 @@ use avbroot::{
cli::ota::{ExtractCli, PatchCli, VerifyCli},
format::{ota, payload::PayloadHeader},
protobuf::chromeos_update_engine::install_operation::Type,
stream::{self, FromReader, HashingReader, PSeekFile, SectionReader},
stream::{self, FromReader, HashingReader, PSeekFile, Reopen, SectionReader},
};
use clap::Parser;
use tempfile::TempDir;
@@ -98,7 +98,7 @@ fn strip_image(
let mut raw_reader = File::open(input)
.map(PSeekFile::new)
.with_context(|| format!("Failed to open for reading: {input:?}"))?;
let mut zip_reader = ZipArchive::new(BufReader::new(raw_reader.reopen()))
let mut zip_reader = ZipArchive::new(BufReader::new(raw_reader.reopen()?))
.with_context(|| format!("Failed to read zip: {input:?}"))?;
let payload_entry = zip_reader
.by_name(ota::PATH_PAYLOAD)
@@ -108,7 +108,7 @@ fn strip_image(
// Open the payload data directly.
let mut payload_reader = SectionReader::new(
BufReader::new(raw_reader.reopen()),
BufReader::new(raw_reader.reopen()?),
payload_offset,
payload_size,
)?;
+1 -1
View File
@@ -23,7 +23,7 @@ mod fuzz {
input.write_zeros_exact(fec.data_size).unwrap();
}
let _ = fec.verify(|| Ok(Box::new(input.reopen())), &cancel_signal);
let _ = fec.verify(&input, &cancel_signal);
}
});
}