Skip to content

Streaming Guide

Agentle4j provides real-time streaming using virtual threads for efficient, non-blocking I/O. This guide covers all streaming patterns.


Why Stream?

Benefit Description
Better UX Users see responses immediately
Lower Latency First token appears faster
Progress Visibility Users know the AI is working
Early Cancellation Stop long responses if not needed

Basic Streaming

Enable Streaming

Add .streaming() to your payload builder:

var payload = CreateResponsePayload.builder()
    .model("openai/gpt-4o")
    .addUserMessage("Write a short poem about Java programming")
    .streaming()  // Enable streaming mode
    .build();

responder.respond(payload)
    .onTextDelta(delta -> {
        System.out.print(delta);  // Print each chunk
        System.out.flush();       // Flush for immediate display
    })
    .onComplete(response -> {
        System.out.println("\n\n✅ Complete!");
        System.out.println("Total tokens: " + response.usage().totalTokens());
    })
    .onError(error -> {
        System.err.println("Error: " + error.getMessage());
    })
    .start();  // Don't forget to call start()!

Callback Reference

Callback When Called Parameter
.onTextDelta(String) Each text chunk Text delta
.onComplete(Response) Stream finished Full response
.onError(Throwable) Error occurred Exception

Streaming with Progress Tracking

import java.util.concurrent.atomic.AtomicInteger;

var payload = CreateResponsePayload.builder()
    .model("openai/gpt-4o")
    .addUserMessage("Explain microservices architecture in detail")
    .streaming()
    .build();

AtomicInteger charCount = new AtomicInteger(0);
long startTime = System.currentTimeMillis();

responder.respond(payload)
    .onTextDelta(delta -> {
        System.out.print(delta);
        System.out.flush();
        charCount.addAndGet(delta.length());
    })
    .onComplete(response -> {
        long elapsed = System.currentTimeMillis() - startTime;

        System.out.println("\n\n--- Stats ---");
        System.out.println("Characters: " + charCount.get());
        System.out.println("Tokens: " + response.usage().totalTokens());
        System.out.println("Time: " + elapsed + " ms");
        System.out.println("Speed: " + (charCount.get() * 1000 / elapsed) + " chars/sec");
    })
    .start();

Structured Streaming

Stream while generating structured JSON output:

// Define output schema
public record Article(
    String title,
    String content,
    List<String> tags,
    int readingTimeMinutes
) {}

// Build structured streaming payload
var payload = CreateResponsePayload.builder()
    .model("openai/gpt-4o")
    .addUserMessage("Write an article about AI in healthcare")
    .withStructuredOutput(Article.class)
    .streaming()  // Enable streaming
    .build();

responder.respond(payload)
    .onTextDelta(delta -> {
        // Watch JSON being generated character by character
        System.out.print(delta);
        System.out.flush();
    })
    .onParsedComplete(parsed -> {
        // Get the final typed object
        Article article = parsed.outputParsed();
        System.out.println("\n\n--- Parsed Result ---");
        System.out.println("Title: " + article.title());
        System.out.println("Tags: " + String.join(", ", article.tags()));
        System.out.println("Reading time: " + article.readingTimeMinutes() + " min");
    })
    .start();

onPartialJson - Zero-Class Approach

Access fields as they stream without defining extra classes:

responder.respond(structuredPayload)
    .onPartialJson(fields -> {
        // fields is a Map<String, Object>
        if (fields.containsKey("title")) {
            updateTitleUI(fields.get("title").toString());
        }
        if (fields.containsKey("content")) {
            updateContentUI(fields.get("content").toString());
        }
    })
    .start();

onPartialParsed - Typed Partial Updates

For type-safe partial updates, define a nullable mirror class:

// Nullable mirror class for partial parsing
record PartialArticle(
    @Nullable String title,
    @Nullable String content,
    @Nullable List<String> tags,
    @Nullable Integer readingTimeMinutes
) {}

responder.respond(structuredPayload)
    .onPartialParsed(PartialArticle.class, partial -> {
        if (partial.title() != null) {
            updateTitleUI(partial.title());
        }
        if (partial.content() != null) {
            appendContentUI(partial.content());
        }
    })
    .start();

Streaming with Tool Calls

Handle tool calls in real-time during streaming:

// Add tools to the payload
var payload = CreateResponsePayload.builder()
    .model("openai/gpt-4o")
    .addUserMessage("What's the weather in Tokyo and calculate 15% tip on $85.50?")
    .addTool(weatherTool)
    .addTool(calculatorTool)
    .streaming()
    .build();

responder.respond(payload)
    .onTextDelta(delta -> {
        System.out.print(delta);
        System.out.flush();
    })
    // Detect when tool call is identified
    .onToolCall((toolName, argsJson) -> {
        System.out.println("\n🔧 Tool called: " + toolName);
        System.out.println("   Arguments: " + argsJson);
    })
    // Auto-execute tools with tool store
    .withToolStore(toolStore)
    // Get tool results
    .onToolResult((toolName, result) -> {
        System.out.println("✅ " + toolName + " returned: " + result.output());
    })
    .onComplete(response -> {
        System.out.println("\n\n" + response.outputText());
    })
    .onError(e -> System.err.println("Error: " + e.getMessage()))
    .start();

Agent Streaming

Full agentic loop with streaming and all events:

agent.interactStream("Research and summarize AI trends, then email me the report")
    // Turn lifecycle
    .onTurnStart(turn -> {
        System.out.println("\n=== Turn " + turn + " ===");
    })
    .onTurnComplete(response -> {
        System.out.println("[Turn complete]");
    })

    // Text streaming
    .onTextDelta(delta -> {
        System.out.print(delta);
        System.out.flush();
    })

    // Tool execution events
    .onToolCall((name, args) -> {
        System.out.println("\n🔧 Calling: " + name);
    })
    .onToolExecuted(exec -> {
        System.out.println("✅ " + exec.toolName() + " done");
    })

    // Multi-agent events
    .onHandoff(handoff -> {
        System.out.println("→ Handing off to: " + handoff.targetAgent().name());
    })

    // Safety events
    .onGuardrailFailed(failed -> {
        System.err.println("⛔ Guardrail blocked: " + failed.reason());
    })

    // Completion
    .onComplete(result -> {
        System.out.println("\n\n✅ Finished!");
        System.out.println("Total turns: " + result.turnCount());
    })
    .onError(Throwable::printStackTrace)

    .start();

Agent Streaming Callbacks

Callback When Called
.onTurnStart(int) Each LLM call
.onTurnComplete(Response) Each LLM response
.onTextDelta(String) Text chunks
.onToolCall(name, args) Tool identified
.onToolExecuted(ToolExecution) Tool completed
.onHandoff(Handoff) Agent routing
.onGuardrailFailed(GuardrailFailure) Blocked by guardrail
.onComplete(AgentResult) All done
.onError(Throwable) Error occurred

Human-in-the-Loop Streaming

Add approval workflows for sensitive tool calls:

Synchronous Approval

agent.interactStream("Send an email to john@example.com")
    .onToolCallPending((toolCall, approve) -> {
        // Show what the agent wants to do
        System.out.println("🔧 Agent wants to execute: " + toolCall.name());
        System.out.println("   Arguments: " + toolCall.arguments());

        // Ask for approval
        System.out.print("Approve? (y/n): ");
        boolean approved = new Scanner(System.in).nextLine().equalsIgnoreCase("y");

        // Accept or reject
        approve.accept(approved);
    })
    .onToolExecuted(exec -> {
        System.out.println("✅ " + exec.toolName() + " completed");
    })
    .start();

Async Pause/Resume

For long approval processes (e.g., manager approval):

// Start with pause capability
agent.interactStream("Delete all customer records")
    .onPause(state -> {
        // Save state to database
        String stateJson = objectMapper.writeValueAsString(state);
        database.save("pending_approval", stateJson);

        // Notify approver
        slackClient.sendMessage(
            "#approvals",
            "AI wants to: " + state.pendingToolCall().name()
        );
    })
    .start();

// Later, when approval received...
@PostMapping("/approve/{id}")
public void handleApproval(@PathVariable String id, @RequestBody boolean approved) {
    // Load saved state
    AgentRunState state = loadState(id);

    if (approved) {
        state.approveToolCall();
    } else {
        state.rejectToolCall("Manager denied");
    }

    // Resume agent
    AgentResult result = agent.resume(state).join();

    // Notify user
    sendResultToUser(result.output());
}

Streaming to UI (Web/JavaFX)

Server-Sent Events (SSE)

@GetMapping(value = "/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamChat(@RequestParam String message) {
    return Flux.create(sink -> {
        var payload = CreateResponsePayload.builder()
            .model("openai/gpt-4o")
            .addUserMessage(message)
            .streaming()
            .build();

        responder.respond(payload)
            .onTextDelta(delta -> {
                sink.next(delta);
            })
            .onComplete(response -> {
                sink.complete();
            })
            .onError(sink::error)
            .start();
    });
}

JavaFX

var payload = CreateResponsePayload.builder()
    .model("openai/gpt-4o")
    .addUserMessage(userInput)
    .streaming()
    .build();

responder.respond(payload)
    .onTextDelta(delta -> {
        // Update UI on JavaFX Application Thread
        Platform.runLater(() -> {
            textArea.appendText(delta);
        });
    })
    .onComplete(response -> {
        Platform.runLater(() -> {
            statusLabel.setText("Complete!");
        });
    })
    .start();

Best Practices

✅ Do

// Always flush when printing
.onTextDelta(delta -> {
    System.out.print(delta);
    System.out.flush();  // Important!
})

// Always provide error handler
.onError(error -> {
    logger.error("Streaming error", error);
    showErrorToUser(error.getMessage());
})

// Don't forget to call start()
responder.respond(payload)
    .onTextDelta(...)
    .start();  // Required!

❌ Don't

// Don't do heavy processing in callbacks
.onTextDelta(delta -> {
    saveToDatabase(delta);  // Too slow!
    callExternalAPI(delta);  // Blocks streaming!
})

// Don't forget error handling
responder.respond(payload)
    .onTextDelta(System.out::print)
    // Missing .onError()!
    .start();

Next Steps