o
    h+                     @  s   d Z ddlmZ ddlZddlZddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlZddlmZ dd	lmZ dd
lmZ ddlmZmZmZ ddlmZ ddlmZmZmZ ddlm Z  e!e"Z#G dd dZ$dS )z/StreamableHTTP Session Manager for MCP servers.    )annotationsN)AsyncIterator)
HTTPStatus)Any)uuid4)
TaskStatus)Request)Response)ReceiveScopeSend)Server)MCP_SESSION_ID_HEADER
EventStoreStreamableHTTPServerTransport)TransportSecuritySettingsc                   @  sR   e Zd ZdZ				d d!ddZejd"ddZd#ddZd#ddZ	d#ddZ
dS )$StreamableHTTPSessionManagera  
    Manages StreamableHTTP sessions with optional resumability via event store.

    This class abstracts away the complexity of session management, event storage,
    and request handling for StreamableHTTP transports. It handles:

    1. Session tracking for clients
    2. Resumability via an optional event store
    3. Connection management and lifecycle
    4. Request handling and transport setup

    Important: Only one StreamableHTTPSessionManager instance should be created
    per application. The instance cannot be reused after its run() context has
    completed. If you need to restart the manager, create a new instance.

    Args:
        app: The MCP server instance
        event_store: Optional event store for resumability support.
                     If provided, enables resumable connections where clients
                     can reconnect and receive missed events.
                     If None, sessions are still tracked but not resumable.
        json_response: Whether to use JSON responses instead of SSE streams
        stateless: If True, creates a completely fresh transport for each request
                   with no session tracking or state persistence between requests.
    NFappMCPServer[Any, Any]event_storeEventStore | Nonejson_responsebool	statelesssecurity_settings TransportSecuritySettings | Nonec                 C  sH   || _ || _|| _|| _|| _t | _i | _d | _	t | _
d| _d S )NF)r   r   r   r   r   anyioLock_session_creation_lock_server_instances_task_group	_run_lock_has_started)selfr   r   r   r   r    r$   c/var/www/html/openai_agents/venv/lib/python3.10/site-packages/mcp/server/streamable_http_manager.py__init__8   s   


z%StreamableHTTPSessionManager.__init__returnAsyncIterator[None]c              
   C s   | j 4 I dH  | jrtdd| _W d  I dH  n1 I dH s#w   Y  t 4 I dH ?}|| _td zdV  W td |j	  d| _| j
  ntd |j	  d| _| j
  w W d  I dH  dS 1 I dH suw   Y  dS )aw  
        Run the session manager with proper lifecycle management.

        This creates and manages the task group for all session operations.

        Important: This method can only be called once per instance. The same
        StreamableHTTPSessionManager instance cannot be reused after this
        context manager exits. Create a new instance if you need to restart.

        Use this in the lifespan context manager of your Starlette app:

        @contextlib.asynccontextmanager
        async def lifespan(app: Starlette) -> AsyncIterator[None]:
            async with session_manager.run():
                yield
        NzyStreamableHTTPSessionManager .run() can only be called once per instance. Create a new instance if you need to run again.Tz&StreamableHTTP session manager startedz,StreamableHTTP session manager shutting down)r!   r"   RuntimeErrorr   create_task_groupr    loggerinfocancel_scopecancelr   clear)r#   tgr$   r$   r%   runP   s,   (




.z StreamableHTTPSessionManager.runscoper   receiver
   sendr   Nonec                   sJ   | j du r
td| jr| |||I dH  dS | |||I dH  dS )a  
        Process ASGI request with proper session handling and transport setup.

        Dispatches to the appropriate handler based on stateless mode.

        Args:
            scope: ASGI scope
            receive: ASGI receive function
            send: ASGI send function
        Nz6Task group is not initialized. Make sure to use run().)r    r)   r   _handle_stateless_request_handle_stateful_request)r#   r2   r3   r4   r$   r$   r%   handle_requesty   s   
z+StreamableHTTPSessionManager.handle_requestc                   s~   t d tdjdjd tjdd	 fdd}jdus#J j|I dH   	|||I dH   
 I dH  dS )
z
        Process request in stateless mode - creating a new transport for each request.

        Args:
            scope: ASGI scope
            receive: ASGI receive function
            send: ASGI send function
        z7Stateless mode: Creating new transport for this requestNmcp_session_idis_json_response_enabledr   r   task_statusr=   TaskStatus[None]c              	     s      4 I d H @}|\}}|   zjj||j ddI d H  W n ty2   td Y nw W d   I d H  d S W d   I d H  d S 1 I d H sOw   Y  d S )NTr   zStateless session crashed)connectstartedr   r1   create_initialization_options	Exceptionr+   	exception)r=   streamsread_streamwrite_streamhttp_transportr#   r$   r%   run_stateless_server   s$   .zTStreamableHTTPSessionManager._handle_stateless_request.<locals>.run_stateless_server)r=   r>   )r+   debugr   r   r   r   TASK_STATUS_IGNOREDr    startr8   	terminate)r#   r2   r3   r4   rJ   r$   rH   r%   r6      s   
z6StreamableHTTPSessionManager._handle_stateless_requestc           
   	     s`  t ||}|jt}|dur+|jv r+j| }td ||||I dH  dS |du rtd j4 I dH V t	 j
}t|jjjd  jdusRJ  j j< td|  tjdd fdd}jdussJ j|I dH   |||I dH  W d  I dH  dS 1 I dH sw   Y  dS tdtjd}	|	|||I dH  dS )z
        Process request in stateful mode - maintaining session state between requests.

        Args:
            scope: ASGI scope
            receive: ASGI receive function
            send: ASGI send function
        Nz1Session already exists, handling request directlyzCreating new transportr9   z'Created new transport with session ID: r<   r=   r>   r'   r5   c                   s.     4 I d H }|\}}|   zPzjj||j ddI d H  W n  tyE } ztjd j d| dd W Y d }~nd }~ww W  jrb jj	v rb j
sbtd j d j	 j= n jr~ jj	v r~ j
s~td j d j	 j= w W d   I d H  d S 1 I d H sw   Y  d S )	NFr?   zSession z
 crashed: T)exc_infozCleaning up crashed session z from active instances.)r@   rA   r   r1   rB   rC   r+   errorr:   r   is_terminatedr,   )r=   rE   rF   rG   erH   r$   r%   
run_server   sV   
.zIStreamableHTTPSessionManager._handle_stateful_request.<locals>.run_serverz)Bad Request: No valid session ID provided)status_code)r=   r>   r'   r5   )r   headersgetr   r   r+   rK   r8   r   r   hexr   r   r   r   r:   r,   r   rL   r    rM   r	   r   BAD_REQUEST)
r#   r2   r3   r4   requestrequest_mcp_session_id	transportnew_session_idrS   responser$   rH   r%   r7      s>   



.5z5StreamableHTTPSessionManager._handle_stateful_request)NFFN)
r   r   r   r   r   r   r   r   r   r   )r'   r(   )r2   r   r3   r
   r4   r   r'   r5   )__name__
__module____qualname____doc__r&   
contextlibasynccontextmanagerr1   r8   r6   r7   r$   r$   r$   r%   r      s    
(
1r   )%ra   
__future__r   rb   loggingcollections.abcr   httpr   typingr   uuidr   r   	anyio.abcr   starlette.requestsr   starlette.responsesr	   starlette.typesr
   r   r   mcp.server.lowlevel.serverr   	MCPServermcp.server.streamable_httpr   r   r   mcp.server.transport_securityr   	getLoggerr^   r+   r   r$   r$   r$   r%   <module>   s$    
