feat(m2): LLM chat via Ollama — streaming responses with migration context #3
1 changed files with 24 additions and 10 deletions
|
|
@ -57,27 +57,33 @@ pub async fn chat_stream(
|
||||||
anyhow::bail!("Ollama returned {status}: {text}");
|
anyhow::bail!("Ollama returned {status}: {text}");
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut buffer = String::new();
|
let mut buffer: Vec<u8> = Vec::new();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match response.chunk().await {
|
match response.chunk().await {
|
||||||
Ok(Some(bytes)) => {
|
Ok(Some(bytes)) => {
|
||||||
buffer.push_str(&String::from_utf8_lossy(&bytes));
|
buffer.extend_from_slice(&bytes);
|
||||||
while let Some(nl) = buffer.find('\n') {
|
while let Some(nl) = buffer.iter().position(|&b| b == b'\n') {
|
||||||
let line = buffer[..nl].trim().to_string();
|
let line_bytes = buffer[..nl].to_vec();
|
||||||
buffer.drain(..=nl);
|
buffer.drain(..=nl);
|
||||||
|
let line = String::from_utf8_lossy(&line_bytes);
|
||||||
|
let line = line.trim();
|
||||||
if line.is_empty() {
|
if line.is_empty() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
match serde_json::from_str::<OllamaChunk>(&line) {
|
match serde_json::from_str::<OllamaChunk>(line) {
|
||||||
Ok(chunk) if chunk.done => {
|
Ok(chunk) if chunk.done => {
|
||||||
let _ = app.emit("robin:chat-done", ());
|
if let Err(e) = app.emit("robin:chat-done", ()) {
|
||||||
|
log::warn!("llm: failed to emit chat-done: {e}");
|
||||||
|
}
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
Ok(chunk) => {
|
Ok(chunk) => {
|
||||||
if let Some(msg) = chunk.message {
|
if let Some(msg) = chunk.message {
|
||||||
if !msg.content.is_empty() {
|
if !msg.content.is_empty() {
|
||||||
let _ = app.emit("robin:chat-token", msg.content);
|
if let Err(e) = app.emit("robin:chat-token", msg.content) {
|
||||||
|
log::warn!("llm: failed to emit chat-token: {e}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -89,13 +95,15 @@ pub async fn chat_stream(
|
||||||
}
|
}
|
||||||
Ok(None) => break,
|
Ok(None) => break,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
return Err(anyhow::anyhow!("stream read error: {e}"));
|
return Err(e).context("stream read error");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stream ended without a done:true line
|
// Stream ended without a done:true line
|
||||||
let _ = app.emit("robin:chat-done", ());
|
if let Err(e) = app.emit("robin:chat-done", ()) {
|
||||||
|
log::warn!("llm: failed to emit chat-done: {e}");
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -133,10 +141,16 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn parse_empty_token_is_handled() {
|
fn parse_empty_content_chunk_deserializes() {
|
||||||
let json = r#"{"model":"llama3.2","created_at":"2024-01-01T00:00:00Z","message":{"role":"assistant","content":""},"done":false}"#;
|
let json = r#"{"model":"llama3.2","created_at":"2024-01-01T00:00:00Z","message":{"role":"assistant","content":""},"done":false}"#;
|
||||||
let chunk: OllamaChunk = serde_json::from_str(json).unwrap();
|
let chunk: OllamaChunk = serde_json::from_str(json).unwrap();
|
||||||
assert!(!chunk.done);
|
assert!(!chunk.done);
|
||||||
assert_eq!(chunk.message.unwrap().content, "");
|
assert_eq!(chunk.message.unwrap().content, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn malformed_json_fails_to_parse() {
|
||||||
|
let result = serde_json::from_str::<OllamaChunk>("not valid json");
|
||||||
|
assert!(result.is_err(), "malformed JSON must fail to parse");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue