Created
April 18, 2026 07:57
-
-
Save vitali2y/6d32bd99157aab7a7085f48007767ad5 to your computer and use it in GitHub Desktop.
Multiline support for ZeroClaw Personal AI Assistant (https://github.com/zeroclaw-labs/zeroclaw.git )
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/src/agent/loop_.rs b/src/agent/loop_.rs | |
| index c156175e..e673211d 100644 | |
| --- a/src/agent/loop_.rs | |
| +++ b/src/agent/loop_.rs | |
| @@ -4070,384 +4070,403 @@ pub async fn run( | |
| vec![ChatMessage::system(&system_prompt)] | |
| }; | |
| + let mut input_buffer: Vec<String> = Vec::new(); | |
| + | |
| loop { | |
| - print!("> "); | |
| - let _ = std::io::stdout().flush(); | |
| + // Print prompt only when starting a fresh turn | |
| + if input_buffer.is_empty() { | |
| + print!("> "); | |
| + let _ = std::io::stdout().flush(); | |
| + } | |
| - // Read raw bytes to avoid UTF-8 validation errors when PTY | |
| - // transport splits multi-byte characters at frame boundaries | |
| - // (e.g. CJK input with spaces over kubectl exec / SSH). | |
| + // 1️⃣ Read line (scoped lock fixes `Send` error) | |
| let mut raw = Vec::new(); | |
| - match std::io::BufRead::read_until(&mut std::io::stdin().lock(), b'\n', &mut raw) { | |
| - Ok(0) => break, | |
| - Ok(_) => {} | |
| - Err(e) => { | |
| - eprintln!("\nError reading input: {e}\n"); | |
| - break; | |
| - } | |
| - } | |
| - let input = String::from_utf8_lossy(&raw).into_owned(); | |
| + let read_res = { | |
| + let mut stdin_lock = std::io::stdin().lock(); | |
| + std::io::BufRead::read_until(&mut stdin_lock, b'\n', &mut raw) | |
| + }; | |
| + let line = String::from_utf8_lossy(&raw).trim().to_string(); | |
| - let user_input = input.trim().to_string(); | |
| - if user_input.is_empty() { | |
| - continue; | |
| - } | |
| - match user_input.as_str() { | |
| - "/quit" | "/exit" => break, | |
| - "/help" => { | |
| - println!("Available commands:"); | |
| - println!(" /help Show this help message"); | |
| - println!(" /clear /new Clear conversation history"); | |
| - println!(" /quit /exit Exit interactive mode"); | |
| - println!( | |
| - " /think:<level> Set reasoning depth (off|minimal|low|medium|high|max)\n" | |
| - ); | |
| - continue; | |
| - } | |
| - "/clear" | "/new" => { | |
| - println!( | |
| - "This will clear the current conversation and delete all session memory." | |
| - ); | |
| - println!("Core memories (long-term facts/preferences) will be preserved."); | |
| - print!("Continue? [y/N] "); | |
| - let _ = std::io::stdout().flush(); | |
| - | |
| - let mut confirm_raw = Vec::new(); | |
| - if std::io::BufRead::read_until( | |
| - &mut std::io::stdin().lock(), | |
| - b'\n', | |
| - &mut confirm_raw, | |
| - ) | |
| - .is_err() | |
| - { | |
| - continue; | |
| - } | |
| - let confirm = String::from_utf8_lossy(&confirm_raw); | |
| - if !matches!(confirm.trim().to_lowercase().as_str(), "y" | "yes") { | |
| - println!("Cancelled.\n"); | |
| - continue; | |
| - } | |
| + let mut should_process = false; | |
| - history.clear(); | |
| - history.push(ChatMessage::system(&system_prompt)); | |
| - // Clear conversation and daily memory | |
| - let mut cleared = 0; | |
| - for category in [MemoryCategory::Conversation, MemoryCategory::Daily] { | |
| - let entries = mem.list(Some(&category), None).await.unwrap_or_default(); | |
| - for entry in entries { | |
| - if mem.forget(&entry.key).await.unwrap_or(false) { | |
| - cleared += 1; | |
| - } | |
| - } | |
| - } | |
| - if cleared > 0 { | |
| - println!("Conversation cleared ({cleared} memory entries removed).\n"); | |
| + // 2️⃣ Decide whether to flush & process | |
| + match read_res { | |
| + Ok(0) => { | |
| + // EOF: process if we have accumulated content | |
| + if !input_buffer.is_empty() { | |
| + should_process = true; | |
| } else { | |
| - println!("Conversation cleared.\n"); | |
| + break; // Clean exit on empty EOF | |
| } | |
| - if let Some(path) = session_state_file.as_deref() { | |
| - save_interactive_session_history(path, &history)?; | |
| - } | |
| - continue; | |
| } | |
| - _ => {} | |
| - } | |
| - | |
| - // ── Parse thinking directive from interactive input ─── | |
| - let (thinking_directive, effective_input) = | |
| - match crate::agent::thinking::parse_thinking_directive(&user_input) { | |
| - Some((level, remaining)) => { | |
| - tracing::info!(thinking_level = ?level, "Thinking directive parsed"); | |
| - (Some(level), remaining) | |
| - } | |
| - None => (None, user_input.clone()), | |
| - }; | |
| - let thinking_level = crate::agent::thinking::resolve_thinking_level( | |
| - thinking_directive, | |
| - None, | |
| - &config.agent.thinking, | |
| - ); | |
| - let thinking_params = crate::agent::thinking::apply_thinking_level(thinking_level); | |
| - let turn_temperature = crate::agent::thinking::clamp_temperature( | |
| - temperature + thinking_params.temperature_adjustment, | |
| - ); | |
| + Ok(_) => { | |
| + if line.is_empty() { | |
| + // Blank line acts as explicit delimiter | |
| + if !input_buffer.is_empty() { | |
| + should_process = true; | |
| + } | |
| + } else { | |
| + // Handle commands only when buffer is empty | |
| + if input_buffer.is_empty() { | |
| + match line.as_str() { | |
| + "/quit" | "/exit" => break, | |
| + "/help" => { | |
| + println!( | |
| + "Available commands:\n /help Show this help message\n /clear /new Clear conversation history\n /quit /exit Exit interactive mode\n /think:<level> Set reasoning depth (off|minimal|low|medium|high|max)\n" | |
| + ); | |
| + continue; | |
| + } | |
| + "/clear" | "/new" => { | |
| + println!( | |
| + "This will clear the current conversation and delete all session memory.\nCore memories (long-term facts/preferences) will be preserved.\nContinue? [y/N] " | |
| + ); | |
| + let _ = std::io::stdout().flush(); | |
| + let mut confirm_raw = Vec::new(); | |
| + { | |
| + let mut confirm_lock = std::io::stdin().lock(); | |
| + let _ = std::io::BufRead::read_until( | |
| + &mut confirm_lock, | |
| + b'\n', | |
| + &mut confirm_raw, | |
| + ); | |
| + } | |
| + let confirm = String::from_utf8_lossy(&confirm_raw); | |
| + if !matches!( | |
| + confirm.trim().to_lowercase().as_str(), | |
| + "y" | "yes" | |
| + ) { | |
| + println!("Cancelled.\n"); | |
| + continue; | |
| + } | |
| + history.clear(); | |
| + history.push(ChatMessage::system(&system_prompt)); | |
| + let mut cleared = 0; | |
| + for category in | |
| + [MemoryCategory::Conversation, MemoryCategory::Daily] | |
| + { | |
| + let entries = mem | |
| + .list(Some(&category), None) | |
| + .await | |
| + .unwrap_or_default(); | |
| + for entry in entries { | |
| + if mem.forget(&entry.key).await.unwrap_or(false) { | |
| + cleared += 1; | |
| + } | |
| + } | |
| + } | |
| + println!( | |
| + "Conversation cleared ({} memory entries removed).\n", | |
| + cleared | |
| + ); | |
| + if let Some(path) = session_state_file.as_deref() { | |
| + save_interactive_session_history(path, &history)?; | |
| + } | |
| + continue; | |
| + } | |
| + _ => {} | |
| + } | |
| + } | |
| - // For non-Medium levels, temporarily patch the system prompt with prefix. | |
| - let turn_system_prompt; | |
| - if let Some(ref prefix) = thinking_params.system_prompt_prefix { | |
| - turn_system_prompt = format!("{prefix}\n\n{system_prompt}"); | |
| - // Update the system message in history for this turn. | |
| - if let Some(sys_msg) = history.first_mut() { | |
| - if sys_msg.role == "system" { | |
| - sys_msg.content = turn_system_prompt.clone(); | |
| + // 3️⃣ Accumulate & check heuristic flush | |
| + input_buffer.push(line.clone()); | |
| + // Heuristic: ≥2 lines + no rlwrap continuation marker ` \` | |
| + if input_buffer.len() > 1 && !line.trim_end().ends_with('\\') { | |
| + should_process = true; | |
| + } | |
| } | |
| } | |
| + Err(e) => { | |
| + eprintln!("\nError reading input: {e}\n"); | |
| + break; | |
| + } | |
| } | |
| - // Auto-save conversation turns (skip short/trivial messages) | |
| - if config.memory.auto_save | |
| - && effective_input.chars().count() >= AUTOSAVE_MIN_MESSAGE_CHARS | |
| - && !memory::should_skip_autosave_content(&effective_input) | |
| - { | |
| - let user_key = autosave_memory_key("user_msg"); | |
| - let _ = mem | |
| - .store( | |
| - &user_key, | |
| - &effective_input, | |
| - MemoryCategory::Conversation, | |
| - memory_session_id.as_deref(), | |
| - ) | |
| - .await; | |
| - } | |
| - | |
| - // Inject memory + hardware RAG context into user message | |
| - let mem_context = build_context( | |
| - mem.as_ref(), | |
| - &effective_input, | |
| - config.memory.min_relevance_score, | |
| - memory_session_id.as_deref(), | |
| - ) | |
| - .await; | |
| - let rag_limit = if config.agent.compact_context { 2 } else { 5 }; | |
| - let hw_context = hardware_rag | |
| - .as_ref() | |
| - .map(|r| build_hardware_context(r, &effective_input, &board_names, rag_limit)) | |
| - .unwrap_or_default(); | |
| - let context = format!("{mem_context}{hw_context}"); | |
| - let now = chrono::Local::now().format("%Y-%m-%d %H:%M:%S %Z"); | |
| - let enriched = if context.is_empty() { | |
| - format!("[{now}] {effective_input}") | |
| - } else { | |
| - format!("{context}[{now}] {effective_input}") | |
| - }; | |
| - | |
| - history.push(ChatMessage::user(&enriched)); | |
| - | |
| - // Compute per-turn excluded MCP tools from tool_filter_groups. | |
| - let excluded_tools = compute_excluded_mcp_tools( | |
| - &tools_registry, | |
| - &config.agent.tool_filter_groups, | |
| - &effective_input, | |
| - ); | |
| - | |
| - // Set up streaming channel so tool progress and response | |
| - // content are printed progressively instead of buffered. | |
| - let (delta_tx, mut delta_rx) = tokio::sync::mpsc::channel::<DraftEvent>(64); | |
| - let content_was_streamed = | |
| - std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); | |
| - let content_streamed_flag = content_was_streamed.clone(); | |
| - let is_tty = std::io::IsTerminal::is_terminal(&std::io::stderr()); | |
| - | |
| - let consumer_handle = tokio::spawn(async move { | |
| - use std::io::Write; | |
| - while let Some(event) = delta_rx.recv().await { | |
| - match event { | |
| - DraftEvent::Clear => { | |
| - let _ = writeln!(std::io::stderr()); | |
| - } | |
| - DraftEvent::Progress(text) => { | |
| - if is_tty { | |
| - let _ = write!(std::io::stderr(), "\x1b[2m{text}\x1b[0m"); | |
| - } else { | |
| - let _ = write!(std::io::stderr(), "{text}"); | |
| - } | |
| - let _ = std::io::stderr().flush(); | |
| + // 4️⃣ Run processing block EXACTLY ONCE per turn | |
| + if should_process { | |
| + // Derive user_input from accumulated buffer | |
| + let user_input = input_buffer.join("\n"); | |
| + input_buffer.clear(); | |
| + | |
| + // ⬇️ PASTE YOUR ORIGINAL PROCESSING BLOCK HERE (UNCHANGED) ⬇️ | |
| + // It must use `user_input` as the input string. | |
| + // (Everything from `// ── Parse thinking directive...` down to | |
| + // `save_interactive_session_history(path, &history)?;`) | |
| + // | |
| + // Example start: | |
| + // let (thinking_directive, effective_input) = match crate::agent::thinking::parse_thinking_directive(&user_input) { ... | |
| + // | |
| + // Example end: | |
| + // if let Some(path) = session_state_file.as_deref() { save_interactive_session_history(path, &history)?; } | |
| + // ⬆️ PASTE YOUR ORIGINAL PROCESSING BLOCK HERE (UNCHANGED) ⬆️ | |
| + | |
| + let (thinking_directive, effective_input) = | |
| + match crate::agent::thinking::parse_thinking_directive(&user_input) { | |
| + Some((level, remaining)) => { | |
| + tracing::info!(thinking_level = ?level, "Thinking directive parsed"); | |
| + (Some(level), remaining) | |
| } | |
| - DraftEvent::Content(text) => { | |
| - content_streamed_flag.store(true, std::sync::atomic::Ordering::Relaxed); | |
| - print!("{text}"); | |
| - let _ = std::io::stdout().flush(); | |
| + None => (None, user_input.clone()), | |
| + }; | |
| + let thinking_level = crate::agent::thinking::resolve_thinking_level( | |
| + thinking_directive, | |
| + None, | |
| + &config.agent.thinking, | |
| + ); | |
| + let thinking_params = crate::agent::thinking::apply_thinking_level(thinking_level); | |
| + let turn_temperature = crate::agent::thinking::clamp_temperature( | |
| + temperature + thinking_params.temperature_adjustment, | |
| + ); | |
| + | |
| + let turn_system_prompt; | |
| + if let Some(ref prefix) = thinking_params.system_prompt_prefix { | |
| + turn_system_prompt = format!("{prefix}\n\n{system_prompt}"); | |
| + if let Some(sys_msg) = history.first_mut() { | |
| + if sys_msg.role == "system" { | |
| + sys_msg.content = turn_system_prompt.clone(); | |
| } | |
| } | |
| } | |
| - }); | |
| - // Ctrl+C cancels the in-flight turn instead of killing the process. | |
| - let cancel_token = CancellationToken::new(); | |
| - let cancel_token_clone = cancel_token.clone(); | |
| - let ctrlc_handle = tokio::spawn(async move { | |
| - if tokio::signal::ctrl_c().await.is_ok() { | |
| - cancel_token_clone.cancel(); | |
| + if config.memory.auto_save | |
| + && effective_input.chars().count() >= AUTOSAVE_MIN_MESSAGE_CHARS | |
| + && !memory::should_skip_autosave_content(&effective_input) | |
| + { | |
| + let user_key = autosave_memory_key("user_msg"); | |
| + let _ = mem | |
| + .store( | |
| + &user_key, | |
| + &effective_input, | |
| + MemoryCategory::Conversation, | |
| + memory_session_id.as_deref(), | |
| + ) | |
| + .await; | |
| } | |
| - }); | |
| - let response = loop { | |
| - match TOOL_LOOP_COST_TRACKING_CONTEXT | |
| - .scope( | |
| - cost_tracking_context.clone(), | |
| - run_tool_call_loop( | |
| - provider.as_ref(), | |
| - &mut history, | |
| - &tools_registry, | |
| - observer.as_ref(), | |
| - &provider_name, | |
| - &model_name, | |
| - turn_temperature, | |
| - true, | |
| - approval_manager.as_ref(), | |
| - channel_name, | |
| - None, | |
| - &config.multimodal, | |
| - config.agent.max_tool_iterations, | |
| - Some(cancel_token.clone()), | |
| - Some(delta_tx.clone()), | |
| - None, | |
| - &excluded_tools, | |
| - &config.agent.tool_call_dedup_exempt, | |
| - activated_handle.as_ref(), | |
| - Some(model_switch_callback.clone()), | |
| - &config.pacing, | |
| - config.agent.max_tool_result_chars, | |
| - config.agent.max_context_tokens, | |
| - None, // shared_budget | |
| - ), | |
| - ) | |
| - .await | |
| - { | |
| - Ok(resp) => break resp, | |
| - Err(e) => { | |
| - if is_tool_loop_cancelled(&e) { | |
| - eprintln!("\n\x1b[2m(cancelled)\x1b[0m"); | |
| - break String::new(); | |
| - } | |
| - if let Some((new_provider, new_model)) = is_model_switch_requested(&e) { | |
| - tracing::info!( | |
| - "Model switch requested, switching from {} {} to {} {}", | |
| - provider_name, | |
| - model_name, | |
| - new_provider, | |
| - new_model | |
| - ); | |
| + let mem_context = build_context( | |
| + mem.as_ref(), | |
| + &effective_input, | |
| + config.memory.min_relevance_score, | |
| + memory_session_id.as_deref(), | |
| + ) | |
| + .await; | |
| + let rag_limit = if config.agent.compact_context { 2 } else { 5 }; | |
| + let hw_context = hardware_rag | |
| + .as_ref() | |
| + .map(|r| build_hardware_context(r, &effective_input, &board_names, rag_limit)) | |
| + .unwrap_or_default(); | |
| + let context = format!("{mem_context}{hw_context}"); | |
| + let now = chrono::Local::now().format("%Y-%m-%d %H:%M:%S %Z"); | |
| + let enriched = if context.is_empty() { | |
| + format!("[{now}] {effective_input}") | |
| + } else { | |
| + format!("{context}[{now}] {effective_input}") | |
| + }; | |
| - provider = providers::create_routed_provider_with_options( | |
| - &new_provider, | |
| - config.api_key.as_deref(), | |
| - config.api_url.as_deref(), | |
| - &config.reliability, | |
| - &config.model_routes, | |
| - &new_model, | |
| - &provider_runtime_options, | |
| - )?; | |
| + history.push(ChatMessage::user(&enriched)); | |
| - provider_name = new_provider; | |
| - model_name = new_model; | |
| + let excluded_tools = compute_excluded_mcp_tools( | |
| + &tools_registry, | |
| + &config.agent.tool_filter_groups, | |
| + &effective_input, | |
| + ); | |
| - clear_model_switch_request(); | |
| + let (delta_tx, mut delta_rx) = tokio::sync::mpsc::channel::<DraftEvent>(64); | |
| + let content_was_streamed = | |
| + std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); | |
| + let content_streamed_flag = content_was_streamed.clone(); | |
| + let is_tty = std::io::IsTerminal::is_terminal(&std::io::stderr()); | |
| + | |
| + let consumer_handle = tokio::spawn(async move { | |
| + use std::io::Write; | |
| + while let Some(event) = delta_rx.recv().await { | |
| + match event { | |
| + DraftEvent::Clear => { | |
| + let _ = writeln!(std::io::stderr()); | |
| + } | |
| + DraftEvent::Progress(text) => { | |
| + if is_tty { | |
| + let _ = write!(std::io::stderr(), "\x1b[2m{text}\x1b[0m"); | |
| + } else { | |
| + let _ = write!(std::io::stderr(), "{text}"); | |
| + } | |
| + let _ = std::io::stderr().flush(); | |
| + } | |
| + DraftEvent::Content(text) => { | |
| + content_streamed_flag | |
| + .store(true, std::sync::atomic::Ordering::Relaxed); | |
| + print!("{text}"); | |
| + let _ = std::io::stdout().flush(); | |
| + } | |
| + } | |
| + } | |
| + }); | |
| - observer.record_event(&ObserverEvent::AgentStart { | |
| - provider: provider_name.to_string(), | |
| - model: model_name.to_string(), | |
| - }); | |
| + let cancel_token = CancellationToken::new(); | |
| + let cancel_token_clone = cancel_token.clone(); | |
| + let ctrlc_handle = tokio::spawn(async move { | |
| + if tokio::signal::ctrl_c().await.is_ok() { | |
| + cancel_token_clone.cancel(); | |
| + } | |
| + }); | |
| - continue; | |
| - } | |
| - // Context overflow recovery: compress and retry | |
| - if crate::providers::reliable::is_context_window_exceeded(&e) { | |
| - tracing::warn!( | |
| - "Context overflow in interactive loop, attempting recovery" | |
| - ); | |
| - let mut compressor = | |
| - crate::agent::context_compressor::ContextCompressor::new( | |
| - config.agent.context_compression.clone(), | |
| - config.agent.max_context_tokens, | |
| - ) | |
| - .with_memory(mem.clone()); | |
| - let error_msg = format!("{e}"); | |
| - match compressor | |
| - .compress_on_error( | |
| - &mut history, | |
| - provider.as_ref(), | |
| - &model_name, | |
| - &error_msg, | |
| - ) | |
| - .await | |
| - { | |
| - Ok(true) => { | |
| - tracing::info!( | |
| - "Context recovered via compression, retrying turn" | |
| - ); | |
| - continue; | |
| - } | |
| - Ok(false) => { | |
| - tracing::warn!("Compression ran but couldn't reduce enough"); | |
| - } | |
| - Err(compress_err) => { | |
| - tracing::warn!( | |
| - error = %compress_err, | |
| - "Compression failed during recovery" | |
| - ); | |
| + let response = loop { | |
| + match TOOL_LOOP_COST_TRACKING_CONTEXT | |
| + .scope( | |
| + cost_tracking_context.clone(), | |
| + run_tool_call_loop( | |
| + provider.as_ref(), | |
| + &mut history, | |
| + &tools_registry, | |
| + observer.as_ref(), | |
| + &provider_name, | |
| + &model_name, | |
| + turn_temperature, | |
| + true, | |
| + approval_manager.as_ref(), | |
| + channel_name, | |
| + None, | |
| + &config.multimodal, | |
| + config.agent.max_tool_iterations, | |
| + Some(cancel_token.clone()), | |
| + Some(delta_tx.clone()), | |
| + None, | |
| + &excluded_tools, | |
| + &config.agent.tool_call_dedup_exempt, | |
| + activated_handle.as_ref(), | |
| + Some(model_switch_callback.clone()), | |
| + &config.pacing, | |
| + config.agent.max_tool_result_chars, | |
| + config.agent.max_context_tokens, | |
| + None, | |
| + ), | |
| + ) | |
| + .await | |
| + { | |
| + Ok(resp) => break resp, | |
| + Err(e) => { | |
| + if is_tool_loop_cancelled(&e) { | |
| + eprintln!("\n\x1b[2m(cancelled)\x1b[0m"); | |
| + break String::new(); | |
| + } | |
| + if let Some((new_provider, new_model)) = is_model_switch_requested(&e) { | |
| + tracing::info!( | |
| + "Model switch requested, switching from {} {} to {} {}", | |
| + provider_name, | |
| + model_name, | |
| + new_provider, | |
| + new_model | |
| + ); | |
| + provider = providers::create_routed_provider_with_options( | |
| + &new_provider, | |
| + config.api_key.as_deref(), | |
| + config.api_url.as_deref(), | |
| + &config.reliability, | |
| + &config.model_routes, | |
| + &new_model, | |
| + &provider_runtime_options, | |
| + )?; | |
| + provider_name = new_provider; | |
| + model_name = new_model; | |
| + clear_model_switch_request(); | |
| + observer.record_event(&ObserverEvent::AgentStart { | |
| + provider: provider_name.to_string(), | |
| + model: model_name.to_string(), | |
| + }); | |
| + continue; | |
| + } | |
| + if crate::providers::reliable::is_context_window_exceeded(&e) { | |
| + tracing::warn!( | |
| + "Context overflow in interactive loop, attempting recovery" | |
| + ); | |
| + let mut compressor = | |
| + crate::agent::context_compressor::ContextCompressor::new( | |
| + config.agent.context_compression.clone(), | |
| + config.agent.max_context_tokens, | |
| + ) | |
| + .with_memory(mem.clone()); | |
| + let error_msg = format!("{e}"); | |
| + match compressor | |
| + .compress_on_error( | |
| + &mut history, | |
| + provider.as_ref(), | |
| + &model_name, | |
| + &error_msg, | |
| + ) | |
| + .await | |
| + { | |
| + Ok(true) => { | |
| + tracing::info!( | |
| + "Context recovered via compression, retrying turn" | |
| + ); | |
| + continue; | |
| + } | |
| + Ok(false) => { | |
| + tracing::warn!( | |
| + "Compression ran but couldn't reduce enough" | |
| + ); | |
| + } | |
| + Err(compress_err) => { | |
| + tracing::warn!(error = %compress_err, "Compression failed during recovery"); | |
| + } | |
| } | |
| } | |
| + eprintln!("\nError: {e}\n"); | |
| + break String::new(); | |
| } | |
| - | |
| - eprintln!("\nError: {e}\n"); | |
| - break String::new(); | |
| } | |
| - } | |
| - }; | |
| + }; | |
| - // Clean up: stop the Ctrl+C listener and flush streaming events. | |
| - ctrlc_handle.abort(); | |
| - drop(delta_tx); | |
| - let _ = consumer_handle.await; | |
| - | |
| - final_output = response.clone(); | |
| - if content_was_streamed.load(std::sync::atomic::Ordering::Relaxed) { | |
| - println!(); | |
| - } else if let Err(e) = crate::channels::Channel::send( | |
| - &cli, | |
| - &crate::channels::traits::SendMessage::new(format!("\n{response}\n"), "user"), | |
| - ) | |
| - .await | |
| - { | |
| - eprintln!("\nError sending CLI response: {e}\n"); | |
| - } | |
| - observer.record_event(&ObserverEvent::TurnComplete); | |
| + ctrlc_handle.abort(); | |
| + drop(delta_tx); | |
| + let _ = consumer_handle.await; | |
| - // Context compression before hard trimming to preserve long-context signal. | |
| - { | |
| - let compressor = crate::agent::context_compressor::ContextCompressor::new( | |
| - config.agent.context_compression.clone(), | |
| - config.agent.max_context_tokens, | |
| + final_output = response.clone(); | |
| + if content_was_streamed.load(std::sync::atomic::Ordering::Relaxed) { | |
| + println!(); | |
| + } else if let Err(e) = crate::channels::Channel::send( | |
| + &cli, | |
| + &crate::channels::traits::SendMessage::new(format!("\n{response}\n"), "user"), | |
| ) | |
| - .with_memory(mem.clone()); | |
| - match compressor | |
| - .compress_if_needed(&mut history, provider.as_ref(), &model_name) | |
| - .await | |
| + .await | |
| { | |
| - Ok(result) if result.compressed => { | |
| - tracing::info!( | |
| - passes = result.passes_used, | |
| - before = result.tokens_before, | |
| - after = result.tokens_after, | |
| - "Context compression complete" | |
| - ); | |
| - } | |
| - Ok(_) => {} // No compression needed | |
| - Err(e) => { | |
| - tracing::warn!( | |
| - error = %e, | |
| - "Context compression failed, falling back to history trim" | |
| - ); | |
| - trim_history(&mut history, config.agent.max_history_messages / 2); | |
| - } | |
| + eprintln!("\nError sending CLI response: {e}\n"); | |
| } | |
| - } | |
| + observer.record_event(&ObserverEvent::TurnComplete); | |
| - // Hard cap as a safety net. | |
| - trim_history(&mut history, config.agent.max_history_messages); | |
| - | |
| - // Restore base system prompt (remove per-turn thinking prefix). | |
| - if thinking_params.system_prompt_prefix.is_some() { | |
| - if let Some(sys_msg) = history.first_mut() { | |
| - if sys_msg.role == "system" { | |
| - sys_msg.content.clone_from(&base_system_prompt); | |
| + { | |
| + let compressor = crate::agent::context_compressor::ContextCompressor::new( | |
| + config.agent.context_compression.clone(), | |
| + config.agent.max_context_tokens, | |
| + ) | |
| + .with_memory(mem.clone()); | |
| + match compressor | |
| + .compress_if_needed(&mut history, provider.as_ref(), &model_name) | |
| + .await | |
| + { | |
| + Ok(result) if result.compressed => { | |
| + tracing::info!( | |
| + passes = result.passes_used, | |
| + before = result.tokens_before, | |
| + after = result.tokens_after, | |
| + "Context compression complete" | |
| + ); | |
| + } | |
| + Ok(_) => {} | |
| + Err(e) => { | |
| + tracing::warn!(error = %e, "Context compression failed, falling back to history trim"); | |
| + trim_history(&mut history, config.agent.max_history_messages / 2); | |
| + } | |
| } | |
| } | |
| - } | |
| + trim_history(&mut history, config.agent.max_history_messages); | |
| - if let Some(path) = session_state_file.as_deref() { | |
| - save_interactive_session_history(path, &history)?; | |
| + if thinking_params.system_prompt_prefix.is_some() { | |
| + if let Some(sys_msg) = history.first_mut() { | |
| + if sys_msg.role == "system" { | |
| + sys_msg.content.clone_from(&base_system_prompt); | |
| + } | |
| + } | |
| + } | |
| } | |
| } | |
| } |
Author
Author
To use in both modes:
- editor-return:
To clickrlwrap --prompt-colour=Yellow -m zeroclaw agentCtrl-^to invoke external editor, likehelix.
To useCtrl-Rto search the input history, like inbash. - piped input:
echo -e "line1\nline2\nline3" | zeroclaw agent
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
To apply: