In Part 1, we covered the theory of A2A: agent cards, JSON-RPC, tasks, and artifacts. Now we're going to build a small agent by hand so the wire format becomes concrete.
No frameworks. No A2A SDK. Just Python, Flask, and the A2A 0.3.0 spec.
By the end of this post, you'll have:
- A working unit-conversion agent
- A client that discovers the agent and talks to it over JSON-RPC
- A minimal but much more accurate picture of what A2A 0.3.0 actually requires
What We're Building
Our agent will convert between:
- Temperature: Fahrenheit, Celsius, Kelvin
- Distance: miles, kilometers, meters, feet
- Weight: pounds, kilograms, ounces, grams
Why a unit converter? Because the conversion logic is boring, and that's useful. We want to focus on the protocol.
This agent does not use an LLM. That's intentional. A2A standardizes how agents communicate, not how they think. The thing behind the protocol could be GPT, a database, a rules engine, or a handful of arithmetic functions.
Prerequisites
You'll need Python 3.10+.
mkdir a2a-converter && cd a2a-converter python -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate pip install flask requests
We'll use Flask because it gives us a small HTTP server with almost no ceremony.
Step 1: The Agent Card
Every A2A agent starts with an agent card. This is the document clients fetch to learn the agent's name, capabilities, and transports.
Create agent_card.py:
AGENT_CARD = { "name": "Unit Converter", "description": "Converts between common units of measurement including temperature, distance, and weight.", "url": "http://localhost:5000", "preferredTransport": "JSONRPC", "additionalInterfaces": [ { "url": "http://localhost:5000", "transport": "JSONRPC", } ], "version": "1.0.0", "protocolVersion": "0.3.0", "capabilities": { "streaming": False, "pushNotifications": False }, "defaultInputModes": ["text/plain"], "defaultOutputModes": ["text/plain"], "skills": [ { "id": "temperature", "name": "Temperature Conversion", "description": "Convert between Fahrenheit, Celsius, and Kelvin.", "tags": ["temperature", "conversion", "units"], "examples": [ "Convert 100 Fahrenheit to Celsius", "32°F in Kelvin" ] }, { "id": "distance", "name": "Distance Conversion", "description": "Convert between miles, kilometers, meters, and feet.", "tags": ["distance", "length", "conversion", "units"], "examples": [ "Convert 5 miles to kilometers", "1000 meters in feet" ] }, { "id": "weight", "name": "Weight Conversion", "description": "Convert between pounds, kilograms, ounces, and grams.", "tags": ["weight", "mass", "conversion", "units"], "examples": [ "Convert 150 pounds to kilograms", "250 grams in ounces" ] } ], "provider": { "organization": "A2A Tutorial" } }
Important fields:
protocolVersion: "0.3.0"tells clients which A2A schema to expect.preferredTransport: "JSONRPC"tells clients which transport to use first.urlis the primary endpoint for that preferred transport.additionalInterfaceslets you declare the transport explicitly.streaming: falseandpushNotifications: falsetell clients we only support synchronous request/response behavior.
You'll often see agent cards served from /.well-known/agent-card.json. That's the recommended well-known discovery convention, and it's the one we'll use here.
Step 2: The Flask Server
Our server needs:
GET /.well-known/agent-card.jsonfor discovery- A JSON-RPC endpoint at the URL declared in the card
In this tutorial, that endpoint is POST / because the card's url is http://localhost:5000.
Start server.py like this:
import uuid from copy import deepcopy from datetime import datetime, timezone from flask import Flask, jsonify, request from agent_card import AGENT_CARD from converter import convert app = Flask(__name__) TASKS = {} TASK_NOT_FOUND = -32001 TASK_NOT_CANCELABLE = -32002 UNSUPPORTED_OPERATION = -32003 CONTENT_TYPE_NOT_SUPPORTED = -32005 TERMINAL_STATES = {"completed", "failed", "canceled", "rejected"} def utc_now(): return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") def build_message(role, text, context_id, task_id=None): message = { "kind": "message", "messageId": str(uuid.uuid4()), "contextId": context_id, "role": role, "parts": [{"kind": "text", "text": text}], } if task_id is not None: message["taskId"] = task_id return message def build_task(task_id, context_id, state, history, artifacts=None, status_text=None): task = { "kind": "task", "id": task_id, "contextId": context_id, "status": { "state": state, "timestamp": utc_now(), }, "history": history, } if artifacts: task["artifacts"] = artifacts if status_text: task["status"]["message"] = build_message("agent", status_text, context_id, task_id) return task
There are two protocol details worth calling out here:
- In A2A 0.3.0, a
Messagehas its ownmessageId. - A
Taskhas its ownidand acontextIdthat ties related messages together.
That distinction matters. The JSON-RPC request id, the A2A message id, and the A2A task id are all different things.
Now add discovery and the JSON-RPC router:
@app.route("/.well-known/agent-card.json") def agent_card(): return jsonify(AGENT_CARD) @app.route("/", methods=["POST"]) def jsonrpc(): body = request.get_json(silent=True) if not isinstance(body, dict) or body.get("jsonrpc") != "2.0" or "method" not in body: request_id = body.get("id") if isinstance(body, dict) else None return jsonrpc_error(request_id, -32600, "Invalid Request") method = body["method"] request_id = body.get("id") params = body.get("params", {}) if method == "message/send": return handle_message_send(request_id, params) if method == "tasks/get": return handle_tasks_get(request_id, params) if method == "tasks/cancel": return handle_tasks_cancel(request_id, params) return jsonrpc_error(request_id, -32601, f"Method not found: {method}")
This is the first place where a lot of simplified tutorials drift from the spec. message/send is not the whole story. If you're returning task objects, you also need a way to retrieve and manage them. So we'll implement:
message/sendtasks/gettasks/cancel
Step 3: Handle message/send
Now for the main method:
def handle_message_send(request_id, params): if not isinstance(params, dict): return jsonrpc_error(request_id, -32602, "params must be an object") message = params.get("message") if not isinstance(message, dict): return jsonrpc_error(request_id, -32602, "params.message is required") if message.get("kind") != "message": return jsonrpc_error(request_id, -32602, "params.message.kind must be 'message'") if not message.get("messageId"): return jsonrpc_error(request_id, -32602, "params.message.messageId is required") parts = message.get("parts") if not isinstance(parts, list) or not parts: return jsonrpc_error(request_id, -32602, "params.message.parts must be a non-empty list") configuration = params.get("configuration", {}) if configuration and not isinstance(configuration, dict): return jsonrpc_error(request_id, -32602, "params.configuration must be an object") if configuration.get("pushNotificationConfig"): return jsonrpc_error( request_id, UNSUPPORTED_OPERATION, "Push notifications are not supported by this agent", ) accepted_output_modes = configuration.get("acceptedOutputModes") or [] if accepted_output_modes and "text/plain" not in accepted_output_modes: return jsonrpc_error( request_id, CONTENT_TYPE_NOT_SUPPORTED, "This agent only returns text/plain output", ) text = "" for part in parts: if isinstance(part, dict) and part.get("kind") == "text": text = part.get("text", "").strip() if text: break if not text: return jsonrpc_error(request_id, -32602, "No text content in message") task_id = str(uuid.uuid4()) context_id = message.get("contextId") or str(uuid.uuid4()) normalized_message = deepcopy(message) normalized_message["contextId"] = context_id history = [normalized_message] try: result = convert(text) except ValueError as exc: error_text = str(exc) history.append(build_message("agent", error_text, context_id, task_id)) task = build_task( task_id, context_id, "failed", history, status_text=error_text, ) else: history.append(build_message("agent", result, context_id, task_id)) task = build_task( task_id, context_id, "completed", history, artifacts=[ { "artifactId": str(uuid.uuid4()), "name": "conversion-result", "parts": [{"kind": "text", "text": result}], } ], status_text="Conversion completed.", ) TASKS[task_id] = deepcopy(task) return jsonrpc_result(request_id, task)
Three important points:
- We validate A2A fields, not just JSON-RPC fields.
- The server generates the task id. The client does not provide it.
- Application failures return a task with
status.state: "failed", not a JSON-RPC error.
That last distinction is important:
- JSON-RPC errors are for protocol or transport problems
- task states are for domain-level outcomes
Step 4: Implement tasks/get and tasks/cancel
Because we store tasks in memory, we can fetch them later:
def handle_tasks_get(request_id, params): if not isinstance(params, dict): return jsonrpc_error(request_id, -32602, "params must be an object") task_id = params.get("id") if not task_id: return jsonrpc_error(request_id, -32602, "params.id is required") task = TASKS.get(task_id) if task is None: return jsonrpc_error(request_id, TASK_NOT_FOUND, f"Task not found: {task_id}", 404) task_copy = deepcopy(task) history_length = params.get("historyLength") if history_length is not None: if not isinstance(history_length, int) or history_length < 0: return jsonrpc_error(request_id, -32602, "params.historyLength must be a non-negative integer") if history_length > 0: task_copy["history"] = task_copy.get("history", [])[-history_length:] return jsonrpc_result(request_id, task_copy)
And here's cancel:
def handle_tasks_cancel(request_id, params): if not isinstance(params, dict): return jsonrpc_error(request_id, -32602, "params must be an object") task_id = params.get("id") if not task_id: return jsonrpc_error(request_id, -32602, "params.id is required") task = TASKS.get(task_id) if task is None: return jsonrpc_error(request_id, TASK_NOT_FOUND, f"Task not found: {task_id}", 404) state = task.get("status", {}).get("state") if state in TERMINAL_STATES: return jsonrpc_error( request_id, TASK_NOT_CANCELABLE, f"Task cannot be canceled from state: {state}", 409, ) canceled_message = "Task canceled." task["status"]["state"] = "canceled" task["status"]["timestamp"] = utc_now() task["status"]["message"] = build_message("agent", canceled_message, task["contextId"], task["id"]) task.setdefault("history", []).append( build_message("agent", canceled_message, task["contextId"], task["id"]) ) TASKS[task_id] = deepcopy(task) return jsonrpc_result(request_id, task)
In this toy agent, work completes immediately, so canceling an already finished task returns TaskNotCancelableError (-32002). That's still useful, because it shows how the method behaves even when your agent is synchronous.
Add the small response helpers and startup block:
def jsonrpc_result(request_id, result): return jsonify({"jsonrpc": "2.0", "id": request_id, "result": result}) def jsonrpc_error(request_id, code, message, http_status=400): return ( jsonify({ "jsonrpc": "2.0", "id": request_id, "error": {"code": code, "message": message}, }), http_status, ) if __name__ == "__main__": app.run(port=5000, debug=True)
Step 5: The Conversion Logic
This part has nothing to do with A2A. It's just the service our agent happens to provide.
Create converter.py:
import re UNITS = { "temperature": { "fahrenheit": {"to_base": lambda f: (f - 32) * 5 / 9, "from_base": lambda c: c * 9 / 5 + 32}, "celsius": {"to_base": lambda c: c, "from_base": lambda c: c}, "kelvin": {"to_base": lambda k: k - 273.15, "from_base": lambda c: c + 273.15}, }, "distance": { "miles": {"to_base": lambda m: m * 1609.344, "from_base": lambda m: m / 1609.344}, "kilometers": {"to_base": lambda k: k * 1000, "from_base": lambda m: m / 1000}, "meters": {"to_base": lambda m: m, "from_base": lambda m: m}, "feet": {"to_base": lambda f: f * 0.3048, "from_base": lambda m: m / 0.3048}, }, "weight": { "pounds": {"to_base": lambda p: p * 453.592, "from_base": lambda g: g / 453.592}, "kilograms": {"to_base": lambda k: k * 1000, "from_base": lambda g: g / 1000}, "ounces": {"to_base": lambda o: o * 28.3495, "from_base": lambda g: g / 28.3495}, "grams": {"to_base": lambda g: g, "from_base": lambda g: g}, }, } ALIASES = { "f": "fahrenheit", "°f": "fahrenheit", "fahr": "fahrenheit", "c": "celsius", "°c": "celsius", "k": "kelvin", "mi": "miles", "mile": "miles", "km": "kilometers", "kilometer": "kilometers", "m": "meters", "meter": "meters", "ft": "feet", "foot": "feet", "lb": "pounds", "lbs": "pounds", "pound": "pounds", "kg": "kilograms", "kilogram": "kilograms", "oz": "ounces", "ounce": "ounces", "g": "grams", "gram": "grams", } def _resolve_unit(name): name = name.lower().strip().rstrip(".") if name in ALIASES: name = ALIASES[name] for category, units in UNITS.items(): if name in units: return category, name return None, None def _parse_input(text): text = text.strip() pattern = r"(?:convert\s+)?(-?\d+(?:\.\d+)?)\s*°?\s*(\w+)\s+(?:to|in)\s+°?\s*(\w+)" match = re.search(pattern, text, re.IGNORECASE) if not match: raise ValueError( "Could not parse your request. Try something like: " "'Convert 100 Fahrenheit to Celsius'" ) value = float(match.group(1)) from_unit = match.group(2) to_unit = match.group(3) return value, from_unit, to_unit def convert(text): value, from_name, to_name = _parse_input(text) from_category, from_unit = _resolve_unit(from_name) to_category, to_unit = _resolve_unit(to_name) if from_category is None: raise ValueError(f"Unknown unit: {from_name}") if to_category is None: raise ValueError(f"Unknown unit: {to_name}") if from_category != to_category: raise ValueError( f"Cannot convert between {from_name} ({from_category}) " f"and {to_name} ({to_category})" ) base_value = UNITS[from_category][from_unit]["to_base"](value) result = UNITS[from_category][to_unit]["from_base"](base_value) return f"{value} {from_unit} = {result:.4g} {to_unit}"
That's it. The protocol layer doesn't care whether this function is regex-based, LLM-based, or a call into another system.
Step 6: Test with curl
Start the server:
python server.py
Fetch the card:
curl http://localhost:5000/.well-known/agent-card.json | python -m json.tool
Now send a valid A2A 0.3.0 request:
curl -X POST http://localhost:5000 \ -H "Content-Type: application/json" \ -d '{ "jsonrpc": "2.0", "id": "req-1", "method": "message/send", "params": { "message": { "kind": "message", "messageId": "msg-1", "contextId": "ctx-1", "role": "user", "parts": [{"kind": "text", "text": "Convert 100 Fahrenheit to Celsius"}] }, "configuration": { "acceptedOutputModes": ["text/plain"] } } }' | python -m json.tool
You should get back a task with:
kind: "task"- a server-generated task
id - the request's
contextId status.state: "completed"- an artifact containing the conversion result
Save the task id from that response, then fetch it:
curl -X POST http://localhost:5000 \ -H "Content-Type: application/json" \ -d '{ "jsonrpc": "2.0", "id": "req-2", "method": "tasks/get", "params": { "id": "TASK_ID_HERE", "historyLength": 1 } }' | python -m json.tool
Try a failure case:
curl -X POST http://localhost:5000 \ -H "Content-Type: application/json" \ -d '{ "jsonrpc": "2.0", "id": "req-3", "method": "message/send", "params": { "message": { "kind": "message", "messageId": "msg-2", "contextId": "ctx-2", "role": "user", "parts": [{"kind": "text", "text": "Convert 100 miles to celsius"}] } } }' | python -m json.tool
That returns a task with status.state: "failed", because the request was valid A2A even though the conversion itself made no sense.
And if you try to cancel a task that's already finished:
curl -X POST http://localhost:5000 \ -H "Content-Type: application/json" \ -d '{ "jsonrpc": "2.0", "id": "req-4", "method": "tasks/cancel", "params": { "id": "TASK_ID_HERE" } }' | python -m json.tool
You'll get -32002, TaskNotCancelableError.
Step 7: The Client
Talking to the agent with curl proves the protocol works. Now let's build a client that follows the same discovery-first A2A flow another agent would use.
Create client.py:
import uuid import requests def discover(base_url): card_url = f"{base_url}/.well-known/agent-card.json" response = requests.get(card_url, timeout=10) response.raise_for_status() return response.json() def get_jsonrpc_endpoint(card): if card.get("preferredTransport") == "JSONRPC": return card["url"] for interface in card.get("additionalInterfaces", []): if interface.get("transport") == "JSONRPC": return interface["url"] raise ValueError("Agent card does not declare a JSON-RPC endpoint") def post_jsonrpc(agent_url, payload): response = requests.post(agent_url, json=payload, timeout=10) try: result = response.json() except ValueError: response.raise_for_status() raise if response.status_code >= 500: response.raise_for_status() return result def send_message(agent_url, text, context_id=None): payload = { "jsonrpc": "2.0", "id": str(uuid.uuid4()), "method": "message/send", "params": { "message": { "kind": "message", "messageId": str(uuid.uuid4()), "contextId": context_id or str(uuid.uuid4()), "role": "user", "parts": [{"kind": "text", "text": text}], }, "configuration": { "acceptedOutputModes": ["text/plain"], }, }, } return post_jsonrpc(agent_url, payload) def get_task(agent_url, task_id, history_length=None): params = {"id": task_id} if history_length is not None: params["historyLength"] = history_length payload = { "jsonrpc": "2.0", "id": str(uuid.uuid4()), "method": "tasks/get", "params": params, } return post_jsonrpc(agent_url, payload)
The important detail here is that the client does not assume the endpoint path. It reads the agent card and chooses the JSON-RPC transport from the card metadata.
Now add a response printer and a simple REPL:
def print_response(result): if "error" in result: print(f"Error {result['error']['code']}: {result['error']['message']}") return task = result.get("result", {}) state = task.get("status", {}).get("state") if state == "completed": for artifact in task.get("artifacts", []): for part in artifact.get("parts", []): if part.get("kind") == "text": print(part["text"]) elif state == "failed": status_message = task.get("status", {}).get("message", {}) for part in status_message.get("parts", []): if part.get("kind") == "text": print(f"Agent error: {part['text']}") return print("Agent error: task failed") else: print(f"Task state: {state}") def main(): base_url = "http://localhost:5000" print(f"Discovering agent at {base_url}...") card = discover(base_url) agent_url = get_jsonrpc_endpoint(card) print(f"Found: {card['name']} - {card['description']}") print(f"Transport: {card['preferredTransport']} -> {agent_url}") print(f"Skills: {', '.join(s['name'] for s in card['skills'])}") print() while True: text = input("> ").strip() if not text or text.lower() in ("quit", "exit"): break result = send_message(agent_url, text) print_response(result) task = result.get("result", {}) if task.get("id"): latest = get_task(agent_url, task["id"], history_length=1) print(f"Stored task state: {latest.get('result', {}).get('status', {}).get('state')}") print() if __name__ == "__main__": main()
This is still a tiny client, but it's much closer to real A2A behavior:
- it discovers the agent
- it chooses a transport from the card
- it sends a fully shaped
message/sendrequest - it can retrieve task state with
tasks/get
The A2A Insight
The interesting thing here is not the converter. It's the shape.
This client could just as easily live inside another agent. If you wrapped it in a server and gave it its own agent card, you'd have an agent that receives a request, discovers another agent, delegates work, and returns the result.
That's agent-to-agent communication in practice. No second protocol. No special "AI-to-AI" channel. Just discovery plus JSON-RPC.
What We Still Didn't Build
Even after tightening this tutorial up for A2A 0.3.0, we're still skipping a lot:
message/streamfor streaming responses- real long-running tasks
- push notification delivery
- authentication and
securitySchemes - persistent task storage
- richer content types beyond
text/plain
But the foundation is now more accurate:
- agent card
- transport metadata
message/sendtasks/gettasks/cancel- task ids, message ids, and context ids
That's enough to see what the protocol is doing under the hood.
If you want the finished code, it's on GitHub: waggle-examples/tutorials/a2a-raw-implementation.
