
    )Jf-P                         d Z ddlZddlZddlZddlZddlZddlZddlm	Z	m
Z
 ddlmZ  ej        e          Z G d dej                  Z G d de
j                  ZdS )	zBase class extended by connection adapters. This extends the
connection.Connection class to encapsulate connection behavior but still
isolate socket and low level communication.

    N)connection_workflownbio_interface)
connectionc                        e Zd ZdZ fdZ fdZd Zeej	        	 	 dd                        Z
ed             Zed             Zd	 Zd
 Zd Zd Zed             Zd Zd Zd Zd Zd Zd Zd Zd Zd Z xZS )BaseConnectionzBaseConnection class that should be extended by connection adapters.

    This class abstracts I/O loop and transport services from pika core.

    c                     |r,t          |t          j                  st          d|          || _        d| _        d| _        d| _        t          t          |           
                    |||||           dS )a  Create a new instance of the Connection object.

        :param None|pika.connection.Parameters parameters: Connection parameters
        :param None|method on_open_callback: Method to call on connection open
        :param None | method on_open_error_callback: Called if the connection
            can't be established or connection establishment is interrupted by
            `Connection.close()`: on_open_error_callback(Connection, exception).
        :param None | method on_close_callback: Called when a previously fully
            open connection is closed:
            `on_close_callback(Connection, exception)`, where `exception` is
            either an instance of `exceptions.ConnectionClosed` if closed by
            user or broker or exception of another type that describes the cause
            of connection failure.
        :param pika.adapters.utils.nbio_interface.AbstractIOServices nbio:
            asynchronous services
        :param bool internal_connection_workflow: True for autonomous connection
            establishment which is default; False for externally-managed
            connection workflow via the `create_connection()` factory.
        :raises: RuntimeError
        :raises: ValueError

        z%Expected instance of Parameters, not NF)internal_connection_workflow)
isinstancer   
Parameters
ValueError_nbio_connection_workflow
_transport_got_eofsuperr   __init__)self
parameterson_open_callbackon_open_error_callbackon_close_callbacknbior	   	__class__s          `/home/alex/cs2snipeproduction/venv/lib/python3.11/site-packages/pika/adapters/base_connection.pyr   zBaseConnection.__init__   s    0  	KjZ5JKK 	K*=GZIK K K 
$(!nd##,,")E 	- 	G 	G 	G 	G 	G    c                     t          t          |                                            d| _        d| _        d| _        dS )zInitialize or reset all of our internal state variables for a given
        connection. If we disconnect and reconnect, all of our state needs to
        be wiped.

        NF)r   r   _init_connection_stater   r   r   )r   r   s    r   r   z%BaseConnection._init_connection_stateE   s;     	nd##::<<<$(!r   c           	      h    d| j         j        d| j        | j                 d| j        d| j        d	S )N< z transport=z params=>)r   __name___STATE_NAMESconnection_stater   paramsr   s    r   __repr__zBaseConnection.__repr__Q   sB     0 N###T%6t7L%M%M%MOOOT[[[* 	+r   Nc                     t           )a  Asynchronously create a connection to an AMQP broker using the given
        configurations. Will attempt to connect using each config in the given
        order, including all compatible resolved IP addresses of the hostname
        supplied in each config, until one is established or all attempts fail.

        See also `_start_connection_workflow()`.

        :param sequence connection_configs: A sequence of one or more
            `pika.connection.Parameters`-based objects.
        :param callable on_done: as defined in
            `connection_workflow.AbstractAMQPConnectionWorkflow.start()`.
        :param object | None custom_ioloop: Provide a custom I/O loop that is
            native to the specific adapter implementation; if None, the adapter
            will use a default loop instance, which is typically a singleton.
        :param connection_workflow.AbstractAMQPConnectionWorkflow | None workflow:
            Pass an instance of an implementation of the
            `connection_workflow.AbstractAMQPConnectionWorkflow` interface;
            defaults to a `connection_workflow.AMQPConnectionWorkflow` instance
            with default values for optional args.
        :returns: Connection workflow instance in use. The user should limit
            their interaction with this object only to it's `abort()` method.
        :rtype: connection_workflow.AbstractAMQPConnectionWorkflow

        )NotImplementedError)clsconnection_configson_donecustom_ioloopworkflows        r   create_connectionz BaseConnection.create_connectionl   s
    > "!r   c           	      Z   |.t          j                    }t                              d|           t	          |t           j                  r|                               fd}|                    ||                                t          j	        | j
        |                     |S )a-  Helper function for custom implementations of `create_connection()`.

        :param sequence connection_configs: A sequence of one or more
            `pika.connection.Parameters`-based objects.
        :param callable connection_factory: A function that takes
            `pika.connection.Parameters` as its only arg and returns a brand new
            `pika.connection.Connection`-based adapter instance each time it is
            called. The factory must instantiate the connection with
            `internal_connection_workflow=False`.
        :param pika.adapters.utils.nbio_interface.AbstractIOServices nbio:
        :param connection_workflow.AbstractAMQPConnectionWorkflow | None workflow:
            Pass an instance of an implementation of the
            `connection_workflow.AbstractAMQPConnectionWorkflow` interface;
            defaults to a `connection_workflow.AMQPConnectionWorkflow` instance
            with default values for optional args.
        :param callable on_done: as defined in
            :py:meth:`connection_workflow.AbstractAMQPConnectionWorkflow.start()`.
        :returns: Connection workflow instance in use. The user should limit
            their interaction with this object only to it's `abort()` method.
        :rtype: connection_workflow.AbstractAMQPConnectionWorkflow

        Nz&Created default connection workflow %rc                  4    t          j         fd          S )z`AMQPConnector` factory.c                 4    t           |                     S N_StreamingProtocolShim)r%   connection_factorys    r   <lambda>zUBaseConnection._start_connection_workflow.<locals>.create_connector.<locals>.<lambda>   s     5&&v.. 0  0 r   )r   AMQPConnector)r6   r   s   r   create_connectorzCBaseConnection._start_connection_workflow.<locals>.create_connector   s/    &40 0 0 0  r   )r+   connector_factorynative_loopr,   )r   AMQPConnectionWorkflowLOGGERdebugr
   set_io_servicesstartget_native_ioloop	functoolspartial$_unshim_connection_workflow_callback)r*   r+   r6   r   r.   r,   r9   s     ``   r   _start_connection_workflowz)BaseConnection._start_connection_workflow   s    2 *ACCHLLA8LLLh 3 JKK 	+$$T***	 	 	 	 	 	 	1...00%c&N&-/ /	 	 	0 	0 	0 r   c                 4    | j                                         S )a3  
        :returns: the native I/O loop instance underlying async services selected
            by user or the default selected by the specialized connection
            adapter (e.g., Twisted reactor, `asyncio.SelectorEventLoop`,
            `select_connection.IOLoop`, etc.)
        :rtype: object
        )r   rA   r&   s    r   ioloopzBaseConnection.ioloop   s     z++---r   c                 8    | j                             ||          S )zXImplement
        :py:meth:`pika.connection.Connection._adapter_call_later()`.

        )r   
call_later)r   delaycallbacks      r   _adapter_call_laterz"BaseConnection._adapter_call_later   s    
 z$$UH555r   c                 .    |                                  dS )z\Implement
        :py:meth:`pika.connection.Connection._adapter_remove_timeout()`.

        N)cancel)r   
timeout_ids     r   _adapter_remove_timeoutz&BaseConnection._adapter_remove_timeout   s    
 	r   c                 |    t          |          st          d|          | j                            |           dS )zeImplement
        :py:meth:`pika.connection.Connection._adapter_add_callback_threadsafe()`.

        z%callback must be a callable, but got N)callable	TypeErrorr   add_callback_threadsafe)r   rK   s     r    _adapter_add_callback_threadsafez/BaseConnection._adapter_add_callback_threadsafe   sR    
 !! 	I)=EXGI I I 	
**844444r   c           	      4    t          j        d           _         j                             j                    fd} j                             j        g| j                                        t          j	         j
         j                             dS )a8  Initiate full-stack connection establishment asynchronously for
        internally-initiated connection bring-up.

        Upon failed completion, we will invoke
        `Connection._on_stream_terminated()`. NOTE: On success,
        the stack will be up already, so there is no corresponding callback.

        T)_until_first_amqp_attemptc                  >    t          j         fd j                  S )z`AMQPConnector` factoryc                 "    t                    S r3   r4   )_paramsr   s    r   r7   zRBaseConnection._adapter_connect_stream.<locals>.create_connector.<locals>.<lambda>   s     6t < < r   )r   r8   r   r&   s   r   r9   z@BaseConnection._adapter_connect_stream.<locals>.create_connector   s+    &4<<<<djJ J Jr   )r:   r;   r,   N)r   r<   r   r?   r   r@   r%   rA   rB   rC   rD   _on_connection_workflow_done)r   r9   s   ` r   _adapter_connect_streamz&BaseConnection._adapter_connect_stream   s     %8$N&*%, %, %,! 	!11$*===	J 	J 	J 	J 	J
 	!''[M.
4466%d&O&*&GI I	 	( 	J 	J 	J 	J 	Jr   c                 X    |}t          |t                    r|j        } | |           dS )z

        :param callable user_on_done: user's `on_done` callback as defined in
            :py:meth:`connection_workflow.AbstractAMQPConnectionWorkflow.start()`.
        :param _StreamingProtocolShim | Exception shim_or_exc:
        N)r
   r5   conn)user_on_doneshim_or_excresults      r   rD   z3BaseConnection._unshim_connection_workflow_callback   s9     f455 	![FVr   c                     | j         r
J d            | j        ,| j        s
J d            | j                                         dS | j                                         dS )zAsynchronously abort connection workflow. Upon
        completion, `Connection._on_stream_terminated()` will be called with None
        as the error argument.

        Assumption: may be called only while connection is opening.

        zK_abort_connection_workflow() may be called only when connection is opening.NzdUnexpected _abort_connection_workflow() call with no transport in external connection workflow mode.)_openedr   _internal_connection_workflowr   abortr&   s    r   _abort_connection_workflowz)BaseConnection._abort_connection_workflow	  s     < 	& 	&%	& 	& 	& ?"
 5 F FEF F F %++----- O!!#####r   c                 V   t                               d|           d| _        t          |t                    rd| _        t          |t          j                  rt                               d|           d}nyt           	                    d|           t          |t          j
                  rDt          |j        d         t          j                  rt          j                            |          }|                     |           dS || u sJ d                    | |                      dS )z`AMQPConnectionWorkflow` completion callback.

        :param BaseConnection | Exception conn_or_exc: Our own connection
            instance on success; exception on failure. See
            `AbstractAMQPConnectionWorkflow.start()` for details.

        z,Full-stack connection workflow completed: %rNz*Full-stack connection workflow aborted: %rz)Full-stack connection workflow failed: %rz4Expected self conn={!r} from workflow, but got {!r}.)r=   r>   r   r
   	Exceptionr   r   AMQPConnectionWorkflowAbortedinfoerrorAMQPConnectionWorkflowFailed
exceptionsAMQPConnectorSocketConnectErrorpikaAMQPConnectionError#_handle_connection_workflow_failureformat)r   conn_or_excs     r   r[   z+BaseConnection._on_connection_workflow_done/  sR    	C 	" 	" 	" %)! k9-- 	'"DO+-KM M %H') ) ) #H(* * *{2OQ Q %&'2268K;= =%
 #'/"E"E##% #%K 44[AAAAA $&&&FMM+' ' '&&&&r   c                     |t                               d           nt                               d|           | j        s|                     |           dS t                               d           dS )a  Handle failure of self-initiated stack bring-up and call
        `Connection._on_stream_terminated()` if connection is not in closed state
        yet. Called by adapter layer when the full-stack connection workflow
        fails.

        :param Exception | None error: exception instance describing the reason
            for failure or None if the connection workflow was aborted.
        Nz&Self-initiated stack bring-up aborted.z(Self-initiated stack bring-up failed: %rzO_handle_connection_workflow_failure(): suppressing - connection already closed.)r=   rk   rl   	is_closed_on_stream_terminatedr>   r   rl   s     r   rr   z2BaseConnection._handle_connection_workflow_failureY  s     =KK@AAAALLCUKKK~ 	E&&u----- LL D E E E E Er   c                 r    | j         s|                                  dS | j                                         dS )zAsynchronously bring down the streaming transport layer and invoke
        `Connection._on_stream_terminated()` asynchronously when complete.

        N)rc   rf   r   re   r&   s    r   _adapter_disconnect_streamz)BaseConnection._adapter_disconnect_streamo  sB    
 | 	$++----- O!!#####r   c                 :    | j                             |           dS )ztTake ownership of data and send it to AMQP server as soon as
        possible.

        :param bytes data:

        N)r   writer   datas     r   _adapter_emit_dataz!BaseConnection._adapter_emit_data{  s      	d#####r   c                 <    || _         |                                  dS )a  Introduces transport to protocol after transport is connected.

        :py:class:`.utils.nbio_interface.AbstractStreamProtocol` implementation.

        :param nbio_interface.AbstractStreamTransport transport:
        :raises Exception: Exception-based exception on error

        N)r   _on_stream_connected)r   	transports     r   _proto_connection_madez%BaseConnection._proto_connection_made  s%     $ 	!!#####r   c                 \   d| _         |'| j        rt          j                            d          }n2t          j                            d                    |                    }t                              |t          j	        nt          j
        d|           |                     |           dS )a  Called upon loss or closing of TCP connection.

        :py:class:`.utils.nbio_interface.AbstractStreamProtocol` implementation.

        NOTE: `connection_made()` and `connection_lost()` are each called just
        once and in that order. All other callbacks are called between them.

        :param BaseException | None error: An exception (check for
            `BaseException`) indicates connection failure. None indicates that
            connection was closed on this side, such as when it's aborted or
            when `AbstractStreamProtocol.eof_received()` returns a falsy result.
        :raises Exception: Exception-based exception on error

        NzTransport indicated EOFzStream connection lost: {!r}zconnection_lost: %r)r   r   rp   rn   StreamLostErrorrs   r=   logloggingDEBUGERRORrw   rx   s     r   _proto_connection_lostz%BaseConnection._proto_connection_lost  s     =} /77-/ / O33.55e<<> >E 	

EM7==w}(%	1 	1 	1 	""5)))))r   c                 H    t                               d           d| _        dS )a%  Called after the remote peer shuts its write end of the connection.
        :py:class:`.utils.nbio_interface.AbstractStreamProtocol` implementation.

        :returns: A falsy value (including None) will cause the transport to
            close itself, resulting in an eventual `connection_lost()` call
            from the transport. If a truthy value is returned, it will be the
            protocol's responsibility to close/abort the transport.
        :rtype: falsy|truthy
        :raises Exception: Exception-based exception on error

        zTransport indicated EOF.TF)r=   rl   r   r&   s    r   _proto_eof_receivedz"BaseConnection._proto_eof_received  s%     	/000 ur   c                 0    |                      |           dS )a  Called to deliver incoming data from the server to the protocol.

        :py:class:`.utils.nbio_interface.AbstractStreamProtocol` implementation.

        :param data: Non-empty data bytes.
        :raises Exception: Exception-based exception on error

        N)_on_data_availabler}   s     r   _proto_data_receivedz#BaseConnection._proto_data_received  s     	%%%%%r   )NN)r"   
__module____qualname____doc__r   r   r'   classmethodabcabstractmethodr/   rE   propertyrG   rL   rP   rU   r\   staticmethodrD   rf   r[   rr   rz   r   r   r   r   r   __classcell__)r   s   @r   r   r      s        (G (G (G (G (GT
 
 
 
 
+ + +6  )-#'	" " "  ["> - - [-^ . . X.6 6 6  	5 	5 	5J J J4   \$$ $$ $$L(' (' ('TE E E,
$ 
$ 
$$ $ $$ $ $* * *>  .	& 	& 	& 	& 	& 	& 	&r   r   c                   4    e Zd ZdZdZdZdZdZd Zd Z	d Z
dS )r5   zShim for callbacks from transport so that we BaseConnection can
    delegate to private methods, thus avoiding contamination of API with
    methods that look public, but aren't.

    Nc                 t    || _         |j        | _        |j        | _        |j        | _        |j        | _        dS )z-
        :param BaseConnection conn:
        N)	r^   r   connection_mader   connection_lostr   eof_receivedr   data_received)r   r^   s     r   r   z_StreamingProtocolShim.__init__  s=     	#:#: 4!6r   c                 ,    t          | j        |          S )zProxy inexistent attribute requests to our connection instance
        so that AMQPConnectionWorkflow/AMQPConnector may treat the shim as an
        actual connection.

        )getattrr^   )r   attrs     r   __getattr__z"_StreamingProtocolShim.__getattr__  s     ty$'''r   c                 L    d                     | j        j        | j                  S )Nz{}: {!r})rs   r   r"   r^   r&   s    r   r'   z_StreamingProtocolShim.__repr__  s      !8$)DDDr   )r"   r   r   r   r   r   r   r   r   r   r'    r   r   r5   r5     sf          OOLM	7 	7 	7( ( (E E E E Er   r5   )r   r   rB   r   pika.compatrp   pika.exceptionspika.tcp_socket_optspika.adapters.utilsr   r   r   	getLoggerr"   r=   
Connectionr   AbstractStreamProtocolr5   r   r   r   <module>r      s   
 


                  C C C C C C C C      		8	$	$}& }& }& }& }&Z* }& }& }&@!E !E !E !E !E^B !E !E !E !E !Er   