
    )Jf5                        d Z ddlZddlZddlZddlZddlZddlZddlmZ  ej	        e
          Z G d de          Z G d de          Z G d d	e          Z G d
 de          Z G d de          Z G d de          Z G d de          Z G d de          Z G d de          Z G d de          Z G d de          Z G d de          Z G d dej        j                  Z G d de          ZdS ) a,  Implements `AMQPConnectionWorkflow` - the default workflow of performing
multiple TCP/[SSL]/AMQP connection attempts with timeouts and retries until one
succeeds or all attempts fail.

Defines the interface `AbstractAMQPConnectionWorkflow` that facilitates
implementing custom connection workflows.

    N)__version__c                       e Zd ZdZdS )AMQPConnectorExceptionzBase exception for this moduleN__name__
__module____qualname____doc__     j/home/alex/cs2snipeproduction/venv/lib/python3.11/site-packages/pika/adapters/utils/connection_workflow.pyr   r      s        ((((r   r   c                       e Zd ZdZdS )AMQPConnectorStackTimeoutz:Overall TCP/[SSL]/AMQP stack connection attempt timed out.Nr   r   r   r   r   r      s        DDDDr   r   c                       e Zd ZdZdS )AMQPConnectorAbortedz Asynchronous request was abortedNr   r   r   r   r   r      s        ****r   r   c                       e Zd ZdZdS )AMQPConnectorWrongStatezjAMQPConnector operation requested in wrong state, such as aborting after
    completion was reported.
    Nr   r   r   r   r   r   "              r   r   c                   (     e Zd ZdZ fdZd Z xZS )AMQPConnectorPhaseErrorBasezMWrapper for exception that occurred during a particular bring-up phase.

    c                 P     t          t          |           j        |  || _        dS )z

        :param BaseException exception: error that occurred while waiting for a
            subclass-specific protocol bring-up phase to complete.
        :param args: args for parent class
        N)superr   __init__	exception)selfr   args	__class__s      r   r   z$AMQPConnectorPhaseErrorBase.__init__-   s+     	:)40094@@"r   c                 L    d                     | j        j        | j                  S )Nz{}: {!r})formatr   r   r   r   s    r   __repr__z$AMQPConnectorPhaseErrorBase.__repr__7   s      !8$.IIIr   r   r   r	   r
   r   r!   __classcell__r   s   @r   r   r   (   sX         # # # # #J J J J J J Jr   r   c                       e Zd ZdZdS )AMQPConnectorSocketConnectErrorz*Error connecting TCP socket to remote peerNr   r   r   r   r&   r&   ;   s        4444r   r&   c                       e Zd ZdZdS ) AMQPConnectorTransportSetupErrorzOError setting up transport after TCP connected but before AMQP handshake.

    Nr   r   r   r   r(   r(   ?   r   r   r(   c                       e Zd ZdZdS )AMQPConnectorAMQPHandshakeErrorzError during AMQP handshakeNr   r   r   r   r*   r*   E   s        %%%%r   r*   c                       e Zd ZdZdS )AMQPConnectionWorkflowAbortedz%AMQP Connection workflow was aborted.Nr   r   r   r   r,   r,   I   s        ////r   r,   c                       e Zd ZdZdS ) AMQPConnectionWorkflowWrongStatezuAMQP Connection Workflow operation requested in wrong state, such as
    aborting after completion was reported.
    Nr   r   r   r   r.   r.   M   r   r   r.   c                   (     e Zd ZdZ fdZd Z xZS )AMQPConnectionWorkflowFailedz5Indicates that AMQP connection workflow failed.

    c                 j     t          t          |           j        |  t          |          | _        dS )z
        :param sequence exceptions: Exceptions that occurred during the
            workflow.
        :param args: args to pass to base class

        N)r   r0   r   tuple
exceptions)r   r3   r   r   s      r   r   z%AMQPConnectionWorkflowFailed.__init__X   s3     	;*D11:DAA
++r   c                     d                     | j        j        t          | j                  | j        d         t          | j                  dk    r| j        d         nd           S )NzG{}: {} exceptions in all; last exception - {!r}; first exception - {!r}   r   )r   r   r   lenr3   r    s    r   r!   z%AMQPConnectionWorkflowFailed.__repr__b   s_    #$*FN+S-A-AOB'*-do*>*>*B*BDOA&&%N %N	Nr   r"   r$   s   @r   r0   r0   S   sX         , , , , ,N N N N N N Nr   r0   c                   r    e Zd ZdZdZdZdZdZdZdZ	dZ
d	 Zd
 Zd Zd Zd Zd Zd Zd Zd Zd ZddZdS )AMQPConnectorz;Performs a single TCP/[SSL]/AMQP connection workflow.

    r   r6                  c                     || _         || _        d| _        d| _        d| _        d| _        d| _        d| _        d| _        d| _	        | j
        | _        dS )a  

        :param callable conn_factory: A function that takes
            `pika.connection.Parameters` as its only arg and returns a brand new
            `pika.connection.Connection`-based adapter instance each time it is
            called. The factory must instantiate the connection with
            `internal_connection_workflow=False`.
        :param pika.adapters.utils.nbio_interface.AbstractIOServices nbio:

        N)_conn_factory_nbio_addr_record_conn_params_on_done_tcp_timeout_ref_stack_timeout_ref	_task_ref_sock
_amqp_conn_STATE_INIT_state)r   conn_factorynbios      r   r   zAMQPConnector.__init__w   s_     *
   !%"&
&r   c                    | j         | j        k    r't          d                    | j                             || _        || _        || _        | j        | _         t          j        | j        dd          | _	        | j	        
                    t          j        j        t          j        d           t          j                            | j        j        | j	                   | j	                            d           | j        d         }t&                              dt*          |           | j                            | j	        || j                  | _        d| _        | j        j        /| j                            | j        j        | j                  | _        d| _        | j        j        1| j                            | j        j        | j                   | _        dS dS )	a  Asynchronously perform a single TCP/[SSL]/AMQP connection attempt.

        :param tuple addr_record: a single resolved address record compatible
            with `socket.getaddrinfo()` format.
        :param pika.connection.Parameters conn_params:
        :param callable on_done: Function to call upon completion of the
            workflow: `on_done(pika.connection.Connection | BaseException)`. If
            exception, it's going to be one of the following:
                `AMQPConnectorSocketConnectError`
                `AMQPConnectorTransportSetupError`
                `AMQPConnectorAMQPHandshakeError`
                `AMQPConnectorAborted`

        )Already in progress or finished; state={}Nr;   r6   Fr<   z Pika version %s connecting to %r)on_done)!rK   rJ   r   r   rB   rC   rD   
_STATE_TCPsocketrH   
setsockoptpikacompatSOL_TCPTCP_NODELAYtcp_socket_optsset_sock_optstcp_optionssetblocking_LOGinfor   rA   connect_socket_on_tcp_connection_donerG   rE   socket_timeout
call_later_on_tcp_connection_timeoutrF   stack_timeout_on_overall_timeout)r   addr_recordconn_paramsrP   addrs        r   startzAMQPConnector.start   s    ;$***);BB4;OOQ Q Q (' o]D$5bqb$9:

dk163EqIII**4+<+H+/:	7 	7 	7
u%%% #		4k4HHH22Jd&B 3 D D !%+7$(J$9$9!0/%1 %1D!
 #'*6&*j&;&;!/1I'K 'KD### 76r   c                    | j         | j        k    rt          d          | j         | j        k    rt          d          | j        | _         |                                  t                              d| j        j	        | j
                   | j        Zt                              d           | j                            t          j        | j        t%                                           dS | j        j        s7t                              d           | j                            dd           dS t                              d	           | j         | j        k    s"J d
                    | j                               dS )a  Abort the workflow asynchronously. The completion callback will be
        called with an instance of AMQPConnectorAborted.

        NOTE: we can't cancel/close synchronously because aborting pika
        Connection and its transport requires an asynchronous operation.

        :raises AMQPConnectorWrongState: If called after completion has been
            reported or the workflow not started yet.
        Cannot abort before starting.*Cannot abort after completion was reportedzCAMQPConnector: beginning client-initiated asynchronous abort; %r/%sNzXAMQPConnector.abort(): no connection, so just scheduling completion report via I/O loop.z*AMQPConnector.abort(): closing Connection.@  z3Client-initiated abort of AMQP Connection Workflow.zCAMQPConnector.abort(): closing of Connection was already initiated.z9Connection is closing, but not in TIMEOUT state; state={})rK   rJ   r   _STATE_DONE_STATE_ABORTING_deactivater\   r]   rC   hostrB   rI   debugrA   add_callback_threadsafe	functoolspartial_report_completion_and_cleanupr   
is_closingclose_STATE_TIMEOUTr   r    s    r   abortzAMQPConnector.abort   s    ;$***)*IJJJ;$***)*VWWW*		 -2D4E	G 	G 	G ?"JJ D E E EJ..!$"E"6"8"8: :; ; ; ; ; ?- + 

GHHH%%NP P P P P
 

 0 1 1 1{d&9999PfT[)) :9999r   c                     |                                   | j         | j                                         d| _        d| _        d| _        d| _        d| _        | j        | _        dS )zqCancel asynchronous tasks and clean up to assist garbage collection.

        Transition to STATE_DONE.

        N)	ro   rH   rw   r@   rA   rB   rD   rm   rK   r    s    r   _closezAMQPConnector._close   se     	:!JDJ!
 &r   c                 F   | j         "J d                    | j                              | j         | j                                         d| _        | j         | j                                         d| _        | j        "| j                                         d| _        dS dS )$Cancel asynchronous tasks.

        Nz:_deactivate called with self._amqp_conn not None; state={})rI   r   rK   rE   cancelrF   rG   r    s    r   ro   zAMQPConnector._deactivate  s     &&HOO  '&&  ,!((***$(D!".#**,,,&*D#>%N!!###!DNNN &%r   c                     t          |t                    rt                              d|           nt                              d|           | j        }|                                   ||           dS )zClean up and invoke client's `on_done` callback.

        :param pika.connection.Connection | BaseException result: value to pass
            to user's `on_done` callback.
        z%AMQPConnector - reporting failure: %rz%AMQPConnector - reporting success: %rN
isinstanceBaseExceptionr\   errorr]   rD   r{   r   resultrP   s      r   ru   z,AMQPConnector._report_completion_and_cleanup  si     fm,, 	GJJ>GGGGII=vFFF-r   c                     d| _         t          t          j        d                    | j        j        | j                                      }|                     |           dS )ztHandle TCP connection timeout

        Reports AMQPConnectorSocketConnectError with socket.timeout inside.

        Nz)TCP connection attempt timed out: {!r}/{})	rE   r&   rR   timeoutr   rC   rp   rB   ru   )r   r   s     r   rb   z(AMQPConnector._on_tcp_connection_timeout,  sg     !%/NFMM!&(9; ; < <= = 	++E22222r   c                 4   d| _         | j        }| j        | _        || j        k    rd                    | j        j        | j        t          | j        j	                            }t                              |           | j        j        r"J d                    | j                              | j        j        s| j                            d|           dS || j        k    r@t#          t%          d                    | j        j        | j                                      }nd|| j        k    sJ t)          t%          d                    | j        j        | j        t          | j        j	                                                }|                     |           dS )a  Handle overall TCP/[SSL]/AMQP connection attempt timeout by reporting
        `Timeout` error to the client.

        Reports AMQPConnectorSocketConnectError if timeout occurred during
            socket TCP connection attempt.
        Reports AMQPConnectorTransportSetupError if timeout occurred during
            tramsport [SSL] setup attempt.
        Reports AMQPConnectorAMQPHandshakeError if timeout occurred during
            AMQP handshake.

        Nz0Timeout while setting up AMQP to {!r}/{}; ssl={}zUnexpected open state of {!r}rl   z*Timeout while connecting socket to {!r}/{}z5Timeout while setting up transport to {!r}/{}; ssl={})rF   rK   rx   _STATE_AMQPr   rC   rp   rB   boolssl_optionsr\   r   rI   is_openrv   rw   rQ   r&   r   _STATE_TRANSPORTr(   ru   )r   
prev_statemsgr   s       r   rd   z!AMQPConnector._on_overall_timeout9  s    #'[
))))ELL!&(9T&2335 5C JJsOOO . H H/66tGGH H H?- 0%%c3///F((3)@GG).0AC CD DE EEE
 !666664)KF4,143D 1 =>>@ @A AB BE 	++E22222r   c                 l   d| _         | j         | j                                         d| _        |Et                              d|| j                   |                     t          |                     dS t                              d| j	                   | j
        | _        dx}}| j        j        0| j        j        j        }| j        j        j        }|| j        j        }| j                            t'          j        | j        | j                  | j	        ||| j                  | _         d| _	        dS )a  Handle completion of asynchronous socket connection attempt.

        Reports AMQPConnectorSocketConnectError if TCP socket connection
            failed.

        :param None|BaseException exc: None on success; exception object on
            failure

        Nz*TCP Connection attempt failed: %r; dest=%rz)TCP connection to broker established: %r.)protocol_factorysockssl_contextserver_hostnamerP   )rG   rE   r~   r\   r   rB   ru   r&   rq   rH   r   rK   rC   r   contextr   rp   rA   create_streaming_connectionrs   rt   r@    _on_transport_establishment_done)r   excr   r   s       r   r_   z%AMQPConnector._on_tcp_connection_doneg  sA     ,!((***$(D!?JJCS(* * *///446 6 6F 	

>
KKK +(,,o(4+7?K"/;KO&"&"3"8??&.t/A/3/@B B#+9 @ ; ; 


r   c           	         d| _         t          |t                    rht                              d|| j        j        | j        t          | j        j	                             | 
                    t          |                     dS t                              d|           |\  }| _        | j        | _        | j                            | j        d           | j                            | j                   dS )aQ  Handle asynchronous completion of
        `AbstractIOServices.create_streaming_connection()`

        Reports AMQPConnectorTransportSetupError if transport ([SSL]) setup
            failed.

        :param sequence|BaseException result: On success, a two-tuple
            (transport, protocol); on failure, exception instance.

        NzCAttempt to create the streaming transport failed: %r; %r/%s; ssl=%sz"Streaming transport linked up: %r.T)remove_default)rG   r   r   r\   r   rC   rp   rB   r   r   ru   r(   r]   rI   r   rK   add_on_open_error_callback_on_amqp_handshake_doneadd_on_open_callback)r   r   
_transports      r   r   z.AMQPConnector._on_transport_establishment_done  s     fm,, 	JJ !'):)?!4(9(E#F#FH H H //088: : :F 			6???&,#
DO &22( 	3 	? 	? 	?,,T-IJJJJJr   Nc                    t                               d| j        || j        j        | j                   d| _        | j        | j        k    rt                      }n	| j        | j	        k    rXt          t          d                    | j        j        | j        t          | j        j                                                }n| j        | j        k    rm|/t                               d| j        j        | j        |           |}n`t                               d| j        j        | j        |           t          |          }n$t                               d| j        ||           dS |                     |           dS )a  Handle completion of AMQP connection handshake attempt.

        NOTE: we handle two types of callbacks - success with just connection
        arg as well as the open-error callback with connection and error

        Reports AMQPConnectorAMQPHandshakeError if AMQP handshake failed.

        :param pika.connection.Connection connection:
        :param BaseException | None error: None on success, otherwise
            failure

        zJAMQPConnector: AMQP handshake attempt completed; state=%s; error=%r; %r/%sNz,Timeout during AMQP handshake{!r}/{}; ssl={}z8AMQPConnector: AMQP connection established for %r/%s: %rz=AMQPConnector: AMQP connection handshake failed for %r/%s: %rzgAMQPConnector: Ignoring AMQP handshake completion notification due to wrong state=%s; error=%r; conn=%r)r\   rq   rK   rC   rp   rB   rI   rn   r   rx   r*   r   r   r   r   r   ru   )r   
connectionr   r   s       r   r   z%AMQPConnector._on_amqp_handshake_done  s    	

#{E43D3I	 	 	 ;$...)++FF[D///4)BII).0AT.:;;= => >? ?FF
 [D,,,}

N%*D,=zK K K $

 !%!2!79J   9?? JJHUJ0 0 0 F++F33333r   )N)r   r   r	   r
   rJ   rQ   r   r   rx   rn   rm   r   rh   ry   r{   ro   ru   rb   rd   r_   r   r   r   r   r   r9   r9   j   s          KJKNOK' ' '8/K /K /Kb,+ ,+ ,+\' ' '&" " ".   3 3 3,3 ,3 ,3\+ + +Z K  K  KD34 34 34 34 34 34r   r9   c                       e Zd ZdZd Zd ZdS )AbstractAMQPConnectionWorkflowzMInterface for implementing a custom TCP/[SSL]/AMQP connection workflow.

    c                     t           )a  Asynchronously perform the workflow until success or all retries
        are exhausted. Called by the adapter.

        :param sequence connection_configs: A sequence of one or more
            `pika.connection.Parameters`-based objects. Will attempt to connect
            using each config in the given order.
        :param callable connector_factory: call it without args to obtain a new
            instance of `AMQPConnector` for each connection attempt.
            See `AMQPConnector` for details.
        :param native_loop: Native I/O loop passed by app to the adapter or
            obtained by the adapter by default.
        :param callable on_done: Function to call upon completion of the
            workflow:
            `on_done(pika.connection.Connection |
                     AMQPConnectionWorkflowFailed |
                     AMQPConnectionWorkflowAborted)`.
            `Connection`-based adapter on success,
            `AMQPConnectionWorkflowFailed` on failure,
            `AMQPConnectionWorkflowAborted` if workflow was aborted.

        :raises AMQPConnectionWorkflowWrongState: If called in wrong state, such
            as after starting the workflow.
        NotImplementedError)r   connection_configsconnector_factorynative_looprP   s        r   rh   z$AbstractAMQPConnectionWorkflow.start  s
    2 "!r   c                     t           )a  Abort the workflow asynchronously. The completion callback will be
        called with an instance of AMQPConnectionWorkflowAborted.

        NOTE: we can't cancel/close synchronously because aborting pika
        Connection and its transport requires an asynchronous operation.

        :raises AMQPConnectionWorkflowWrongState: If called in wrong state, such
            as before starting or after completion has been reported.
        r   r    s    r   ry   z$AbstractAMQPConnectionWorkflow.abort  s
     "!r   N)r   r   r	   r
   rh   ry   r   r   r   r   r     s<         " " "6
" 
" 
" 
" 
"r   r   c                       e Zd ZdZej        Zej        ZdZ	dZ
dZdZddZd Zd	 Zd
 Zd Zd Zd Zd Zd Zd Zd Zd ZdS )AMQPConnectionWorkflowa  Implements Pika's default workflow for performing multiple TCP/[SSL]/AMQP
    connection attempts with timeouts and retries until one succeeds or all
    attempts fail.

    The workflow:
        while not success and retries remain:
            1. For each given config (pika.connection.Parameters object):
                A. Perform DNS resolution of the config's host.
                B. Attempt to establish TCP/[SSL]/AMQP for each resolved address
                   until one succeeds, in which case we're done.
            2. If all configs failed but retries remain, resume from beginning
               after the given retry pause. NOTE: failure of DNS resolution
               is equivalent to one cycle and will be retried after the pause
               if retries remain.

    r   r6   r:   r;   Fc                     d| _         d| _        || _        d| _        d| _        d| _        d| _        d| _        d| _        d| _	        d| _
        g | _        | j        | _        dS )a  
        :param int | float retry_pause: Non-negative number of seconds to wait
            before retrying the config sequence. Meaningful only if retries is
            greater than 0. Defaults to 2 seconds.
        :param bool _until_first_amqp_attempt: INTERNAL USE ONLY; ends workflow
            after first AMQP handshake attempt, regardless of outcome (success
            or failure). The automatic connection logic in
            `pika.connection.Connection` enables this because it's not
            designed/tested to reset all state properly to handle more than one
            AMQP handshake attempt.

        TODO: Do we need getaddrinfo timeout?
        TODO: Would it be useful to implement exponential back-off?

        N)_attempts_remaining_retry_pause_until_first_amqp_attemptrA   _current_config_index_connection_configs_connector_factoryrD   
_connectorrG   _addrinfo_iter_connection_errorsrJ   rK   )r   r   s     r   r   zAMQPConnectionWorkflow.__init__3  st      $(  )B& 
 &*"#' "&" #%&r   c                     || _         dS )a  Called by the conneciton adapter only on pika's
        `AMQPConnectionWorkflow` instance to provide it the adapter-specific
        `AbstractIOServices` object before calling the `start()` method.

        NOTE: Custom workflow implementations should use the native I/O loop
        directly because `AbstractIOServices` is private to Pika
        implementation and its interface may change without notice.

        :param pika.adapters.utils.nbio_interface.AbstractIOServices nbio:

        N)rA   )r   rM   s     r   set_io_servicesz&AMQPConnectionWorkflow.set_io_services]  s     


r   c                 v   | j         | j        k    r't          d                    | j                             	 t	          |           n4# t
          $ r'}t          d                    |                    d}~ww xY w|s"t          d                    |                    || _        || _	        || _
        |d         j        | _        |d         j        | _        | j        | _         t                               d           | j                            dt)          j        | j        d	                    | _        dS )
af  Override `AbstractAMQPConnectionWorkflow.start()`.

        NOTE: This implementation uses `connection_attempts` and `retry_delay`
        values from the last element of the given `connection_configs` sequence
        as the overall number of connection attempts of the entire
        `connection_configs` sequence and pause between each sequence.

        rO   z3connection_configs does not support iteration: {!r}Nz"connection_configs is empty: {!r}.r5   z1Starting AMQP Connection workflow asynchronously.r   Tfirst)rK   rJ   r   r   iter	Exception	TypeError
ValueErrorr   r   rD   connection_attemptsr   retry_delayr   _STATE_ACTIVEr\   rq   rA   ra   rs   rt   _start_new_cycle_asyncrG   )r   r   r   r   rP   r   s         r   rh   zAMQPConnectionWorkflow.startk  s[    ;$***);BB4;OOQ Q Q	#$$$$ 	 	 	ELL   	 " 	Q4;;<NOOQ Q Q $6 "3#5b#9#M .r2>(

FGGG ..y !<DIIIK Ks   A	 	
A:"A55A:c                    | j         | j        k    rt          d          | j         | j        k    rt          d          | j        | _         |                                  t                              d           | j        Zt          	                    d           | j
                            t          j        | j        t                                           dS t          	                    d           | j                                         dS )z<Override `AbstractAMQPConnectionWorkflow.abort()`.

        rj   rk   zFAMQPConnectionWorkflow: beginning client-initiated asynchronous abort.Nz`AMQPConnectionWorkflow.abort(): no connector, so just scheduling completion report via I/O loop.z=AMQPConnectionWorkflow.abort(): requesting connector.abort().)rK   rJ   r   rm   rn   ro   r\   r]   r   rq   rA   rr   rs   rt   ru   r,   ry   r    s    r   ry   zAMQPConnectionWorkflow.abort  s*    ;$***)*IJJJ[D,,,)<> > > *		 ( 	) 	) 	) ?"JJ D E E EJ..!$"E"?"A"AC CD D D D D JJ , - - -O!!#####r   c                     |                                   d| _        d| _        d| _        d| _        d| _        d| _        d| _        | j        | _	        dS )zrCancel asynchronous tasks and clean up to assist garbage collection.

        Transition to _STATE_DONE.

        N)
ro   r   rA   r   rD   r   r   r   rm   rK   r    s    r   r{   zAMQPConnectionWorkflow._close  sY     	#' 
"&""&&r   c                 X    | j         "| j                                          d| _         dS dS )r}   N)rG   r~   r    s    r   ro   z"AMQPConnectionWorkflow._deactivate  s3     >%N!!###!DNNN &%r   c                     t          |t                    rt                              d|           nt                              d|           | j        }|                                   ||           dS )zClean up and invoke client's `on_done` callback.

        :param pika.connection.Connection | AMQPConnectionWorkflowFailed result:
            value to pass to user's `on_done` callback.
        z.AMQPConnectionWorkflow - reporting failure: %rz.AMQPConnectionWorkflow - reporting success: %rNr   r   s      r   ru   z5AMQPConnectionWorkflow._report_completion_and_cleanup  si     fm,, 	PJJGPPPPIIFOOO-r   c                    d| _         | j        dk    sJ | j                    | j        dk    rFt          | j                  }t                              d|           |                     |           dS | xj        dz  c_        t                              d| j                   d| _        | j	        
                    |rdn| j        | j                  | _         dS )aQ  Start a new workflow cycle (if any more attempts are left) beginning
        with the first Parameters object in self._connection_configs. If out of
        attempts, report `AMQPConnectionWorkflowFailed`.

        :param bool first: if True, don't delay; otherwise delay next attempt by
            `self._retry_pause` seconds.
        Nr   z$AMQP connection workflow failed: %r.r6   zQBeginning a new AMQP connection workflow cycle; attempts remaining after this: %s)rG   r   r0   r   r\   r   ru   rq   r   rA   ra   r   _try_next_config_async)r   r   r   s      r   r   z-AMQPConnectionWorkflow._start_new_cycle_async  s     '1,,,d.F,,,#q((01HIIEJJ=uEEE//666F  A%  

'(,(@	B 	B 	B &*"..-AAD-t/JL Lr   c                    d| _         | j        d| _        n| xj        dz  c_        | j        t          | j                  k    r2t                              d           |                     d           dS | j        | j                 }t                              d|j        |j                   | j         J | j	        
                    |j        |j        | j        | j        | j                  | _         dS )	zwAttempt to connect using the next Parameters config. If there are no
        more configs, start a new cycle.

        Nr   r6   z-_try_next_config_async: starting a new cycle.Fr   z_try_next_config_async: %r:%s)rp   portsocktypeprotorP   )rG   r   r7   r   r\   rq   r   rp   r   rA   getaddrinfo
_SOCK_TYPE_IPPROTO_on_getaddrinfo_async_done)r   paramss     r   r   z-AMQPConnectionWorkflow._try_next_config_async  s    
 %-)*D&&&&!+&&%T-E)F)FFFJJFGGG''e'444F)$*DE

2FKMMM ~%%%//_-3 0 5 5r   c                 x   d| _         t          |t                    rMt                              d|           | j                            |           |                     d           dS t                              dt          |                     t          |          | _        |                                  dS )zHandles completion callback from asynchronous `getaddrinfo()`.

        :param list | BaseException addrinfos_or_exc: resolved address records
            returned by `getaddrinfo()` or an exception object from failure.
        Nzgetaddrinfo failed: %r.Fr   zgetaddrinfo returned %s records)rG   r   r   r\   r   r   appendr   rq   r7   r   r   _try_next_resolved_address)r   addrinfos_or_excs     r   r   z1AMQPConnectionWorkflow._on_getaddrinfo_async_done  s     &66 	JJ02BCCC#**+;<<<''e'444F

4c:J6K6KLLL"#344'')))))r   c                 |   	 t          | j                  }n?# t          $ r2 t                              d           |                                  Y dS w xY wt                              d|           |                                 | _        | j                            || j	        | j
                 | j                   dS )z}Try connecting using next resolved address. If there aren't any left,
        continue with next Parameters config.

        z8_try_next_resolved_address: continuing with next config.Nz-Attempting to connect using address record %r)re   rf   rP   )nextr   StopIterationr\   rq   r   r   r   rh   r   r   _on_connector_done)r   re   s     r   r   z1AMQPConnectionWorkflow._try_next_resolved_address)  s    
	t233KK 	 	 	JJJL L L'')))FF		 	

BKPPP1133#01KL+ 	 	- 	- 	- 	- 	-s    8AAc                    d| _         t                              d|           t          |t                    r'| j                            |           t          |t                    rU| j        | j	        k    s"J d
                    | j                              |                     t                                 dS | j        rt          |t                    rqt                              d           t          |j        t           j        j                  rt          }nt'          | j                  }|                     |           dS |                                  dS |                     |           dS )zHandle completion of connection attempt by `AMQPConnector`.

        :param pika.connection.Connection | BaseException conn_or_exc: See
            `AMQPConnector.start()` for exception details.

        Nz$Connection attempt completed with %rz&Expected _STATE_ABORTING, but got {!r}zcEnding AMQP connection workflow after first failed AMQP handshake due to _until_first_amqp_attempt.)r   r\   rq   r   r   r   r   r   rK   rn   r   ru   r,   r   r*   r   rT   r3   ConnectionOpenAbortedr0   r   )r   conn_or_excr   s      r   r   z)AMQPConnectionWorkflow._on_connector_done?  s    

9;GGGk=11 	=#**;777+';<< 2{d&::::<CCDKPP ;:: 331335 5 5 5 50 2[*IJJ2

 N O O Ok3"oCE E 19EE8/1 1E 33E::::://11111 //<<<<<r   N)F)r   r   r	   r
   rR   SOCK_STREAMr   IPPROTO_TCPr   rJ   r   rn   rm   r   r   rh   ry   r{   ro   ru   r   r   r   r   r   r   r   r   r   r     s        " #J!HKMOK(' (' (' ('T  *K *K *KX$ $ $6' ' '$" " "   L L L85 5 5<* * *&- - -,#= #= #= #= #=r   r   )r
   rs   loggingrR   pika.compatrT   pika.exceptionspika.tcp_socket_optsr   	getLoggerr   r\   r   r   r   r   r   r   r&   r(   r*   r,   r.   r0   objectr9   rU   AbstractBaser   r   r   r   r   <module>r      s	                           w"") ) ) ) )Y ) ) )E E E E E 6 E E E+ + + + +1 + + +    4   J J J J J"8 J J J&5 5 5 5 5&A 5 5 5    'B   & & & & &&A & & &0 0 0 0 0$: 0 0 0    '=   N N N N N#9 N N N.4 4 4 4 4F 4 4 4D*" *" *" *" *"T[%= *" *" *"ZI= I= I= I= I=; I= I= I= I= I=r   