Skip to content

Instantly share code, notes, and snippets.

@allada
Last active April 1, 2024 00:13
Show Gist options
  • Save allada/23f6d9f11854c830d48828a1f00ae8a5 to your computer and use it in GitHub Desktop.
Save allada/23f6d9f11854c830d48828a1f00ae8a5 to your computer and use it in GitHub Desktop.
diff --git a/Cargo.lock b/Cargo.lock
index e563095..c0e1fee 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1915,6 +1915,7 @@ dependencies = [
"serde_json5",
"shlex",
"tokio",
+ "tokio-fork",
"tokio-stream",
"tonic 0.11.0",
"tracing",
@@ -2964,6 +2965,16 @@ dependencies = [
"windows-sys 0.48.0",
]
+[[package]]
+name = "tokio-fork"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "008f916174b9d488089bbf4309aa1b07fe7a87686318bd6d77a692b937de2bfb"
+dependencies = [
+ "libc",
+ "tokio",
+]
+
[[package]]
name = "tokio-io-timeout"
version = "1.2.0"
diff --git a/nativelink-worker/BUILD.bazel b/nativelink-worker/BUILD.bazel
index 9abaf1e..33286a5 100644
--- a/nativelink-worker/BUILD.bazel
+++ b/nativelink-worker/BUILD.bazel
@@ -78,6 +78,7 @@ rust_test_suite(
"@crates//:rand",
"@crates//:tokio",
"@crates//:tonic",
+ "@crates//:tokio-fork",
],
)
diff --git a/nativelink-worker/Cargo.toml b/nativelink-worker/Cargo.toml
index 0635ef9..210dad8 100644
--- a/nativelink-worker/Cargo.toml
+++ b/nativelink-worker/Cargo.toml
@@ -40,3 +40,4 @@ once_cell = "1.19.0"
pretty_assertions = "1.4.0"
prost-types = "0.12.3"
rand = "0.8.5"
+tokio-fork = "0.2.1"
diff --git a/nativelink-worker/tests/running_actions_manager_test.rs b/nativelink-worker/tests/running_actions_manager_test.rs
index 6c54afd..841e8b4 100644
--- a/nativelink-worker/tests/running_actions_manager_test.rs
+++ b/nativelink-worker/tests/running_actions_manager_test.rs
@@ -2933,191 +2933,224 @@ exit 1
// We've experienced deadlocks when uploading, so make only a single permit available and
// check it's able to handle uploading some directories with some files in.
- // Be default this test is ignored because it *must* be run single threaded... to run this
- // test execute:
- // cargo test -p nativelink-worker --test running_actions_manager_test -- --test-threads=1 --ignored
- #[tokio::test]
- #[ignore]
- async fn upload_with_single_permit() -> Result<(), Box<dyn std::error::Error>> {
+ #[test]
+ fn upload_with_single_permit() -> Result<(), Box<dyn std::error::Error>> {
+ unsafe {
+ let fork = tokio_fork::fork()?;
+ match fork {
+ tokio_fork::Fork::Child => {
+ let panics = std::panic::catch_unwind(|| {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap()
+ .block_on(inner_test())
+ });
+ // Do not trust rust's test wrapper exit code, as it may be incorrect.
+ // We manually kill the process after this test if we are a child with
+ // the correct exit code.
+ std::process::exit(match panics {
+ Ok(Ok(())) => 0,
+ Ok(Err(e)) => {
+ eprintln!("Error: {e:?}");
+ 1
+ }
+ Err(e) => {
+ eprintln!("Error: {e:?}");
+ 1
+ },
+ });
+ }
+ tokio_fork::Fork::Parent(mut child) => {
+ let exit_status = child.block().expect("Failed to wait for child process");
+ assert!(
+ exit_status.success(),
+ "Child process failed. This probably means test failed"
+ );
+ return Ok(());
+ }
+ }
+ }
const WORKER_ID: &str = "foo_worker_id";
fn test_monotonic_clock() -> SystemTime {
static CLOCK: AtomicU64 = AtomicU64::new(0);
monotonic_clock(&CLOCK)
}
+ async fn inner_test() -> Result<(), Box<dyn std::error::Error>> {
+ let (_, slow_store, cas_store, ac_store) = setup_stores().await?;
+ let root_work_directory = make_temp_path("root_work_directory");
+ fs::create_dir_all(&root_work_directory).await?;
+
+ // Take all but one FD permit away.
+ let _permits = futures::stream::iter(1..fs::OPEN_FILE_SEMAPHORE.available_permits())
+ .then(|_| fs::OPEN_FILE_SEMAPHORE.acquire())
+ .try_collect::<Vec<_>>()
+ .await?;
+ assert_eq!(1, fs::OPEN_FILE_SEMAPHORE.available_permits());
- let (_, slow_store, cas_store, ac_store) = setup_stores().await?;
- let root_work_directory = make_temp_path("root_work_directory");
- fs::create_dir_all(&root_work_directory).await?;
-
- // Take all but one FD permit away.
- let _permits = futures::stream::iter(1..fs::OPEN_FILE_SEMAPHORE.available_permits())
- .then(|_| fs::OPEN_FILE_SEMAPHORE.acquire())
- .try_collect::<Vec<_>>()
- .await?;
- assert_eq!(1, fs::OPEN_FILE_SEMAPHORE.available_permits());
-
- let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_callbacks(
- RunningActionsManagerArgs {
- root_work_directory,
- execution_configuration: ExecutionConfiguration::default(),
- cas_store: Pin::into_inner(cas_store.clone()),
- ac_store: Some(Pin::into_inner(ac_store.clone())),
- historical_store: Pin::into_inner(cas_store.clone()),
- upload_action_result_config:
- &nativelink_config::cas_server::UploadActionResultConfig {
- upload_ac_results_strategy:
- nativelink_config::cas_server::UploadCacheResultsStrategy::never,
+ let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_callbacks(
+ RunningActionsManagerArgs {
+ root_work_directory,
+ execution_configuration: ExecutionConfiguration::default(),
+ cas_store: Pin::into_inner(cas_store.clone()),
+ ac_store: Some(Pin::into_inner(ac_store.clone())),
+ historical_store: Pin::into_inner(cas_store.clone()),
+ upload_action_result_config:
+ &nativelink_config::cas_server::UploadActionResultConfig {
+ upload_ac_results_strategy:
+ nativelink_config::cas_server::UploadCacheResultsStrategy::never,
+ ..Default::default()
+ },
+ max_action_timeout: Duration::MAX,
+ timeout_handled_externally: false,
+ },
+ Callbacks {
+ now_fn: test_monotonic_clock,
+ sleep_fn: |_duration| Box::pin(futures::future::pending()),
+ },
+ )?);
+ let action_result = {
+ const SALT: u64 = 55;
+ #[cfg(target_family = "unix")]
+ let arguments = vec![
+ "sh".to_string(),
+ "-c".to_string(),
+ "printf '123 ' > ./test.txt; mkdir ./tst; printf '456 ' > ./tst/tst.txt; printf 'foo-stdout '; >&2 printf 'bar-stderr '"
+ .to_string(),
+ ];
+ #[cfg(target_family = "windows")]
+ let arguments = vec![
+ "cmd".to_string(),
+ "/C".to_string(),
+ // Note: Windows adds two spaces after 'set /p=XXX'.
+ "echo | set /p=123> ./test.txt & mkdir ./tst & echo | set /p=456> ./tst/tst.txt & echo | set /p=foo-stdout & echo | set /p=bar-stderr 1>&2 & exit 0"
+ .to_string(),
+ ];
+ let working_directory = "some_cwd";
+ let command = Command {
+ arguments,
+ output_paths: vec!["test.txt".to_string(), "tst".to_string()],
+ working_directory: working_directory.to_string(),
+ ..Default::default()
+ };
+ let command_digest = serialize_and_upload_message(
+ &command,
+ cas_store.as_ref(),
+ &mut DigestHasherFunc::Sha256.hasher(),
+ )
+ .await?;
+ let input_root_digest = serialize_and_upload_message(
+ &Directory {
+ directories: vec![DirectoryNode {
+ name: working_directory.to_string(),
+ digest: Some(
+ serialize_and_upload_message(
+ &Directory::default(),
+ cas_store.as_ref(),
+ &mut DigestHasherFunc::Sha256.hasher(),
+ )
+ .await?
+ .into(),
+ ),
+ }],
..Default::default()
},
- max_action_timeout: Duration::MAX,
- timeout_handled_externally: false,
- },
- Callbacks {
- now_fn: test_monotonic_clock,
- sleep_fn: |_duration| Box::pin(futures::future::pending()),
- },
- )?);
- let action_result = {
- const SALT: u64 = 55;
- #[cfg(target_family = "unix")]
- let arguments = vec![
- "sh".to_string(),
- "-c".to_string(),
- "printf '123 ' > ./test.txt; mkdir ./tst; printf '456 ' > ./tst/tst.txt; printf 'foo-stdout '; >&2 printf 'bar-stderr '"
- .to_string(),
- ];
- #[cfg(target_family = "windows")]
- let arguments = vec![
- "cmd".to_string(),
- "/C".to_string(),
- // Note: Windows adds two spaces after 'set /p=XXX'.
- "echo | set /p=123> ./test.txt & mkdir ./tst & echo | set /p=456> ./tst/tst.txt & echo | set /p=foo-stdout & echo | set /p=bar-stderr 1>&2 & exit 0"
- .to_string(),
- ];
- let working_directory = "some_cwd";
- let command = Command {
- arguments,
- output_paths: vec!["test.txt".to_string(), "tst".to_string()],
- working_directory: working_directory.to_string(),
- ..Default::default()
- };
- let command_digest = serialize_and_upload_message(
- &command,
- cas_store.as_ref(),
- &mut DigestHasherFunc::Sha256.hasher(),
- )
- .await?;
- let input_root_digest = serialize_and_upload_message(
- &Directory {
- directories: vec![DirectoryNode {
- name: working_directory.to_string(),
- digest: Some(
- serialize_and_upload_message(
- &Directory::default(),
- cas_store.as_ref(),
- &mut DigestHasherFunc::Sha256.hasher(),
- )
- .await?
- .into(),
- ),
- }],
+ cas_store.as_ref(),
+ &mut DigestHasherFunc::Sha256.hasher(),
+ )
+ .await?;
+ let action = Action {
+ command_digest: Some(command_digest.into()),
+ input_root_digest: Some(input_root_digest.into()),
..Default::default()
- },
- cas_store.as_ref(),
- &mut DigestHasherFunc::Sha256.hasher(),
- )
- .await?;
- let action = Action {
- command_digest: Some(command_digest.into()),
- input_root_digest: Some(input_root_digest.into()),
- ..Default::default()
- };
- let action_digest = serialize_and_upload_message(
- &action,
- cas_store.as_ref(),
- &mut DigestHasherFunc::Sha256.hasher(),
- )
- .await?;
-
- let running_action_impl = running_actions_manager
- .create_and_add_action(
- WORKER_ID.to_string(),
- StartExecute {
- execute_request: Some(ExecuteRequest {
- action_digest: Some(action_digest.into()),
- ..Default::default()
- }),
- salt: SALT,
- queued_timestamp: None,
- },
+ };
+ let action_digest = serialize_and_upload_message(
+ &action,
+ cas_store.as_ref(),
+ &mut DigestHasherFunc::Sha256.hasher(),
)
.await?;
- run_action(running_action_impl.clone()).await?
- };
- let file_content = slow_store
- .as_ref()
- .get_part_unchunked(action_result.output_files[0].digest, 0, None, None)
- .await?;
- assert_eq!(from_utf8(&file_content)?, "123 ");
- let stdout_content = slow_store
- .as_ref()
- .get_part_unchunked(action_result.stdout_digest, 0, None, None)
- .await?;
- assert_eq!(from_utf8(&stdout_content)?, "foo-stdout ");
- let stderr_content = slow_store
- .as_ref()
- .get_part_unchunked(action_result.stderr_digest, 0, None, None)
- .await?;
- assert_eq!(from_utf8(&stderr_content)?, "bar-stderr ");
- let mut clock_time = make_system_time(0);
- assert_eq!(
- action_result,
- ActionResult {
- output_files: vec![FileInfo {
- name_or_path: NameOrPath::Path("test.txt".to_string()),
- digest: DigestInfo::try_new(
- "c69e10a5f54f4e28e33897fbd4f8701595443fa8c3004aeaa20dd4d9a463483b",
- 4
+ let running_action_impl = running_actions_manager
+ .create_and_add_action(
+ WORKER_ID.to_string(),
+ StartExecute {
+ execute_request: Some(ExecuteRequest {
+ action_digest: Some(action_digest.into()),
+ ..Default::default()
+ }),
+ salt: SALT,
+ queued_timestamp: None,
+ },
+ )
+ .await?;
+
+ run_action(running_action_impl.clone()).await?
+ };
+ let file_content = slow_store
+ .as_ref()
+ .get_part_unchunked(action_result.output_files[0].digest, 0, None, None)
+ .await?;
+ assert_eq!(from_utf8(&file_content)?, "123 ");
+ let stdout_content = slow_store
+ .as_ref()
+ .get_part_unchunked(action_result.stdout_digest, 0, None, None)
+ .await?;
+ assert_eq!(from_utf8(&stdout_content)?, "foo-stdout ");
+ let stderr_content = slow_store
+ .as_ref()
+ .get_part_unchunked(action_result.stderr_digest, 0, None, None)
+ .await?;
+ assert_eq!(from_utf8(&stderr_content)?, "bar-stderr ");
+ let mut clock_time = make_system_time(0);
+ assert_eq!(
+ action_result,
+ ActionResult {
+ output_files: vec![FileInfo {
+ name_or_path: NameOrPath::Path("test.txt".to_string()),
+ digest: DigestInfo::try_new(
+ "c69e10a5f54f4e28e33897fbd4f8701595443fa8c3004aeaa20dd4d9a463483b",
+ 4
+ )?,
+ is_executable: false,
+ }],
+ stdout_digest: DigestInfo::try_new(
+ "15019a676f057d97d1ad3af86f3cc1e623cb33b18ff28422bbe3248d2471cc94",
+ 11
)?,
- is_executable: false,
- }],
- stdout_digest: DigestInfo::try_new(
- "15019a676f057d97d1ad3af86f3cc1e623cb33b18ff28422bbe3248d2471cc94",
- 11
- )?,
- stderr_digest: DigestInfo::try_new(
- "2375ab8a01ca11e1ea7606dfb58756c153d49733cde1dbfb5a1e00f39afacf06",
- 12
- )?,
- exit_code: 0,
- output_folders: vec![DirectoryInfo {
- path: "tst".to_string(),
- tree_digest: DigestInfo::try_new(
- "95711c1905d4898a70209dd6e98241dcafb479c00241a1ea4ed8415710d706f3",
- 166,
+ stderr_digest: DigestInfo::try_new(
+ "2375ab8a01ca11e1ea7606dfb58756c153d49733cde1dbfb5a1e00f39afacf06",
+ 12
)?,
- },],
- output_file_symlinks: vec![],
- output_directory_symlinks: vec![],
- server_logs: HashMap::new(),
- execution_metadata: ExecutionMetadata {
- worker: WORKER_ID.to_string(),
- queued_timestamp: SystemTime::UNIX_EPOCH,
- worker_start_timestamp: increment_clock(&mut clock_time),
- input_fetch_start_timestamp: increment_clock(&mut clock_time),
- input_fetch_completed_timestamp: increment_clock(&mut clock_time),
- execution_start_timestamp: increment_clock(&mut clock_time),
- execution_completed_timestamp: increment_clock(&mut clock_time),
- output_upload_start_timestamp: increment_clock(&mut clock_time),
- output_upload_completed_timestamp: increment_clock(&mut clock_time),
- worker_completed_timestamp: increment_clock(&mut clock_time),
- },
- error: None,
- message: String::new(),
- }
- );
- Ok(())
+ exit_code: 0,
+ output_folders: vec![DirectoryInfo {
+ path: "tst".to_string(),
+ tree_digest: DigestInfo::try_new(
+ "95711c1905d4898a70209dd6e98241dcafb479c00241a1ea4ed8415710d706f3",
+ 166,
+ )?,
+ },],
+ output_file_symlinks: vec![],
+ output_directory_symlinks: vec![],
+ server_logs: HashMap::new(),
+ execution_metadata: ExecutionMetadata {
+ worker: WORKER_ID.to_string(),
+ queued_timestamp: SystemTime::UNIX_EPOCH,
+ worker_start_timestamp: increment_clock(&mut clock_time),
+ input_fetch_start_timestamp: increment_clock(&mut clock_time),
+ input_fetch_completed_timestamp: increment_clock(&mut clock_time),
+ execution_start_timestamp: increment_clock(&mut clock_time),
+ execution_completed_timestamp: increment_clock(&mut clock_time),
+ output_upload_start_timestamp: increment_clock(&mut clock_time),
+ output_upload_completed_timestamp: increment_clock(&mut clock_time),
+ worker_completed_timestamp: increment_clock(&mut clock_time),
+ },
+ error: None,
+ message: String::new(),
+ }
+ );
+ Ok(())
+ }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment