darkwing/server/api/
stop_controller.rsuse axum::extract::{DefaultBodyLimit, Json};
use axum::routing::post;
use axum::Router;
use axum_typed_multipart::TypedMultipart;
use darkwing_diff::Patch;
use metrics::histogram;
use tempfile::NamedTempFile;
use tracing::{debug, info};
use crate::{measure_time, unreachable_if_none, unreachable_if_none_ref};
use crate::server::dtos::stop_dto::{StopRequest, StopResponse};
use crate::server::error::AppResult;
use crate::server::error::Error;
use crate::server::extractors::RequiredAuthentication;
use crate::server::services::cloud_file_services::S3ClientType;
use crate::server::services::Services;
pub struct StopController;
impl StopController {
pub fn app() -> Router {
Router::new()
.route("/", post(Self::main))
.layer(DefaultBodyLimit::max(1024 * 1024 * 700)) }
pub async fn main(
RequiredAuthentication {
user,
team,
services,
token: _,
}: RequiredAuthentication,
TypedMultipart(request): TypedMultipart<StopRequest>,
) -> AppResult<Json<StopResponse>> {
let is_diff_present = request.diff.is_some();
let is_datadir_present = request.datadir.is_some();
if (!is_diff_present && !is_datadir_present)
|| (is_diff_present && is_datadir_present)
{
return Err(Error::BadRequest(
"Specify either diff or datadir".to_string(),
));
}
if let Some(diff) = &request.diff {
let size = tokio::fs::metadata(diff.path()).await?.len();
histogram!("darkwing_stop_diff_size_bytes").record(size as f64);
metrics::counter!("darkwing_stop_requests_total", "type" => "diff")
.increment(1);
}
if let Some(datadir) = &request.datadir {
let size = tokio::fs::metadata(datadir.path()).await?.len();
histogram!("darkwing_stop_datadir_size_bytes").record(size as f64);
metrics::counter!("darkwing_stop_requests_total", "type" => "datadir")
.increment(1);
}
if services.users.is_fully_free_plan(&team) {
return Err(Error::PaymentRequired);
}
let access = measure_time!(
"stop_check_access",
services
.browser_profile_service
.check_access(user, request.browser_profile_id)
.await
)?;
if !access {
return Err(Error::Forbidden);
}
let browser_profile = measure_time!(
"stop_get_mini_profile",
services
.browser_profile_service
.get_mini_by_id(request.browser_profile_id as i64)
.await
)?;
if browser_profile
.clone()
.datadir_hash
.is_some_and(|hash| hash == request.hash)
{
return Ok(Json(StopResponse { hash: request.hash }));
}
let datadir_path = measure_time!(
"stop_generate_storage_path",
services
.browser_profile_service
.generate_storage_path(browser_profile.clone())
)?;
let old_archive = NamedTempFile::new()?;
let old_archive_path = old_archive.path().to_string_lossy().to_string();
debug!(
"forming old_archive_path. datadir_hash.is_some: {}, is_diff_present: {}",
browser_profile.clone().datadir_hash.is_some(),
is_diff_present
);
let old_archive_path = if browser_profile.clone().datadir_hash.is_some() {
measure_time!(
"stop_get_old_archive",
services
.cloud_files
.get_file(&datadir_path, old_archive_path.clone())
.await
)?
} else {
None
};
if is_diff_present && old_archive_path.is_none() {
return Err(Error::NoPreviousDatadir);
}
let new_archive = NamedTempFile::new()?;
let new_archive_path = new_archive.path().to_string_lossy().to_string();
if is_diff_present {
debug!("applying patch");
let (diff, diff_size) = measure_time!("stop_construct_patch", {
let diff_file = unreachable_if_none!(request.diff).into_temp_path();
let diff = tokio::fs::read(diff_file).await?;
let diff = Patch::from_bytes(&diff).map_err(Into::<Error>::into)?;
let diff_size = diff.get_size();
(diff, diff_size)
});
measure_time!(
"stop_apply_patch",
services
.local_files
.apply_patch(
unreachable_if_none_ref!(old_archive_path),
diff,
new_archive_path.clone(),
)
.await
)?;
let applied_size =
tokio::fs::metadata(new_archive_path.clone()).await?.len();
histogram!("darkwing_stop_diff_size_reduction_ratio")
.record((diff_size as f64 - applied_size as f64) / applied_size as f64);
} else {
measure_time!(
"stop_copy_datadir",
tokio::fs::copy(
unreachable_if_none!(request.datadir).into_temp_path(),
new_archive_path.clone(),
)
.await
)?;
}
debug!(
"new archive size before merge: {}",
tokio::fs::metadata(new_archive_path.clone()).await?.len()
);
let new_archive_hash = match (
request.previous_hash.clone(),
browser_profile.clone().datadir_hash,
) {
(Some(prev_hash), Some(db_hash)) if db_hash == prev_hash => {
debug!("skipping merge, as this version is based on the same datadir");
request.hash.clone()
}
(None, None) => {
debug!(
"no previous hash (both request and db), seems like this is first sync, using new hash"
);
request.hash.clone()
}
(Some(_), None) => {
debug!("no db hash, but req have previous hash");
request.hash.clone()
}
_ => {
debug!(
"merging sqlites, bc db hash: {:?} is not equal to {:?}",
browser_profile.datadir_hash.clone(),
request.previous_hash.clone()
);
measure_time!(
"stop_merge_cookies",
services
.sqlite
.merge_cookies(old_archive_path.clone(), new_archive_path.clone())
.await
)?;
measure_time!(
"stop_calc_hash_after_merge",
services
.local_files
.calc_hash_from_path(new_archive_path.clone())
.await
)?
}
};
debug!(
"archive size after merge: {}",
tokio::fs::metadata(new_archive_path.clone()).await?.len()
);
if browser_profile.clone().datadir_hash.is_some()
&& old_archive_path.is_some()
{
let res = measure_time!(
"stop_prepare_patch",
prepare_patch(
datadir_path.clone(),
unreachable_if_none!(browser_profile.datadir_hash),
unreachable_if_none_ref!(old_archive_path).clone(),
new_archive_path.clone(),
&services,
)
.await
);
if let Err(err) = res {
sentry::capture_error(&err);
}
}
let (main_storage_result, backup_storage_result) = measure_time!(
"stop_put_files_to_storage",
tokio::join!(
services.cloud_files.put_file(
&datadir_path,
new_archive_path.clone(),
S3ClientType::Primary,
),
services.cloud_files.put_file(
&datadir_path,
new_archive_path.clone(),
S3ClientType::Backup,
)
)
);
main_storage_result?;
if let Err(err) = backup_storage_result {
sentry::capture_error(&err);
}
measure_time!(
"stop_update_datadir_hash",
services
.browser_profile_service
.update_datadir_hash(
request.browser_profile_id as i64,
new_archive_hash.clone(),
)
.await
)?;
Ok(Json(StopResponse {
hash: new_archive_hash,
}))
}
}
async fn prepare_patch(
remote_datadir_path: String,
datadir_hash: String,
old_archive_path: String,
new_archive_path: String,
services: &Services,
) -> AppResult<()> {
info!("preparing patch");
let patch = services
.local_files
.calc_diff(old_archive_path.clone(), new_archive_path.clone())
.await?;
info!("patch size: {}", patch.get_size());
let patch_bytes = patch.to_bytes().map_err(Into::<Error>::into)?;
let key = format!("{}/{}.patch", remote_datadir_path, datadir_hash);
services.cloud_files.put_diff(&key, patch_bytes).await?;
Ok(())
}