darkwing/server/services/
cloud_file_services.rsuse anyhow::Context;
use async_trait::async_trait;
use aws_config::BehaviorVersion;
use aws_sdk_s3::{
presigning::PresigningConfig, primitives::ByteStream, types::StorageClass,
Client as S3Client,
};
use mockall::automock;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tracing::{debug, info};
use crate::{config::DarkwingConfig, server::error::AppResult};
pub enum S3ClientType {
Primary,
Backup,
}
impl std::fmt::Display for S3ClientType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
S3ClientType::Primary => write!(f, "primary"),
S3ClientType::Backup => write!(f, "backup"),
}
}
}
pub type DynCloudFileService = Arc<dyn CloudFileServiceTrait + Send + Sync>;
#[automock]
#[async_trait]
pub trait CloudFileServiceTrait {
async fn get_file(
&self,
key: &str,
local_path: String,
) -> AppResult<Option<String>>;
async fn put_file(
&self,
key: &str,
local_path: String,
client: S3ClientType,
) -> AppResult<()>;
async fn put_file_bytes(&self, key: &str, bytes: Vec<u8>) -> AppResult<()>;
async fn list_files(
&self,
key: &str,
client: S3ClientType,
) -> AppResult<Vec<Option<String>>>;
async fn generate_presigned_url(&self, key: &str) -> AppResult<String>;
async fn does_file_exist(
&self,
key: &str,
client: S3ClientType,
) -> AppResult<bool>;
async fn delete_file(&self, key: &str) -> AppResult<()>;
async fn put_diff(&self, key: &str, bytes: Vec<u8>) -> AppResult<()>;
}
#[derive(Clone)]
pub struct CloudFileService {
config: Arc<DarkwingConfig>,
s3_client: S3Client,
backup_s3_client: S3Client,
}
impl CloudFileService {
pub async fn new(config: Arc<DarkwingConfig>) -> Self {
let region: &'static str =
Box::leak(config.aws_s3_region.clone().into_boxed_str());
let s3_config = aws_config::defaults(BehaviorVersion::latest())
.region(region)
.endpoint_url(config.aws_s3_endpoint.clone())
.credentials_provider(aws_sdk_s3::config::Credentials::new(
config.aws_s3_access_key_id.clone(),
config.aws_s3_secret_access_key.clone(),
None,
None,
"darkwing provider",
))
.load()
.await;
let s3_client = S3Client::new(&s3_config);
let backup_region: &'static str =
Box::leak(config.aws_s3_backup_region.clone().into_boxed_str());
let backup_s3_config = aws_config::defaults(BehaviorVersion::latest())
.region(backup_region)
.endpoint_url(config.aws_s3_backup_endpoint.clone())
.credentials_provider(aws_sdk_s3::config::Credentials::new(
config.aws_s3_backup_access_key_id.clone(),
config.aws_s3_backup_secret_access_key.clone(),
None,
None,
"darkwing provider",
))
.load()
.await;
let backup_s3_client = S3Client::new(&backup_s3_config);
Self {
config,
s3_client,
backup_s3_client,
}
}
async fn get_file_inner(&self, key: &str) -> AppResult<Option<ByteStream>> {
info!("Getting file from S3: {}", key);
let resp = self
.s3_client
.get_object()
.bucket(&self.config.aws_s3_bucket)
.key(key)
.send()
.await;
match resp {
Ok(resp) => Ok(Some(resp.body)),
Err(e) => {
let service_error = e.into_service_error();
if service_error.is_no_such_key() {
return Ok(None);
}
Err(service_error.into())
}
}
}
}
#[async_trait]
impl CloudFileServiceTrait for CloudFileService {
async fn list_files(
&self,
key: &str,
client: S3ClientType,
) -> AppResult<Vec<Option<String>>> {
info!("Listing files from S3: {}", key);
let s3_client = match client {
S3ClientType::Primary => &self.s3_client,
S3ClientType::Backup => &self.backup_s3_client,
};
let bucket = match client {
S3ClientType::Primary => &self.config.aws_s3_bucket,
S3ClientType::Backup => &self.config.aws_s3_backup_bucket,
};
let resp = s3_client
.list_objects_v2()
.bucket(bucket)
.prefix(key)
.send()
.await
.context(format!("Failed to list objects in {} S3", client))?;
let files = resp
.contents
.unwrap_or_default()
.iter()
.map(|obj| obj.key.clone())
.collect();
Ok(files)
}
async fn get_file(
&self,
key: &str,
local_path: String,
) -> AppResult<Option<String>> {
let body = self.get_file_inner(key).await?;
let mut reader = match body {
Some(body) => body.into_async_read(),
None => return Ok(None),
};
let file = tokio::fs::File::create(local_path.clone()).await?;
let mut writer = tokio::io::BufWriter::new(file);
tokio::io::copy(&mut reader, &mut writer)
.await
.context("Failed to copy S3 object to temp file")?;
writer.flush().await.context("Failed to flush temp file")?;
Ok(Some(local_path))
}
async fn put_file_bytes(&self, key: &str, bytes: Vec<u8>) -> AppResult<()> {
self
.s3_client
.put_object()
.bucket(&self.config.aws_s3_bucket)
.key(key)
.body(ByteStream::new(bytes.into()))
.send()
.await
.context("Failed to put object to S3")?;
Ok(())
}
async fn put_file(
&self,
key: &str,
local_path: String,
client: S3ClientType,
) -> AppResult<()> {
info!("Putting file to S3: {}", key);
let s3_client = match client {
S3ClientType::Primary => &self.s3_client,
S3ClientType::Backup => &self.backup_s3_client,
};
let bucket = match client {
S3ClientType::Primary => &self.config.aws_s3_bucket,
S3ClientType::Backup => &self.config.aws_s3_backup_bucket,
};
let mut put_object_request =
s3_client.put_object().bucket(bucket).key(key).body(
ByteStream::read_from()
.path(local_path)
.build()
.await
.context("Failed to read file")?,
);
if let S3ClientType::Backup = client {
put_object_request =
put_object_request.storage_class(StorageClass::IntelligentTiering);
}
put_object_request
.send()
.await
.context(format!("Failed to put object to {} S3", client))?;
Ok(())
}
async fn generate_presigned_url(&self, key: &str) -> AppResult<String> {
info!("Generating presigned URL for S3: {}", key);
let config = PresigningConfig::builder()
.expires_in(self.config.presigned_url_expiration())
.build()
.context("Failed to build presigning config")?;
let request = self
.s3_client
.get_object()
.bucket(&self.config.aws_s3_bucket)
.key(key)
.presigned(config)
.await
.context("Failed to generate presigned URL")?;
Ok(request.uri().to_string())
}
async fn does_file_exist(
&self,
key: &str,
client: S3ClientType,
) -> AppResult<bool> {
let s3_client = match client {
S3ClientType::Primary => &self.s3_client,
S3ClientType::Backup => &self.backup_s3_client,
};
let resp = s3_client
.head_object()
.bucket(&self.config.aws_s3_bucket)
.key(key)
.send()
.await;
Ok(resp.is_ok_and(|r| r.content_length.is_some()))
}
async fn delete_file(&self, key: &str) -> AppResult<()> {
self
.s3_client
.delete_object()
.bucket(&self.config.aws_s3_bucket)
.key(key)
.send()
.await
.context("Failed to delete object from S3")?;
Ok(())
}
async fn put_diff(&self, key: &str, bytes: Vec<u8>) -> AppResult<()> {
debug!("putting diff for key: {}", key);
let directory = key.split('/').collect::<Vec<&str>>()[0..=1].join("/");
let files = self.list_files(&directory, S3ClientType::Primary).await?;
for file in files.into_iter().flatten() {
if file.ends_with(".patch") {
self.delete_file(&file).await?;
}
}
self.put_file_bytes(key, bytes).await
}
}