diff --git a/app/update/src/main/java/org/phoebus/applications/update/Update.java b/app/update/src/main/java/org/phoebus/applications/update/Update.java index 320a3b1a24..f9b3c5aeba 100644 --- a/app/update/src/main/java/org/phoebus/applications/update/Update.java +++ b/app/update/src/main/java/org/phoebus/applications/update/Update.java @@ -9,6 +9,7 @@ import java.io.BufferedInputStream; import java.io.File; +import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; import java.nio.file.StandardCopyOption; @@ -19,6 +20,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; import java.util.prefs.Preferences; @@ -163,38 +165,13 @@ protected File download(final JobMonitor monitor) throws Exception final File file = File.createTempFile("phoebus_update", ".zip"); file.deleteOnExit(); - // Watcher thread that displays file size in monitor + // Watcher thread that displays file size in monitor. + // Reference to the download stream so the watcher can close it on 'cancel'. final CountDownLatch done = new CountDownLatch(1); - final Thread download_thread = Thread.currentThread(); - final Thread watcher = new Thread(() -> - { - try - { - while (! done.await(1, TimeUnit.SECONDS)) - { - final long size = file.length(); - final long full = full_size.get(); - if (full > 0) - { - int percent = (int) ((size*100) / full); - monitor.updateTaskName(String.format("Downloading %d %% (%.3f/%.3f MB)", percent, size/1.0e6, full/1.0e6)); - } - else - monitor.updateTaskName(String.format("Downloading %.3f MB", size/1.0e6)); - - // Force the download thread to stop on 'cancel'. - // 'interrupt()' has no effect, and Files.copy is not - // otherwise instrumented. - if (monitor.isCanceled()) - download_thread.stop(); - } - monitor.updateTaskName(String.format("Download Finished")); - } - catch (Exception ex) - { - logger.log(Level.WARNING, "Download watch thread", ex); - } - }, "Watch Download"); + final AtomicReference download_stream = new AtomicReference<>(); + final Thread watcher = new Thread( + () -> watchDownload(monitor, file, full_size, done, download_stream), + "Watch Download"); watcher.setDaemon(true); watcher.start(); @@ -203,6 +180,7 @@ protected File download(final JobMonitor monitor) throws Exception final InputStream src = getDownloadStream(); ) { + download_stream.set(src); logger.info("Download into " + file); Files.copy(src, file.toPath(), StandardCopyOption.REPLACE_EXISTING); return file; @@ -218,6 +196,74 @@ protected File download(final JobMonitor monitor) throws Exception } } + /** Watch an in-progress download, reporting size to the monitor and + * aborting on 'cancel'. + * + *

Runs on the "Watch Download" thread until {@code done} is counted + * down by {@link #download(JobMonitor)}. + * + * @param monitor {@link JobMonitor} to update and poll for cancellation + * @param file Partially downloaded file, polled for its current size + * @param full_size Expected total size, or ≤ 0 if unknown + * @param done Latch that the download signals once the copy has finished + * @param download_stream Holds the source stream to close on 'cancel' + */ + private void watchDownload(final JobMonitor monitor, + final File file, + final AtomicLong full_size, + final CountDownLatch done, + final AtomicReference download_stream) + { + try + { + while (! done.await(1, TimeUnit.SECONDS)) + { + final long size = file.length(); + final long full = full_size.get(); + if (full > 0) + { + int percent = (int) ((size*100) / full); + monitor.updateTaskName(String.format("Downloading %d %% (%.3f/%.3f MB)", percent, size/1.0e6, full/1.0e6)); + } + else + monitor.updateTaskName(String.format("Downloading %.3f MB", size/1.0e6)); + + // Force the download to stop on 'cancel'. + // 'interrupt()' has no effect, and Files.copy is not + // otherwise instrumented, so close the source stream to + // break the in-progress copy out of its blocking read. + if (monitor.isCanceled()) + closeDownloadStream(download_stream.get()); + } + monitor.updateTaskName(String.format("Download Finished")); + } + catch (Exception ex) + { + logger.log(Level.WARNING, "Download watch thread", ex); + } + } + + /** Close the source stream to abort a cancelled download. + * + *

A failed close is logged rather than thrown so it doesn't end the + * watcher; it will retry on the next poll. + * + * @param src Stream to close, may be null + */ + private void closeDownloadStream(final InputStream src) + { + if (src == null) + return; + try + { + src.close(); + } + catch (IOException ex) + { + logger.log(Level.WARNING, "Cannot close download stream on cancel", ex); + } + } + /** Update installation * @param monitor {@link JobMonitor} * @param install_location Existing {@link Locations#install()} diff --git a/app/update/src/test/java/org/phoebus/applications/update/DownloadCancelTest.java b/app/update/src/test/java/org/phoebus/applications/update/DownloadCancelTest.java new file mode 100644 index 0000000000..59bb2db773 --- /dev/null +++ b/app/update/src/test/java/org/phoebus/applications/update/DownloadCancelTest.java @@ -0,0 +1,158 @@ +/******************************************************************************* + * Copyright (c) 2026 Oak Ridge National Laboratory. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + ******************************************************************************/ +package org.phoebus.applications.update; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.CountDownLatch; + +import org.junit.jupiter.api.Test; +import org.phoebus.framework.jobs.BasicJobMonitor; + +/** Verify the download cancellation behavior of {@link Update}. + * + *

The download watcher used to force-cancel via the now-removed + * {@code Thread.stop()}. It now closes the source stream on 'cancel', + * which must unblock the in-progress {@code Files.copy} on both the + * JDK 21 target (where {@code Thread.stop()} throws at runtime) and + * newer JDKs (where it no longer exists at all). + * + * @author Gianluca Martino + */ +@SuppressWarnings("nls") +public class DownloadCancelTest +{ + /** JobMonitor that always reports 'cancelled' */ + private static class CancelledMonitor extends BasicJobMonitor + { + @Override + public boolean isCanceled() + { + return true; + } + } + + /** InputStream whose read() blocks until the stream is closed, + * emulating a slow network download stuck inside Files.copy. + * Records the name of the thread that first closed it so the + * test can confirm the watcher (not JUnit's timeout interrupt) + * performed the abort. + */ + private static class BlockingStream extends InputStream + { + private final CountDownLatch closed = new CountDownLatch(1); + private volatile String closing_thread = null; + + @Override + public int read() throws IOException + { + try + { + closed.await(); + } + catch (InterruptedException ex) + { + Thread.currentThread().interrupt(); + } + throw new IOException("Stream closed"); + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException + { + return read(); + } + + @Override + public void close() + { + if (closing_thread == null) + closing_thread = Thread.currentThread().getName(); + closed.countDown(); + } + + String getClosingThread() + { + return closing_thread; + } + } + + /** Minimal {@link Update} that streams the given bytes. */ + private static Update updateStreaming(final InputStream stream) + { + return new Update() + { + @Override + protected Instant getVersion() + { + return Instant.now(); + } + + @Override + protected Long getDownloadSize() + { + return 1000L; + } + + @Override + protected InputStream getDownloadStream() + { + return stream; + } + }; + } + + /** A cancelled download must abort instead of hanging, + * and the abort must be performed by the watcher thread. + */ + @Test + public void cancelAbortsDownload() + { + final BlockingStream stream = new BlockingStream(); + final Update update = updateStreaming(stream); + + // The monitor reports 'cancelled', so the watcher must close the + // stream within its ~1s poll interval, making download() throw + // instead of blocking forever. Generous timeout avoids CI flakiness. + assertTimeoutPreemptively(Duration.ofSeconds(30), () -> + assertThrows(IOException.class, () -> update.download(new CancelledMonitor()))); + + // The abort must come from the watcher closing the stream, not from + // JUnit's preemptive-timeout interrupt (which would mean it hung). + assertEquals("Watch Download", stream.getClosingThread()); + } + + /** A normal (non-cancelled) download must complete and return the file. */ + @Test + public void normalDownloadCompletes() throws Exception + { + final byte[] payload = "phoebus update payload".getBytes(StandardCharsets.UTF_8); + final Update update = updateStreaming(new ByteArrayInputStream(payload)); + + final File file = update.download(new BasicJobMonitor()); + try + { + assertArrayEquals(payload, Files.readAllBytes(file.toPath())); + } + finally + { + file.delete(); + } + } +}