-
Notifications
You must be signed in to change notification settings - Fork 964
Description
Describe the bug
We're experiencing an intermittent issue when downloading a large number of files (400-500+) in parallel using the AWS SDK for Java v2's S3TransferManager with S3AsyncClient (CRT-based).
Error:
software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: subscription has been cancelled. (SDK Attempt Count: 1)
at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:130)
at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:95)
at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.retryPolicyDisallowedRetryException(RetryableStageHelper.java:168)
... 25 more
Caused by: java.util.concurrent.CancellationException: subscription has been cancelled.
at software.amazon.awssdk.utils.async.SimplePublisher.lambda$doProcessQueue$9(SimplePublisher.java:286)
at software.amazon.awssdk.utils.async.SimplePublisher$FailureMessage.get(SimplePublisher.java:413)
at software.amazon.awssdk.utils.async.SimplePublisher$FailureMessage.access$700(SimplePublisher.java:397)
at software.amazon.awssdk.utils.async.SimplePublisher.doProcessQueue(SimplePublisher.java:260)
at software.amazon.awssdk.utils.async.SimplePublisher.processEventQueue(SimplePublisher.java:224)
at software.amazon.awssdk.utils.async.SimplePublisher.access$1300(SimplePublisher.java:58)
at software.amazon.awssdk.utils.async.SimplePublisher$SubscriptionImpl.cancel(SimplePublisher.java:389)
at software.amazon.awssdk.core.async.listener.SubscriberListener$NotifyingSubscriber$NotifyingSubscription.cancel(SubscriberListener.java:124)
at software.amazon.awssdk.core.async.listener.SubscriberListener$NotifyingSubscriber$NotifyingSubscription.cancel(SubscriberListener.java:124)
at software.amazon.awssdk.core.internal.async.FileAsyncResponseTransformer$FileSubscriber$1.failed(FileAsyncResponseTransformer.java:244)
at software.amazon.awssdk.core.internal.async.FileAsyncResponseTransformer$FileSubscriber$1.failed(FileAsyncResponseTransformer.java:223)
at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:131)
at java.base/sun.nio.ch.SimpleAsynchronousFileChannelImpl$3.run(SimpleAsynchronousFileChannelImpl.java:389)
... 3 more
Context:
- Using S3TransferManager.downloadFile() to download files to local disk
- Files vary in size (some are quite large) , the total size is around 240 gb
- Intermittent: Some operations succeed with the same workload, others fail
- We use default parallelism of S3 Transfer Manager
Questions:
What are the common causes of FileAsyncResponseTransformer subscription cancellation beyond network issues? Any guidance on the underlying cause or mitigation strategies would be greatly appreciated.
Regression Issue
- Select this option if this issue appears to be a regression.
Expected Behavior
Files are downloaded successfully
Current Behavior
subscription has been cancelled exception
Reproduction Steps
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.DownloadFileRequest;
import java.io.File;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class S3DownloadConcurrencyTest {
private static final int EXTERNAL_THREAD_POOL_SIZE = 32;
private static final long MAX_BYTES_IN_BATCH = 1L * 1024 * 1024 * 1024; // 1GB
public static void main(String[] args) throws Exception {
// Create S3AsyncClient using CRT
S3AsyncClient s3AsyncClient = S3AsyncClient.crtBuilder()
.credentialsProvider(DefaultCredentialsProvider.create())
.region(Region.US_WEST_2)
.build();
S3TransferManager transferManager = S3TransferManager.builder()
.s3Client(s3AsyncClient)
.build();
ExecutorService downloadExecutor = Executors.newFixedThreadPool(EXTERNAL_THREAD_POOL_SIZE);
File downloadDir = Files.createTempDirectory("s3-downloads").toFile();
String bucketName = "your-test-bucket";
Map<String, Long> fileNameToSize = new HashMap<>();
for (int i = 0; i < 500; i++) {
String key = "test-data/file-" + i + ".data";
long fileSize = 480L * 1024 * 1024;
fileNameToSize.put(key, fileSize);
}
List<List<String>> batches = partitionFiles(fileNameToSize);
System.out.println("Downloading " + fileNameToSize.size() + " files (~240GB total) in "
+ batches.size() + " batches");
for (List<String> batch : batches) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
System.out.println("Processing batch with " + batch.size() + " files");
for (String key : batch) {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
try {
File destination = new File(downloadDir, key.replaceAll("/", "_"));
DownloadFileRequest downloadRequest = DownloadFileRequest.builder()
.getObjectRequest(req -> req.bucket(bucketName).key(key))
.destination(destination)
.build();
transferManager.downloadFile(downloadRequest)
.completionFuture()
.join();
return null;
} catch (Exception e) {
throw new RuntimeException("Download failed for " + key, e);
}
}, downloadExecutor);
futures.add(future);
}
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
System.out.println("Batch completed successfully");
} catch (Exception e) {
System.err.println("Batch failed with error:");
e.printStackTrace();
break;
}
}
transferManager.close();
s3AsyncClient.close();
downloadExecutor.shutdown();
}
private static List<List<String>> partitionFiles(Map<String, Long> fileNameToSize) {
List<List<String>> batches = new ArrayList<>();
List<String> currentBatch = new ArrayList<>();
long currentBatchSize = 0;
for (Map.Entry<String, Long> entry : fileNameToSize.entrySet()) {
currentBatchSize += entry.getValue();
currentBatch.add(entry.getKey());
if (currentBatchSize >= MAX_BYTES_IN_BATCH) {
batches.add(currentBatch);
currentBatch = new ArrayList<>();
currentBatchSize = 0;
}
}
if (!currentBatch.isEmpty()) {
batches.add(currentBatch);
}
return batches;
}
}
Possible Solution
No response
Additional Information/Context
No response
AWS Java SDK version used
AWS SDK v2
JDK version used
JDK 17.0.16
Operating System and version
Ubuntu 22.04.5 LTS