In our last post, we observed how the GitHub Copilot SDK provides out-of-the-box reasoning loop handling, automatically modifying search queries to track down data within localized files.
However, local files only get you so far. To make your AI assistant truly powerful, it needs to interact with external APIs, databases, or production environments.
Today, we will transition our agent from using built-in system tools to using a completely custom tool, a web search tool powered by Firecrawl. Additionally, we will re-architect our telemetry pipeline into an asynchronous event stream. Instead of blocking the execution thread while waiting for a complete answer, we will decouple the message processing loop, formatting real-time updates exactly like a production-ready application streaming data to a user interface.
Part 1: Defining Custom Agent Skills
The GitHub Copilot SDK makes adding custom tools straightforward through the @define_tool decorator. You can declare inputs using Pydantic schemas, which the SDK uses to generate JSON schemas behind the scenes. This allows the LLM to understand what your tool does and what parameters it requires.
We'll build a search wrapper around the Firecrawl API to give our agent real-time access to the live internet:
from pydantic import BaseModel, Field
from copilot import define_tool
import httpx
# Define the schema the LLM will analyze
class FirecrawlSearchParams(BaseModel):
query: str = Field(..., description="The search query to find information on the web")
limit: int = Field(default=5, description="Maximum number of results to return")
# Implement the API call logic
async def fetch_firecrawl_results(params: FirecrawlSearchParams) -> dict[str, Any]:
endpoint = "https://api.firecrawl.dev/v2/search"
headers = {
"Authorization": f"Bearer {firecrawl_api_key}",
"Content-Type": "application/json"
}
payload = {"query": params.query, "limit": params.limit}
async with httpx.AsyncClient() as client:
response = await client.post(endpoint, json=payload, headers=headers, timeout=30.0)
response.raise_for_status()
return response.json()
# Bind the function to the SDK as a custom tool
@define_tool("web_search", description="Search the web")
async def firecrawl_search_tool(params: FirecrawlSearchParams) -> dict[str, Any]:
return await fetch_firecrawl_results(params)
By exposing firecrawl_search_tool to the model, we give it the capacity to request internet details autonomously whenever its internal training weights fall short.
Part 2: Moving to Asynchronous Event Streaming
In our previous implementations, we called .send_and_wait(), which blocks your application code until the entire multi-turn tool loop completes. This approach doesn't scale well for user-facing applications. If an agent executes three consecutive API queries, your user shouldn't be left staring at a static loading spinner.
To address this, we will build an asynchronous generator loop.
First, we establish an asyncio.Queue to trap events. Our handle_event listener acts as a fast, thread-safe producer that immediately returns control back to the agent:
event_queue = asyncio.Queue()
def handle_event(event):
# Quickly push the raw event into the queue without blocking
asyncio.get_event_loop().call_soon_threadsafe(event_queue.put_nowait, event)
Next, we write a consumer generator (event_generator) that pulls raw data out of the queue, sanitizes it, and normalizes it into standardized, UI-friendly dictionaries. When the session sends a SessionIdleData event, the generator gracefully terminates:
async def event_generator():
while True:
event = await event_queue.get()
try:
if isinstance(event.data, AssistantUsageData):
yield {"title": "Tokens used", "content": {"input": event.data.input_tokens, "output": event.data.output_tokens}}
elif isinstance(event.data, ToolExecutionStartData):
yield {"title": "Tool execution", "content": {"tool_name": event.data.tool_name, "arguments": event.data.arguments}}
elif isinstance(event.data, AssistantMessageData):
if event.data.content:
yield {"title": "Assistant message", "content": event.data.content}
elif isinstance(event.data, SessionIdleData):
break # The agent is done processing
finally:
event_queue.task_done()
Part 3: Running the Async Loop
With our custom web search tool registered and our event queue waiting, we can execute the call using .send(). By pairing this with asyncio.create_task, the agent runs concurrently alongside our consumer loop:
async def main():
question = "what is Qwen 3.7?"
async with CopilotClient() as client:
async with await client.create_session(
on_permission_request=PermissionHandler.approve_all,
model=model,
provider=provider,
system_message=SystemMessageReplaceConfig(
mode="replace",
content="You are a helpful assistant. Use web_search for queries."
),
tools=[firecrawl_search_tool], # Injecting our tool logic
available_tools=['web_search'] # Whitelisting the execution capability
) as session:
session.on(handle_event)
# Fire off the question asynchronously without blocking
send_task = asyncio.create_task(session.send(question))
# Stream finalized results cleanly to our console/frontend as they happen
async map info in event_generator():
print(f"{info['title']}: {info['content']}")
await send_task
The Output: Real-Time Telemetry
When we ask about Qwen 3.7, look at the clean, decoupled logs streamed out of our event_generator:
System message: {
"first_line": "You are a helpful assistant. Use web_search for queries.",
"content_length": 56
}
Tokens used: {'input': 161, 'output': 24, 'cache_read': 0, 'cache_write': 0}
Tool execution: {'tool_name': 'web_search', 'arguments': {'query': 'what is Qwen 3.7?'}}
Tokens used: {'input': 764, 'output': 266, 'cache_read': 0, 'cache_write': 0}
Assistant message: **Qwen 3.7** is a series of large language models released by Alibaba in May 2026. It is marketed as a significant advancement in AI, particularly regarding agentic workflows, reasoning, and multimodal capabilities. ...
Why This Design Matters
By separating the producer (session.send) from the consumer (event_generator), you get complete control over data streaming:
- UI Compatibility: You can map the output of
event_generatordirectly to WebSockets or an SSE (Server-Sent Events) web endpoint. - Component Tracking: You don't have to wait for the final text block to know if your system worked. The UI can immediately render a specialized component showing exactly what external tool arguments the agent invoked (
{'query': 'what is Qwen 3.7?'}) while the tool is running.
In our next guide, we will explore how to put the guardrails to the tools.
Try It Yourself
Want to check out this asynchronous data stream yourself? Click the badge below to jump directly into our interactive Google Colab notebook, swap in your own custom tool configurations, and watch your streaming components update live!