Spring Boot Integration¶
This docs was updated at: 2026-02-23
This guide shows how to integrate Agentle4j into a Spring Boot application with best practices for production.
Overview¶
Agentle4j integrates seamlessly with Spring Boot:
- Responder as a Bean - Thread-safe, reusable HTTP client
- Agents as Services - AI-powered business logic
- REST Controllers - Expose AI capabilities via API
- SSE Streaming - Real-time responses to clients
- WebSocket - Bidirectional chat interfaces
Configuration¶
Application Properties¶
# application.yml
agentle4j:
api-key: ${OPENROUTER_API_KEY}
model: openai/gpt-4o
temperature: 0.7
max-retries: 3
Configuration Class¶
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties(prefix = "agentle4j")
public class Agentle4jProperties {
private String apiKey;
private String model = "openai/gpt-4o";
private double temperature = 0.7;
private int maxRetries = 3;
// Getters and setters
public String getApiKey() { return apiKey; }
public void setApiKey(String apiKey) { this.apiKey = apiKey; }
public String getModel() { return model; }
public void setModel(String model) { this.model = model; }
public double getTemperature() { return temperature; }
public void setTemperature(double temperature) { this.temperature = temperature; }
public int getMaxRetries() { return maxRetries; }
public void setMaxRetries(int maxRetries) { this.maxRetries = maxRetries; }
}
Dependency Injection¶
Responder as a Singleton Bean¶
import com.paragon.responses.Responder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class Agentle4jConfig {
@Bean
public Responder responder(Agentle4jProperties props) {
return Responder.builder()
.openRouter()
.apiKey(props.getApiKey())
.maxRetries(props.getMaxRetries())
.build();
}
}
Thread Safety
Responder is thread-safe and reusable. Create it once as a singleton bean.
Agent as a Service¶
import com.paragon.agents.Agent;
import org.springframework.stereotype.Service;
@Service
public class CustomerSupportAgent {
private final Agent agent;
private final Agentle4jProperties props;
public CustomerSupportAgent(Responder responder, Agentle4jProperties props) {
this.props = props;
this.agent = Agent.builder()
.name("CustomerSupport")
.model(props.getModel())
.instructions("""
You are a helpful customer support assistant.
Be concise, friendly, and professional.
""")
.responder(responder)
.build();
}
public String chat(String userMessage) {
return agent.interact(userMessage).output();
}
}
Managing Multiple Agents¶
As your application grows, you'll likely need multiple agents with different purposes. Instead of creating a separate @Service wrapper for each agent, you can use Spring's dependency injection patterns to manage agents more elegantly.
Prompt Management with PromptProvider¶
Before defining agents, consider externalizing your prompts. Agentle4j provides the PromptProvider interface with built-in implementations:
Filesystem-Based Prompts¶
Store prompts as text files and load them at runtime:
import com.paragon.prompts.PromptProvider;
import com.paragon.prompts.FilesystemPromptProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class PromptConfig {
@Bean
public PromptProvider filePromptProvider() {
return FilesystemPromptProvider.create("./prompts");
}
}
Create prompt files in the prompts/ directory:
# prompts/question-generator.txt
You are an expert question creator. Generate high-quality questions based on the given content.
Requirements:
- Questions should be clear and unambiguous
- Include a mix of difficulty levels
- Cover key concepts from the source material
Langfuse-Managed Prompts¶
For production environments with prompt versioning and A/B testing:
import com.paragon.prompts.LangfusePromptProvider;
import okhttp3.OkHttpClient;
@Configuration
public class PromptConfig {
@Bean
public OkHttpClient okHttpClient() {
return new OkHttpClient.Builder()
.connectTimeout(Duration.ofSeconds(10))
.readTimeout(Duration.ofSeconds(30))
.build();
}
@Bean
public PromptProvider langfusePromptProvider(OkHttpClient httpClient) {
return LangfusePromptProvider.builder()
.httpClient(httpClient)
.publicKey(System.getenv("LANGFUSE_PUBLIC_KEY"))
.secretKey(System.getenv("LANGFUSE_SECRET_KEY"))
.build();
}
}
Retrieve prompts with version or label filters:
// Get latest version
Prompt prompt = provider.providePrompt("question-generator");
// Get specific version
Prompt v2 = provider.providePrompt("question-generator", Map.of("version", "2"));
// Get production label
Prompt prod = provider.providePrompt("question-generator", Map.of("label", "production"));
Template Compilation¶
Prompts support Handlebars-like templating:
Prompt template = Prompt.of("""
You are a {{role}} assistant for {{company}}.
{{#if includeDisclaimer}}
Always remind users this is AI-generated content.
{{/if}}
Topics you handle:
{{#each topics}}
- {{this}}
{{/each}}
""");
Prompt compiled = template.compile(Map.of(
"role", "customer support",
"company", "Acme Corp",
"includeDisclaimer", true,
"topics", List.of("billing", "technical issues", "returns")
));
Direct Agent Injection with @Qualifier¶
For a known, finite set of agents, define each as a Spring bean:
import com.paragon.agents.Agent;
import com.paragon.prompts.Prompt;
import com.paragon.prompts.PromptProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AgentConfig {
@Bean("questionAgent")
public Agent questionAgent(Responder responder,
PromptProvider prompts,
Agentle4jProperties props) {
return Agent.builder()
.name("QuestionGenerator")
.model(props.getModel())
.instructions(prompts.providePrompt("question-generator.txt"))
.responder(responder)
.build();
}
@Bean("summarizationAgent")
public Agent summarizationAgent(Responder responder,
PromptProvider prompts,
Agentle4jProperties props) {
return Agent.builder()
.name("Summarizer")
.model(props.getModel())
.instructions(prompts.providePrompt("summarizer.txt"))
.responder(responder)
.build();
}
@Bean("reviewerAgent")
public Agent reviewerAgent(Responder responder,
PromptProvider prompts,
Agentle4jProperties props) {
return Agent.builder()
.name("Reviewer")
.model(props.getModel())
.instructions(prompts.providePrompt("reviewer.txt"))
.responder(responder)
.build();
}
}
Now inject agents directly into your use case classes:
import com.paragon.agents.Agent;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
@Service
public class CriarQuestaoUseCase {
private final Agent questionAgent;
public CriarQuestaoUseCase(@Qualifier("questionAgent") Agent questionAgent) {
this.questionAgent = questionAgent;
}
public Questao criarQuestao(CriarQuestaoRequest input) {
var result = questionAgent.interact(input.content());
return parseQuestaoFromResult(result);
}
}
@Service
public class SummarizeDocumentUseCase {
private final Agent summarizationAgent;
public SummarizeDocumentUseCase(@Qualifier("summarizationAgent") Agent agent) {
this.summarizationAgent = agent;
}
public Summary summarize(Document doc) {
AgentResult result = summarizationAgent.interact(doc.content());
return new Summary(result.output());
}
}
Thread Safety
Agents are stateless and thread-safe. The same agent instance can handle concurrent requests because conversation state lives in AgentContext, not the agent itself.
Agent Factory Pattern¶
For dynamic agent creation or when agents are determined at runtime:
import com.paragon.agents.Agent;
import com.paragon.prompts.PromptProvider;
import org.springframework.stereotype.Component;
@Component
public class AgentFactory {
private final Responder responder;
private final PromptProvider prompts;
private final Agentle4jProperties props;
public AgentFactory(Responder responder,
PromptProvider prompts,
Agentle4jProperties props) {
this.responder = responder;
this.prompts = prompts;
this.props = props;
}
/**
* Creates an agent by loading its prompt from the filesystem.
*
* @param agentName name used for both the agent and prompt file lookup
* @return configured Agent instance
*/
public Agent create(String agentName) {
return Agent.builder()
.name(agentName)
.model(props.getModel())
.instructions(prompts.providePrompt(agentName + ".txt"))
.responder(responder)
.build();
}
/**
* Creates an agent with custom configuration.
*/
public Agent create(String agentName, String model, Prompt instructions) {
return Agent.builder()
.name(agentName)
.model(model)
.instructions(instructions)
.responder(responder)
.build();
}
}
Usage:
@Service
public class DynamicAgentUseCase {
private final AgentFactory agentFactory;
public DynamicAgentUseCase(AgentFactory agentFactory) {
this.agentFactory = agentFactory;
}
public String processWithAgent(String agentType, String input) {
Agent agent = agentFactory.create(agentType);
return agent.interact(input).output();
}
}
Agent Registry Pattern¶
For centralized management with caching (avoids recreating agents):
import com.paragon.agents.Agent;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class AgentRegistry {
private final ConcurrentHashMap<String, Agent> agents = new ConcurrentHashMap<>();
private final AgentFactory factory;
public AgentRegistry(AgentFactory factory) {
this.factory = factory;
}
/**
* Gets or creates an agent by name.
* Agents are cached after first creation.
*/
public Agent get(String name) {
return agents.computeIfAbsent(name, factory::create);
}
/**
* Checks if an agent is registered.
*/
public boolean has(String name) {
return agents.containsKey(name);
}
/**
* Registers a pre-configured agent.
*/
public void register(String name, Agent agent) {
agents.put(name, agent);
}
/**
* Returns all registered agent names.
*/
public Set<String> registeredAgents() {
return Set.copyOf(agents.keySet());
}
}
Usage in use cases:
@Service
public class FlexibleProcessingUseCase {
private final AgentRegistry agentRegistry;
public FlexibleProcessingUseCase(AgentRegistry agentRegistry) {
this.agentRegistry = agentRegistry;
}
public ProcessingResult process(String agentName, String input) {
Agent agent = agentRegistry.get(agentName);
AgentResult result = agent.interact(input);
return new ProcessingResult(result.output(), result.turnsUsed());
}
}
RouterAgent Injection¶
For classification and routing use cases:
import com.paragon.agents.Agent;
import com.paragon.agents.RouterAgent;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RouterConfig {
@Bean
public RouterAgent supportRouter(
Responder responder,
Agentle4jProperties props,
@Qualifier("billingAgent") Agent billing,
@Qualifier("techSupportAgent") Agent techSupport,
@Qualifier("salesAgent") Agent sales) {
return RouterAgent.builder()
.model(props.getModel())
.responder(responder)
.addRoute(billing, "billing, invoices, payments, refunds")
.addRoute(techSupport, "technical issues, bugs, errors, crashes")
.addRoute(sales, "pricing, demos, upgrades, new features")
.fallback(techSupport)
.build();
}
}
Use the router to classify and dispatch:
@Service
public class CustomerInquiryUseCase {
private final RouterAgent router;
public CustomerInquiryUseCase(RouterAgent router) {
this.router = router;
}
public InquiryResponse handle(String customerMessage) {
// Classify the message (returns Optional<Agent>)
Agent selectedAgent = router.classify(customerMessage)
.orElseThrow(() -> new IllegalStateException("No agent found for message"));
// Process with the appropriate agent
AgentResult result = selectedAgent.interact(customerMessage);
return new InquiryResponse(
selectedAgent.name(),
result.output()
);
}
}
Lazy Agent Initialization¶
For agents that are rarely used, use @Lazy to defer initialization:
@Configuration
public class AgentConfig {
@Bean("expensiveAgent")
@Lazy
public Agent expensiveAgent(Responder responder, PromptProvider prompts) {
// Only created when first injected/used
return Agent.builder()
.name("ExpensiveAnalyzer")
.model("openai/gpt-4-turbo") // More expensive model
.instructions(prompts.providePrompt("expensive-analyzer.txt"))
.responder(responder)
.build();
}
}
Or use ObjectProvider for optional/conditional injection:
import org.springframework.beans.factory.ObjectProvider;
@Service
public class ConditionalUseCase {
private final ObjectProvider<Agent> expensiveAgentProvider;
public ConditionalUseCase(
@Qualifier("expensiveAgent") ObjectProvider<Agent> expensiveAgentProvider) {
this.expensiveAgentProvider = expensiveAgentProvider;
}
public String process(String input, boolean useExpensiveModel) {
if (useExpensiveModel) {
Agent agent = expensiveAgentProvider.getIfAvailable();
if (agent != null) {
return agent.interact(input).output();
}
}
// Fallback to cheaper processing
return processWithCheaperMethod(input);
}
}
Best Practices for Organizing Agents¶
When your application has 10+ agents, organization becomes critical:
Package Structure¶
com.yourapp/
├── agents/
│ ├── config/
│ │ ├── AgentConfig.java # Bean definitions
│ │ ├── RouterConfig.java # Router definitions
│ │ └── PromptConfig.java # PromptProvider setup
│ ├── factory/
│ │ ├── AgentFactory.java
│ │ └── AgentRegistry.java
│ └── tools/
│ ├── WeatherTool.java
│ └── SearchTool.java
├── prompts/ # If using filesystem prompts
│ ├── question-generator.txt
│ ├── summarizer.txt
│ └── reviewer.txt
└── usecases/
├── CriarQuestaoUseCase.java
└── SummarizeDocumentUseCase.java
Naming Conventions¶
| Item | Convention | Example |
|---|---|---|
| Agent bean | {purpose}Agent |
questionAgent, summarizationAgent |
| Prompt file | {purpose}.txt or {purpose}-{version}.txt |
question-generator.txt, summarizer-v2.txt |
| Router bean | {domain}Router |
supportRouter, contentRouter |
| Use case | {Action}{Entity}UseCase |
CriarQuestaoUseCase, SummarizeDocumentUseCase |
Configuration via YAML¶
Define agent metadata in application.yml:
agentle4j:
api-key: ${OPENROUTER_API_KEY}
default-model: openai/gpt-4o
agents:
question-generator:
prompt-file: question-generator.txt
model: openai/gpt-4o
temperature: 0.7
summarizer:
prompt-file: summarizer.txt
model: openai/gpt-4o-mini
temperature: 0.3
reviewer:
prompt-file: reviewer.txt
model: openai/gpt-4o
temperature: 0.5
And create a configuration properties class:
@Configuration
@ConfigurationProperties(prefix = "agentle4j")
public class AgentConfigProperties {
private String defaultModel = "openai/gpt-4o";
private Map<String, AgentDefinition> agents = new HashMap<>();
// Getters and setters
public static class AgentDefinition {
private String promptFile;
private String model;
private double temperature = 0.7;
// Getters and setters
}
}
Key Reminders¶
Thread Safety
Agents are stateless and thread-safe. Create them once and reuse. State belongs in AgentContext.
Prompt Versioning
Use Langfuse labels (production, staging) or git-versioned prompt files for safe deployments.
Memory Usage
Each Agent instance is lightweight (~1KB). Having 50+ agent beans is not a memory concern.
REST API Endpoints¶
Simple Chat Controller¶
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/chat")
public class ChatController {
private final CustomerSupportAgent agent;
public ChatController(CustomerSupportAgent agent) {
this.agent = agent;
}
// Synchronous endpoint (efficient with Virtual Threads)
@PostMapping
public ChatResponse chat(@RequestBody ChatRequest request) {
String response = agent.chat(request.message());
return new ChatResponse(response);
}
public record ChatRequest(String message) {}
public record ChatResponse(String response) {}
}
Structured Output Endpoint¶
import com.paragon.responses.Responder;
import com.paragon.responses.spec.CreateResponsePayload;
@RestController
@RequestMapping("/api/analyze")
public class AnalysisController {
private final Responder responder;
private final Agentle4jProperties props;
public AnalysisController(Responder responder, Agentle4jProperties props) {
this.responder = responder;
this.props = props;
}
// Extract structured data
@PostMapping("/sentiment")
public SentimentResult analyzeSentiment(@RequestBody TextInput input) {
var payload = CreateResponsePayload.builder()
.model(props.getModel())
.addUserMessage("Analyze sentiment: " + input.text())
.withStructuredOutput(SentimentResult.class)
.build();
return responder.respond(payload).outputParsed();
}
public record TextInput(String text) {}
public record SentimentResult(
String sentiment,
double confidence,
List<String> keywords
) {}
}
Server-Sent Events (SSE) Streaming¶
Stream AI responses in real-time to the client:
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@RestController
@RequestMapping("/api/stream")
public class StreamingController {
private final Responder responder;
private final Agentle4jProperties props;
public StreamingController(Responder responder, Agentle4jProperties props) {
this.responder = responder;
this.props = props;
}
@GetMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamChat(@RequestParam String message) {
SseEmitter emitter = new SseEmitter(60_000L); // 60s timeout
var payload = CreateResponsePayload.builder()
.model(props.getModel())
.addUserMessage(message)
.streaming()
.build();
responder.respond(payload)
.onTextDelta(delta -> {
try {
emitter.send(SseEmitter.event()
.name("delta")
.data(delta));
} catch (Exception e) {
emitter.completeWithError(e);
}
})
.onComplete(response -> {
try {
emitter.send(SseEmitter.event()
.name("complete")
.data(Map.of("tokens", response.usage().totalTokens())));
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
})
.onError(emitter::completeWithError)
.start();
return emitter;
}
}
Client-Side JavaScript¶
const eventSource = new EventSource('/api/stream/chat?message=Hello');
eventSource.addEventListener('delta', (e) => {
document.getElementById('output').textContent += e.data;
});
eventSource.addEventListener('complete', (e) => {
const data = JSON.parse(e.data);
console.log('Tokens used:', data.tokens);
eventSource.close();
});
eventSource.onerror = () => eventSource.close();
Streaming Structured Outputs (Partial JSON)¶
Stream structured data with partial JSON updates as fields are generated:
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@RestController
@RequestMapping("/api/stream")
public class StructuredStreamingController {
private final Responder responder;
private final Agentle4jProperties props;
public StructuredStreamingController(Responder responder, Agentle4jProperties props) {
this.responder = responder;
this.props = props;
}
// Define your structured output type
public record Article(
String title,
String summary,
List<String> keyPoints,
String author
) {}
@GetMapping(value = "/article", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamStructuredArticle(@RequestParam String topic) {
SseEmitter emitter = new SseEmitter(120_000L); // 2 min timeout for longer content
var payload = CreateResponsePayload.builder()
.model(props.getModel())
.addUserMessage("Write an article about: " + topic)
.withStructuredOutput(Article.class)
.streaming()
.build();
responder.respond(payload)
// Stream partial JSON as fields are generated
.onPartialJson(fields -> {
try {
emitter.send(SseEmitter.event()
.name("partial")
.data(fields)); // Send Map<String, Object> with current fields
} catch (Exception e) {
emitter.completeWithError(e);
}
})
// Get the final typed object
.onParsedComplete(response -> {
try {
Article article = response.outputParsed();
emitter.send(SseEmitter.event()
.name("complete")
.data(article));
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
})
.onError(emitter::completeWithError)
.start();
return emitter;
}
}
How Partial JSON Works
The parser auto-completes incomplete JSON. As the LLM generates text, you'll receive progressive updates:
- First:
{title: "AI in Healthcare"} - Then:
{title: "AI in Healthcare", summary: "Artificial intel..."} - Finally: Complete object with all fields
Client-Side JavaScript for Structured Streaming¶
const eventSource = new EventSource('/api/stream/article?topic=AI%20in%20Healthcare');
eventSource.addEventListener('partial', (e) => {
const fields = JSON.parse(e.data);
// Update UI fields as they arrive
if (fields.title) {
document.getElementById('title').textContent = fields.title;
}
if (fields.summary) {
document.getElementById('summary').textContent = fields.summary;
}
if (fields.keyPoints) {
const list = document.getElementById('keyPoints');
list.innerHTML = fields.keyPoints.map(p => `<li>${p}</li>`).join('');
}
});
eventSource.addEventListener('complete', (e) => {
const article = JSON.parse(e.data);
console.log('Final article:', article);
eventSource.close();
});
eventSource.onerror = () => {
console.error('Stream error');
eventSource.close();
};
Type-Safe Partial Updates¶
For more robust type safety, use onPartialParsed with a nullable mirror class:
// Nullable mirror class for partial parsing
public record PartialArticle(
@Nullable String title,
@Nullable String summary,
@Nullable List<String> keyPoints,
@Nullable String author
) {}
@GetMapping(value = "/article/typed", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamTypedArticle(@RequestParam String topic) {
SseEmitter emitter = new SseEmitter(120_000L);
var payload = CreateResponsePayload.builder()
.model(props.getModel())
.addUserMessage("Write an article about: " + topic)
.withStructuredOutput(Article.class)
.streaming()
.build();
responder.respond(payload)
.onPartialParsed(PartialArticle.class, partial -> {
try {
// Type-safe access to partial fields
emitter.send(SseEmitter.event()
.name("partial")
.data(Map.of(
"title", partial.title() != null ? partial.title() : "",
"summary", partial.summary() != null ? partial.summary() : "",
"keyPoints", partial.keyPoints() != null ? partial.keyPoints() : List.of(),
"author", partial.author() != null ? partial.author() : ""
)));
} catch (Exception e) {
emitter.completeWithError(e);
}
})
.onParsedComplete(response -> {
try {
emitter.send(SseEmitter.event()
.name("complete")
.data(response.outputParsed()));
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
})
.onError(emitter::completeWithError)
.start();
return emitter;
}
Streaming with Tool Calls¶
Stream AI responses that include function/tool calls in real-time:
import com.paragon.responses.spec.FunctionTool;
import com.paragon.responses.spec.FunctionToolStore;
import com.paragon.responses.annotations.FunctionMetadata;
@RestController
@RequestMapping("/api/stream")
public class ToolStreamingController {
private final Responder responder;
private final FunctionToolStore toolStore;
private final Agentle4jProperties props;
public ToolStreamingController(
Responder responder,
FunctionToolStore toolStore,
Agentle4jProperties props) {
this.responder = responder;
this.toolStore = toolStore;
this.props = props;
}
@GetMapping(value = "/assistant", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamWithTools(@RequestParam String message) {
SseEmitter emitter = new SseEmitter(120_000L);
var payload = CreateResponsePayload.builder()
.model(props.getModel())
.addDeveloperMessage("You are a helpful assistant with access to tools.")
.addUserMessage(message)
.addTool(toolStore.get("get_weather"))
.addTool(toolStore.get("search_products"))
.addTool(toolStore.get("get_order_status"))
.streaming()
.build();
responder.respond(payload)
// Stream text as it's generated
.onTextDelta(delta -> {
try {
emitter.send(SseEmitter.event()
.name("text")
.data(delta));
} catch (Exception e) {
emitter.completeWithError(e);
}
})
// Notify when a tool is called
.onToolCall((toolName, argsJson) -> {
try {
emitter.send(SseEmitter.event()
.name("tool_call")
.data(Map.of(
"tool", toolName,
"arguments", argsJson,
"status", "executing"
)));
} catch (Exception e) {
emitter.completeWithError(e);
}
})
// Auto-execute tools and get results
.withToolStore(toolStore)
.onToolResult((toolName, result) -> {
try {
emitter.send(SseEmitter.event()
.name("tool_result")
.data(Map.of(
"tool", toolName,
"output", result.output(),
"status", "completed"
)));
} catch (Exception e) {
emitter.completeWithError(e);
}
})
.onComplete(response -> {
try {
emitter.send(SseEmitter.event()
.name("complete")
.data(Map.of(
"finalText", response.outputText(),
"tokens", response.usage().totalTokens()
)));
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
})
.onError(emitter::completeWithError)
.start();
return emitter;
}
}
Client-Side: Tool Call Progress UI¶
const eventSource = new EventSource('/api/stream/assistant?message=What is the weather in Tokyo and find me running shoes');
eventSource.addEventListener('text', (e) => {
document.getElementById('response').textContent += e.data;
});
eventSource.addEventListener('tool_call', (e) => {
const data = JSON.parse(e.data);
addToolStatus(`🔧 Calling ${data.tool}...`, 'pending');
});
eventSource.addEventListener('tool_result', (e) => {
const data = JSON.parse(e.data);
updateToolStatus(data.tool, `✅ ${data.tool}: ${data.output}`, 'success');
});
eventSource.addEventListener('complete', (e) => {
const data = JSON.parse(e.data);
console.log('Final response:', data.finalText);
console.log('Tokens used:', data.tokens);
eventSource.close();
});
eventSource.onerror = () => {
console.error('Stream error');
eventSource.close();
};
Agent Streaming (Multi-Turn with Tools)¶
Stream full agent interactions with tool execution, handoffs, and guardrails:
import com.paragon.agents.Agent;
import com.paragon.agents.AgentResult;
@RestController
@RequestMapping("/api/agent")
public class AgentStreamingController {
private final Agent customerServiceAgent;
public AgentStreamingController(Agent customerServiceAgent) {
this.customerServiceAgent = customerServiceAgent;
}
@GetMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamAgentChat(
@RequestParam String message,
@RequestParam(required = false) String sessionId) {
SseEmitter emitter = new SseEmitter(180_000L); // 3 min for complex interactions
customerServiceAgent.interactStream(message)
// Track multi-turn progress
.onTurnStart(turn -> {
try {
emitter.send(SseEmitter.event()
.name("turn_start")
.data(Map.of("turn", turn)));
} catch (Exception e) {
emitter.completeWithError(e);
}
})
.onTurnComplete(response -> {
try {
emitter.send(SseEmitter.event()
.name("turn_complete")
.data(Map.of("tokens", response.usage().totalTokens())));
} catch (Exception e) {
emitter.completeWithError(e);
}
})
// Stream text
.onTextDelta(delta -> {
try {
emitter.send(SseEmitter.event()
.name("text")
.data(delta));
} catch (Exception e) {
emitter.completeWithError(e);
}
})
// Tool execution events
.onToolCall((name, args) -> {
try {
emitter.send(SseEmitter.event()
.name("tool_call")
.data(Map.of("tool", name, "args", args)));
} catch (Exception e) {
emitter.completeWithError(e);
}
})
.onToolExecuted(exec -> {
try {
emitter.send(SseEmitter.event()
.name("tool_executed")
.data(Map.of(
"tool", exec.toolName(),
"output", exec.output().toString(),
"durationMs", exec.duration().toMillis()
)));
} catch (Exception e) {
emitter.completeWithError(e);
}
})
// Handoff events (multi-agent routing)
.onHandoff(handoff -> {
try {
emitter.send(SseEmitter.event()
.name("handoff")
.data(Map.of(
"to", handoff.targetAgent().name(),
"description", handoff.description()
)));
} catch (Exception e) {
emitter.completeWithError(e);
}
})
// Guardrail events
.onGuardrailFailed(failure -> {
try {
emitter.send(SseEmitter.event()
.name("guardrail_blocked")
.data(Map.of(
"reason", failure.reason()
)));
} catch (Exception e) {
emitter.completeWithError(e);
}
})
// Completion
.onComplete(result -> {
try {
emitter.send(SseEmitter.event()
.name("complete")
.data(Map.of(
"output", result.output(),
"turnsUsed", result.turnsUsed(),
"status", result.isSuccess() ? "success" : "error"
)));
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
})
.onError(error -> {
try {
emitter.send(SseEmitter.event()
.name("error")
.data(Map.of("message", error.getMessage())));
emitter.completeWithError(error);
} catch (Exception e) {
emitter.completeWithError(e);
}
})
.start();
return emitter;
}
}
Real-World Streaming Examples¶
E-Commerce Order Assistant¶
A production-ready example combining structured output, tool calls, and streaming:
@Service
public class OrderAssistantService {
private final Agent orderAgent;
private final ObjectMapper objectMapper;
public OrderAssistantService(Responder responder, OrderTools orderTools) {
this.objectMapper = new ObjectMapper();
this.orderAgent = Agent.builder()
.name("OrderAssistant")
.model("openai/gpt-4o")
.instructions("""
You are an e-commerce order assistant. Help customers with:
- Checking order status
- Processing returns
- Tracking shipments
- Answering product questions
Always verify the order ID before taking actions.
Be concise and helpful.
""")
.responder(responder)
.addTool(orderTools.getOrderStatus())
.addTool(orderTools.trackShipment())
.addTool(orderTools.initiateReturn())
.addTool(orderTools.searchProducts())
.build();
}
public void streamOrderAssistance(String userId, String message, SseEmitter emitter) {
orderAgent.interactStream(message)
.onTextDelta(delta -> sendSafe(emitter, "text", delta))
.onToolCall((name, args) -> {
sendSafe(emitter, "action", Map.of(
"type", "tool_started",
"tool", name,
"message", getToolMessage(name)
));
})
.onToolExecuted(exec -> {
sendSafe(emitter, "action", Map.of(
"type", "tool_completed",
"tool", exec.toolName(),
"success", true
));
})
.onComplete(result -> {
sendSafe(emitter, "complete", Map.of(
"success", result.isSuccess()
));
emitter.complete();
})
.onError(e -> {
sendSafe(emitter, "error", Map.of("message", e.getMessage()));
emitter.completeWithError(e);
})
.start();
}
private String getToolMessage(String toolName) {
return switch (toolName) {
case "get_order_status" -> "Looking up your order...";
case "track_shipment" -> "Checking shipment tracking...";
case "initiate_return" -> "Processing your return request...";
case "search_products" -> "Searching our catalog...";
default -> "Processing...";
};
}
private void sendSafe(SseEmitter emitter, String event, Object data) {
try {
emitter.send(SseEmitter.event().name(event).data(data));
} catch (Exception ignored) {}
}
}
Document Analysis with Progress¶
Stream analysis of documents with detailed progress updates:
@RestController
@RequestMapping("/api/analyze")
public class DocumentAnalysisController {
private final Responder responder;
public record AnalysisResult(
String summary,
List<String> keyFindings,
List<String> actionItems,
String sentiment,
double confidenceScore
) {}
@PostMapping(value = "/document", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter analyzeDocument(@RequestBody DocumentRequest request) {
SseEmitter emitter = new SseEmitter(300_000L); // 5 min for long documents
var payload = CreateResponsePayload.builder()
.model("openai/gpt-4o")
.addDeveloperMessage("""
Analyze the provided document and extract:
- A concise summary
- Key findings (up to 5)
- Action items if any
- Overall sentiment
- Confidence score (0-1)
""")
.addUserMessage("Document to analyze:\n\n" + request.content())
.withStructuredOutput(AnalysisResult.class)
.streaming()
.build();
// Track which fields have been seen for progress
Set<String> seenFields = ConcurrentHashMap.newKeySet();
responder.respond(payload)
.onPartialJson(fields -> {
try {
// Send progress updates as new fields appear
for (String field : fields.keySet()) {
if (seenFields.add(field)) {
emitter.send(SseEmitter.event()
.name("progress")
.data(Map.of(
"step", getProgressStep(field),
"message", getProgressMessage(field)
)));
}
}
// Send partial data
emitter.send(SseEmitter.event()
.name("partial")
.data(fields));
} catch (Exception e) {
emitter.completeWithError(e);
}
})
.onParsedComplete(response -> {
try {
emitter.send(SseEmitter.event()
.name("complete")
.data(response.outputParsed()));
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
})
.onError(emitter::completeWithError)
.start();
return emitter;
}
private int getProgressStep(String field) {
return switch (field) {
case "summary" -> 1;
case "keyFindings" -> 2;
case "actionItems" -> 3;
case "sentiment" -> 4;
case "confidenceScore" -> 5;
default -> 0;
};
}
private String getProgressMessage(String field) {
return switch (field) {
case "summary" -> "Generating summary...";
case "keyFindings" -> "Identifying key findings...";
case "actionItems" -> "Extracting action items...";
case "sentiment" -> "Analyzing sentiment...";
case "confidenceScore" -> "Calculating confidence...";
default -> "Processing...";
};
}
public record DocumentRequest(String content) {}
}
Real-Time Translation with Streaming¶
Stream translations as they're generated for long texts:
@RestController
@RequestMapping("/api/translate")
public class TranslationStreamController {
private final Responder responder;
public record Translation(
String translatedText,
String detectedSourceLanguage,
List<String> alternatives
) {}
@PostMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamTranslation(
@RequestBody TranslationRequest request) {
SseEmitter emitter = new SseEmitter(120_000L);
var payload = CreateResponsePayload.builder()
.model("openai/gpt-4o")
.addDeveloperMessage(String.format(
"Translate the following text to %s. Detect the source language. " +
"Provide 2-3 alternative translations if applicable.",
request.targetLanguage()))
.addUserMessage(request.text())
.withStructuredOutput(Translation.class)
.streaming()
.build();
StringBuilder translationBuffer = new StringBuilder();
responder.respond(payload)
.onPartialJson(fields -> {
try {
// Stream the translation text progressively
if (fields.containsKey("translatedText")) {
String current = fields.get("translatedText").toString();
String newChunk = current.substring(translationBuffer.length());
translationBuffer.setLength(0);
translationBuffer.append(current);
if (!newChunk.isEmpty()) {
emitter.send(SseEmitter.event()
.name("translation_chunk")
.data(newChunk));
}
}
// Send detected language when available
if (fields.containsKey("detectedSourceLanguage")) {
emitter.send(SseEmitter.event()
.name("language_detected")
.data(fields.get("detectedSourceLanguage")));
}
} catch (Exception e) {
emitter.completeWithError(e);
}
})
.onParsedComplete(response -> {
try {
Translation result = response.outputParsed();
emitter.send(SseEmitter.event()
.name("complete")
.data(result));
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
})
.onError(emitter::completeWithError)
.start();
return emitter;
}
public record TranslationRequest(String text, String targetLanguage) {}
}
WebSocket Chat¶
For bidirectional real-time chat:
WebSocket Configuration¶
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.*;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
private final ChatWebSocketHandler chatHandler;
public WebSocketConfig(ChatWebSocketHandler chatHandler) {
this.chatHandler = chatHandler;
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(chatHandler, "/ws/chat")
.setAllowedOrigins("*");
}
}
WebSocket Handler¶
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.TextWebSocketHandler;
@Component
public class ChatWebSocketHandler extends TextWebSocketHandler {
private final CustomerSupportAgent agent;
public ChatWebSocketHandler(CustomerSupportAgent agent) {
this.agent = agent;
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
String userInput = message.getPayload();
agent.chatAsync(userInput)
.thenAccept(result -> {
try {
session.sendMessage(new TextMessage(result.output()));
} catch (Exception e) {
// Log error
}
});
}
}
Error Handling¶
Global Exception Handler¶
import org.springframework.web.bind.annotation.*;
import org.springframework.http.ResponseEntity;
@RestControllerAdvice
public class AIExceptionHandler {
@ExceptionHandler(CompletionException.class)
public ResponseEntity<ErrorResponse> handleAIError(CompletionException e) {
Throwable cause = e.getCause();
if (cause.getMessage().contains("429")) {
return ResponseEntity.status(429)
.body(new ErrorResponse("Rate limited. Please try again later."));
}
if (cause.getMessage().contains("401")) {
return ResponseEntity.status(500)
.body(new ErrorResponse("AI service configuration error."));
}
return ResponseEntity.status(500)
.body(new ErrorResponse("AI service temporarily unavailable."));
}
public record ErrorResponse(String error) {}
}
Production Best Practices¶
1. Environment Variables¶
2. Health Check¶
import org.springframework.boot.actuate.health.*;
@Component
public class AIHealthIndicator implements HealthIndicator {
private final Responder responder;
private final Agentle4jProperties props;
public AIHealthIndicator(Responder responder, Agentle4jProperties props) {
this.responder = responder;
this.props = props;
}
@Override
public Health health() {
try {
var payload = CreateResponsePayload.builder()
.model(props.getModel())
.addUserMessage("ping")
.maxTokens(1)
.build();
responder.respond(payload);
return Health.up().build();
} catch (Exception e) {
return Health.down()
.withDetail("error", e.getMessage())
.build();
}
}
}
3. Rate Limiting¶
import io.github.bucket4j.*;
@Service
public class RateLimitedChatService {
private final CustomerSupportAgent agent;
private final Bucket bucket;
public RateLimitedChatService(CustomerSupportAgent agent) {
this.agent = agent;
this.bucket = Bucket.builder()
.addLimit(Bandwidth.simple(100, Duration.ofMinutes(1)))
.build();
}
public String chat(String message) {
if (!bucket.tryConsume(1)) {
throw new RateLimitExceededException("Too many requests");
}
return agent.chat(message);
}
}
4. Caching Responses¶
import org.springframework.cache.annotation.*;
@Service
@CacheConfig(cacheNames = "ai-responses")
public class CachedAnalysisService {
private final Responder responder;
// Cache identical analysis requests
@Cacheable(key = "#input.hashCode()")
public SentimentResult analyzeSentiment(String input) {
// AI call...
}
}
Complete Example Application¶
Here's a minimal complete Spring Boot application:
@SpringBootApplication
@EnableConfigurationProperties(Agentle4jProperties.class)
public class AIApplication {
public static void main(String[] args) {
SpringApplication.run(AIApplication.class, args);
}
}
# application.yml
server:
port: 8080
agentle4j:
api-key: ${OPENROUTER_API_KEY}
model: openai/gpt-4o-mini
temperature: 0.7
max-retries: 3
<!-- pom.xml additions -->
<dependency>
<groupId>io.github.paragon-intelligence</groupId>
<artifactId>agentle4j</artifactId>
<version>0.7.0</version>
</dependency>
Use Cases¶
| Use Case | Approach |
|---|---|
| Chatbot API | Agent as @Service + REST Controller |
| Real-time Chat | WebSocket + streaming |
| Data Extraction | Structured output endpoint |
| Document Analysis | Async processing with queues |
| Multi-tenant SaaS | Per-request Agent with tenant context |
| Batch Processing | @Async + CompletableFuture |
Asynchronous Job Patterns¶
For long-running AI tasks (document analysis, code generation, content creation), use async patterns where clients submit jobs and check status later.
Job Submission & Status API¶
A REST API pattern for fire-and-forget job submission with polling:
import com.paragon.agents.Agent;
import com.paragon.agents.AgentResult;
import org.springframework.web.bind.annotation.*;
import java.util.UUID;
import java.util.concurrent.*;
@RestController
@RequestMapping("/api/jobs")
public class AsyncJobController {
private final Agent analysisAgent;
private final JobStore jobStore;
public AsyncJobController(Agent analysisAgent, JobStore jobStore) {
this.analysisAgent = analysisAgent;
this.jobStore = jobStore;
}
// Submit a job and return immediately
@PostMapping
public JobSubmissionResponse submitJob(@RequestBody JobRequest request) {
String jobId = UUID.randomUUID().toString();
Job job = new Job(jobId, request.input(), JobStatus.PENDING, null, null);
jobStore.save(job);
// Fire and forget - fully async, no blocking
processJobAsync(jobId, request.input());
return new JobSubmissionResponse(jobId, JobStatus.PENDING);
}
// Check job status
@GetMapping("/{jobId}")
public JobStatusResponse getJobStatus(@PathVariable String jobId) {
Job job = jobStore.get(jobId)
.orElseThrow(() -> new JobNotFoundException(jobId));
return new JobStatusResponse(
job.id(),
job.status(),
job.result(),
job.error()
);
}
// Cancel a pending job
@DeleteMapping("/{jobId}")
public void cancelJob(@PathVariable String jobId) {
Job job = jobStore.get(jobId)
.orElseThrow(() -> new JobNotFoundException(jobId));
if (job.status() == JobStatus.PENDING || job.status() == JobStatus.RUNNING) {
jobStore.save(job.withStatus(JobStatus.CANCELLED));
}
}
private void processJobAsync(String jobId, String input) {
// Update to RUNNING
jobStore.get(jobId).ifPresent(job ->
jobStore.save(job.withStatus(JobStatus.RUNNING)));
// Submit to virtual thread for background processing
Thread.startVirtualThread(() -> {
try {
var result = analysisAgent.interact(input);
if (result.isSuccess()) {
jobStore.save(new Job(jobId, input, JobStatus.COMPLETED, result.output(), null));
} else {
String error = result.error() != null ? result.error().getMessage() : "Unknown error";
jobStore.save(new Job(jobId, input, JobStatus.FAILED, null, error));
}
} catch (Exception e) {
jobStore.save(new Job(jobId, input, JobStatus.FAILED, null, e.getMessage()));
}
});
}
// DTOs
public record JobRequest(String input) {}
public record JobSubmissionResponse(String jobId, JobStatus status) {}
public record JobStatusResponse(
String jobId,
JobStatus status,
String result,
String error
) {}
public record Job(
String id,
String input,
JobStatus status,
String result,
String error
) {
public Job withStatus(JobStatus newStatus) {
return new Job(id, input, newStatus, result, error);
}
}
public enum JobStatus { PENDING, RUNNING, COMPLETED, FAILED, CANCELLED }
}
In-Memory Job Store¶
Simple job store for single-instance applications:
import org.springframework.stereotype.Component;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class JobStore {
private final ConcurrentHashMap<String, AsyncJobController.Job> jobs = new ConcurrentHashMap<>();
public void save(AsyncJobController.Job job) {
jobs.put(job.id(), job);
}
public Optional<AsyncJobController.Job> get(String jobId) {
return Optional.ofNullable(jobs.get(jobId));
}
public void remove(String jobId) {
jobs.remove(jobId);
}
}
Client Usage¶
# Submit a job
curl -X POST http://localhost:8080/api/jobs \
-H "Content-Type: application/json" \
-d '{"input": "Analyze this 50-page document..."}'
# Response: {"jobId": "abc-123", "status": "PENDING"}
# Poll for status
curl http://localhost:8080/api/jobs/abc-123
# Response (in progress): {"jobId": "abc-123", "status": "RUNNING", "result": null}
# Response (complete): {"jobId": "abc-123", "status": "COMPLETED", "result": "Analysis: ..."}
RabbitMQ Message Queue Pattern¶
For distributed systems, use RabbitMQ to decouple job submission from processing:
Dependencies¶
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Configuration¶
# application.yml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
app:
queues:
ai-jobs: ai-job-queue
ai-results: ai-result-queue
Queue Configuration¶
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String JOB_QUEUE = "ai-job-queue";
public static final String RESULT_QUEUE = "ai-result-queue";
@Bean
public Queue jobQueue() {
return QueueBuilder.durable(JOB_QUEUE).build();
}
@Bean
public Queue resultQueue() {
return QueueBuilder.durable(RESULT_QUEUE).build();
}
@Bean
public Jackson2JsonMessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(messageConverter());
return template;
}
}
Job Publisher (API Service)¶
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.*;
import java.util.UUID;
@RestController
@RequestMapping("/api/jobs")
public class JobPublisherController {
private final RabbitTemplate rabbitTemplate;
private final JobStore jobStore;
public JobPublisherController(RabbitTemplate rabbitTemplate, JobStore jobStore) {
this.rabbitTemplate = rabbitTemplate;
this.jobStore = jobStore;
}
@PostMapping
public JobSubmissionResponse submitJob(@RequestBody JobRequest request) {
String jobId = UUID.randomUUID().toString();
// Save initial job state
jobStore.save(new Job(jobId, JobStatus.PENDING, null, null));
// Publish to queue
AIJobMessage message = new AIJobMessage(jobId, request.input(), request.callbackUrl());
rabbitTemplate.convertAndSend(RabbitMQConfig.JOB_QUEUE, message);
return new JobSubmissionResponse(jobId, JobStatus.PENDING);
}
@GetMapping("/{jobId}")
public Job getJob(@PathVariable String jobId) {
return jobStore.get(jobId)
.orElseThrow(() -> new JobNotFoundException(jobId));
}
// DTOs
public record JobRequest(String input, String callbackUrl) {}
public record JobSubmissionResponse(String jobId, JobStatus status) {}
public record AIJobMessage(String jobId, String input, String callbackUrl) {}
public record Job(String id, JobStatus status, String result, String error) {}
public enum JobStatus { PENDING, RUNNING, COMPLETED, FAILED }
}
Job Consumer (Worker Service)¶
import com.paragon.agents.Agent;
import com.paragon.agents.AgentResult;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
@Service
public class AIJobConsumer {
private final Agent analysisAgent;
private final RabbitTemplate rabbitTemplate;
private final JobStore jobStore;
private final RestTemplate restTemplate;
public AIJobConsumer(
Agent analysisAgent,
RabbitTemplate rabbitTemplate,
JobStore jobStore,
RestTemplate restTemplate) {
this.analysisAgent = analysisAgent;
this.rabbitTemplate = rabbitTemplate;
this.jobStore = jobStore;
this.restTemplate = restTemplate;
}
@RabbitListener(queues = RabbitMQConfig.JOB_QUEUE)
public void processJob(JobPublisherController.AIJobMessage message) {
String jobId = message.jobId();
// Update status to RUNNING
jobStore.save(new JobPublisherController.Job(
jobId, JobPublisherController.JobStatus.RUNNING, null, null));
// Execute AI task - fully async
analysisAgent.interact(message.input())
.thenAccept(result -> {
JobPublisherController.Job completedJob;
if (result.isSuccess()) {
completedJob = new JobPublisherController.Job(
jobId, JobPublisherController.JobStatus.COMPLETED, result.output(), null);
} else {
String error = result.error() != null ? result.error().getMessage() : "Unknown error";
completedJob = new JobPublisherController.Job(
jobId, JobPublisherController.JobStatus.FAILED, null, error);
}
jobStore.save(completedJob);
// Notify via callback if provided
if (message.callbackUrl() != null && !message.callbackUrl().isEmpty()) {
notifyCallback(message.callbackUrl(), completedJob);
}
// Publish result to result queue
rabbitTemplate.convertAndSend(RabbitMQConfig.RESULT_QUEUE, completedJob);
})
.exceptionally(e -> {
JobPublisherController.Job failedJob = new JobPublisherController.Job(
jobId, JobPublisherController.JobStatus.FAILED, null, e.getMessage());
jobStore.save(failedJob);
if (message.callbackUrl() != null) {
notifyCallback(message.callbackUrl(), failedJob);
}
return null;
});
}
private void notifyCallback(String callbackUrl, JobPublisherController.Job job) {
try {
restTemplate.postForEntity(callbackUrl, job, Void.class);
} catch (Exception e) {
// Log but don't fail - callback is best-effort
System.err.println("Failed to notify callback: " + e.getMessage());
}
}
}
Webhook Notification Pattern¶
Let clients register a callback URL to receive job completion notifications:
@RestController
@RequestMapping("/api/jobs")
public class WebhookJobController {
private final Agent agent;
private final JobStore jobStore;
private final RestTemplate restTemplate;
public WebhookJobController(Agent agent, JobStore jobStore, RestTemplate restTemplate) {
this.agent = agent;
this.jobStore = jobStore;
this.restTemplate = restTemplate;
}
@PostMapping
public JobResponse submitWithWebhook(@RequestBody WebhookJobRequest request) {
String jobId = UUID.randomUUID().toString();
jobStore.save(new Job(jobId, JobStatus.PENDING, null));
jobStore.save(new Job(jobId, JobStatus.RUNNING, null));
// Fully async - no executor needed, CompletableFuture handles it
agent.interact(request.input())
.thenAccept(result -> {
Job completedJob = result.isSuccess()
? new Job(jobId, JobStatus.COMPLETED, result.output())
: new Job(jobId, JobStatus.FAILED, result.error().getMessage());
jobStore.save(completedJob);
// Send webhook notification
if (request.webhookUrl() != null) {
sendWebhook(request.webhookUrl(), completedJob, request.webhookSecret());
}
})
.exceptionally(e -> {
Job failed = new Job(jobId, JobStatus.FAILED, e.getMessage());
jobStore.save(failed);
if (request.webhookUrl() != null) {
sendWebhook(request.webhookUrl(), failed, request.webhookSecret());
}
return null;
});
return new JobResponse(jobId, JobStatus.PENDING);
}
private void sendWebhook(String url, Job job, String secret) {
try {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
// Sign the payload with the secret for verification
if (secret != null) {
String signature = HmacUtils.hmacSha256Hex(secret, job.toString());
headers.set("X-Webhook-Signature", signature);
}
HttpEntity<Job> entity = new HttpEntity<>(job, headers);
restTemplate.postForEntity(url, entity, Void.class);
} catch (Exception e) {
System.err.println("Webhook delivery failed: " + e.getMessage());
// Consider implementing retry logic with exponential backoff
}
}
public record WebhookJobRequest(
String input,
String webhookUrl, // URL to notify on completion
String webhookSecret // Secret for HMAC signing
) {}
public record JobResponse(String jobId, JobStatus status) {}
public record Job(String id, JobStatus status, String result) {}
public enum JobStatus { PENDING, RUNNING, COMPLETED, FAILED }
}
Client Webhook Receiver¶
// Client's webhook endpoint
@RestController
@RequestMapping("/webhooks")
public class WebhookReceiverController {
@PostMapping("/ai-job-complete")
public ResponseEntity<Void> handleJobComplete(
@RequestHeader("X-Webhook-Signature") String signature,
@RequestBody JobResult result) {
// Verify signature
String expectedSignature = HmacUtils.hmacSha256Hex(
System.getenv("WEBHOOK_SECRET"),
result.toString()
);
if (!signature.equals(expectedSignature)) {
return ResponseEntity.status(HttpStatus.UNAUTHORIZED).build();
}
// Process the result
System.out.println("Job " + result.id() + " completed: " + result.status());
if (result.status() == JobStatus.COMPLETED) {
// Handle successful result
saveResultToDatabase(result);
notifyUser(result);
} else {
// Handle failure
alertOperations(result);
}
return ResponseEntity.ok().build();
}
public record JobResult(String id, JobStatus status, String result) {}
}
Database-Backed Job Persistence¶
For production systems with multiple instances, persist jobs to a database:
import jakarta.persistence.*;
import org.springframework.data.jpa.repository.JpaRepository;
import java.time.Instant;
@Entity
@Table(name = "ai_jobs")
public class AIJob {
@Id
private String id;
@Column(columnDefinition = "TEXT")
private String input;
@Enumerated(EnumType.STRING)
private JobStatus status;
@Column(columnDefinition = "TEXT")
private String result;
private String error;
private String webhookUrl;
private Instant createdAt;
private Instant startedAt;
private Instant completedAt;
// Getters, setters, constructors
public enum JobStatus { PENDING, RUNNING, COMPLETED, FAILED, CANCELLED }
}
public interface AIJobRepository extends JpaRepository<AIJob, String> {
List<AIJob> findByStatus(AIJob.JobStatus status);
List<AIJob> findByStatusAndCreatedAtBefore(
AIJob.JobStatus status,
Instant cutoff
);
}
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class AIJobService {
private final AIJobRepository jobRepository;
private final Agent agent;
private final RestTemplate restTemplate;
public AIJobService(AIJobRepository jobRepository, Agent agent, RestTemplate restTemplate) {
this.jobRepository = jobRepository;
this.agent = agent;
this.restTemplate = restTemplate;
}
@Transactional
public AIJob submitJob(String input, String webhookUrl) {
AIJob job = new AIJob();
job.setId(UUID.randomUUID().toString());
job.setInput(input);
job.setStatus(AIJob.JobStatus.PENDING);
job.setWebhookUrl(webhookUrl);
job.setCreatedAt(Instant.now());
return jobRepository.save(job);
}
// Scheduled job processor - picks up PENDING jobs
@Scheduled(fixedDelay = 5000) // Every 5 seconds
@Transactional
public void processPendingJobs() {
List<AIJob> pendingJobs = jobRepository.findByStatus(AIJob.JobStatus.PENDING);
for (AIJob job : pendingJobs) {
processJob(job);
}
}
private void processJob(AIJob job) {
job.setStatus(AIJob.JobStatus.RUNNING);
job.setStartedAt(Instant.now());
jobRepository.save(job);
// Fully async processing
agent.interact(job.getInput())
.thenAccept(result -> {
if (result.isSuccess()) {
job.setStatus(AIJob.JobStatus.COMPLETED);
job.setResult(result.output());
} else {
job.setStatus(AIJob.JobStatus.FAILED);
job.setError(result.error() != null ? result.error().getMessage() : "Unknown error");
}
job.setCompletedAt(Instant.now());
jobRepository.save(job);
// Send webhook notification
if (job.getWebhookUrl() != null) {
sendWebhook(job);
}
})
.exceptionally(e -> {
job.setStatus(AIJob.JobStatus.FAILED);
job.setError(e.getMessage());
job.setCompletedAt(Instant.now());
jobRepository.save(job);
return null;
});
}
private void sendWebhook(AIJob job) {
try {
restTemplate.postForEntity(job.getWebhookUrl(), job, Void.class);
} catch (Exception e) {
System.err.println("Webhook failed for job " + job.getId() + ": " + e.getMessage());
}
}
// Cleanup old completed jobs
@Scheduled(cron = "0 0 2 * * ?") // Daily at 2 AM
@Transactional
public void cleanupOldJobs() {
Instant cutoff = Instant.now().minus(Duration.ofDays(30));
List<AIJob> oldJobs = jobRepository.findByStatusAndCreatedAtBefore(
AIJob.JobStatus.COMPLETED, cutoff);
jobRepository.deleteAll(oldJobs);
}
}
Human-in-the-Loop with Job Persistence¶
Combine async jobs with human approval for sensitive AI operations:
@Service
public class ApprovalJobService {
private final AIJobRepository jobRepository;
private final Agent agent;
/**
* Submits a job that may require human approval.
* Returns immediately with PENDING status; use getJob() to check for AWAITING_APPROVAL.
*/
public AIJob submitForApproval(String input) {
AIJob job = new AIJob();
job.setId(UUID.randomUUID().toString());
job.setInput(input);
job.setStatus(AIJob.JobStatus.PENDING);
job.setCreatedAt(Instant.now());
jobRepository.save(job);
// Fully async - may pause for approval
agent.interact(input)
.thenAccept(result -> {
if (result.isPaused()) {
// Store paused state for later resumption
job.setStatus(AIJob.JobStatus.AWAITING_APPROVAL);
job.setPausedState(serialize(result.pausedState()));
job.setPendingToolName(result.pausedState().pendingToolCall().name());
job.setPendingToolArgs(result.pausedState().pendingToolCall().arguments());
} else if (result.isSuccess()) {
job.setStatus(AIJob.JobStatus.COMPLETED);
job.setResult(result.output());
} else {
job.setStatus(AIJob.JobStatus.FAILED);
job.setError(result.error().getMessage());
}
jobRepository.save(job);
})
.exceptionally(e -> {
job.setStatus(AIJob.JobStatus.FAILED);
job.setError(e.getMessage());
jobRepository.save(job);
return null;
});
return job; // Returns immediately with PENDING status
}
@Transactional
public AIJob approveAndResume(String jobId, String approvalOutput) {
AIJob job = jobRepository.findById(jobId)
.orElseThrow(() -> new JobNotFoundException(jobId));
if (job.getStatus() != AIJob.JobStatus.AWAITING_APPROVAL) {
throw new IllegalStateException("Job is not awaiting approval");
}
// Restore and resume
AgentRunState state = deserialize(job.getPausedState());
state.approveToolCall(approvalOutput);
AgentResult result = agent.resume(state);
// May pause again for another tool, or complete
if (result.isPaused()) {
job.setPausedState(serialize(result.pausedState()));
job.setPendingToolName(result.pausedState().pendingToolCall().name());
} else if (result.isSuccess()) {
job.setStatus(AIJob.JobStatus.COMPLETED);
job.setResult(result.output());
job.setCompletedAt(Instant.now());
} else {
job.setStatus(AIJob.JobStatus.FAILED);
job.setError(result.error().getMessage());
}
return jobRepository.save(job);
}
@Transactional
public AIJob rejectAndResume(String jobId, String reason) {
AIJob job = jobRepository.findById(jobId)
.orElseThrow(() -> new JobNotFoundException(jobId));
AgentRunState state = deserialize(job.getPausedState());
state.rejectToolCall(reason);
AgentResult result = agent.resume(state);
if (result.isSuccess()) {
job.setStatus(AIJob.JobStatus.COMPLETED);
job.setResult(result.output());
} else {
job.setStatus(AIJob.JobStatus.FAILED);
job.setError(result.error().getMessage());
}
job.setCompletedAt(Instant.now());
return jobRepository.save(job);
}
}
Production Considerations¶
Idempotency
Ensure job processing is idempotent. If a worker crashes mid-job, the same job may be retried. Use database transactions and unique constraints.
Timeouts
Set appropriate timeouts for AI operations to prevent hung jobs:
Scaling Workers
With RabbitMQ, scale workers horizontally by running multiple instances. Each instance will compete for messages from the queue.
Dead Letter Queues
Configure RabbitMQ dead letter queues for failed messages:
Next Steps¶
- Agents Guide - Advanced agent patterns
- Streaming Guide - Streaming details
- Observability Guide - Monitoring in production