""" Example custom ASGI application for PyServe ASGI mounting. This demonstrates how to create a raw ASGI application without any framework - similar to Python's http.server but async. """ from typing import Dict, Any, List, Callable, Awaitable, Optional import json Scope = Dict[str, Any] Receive = Callable[[], Awaitable[Dict[str, Any]]] Send = Callable[[Dict[str, Any]], Awaitable[None]] class SimpleASGIApp: def __init__(self): self.routes: Dict[str, Callable] = {} self._setup_routes() def _setup_routes(self) -> None: self.routes = { "/": self._handle_root, "/health": self._handle_health, "/echo": self._handle_echo, "/info": self._handle_info, "/headers": self._handle_headers, } async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: if scope["type"] != "http": return path = scope.get("path", "/") method = scope.get("method", "GET") handler = self.routes.get(path) if handler is None: if path.startswith("/echo/"): handler = self._handle_echo_path else: await self._send_response( send, status=404, body={"error": "Not found", "path": path} ) return await handler(scope, receive, send) async def _handle_root(self, scope: Scope, receive: Receive, send: Send) -> None: await self._send_response( send, body={ "message": "Welcome to Custom ASGI App mounted in PyServe!", "description": "This is a raw ASGI application without any framework", "endpoints": list(self.routes.keys()) + ["/echo/{message}"], } ) async def _handle_health(self, scope: Scope, receive: Receive, send: Send) -> None: await self._send_response( send, body={"status": "healthy", "app": "custom-asgi"} ) async def _handle_echo(self, scope: Scope, receive: Receive, send: Send) -> None: method = scope.get("method", "GET") if method == "POST": body = await self._read_body(receive) await self._send_response( send, body={"echo": body.decode("utf-8") if body else ""} ) else: await self._send_response( send, body={"message": "Send a POST request to echo data"} ) async def _handle_echo_path(self, scope: Scope, receive: Receive, send: Send) -> None: path = scope.get("path", "") message = path.replace("/echo/", "", 1) await self._send_response( send, body={"echo": message} ) async def _handle_info(self, scope: Scope, receive: Receive, send: Send) -> None: await self._send_response( send, body={ "method": scope.get("method"), "path": scope.get("path"), "query_string": scope.get("query_string", b"").decode("utf-8"), "root_path": scope.get("root_path", ""), "scheme": scope.get("scheme", "http"), "server": list(scope.get("server", ())), "asgi": scope.get("asgi", {}), } ) async def _handle_headers(self, scope: Scope, receive: Receive, send: Send) -> None: headers = {} for name, value in scope.get("headers", []): headers[name.decode("utf-8")] = value.decode("utf-8") await self._send_response( send, body={"headers": headers} ) async def _read_body(self, receive: Receive) -> bytes: body = b"" more_body = True while more_body: message = await receive() body += message.get("body", b"") more_body = message.get("more_body", False) return body async def _send_response( self, send: Send, status: int = 200, body: Any = None, content_type: str = "application/json", headers: Optional[List[tuple]] = None, ) -> None: response_headers = [ (b"content-type", content_type.encode("utf-8")), ] if headers: response_headers.extend(headers) if body is not None: if content_type == "application/json": body_bytes = json.dumps(body, ensure_ascii=False).encode("utf-8") elif isinstance(body, bytes): body_bytes = body else: body_bytes = str(body).encode("utf-8") else: body_bytes = b"" response_headers.append( (b"content-length", str(len(body_bytes)).encode("utf-8")) ) await send({ "type": "http.response.start", "status": status, "headers": response_headers, }) await send({ "type": "http.response.body", "body": body_bytes, }) app = SimpleASGIApp() async def simple_asgi_app(scope: Scope, receive: Receive, send: Send) -> None: if scope["type"] != "http": return response_body = json.dumps({ "message": "Hello from minimal ASGI app!", "path": scope.get("path", "/"), }).encode("utf-8") await send({ "type": "http.response.start", "status": 200, "headers": [ (b"content-type", b"application/json"), (b"content-length", str(len(response_body)).encode("utf-8")), ], }) await send({ "type": "http.response.body", "body": response_body, }) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8004)