
    )JfF                        d Z ddlZddlZddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlmZmZ ddlZddlZej        ej        fZej        ej        fZ ej        e          Zej                            e          Zd Zd Zd Z G d de          Z  G d	 d
e          Z! G d de          Z" G d de          Z# G d de          Z$ G d de          Z% G d de%          Z& G d de%          Z'dS )z^Utilities for implementing `nbio_interface.AbstractIOServices` for
pika connection adapters.

    N)AbstractIOReferenceAbstractStreamTransportc                 j    t          |           s#t          d                    ||                     dS )zRaise TypeError if callback is not callable

    :param callback: callback to check
    :param name: Name to include in exception text
    :raises TypeError:

    z!{} must be callable, but got {!r}N)callable	TypeErrorformat)callbacknames     h/home/alex/cs2snipeproduction/venv/lib/python3.11/site-packages/pika/adapters/utils/io_services_utils.pycheck_callback_argr   ,   sH     H ;BB(    	     c                 ~    t          | t          j                  s"t          d                    |                     dS )zqRaise TypeError if file descriptor is not an integer

    :param fd: file descriptor
    :raises TypeError:

    z0Paramter must be a file descriptor, but got {!r}N)
isinstancenumbersIntegralr   r   )fds    r   check_fd_argr   9   sL     b'*++ K>EEbIIK K 	KK Kr   c                 F     t          j                    fd            }|S )z0Function decorator for retrying on SIGINT.

    c                      	 	  | i |S # t           j        j        $ r }|j        t          j        k    rY d}~8 d}~ww xY w)zWrapper for decorated functionTN)pikacompatSOCKET_ERRORerrnoEINTR)argskwargserrorfuncs      r   retry_sigint_wrapz+_retry_on_sigint.<locals>.retry_sigint_wrapJ   sc    	tT,V,,,;+   ;%+--HHHH	s    A ;;A )	functoolswraps)r   r   s   ` r   _retry_on_sigintr"   E   s;    
 _T	 	 	 	 	 r   c                       e Zd ZdZd ZdS )SocketConnectionMixinzImplements
    `pika.adapters.utils.nbio_interface.AbstractIOServices.connect_socket()`
    on top of
    `pika.adapters.utils.nbio_interface.AbstractFileDescriptorServices` and
    basic `pika.adapters.utils.nbio_interface.AbstractIOServices`.

    c                 L    t          | |||                                          S )z[Implement
        :py:meth:`.nbio_interface.AbstractIOServices.connect_socket()`.

        )nbiosockresolved_addron_done)_AsyncSocketConnectorstart)selfr'   r(   r)   s       r   connect_socketz$SocketConnectionMixin.connect_socketb   s.    
 %D  "UWW	%r   N)__name__
__module____qualname____doc__r-    r   r   r$   r$   Y   s-         % % % % %r   r$   c                       e Zd ZdZ	 	 ddZdS )StreamingConnectionMixinzImplements
    `.nbio_interface.AbstractIOServices.create_streaming_connection()` on
    top of `.nbio_interface.AbstractFileDescriptorServices` and basic
    `nbio_interface.AbstractIOServices` services.

    Nc                 B   	 t          | |||||                                          S # t          $ rk}t                              d||           	 |                                 n3# t          $ r&}t                              d||           Y d}~nd}~ww xY w d}~ww xY w)zhImplement
        :py:meth:`.nbio_interface.AbstractIOServices.create_streaming_connection()`.

        )r&   protocol_factoryr'   ssl_contextserver_hostnamer)   z*create_streaming_connection(%s) failed: %rz%s.close() failed: %rN)_AsyncStreamConnectorr+   	Exception_LOGGERr   close)r,   r6   r'   r)   r7   r8   r   s          r   create_streaming_connectionz4StreamingConnectionMixin.create_streaming_connectiont   s    	(!1' /! ! ! "')  	 	 	MMF! ! !D

 D D D 5tUCCCCCCCC	D 	s>   &) 
BBA%$B%
B/BBBBB)NN)r.   r/   r0   r1   r=   r2   r   r   r4   r4   l   s:          1548     r   r4   c                       e Zd ZdZd Zd ZdS )_AsyncServiceAsyncHandlezGThis module's adaptation of `.nbio_interface.AbstractIOReference`

    c                     |j         | _        dS )zZ
        :param subject: subject of the reference containing a `cancel()` method

        N)cancel_cancel)r,   subjects     r   __init__z!_AsyncServiceAsyncHandle.__init__   s    
 ~r   c                 *    |                                  S )zCancel pending operation

        :returns: False if was already done or cancelled; True otherwise
        :rtype: bool

        )rB   r,   s    r   rA   z_AsyncServiceAsyncHandle.cancel   s     ||~~r   N)r.   r/   r0   r1   rD   rA   r2   r   r   r?   r?      s<         & & &    r   r?   c                       e Zd ZdZdZdZdZdZd Ze	d             Z
d Zd	 Ze	d
             Ze	d             Ze	d             ZdS )r*   zConnects the given non-blocking socket asynchronously using
    `.nbio_interface.AbstractFileDescriptorServices` and basic
    `.nbio_interface.AbstractIOServices`. Used for implementing
    `.nbio_interface.AbstractIOServices.connect_socket()`.
    r            c                    t          |d           	 t          j        |j        |d                    n# t          $ rz}t          t          d          st                              d           n@d                    |||          }t          	                    |           t          |          Y d}~nd}~ww xY w|| _        || _        || _        || _        | j        | _        d| _        dS )a  
        :param AbstractIOServices | AbstractFileDescriptorServices nbio:
        :param socket.socket sock: non-blocking socket that needs to be
            connected via `socket.socket.connect()`
        :param tuple resolved_addr: resolved destination address/port two-tuple
            which is compatible with the given's socket's address family
        :param callable on_done: user callback that takes None upon successful
            completion or exception upon error (check for `BaseException`) as
            its only arg. It will not be called if the operation was cancelled.
        :raises ValueError: if host portion of `resolved_addr` is not an IP
            address or is inconsistent with the socket's address family as
            validated via `socket.inet_pton()`
        r)   r   	inet_ptonz8Unable to check resolved address: no socket.inet_pton().z9Invalid or unresolved IP address {!r} for socket {}: {!r}NF)r   socketrL   familyr:   hasattrr;   debugr   r   
ValueError_nbio_sock_addr_on_done_STATE_NOT_STARTED_state_watching_socket_events)r,   r&   r'   r(   r)   r   msgs          r   rD   z_AsyncSocketConnector.__init__   s    	7I...	&T[-*:;;;; 		& 		& 		&6;// &NP P P P2396($47 47  c""" oo%P P P P P		& 

"
-',$$$s    3 
B7A0B22B7c                     | j         r:d| _         | j                            | j                                                   dS dS )z'Remove socket watcher, if any

        FN)rX   rR   remove_writerrS   filenorF   s    r   _cleanupz_AsyncSocketConnector._cleanup   sK    
 ' 	:+0D(J$$TZ%6%6%8%899999	: 	:r   c                     | j         | j        k    sJ d| j         f            | j        | _         | j                            | j                   t          |           S )zZStart asynchronous connection establishment.

        :rtype: AbstractIOReference
        z:_AsyncSocketConnector.start(): expected _STATE_NOT_STARTED)rW   rV   _STATE_ACTIVErR   add_callback_threadsafe_start_asyncr?   rF   s    r   r+   z_AsyncSocketConnector.start   sb    
 {d5555HK8555 ( 	
**4+<==='---r   c                    | j         | j        k    rH| j        | _         t                              d| j        | j                   |                                  dS t                              d| j         | j                   dS )Cancel pending connection request without calling user's completion
        callback.

        :returns: False if was already done or cancelled; True otherwise
        :rtype: bool

        z-User canceled connection request for %s to %sTzD_AsyncSocketConnector cancel requested when not ACTIVE: state=%s; %sF)rW   r_   _STATE_CANCELEDr;   rP   rS   rT   r]   rF   s    r   rA   z_AsyncSocketConnector.cancel   s}     ;$,,,.DKMMI*dj2 2 2MMOOO4 K	5 	5 	5 ur   c                 T   t                               d|| j                   t          |t          t          d          f          sJ d|f            | j        | j        k    sJ d| j        f            | j        | _        | 	                                 | 
                    |           dS )zAdvance to COMPLETED state, remove socket watcher, and invoke user's
        completion callback.

        :param BaseException | None result: value to pass in user's callback

        z0_AsyncSocketConnector._report_completion(%r); %sNzP_AsyncSocketConnector._report_completion() expected exception or None as result.zF_AsyncSocketConnector._report_completion() expected _STATE_NOT_STARTED)r;   rP   rS   r   BaseExceptiontyperW   r_   _STATE_COMPLETEDr]   rU   r,   results     r   _report_completionz(_AsyncSocketConnector._report_completion	  s     	Hdj	* 	* 	* &=$t**"=>> 	' 	'%A' 	' 	' 	' {d0000!"&+3/000 +fr   c                 N   | j         | j        k    r.t                              d| j        | j        | j                    dS 	 | j                            | j                   n# t          t          j	        j
        f$ rz}t          |t          j	        j
                  r|j        t          v rnBt                              d| j        | j        |           |                     |           Y d}~dS Y d}~nd}~ww xY w	 | j                            | j                                        | j                   d| _        t                              d| j                   dS # t          $ rA}t                              d| j        |           |                     |           Y d}~dS d}~ww xY w)zCalled as callback from I/O loop to kick-start the workflow, so it's
        safe to call user's completion callback from here, if needed

        zJAbandoning sock=%s connection establishment to %s due to inactive state=%sNz%s.connect(%s) failed: %rTz/Connection-establishment is in progress for %s.zasync.set_writer(%s) failed: %r)rW   r_   r;   rP   rS   rT   connectr:   r   r   r   r   r   (_CONNECTION_IN_PROGRESS_SOCK_ERROR_CODESr   rk   rR   
set_writerr\   _on_writablerX   	exceptionr,   r   s     r   ra   z"_AsyncSocketConnector._start_async   s    ;$,,,MM+,0J
DKQ Q Q F	Jtz****4;34 		 		 		5$+":;; K#KKK94:"j%1 1 1''... 			
	&J!!$*"3"3"5"5t7HIII ,0D(MMK*& & & & &  	 	 	?#% % %##E***FFFFF		s1    A   C5;A*C00C597E 
F$#6FF$c                    | j         | j        k    r(t                              d| j        | j                    dS | j                            t          j        t          j                  }|s#t          	                    d| j                   d}nVt          j        |          }t                              d| j        ||           t          j                            ||          }|                     |           dS )zwCalled when socket connects or fails to. Check for predicament and
        invoke user's completion callback.

        z_Socket connection-establishment event watcher called in inactive state (ignoring): %s; state=%sNzSocket connected: %sz+Socket failed to connect: %s; error=%s (%s))rW   r_   r;   r   rS   
getsockoptrM   
SOL_SOCKETSO_ERRORinfoosstrerrorr   r   r   rk   )r,   
error_coderj   	error_msgs       r   rp   z"_AsyncSocketConnector._on_writableG  s     ;$,,, MMDEIZ   F Z**6+<foNN
 	ELL/<<<FFJ//IMMG*j)= = =[--j)DDF'''''r   N)r.   r/   r0   r1   rV   r_   rd   rh   rD   _log_exceptionsr]   r+   rA   rk   ra   rp   r2   r   r   r*   r*      s          MO"- "- "-H : : _:. . ."  (   _, $& $& _$&L ( ( _( ( (r   r*   c                       e Zd ZdZdZdZdZdZd Ze	d             Z
d Zd	 Ze	d
             Ze	d             Ze	d             Ze	d             ZdS )r9   zPerforms asynchronous SSL session establishment, if requested, on the
    already-connected socket and links the streaming transport to protocol.
    Used for implementing
    `.nbio_interface.AbstractIOServices.create_streaming_connection()`.

    r   rH   rI   rJ   c                    t          |d           t          |d           t          |t          d          t          j        f          s"t          d                    |                    ||t          d          	 |                                 n5# t          $ r(}t          d                    ||                    d}~ww xY w|| _	        || _
        || _        || _        || _        || _        | j        | _        d| _        dS )a  
        NOTE: We take ownership of the given socket upon successful completion
        of the constructor.

        See `AbstractIOServices.create_streaming_connection()` for detailed
        documentation of the corresponding args.

        :param AbstractIOServices | AbstractFileDescriptorServices nbio:
        :param callable protocol_factory:
        :param socket.socket sock:
        :param ssl.SSLContext | None ssl_context:
        :param str | None server_hostname:
        :param callable on_done:

        r6   r)   Nz8Expected ssl_context=None | ssl.SSLContext, but got {!r}z?Non-None server_hostname must not be passed without ssl contextzEExpected connected socket, but getpeername() failed: error={!r}; {}; F)r   r   rg   ssl
SSLContextrQ   r   getpeernamer:   rR   _protocol_factoryrS   _ssl_context_server_hostnamerU   rV   rW   _watching_socket)r,   r&   r6   r'   r7   r8   r)   r   s           r   rD   z_AsyncStreamConnector.__init__p  s3   " 	+-?@@@7I...+T

CN'CDD 	= ((.{(;(;= = = &;+> 3 4 4 4	8 	8 	8 	8##)6%#6#68 8 8	8
 
!1
' /- %s    B 
C#CCc                    t                               d|           | j        rt                               d|| j                   d| _        | j                            | j                                                   | j                            | j                                                   	 |rpt                               d|| j                   	 | j                                         n4# t          $ r'}t           
                    d|| j                    d}~ww xY wd| _        d| _        d| _        d| _        d| _        d| _        dS # d| _        d| _        d| _        d| _        d| _        d| _        w xY w)zeCancel pending async operations, if any

        :param bool close: close the socket if true
        z"_AsyncStreamConnector._cleanup(%r)z5_AsyncStreamConnector._cleanup(%r): removing RdWr; %sFz6_AsyncStreamConnector._cleanup(%r): closing socket; %sz"_sock.close() failed: error=%r; %sN)r;   rP   r   rS   rR   remove_readerr\   r[   r<   r:   rq   r   r   r   rU   )r,   r<   r   s      r   r]   z_AsyncStreamConnector._cleanup  s    	:EBBB  	:MMG
   %*D!J$$TZ%6%6%8%8999J$$TZ%6%6%8%8999	! 	L4:' ' 'J$$&&&&    %%&J&+TZ9 9 9
 DJDJ%)D" $D$(D! DMMM DJDJ%)D" $D$(D! DM    s0   .#E C, +E ,
D6"DDE ,E8c                     t                               d| j                   | j        | j        k    sJ d| j        f            | j        | _        | j                            | j                   t          |           S )zCKick off the workflow

        :rtype: AbstractIOReference
        z!_AsyncStreamConnector.start(); %sz9_AsyncStreamConnector.start() expected _STATE_NOT_STARTED)
r;   rP   rS   rW   rV   r_   rR   r`   ra   r?   rF   s    r   r+   z_AsyncStreamConnector.start  s{    
 	94:FFF{d5555!"&+8/555 ( 	
**4+<==='---r   c                     | j         | j        k    rD| j        | _         t                              d| j                   |                     d           dS t                              d| j         | j                   dS )rc   z%User canceled streaming linkup for %sTr<   zD_AsyncStreamConnector cancel requested when not ACTIVE: state=%s; %sF)rW   r_   rd   r;   rP   rS   r]   rF   s    r   rA   z_AsyncStreamConnector.cancel  sy     ;$,,,.DKMMA4:NNNMMM%%%4 K	5 	5 	5 ur   c                 <   t                               d|| j                   t          |t          t
          f          sJ d|| j        f            | j        | j        k    sJ d| j        f            | j        | _        	 | 	                    |           n0# t          $ r# t                               d| j        |            w xY w	 |                     t          |t                               dS # |                     t          |t                               w xY w)a  Advance to COMPLETED state, cancel async operation(s), and invoke
        user's completion callback.

        :param BaseException | tuple result: value to pass in user's callback.
            `tuple(transport, protocol)` on success, exception on error

        z0_AsyncStreamConnector._report_completion(%r); %szQ_AsyncStreamConnector._report_completion() expected exception or tuple as result.zA_AsyncStreamConnector._report_completion() expected _STATE_ACTIVEz%r: _on_done(%r) failed.r   N)r;   rP   rS   r   rf   tuplerW   r_   rh   rU   r:   rq   rk   r]   ri   s     r   rk   z(_AsyncStreamConnector._report_completion  sF    	Hdj	* 	* 	* &=%"899 	5 	5 &<5 	5 	5 	5 {d0000![3*000 +	CMM&!!!! 	 	 	8"5v? ? ?	 " MM
6= A AMBBBBBDMM
6= A AMBBBBs   >B C0 -CC0 0+Dc                 T   t                               d| j                   | j        | j        k    r(t                               d| j        | j                   dS | j        |                                  dS t                               d| j                   	 | j                            | j        ddd| j                  | _        nN# t          $ rA}t           
                    d| j        |           |                     |           Y d}~dS d}~ww xY w|                                  dS )zCalled as callback from I/O loop to kick-start the workflow, so it's
        safe to call user's completion callback from here if needed

        z(_AsyncStreamConnector._start_async(); %szMAbandoning streaming linkup due to inactive state transition; state=%s; %s; .NzStarting SSL handshake on %sF)server_sidedo_handshake_on_connectsuppress_ragged_eofsr8   zSSL wrap_socket(%s) failed: %r)r;   rP   rS   rW   r_   r   _linkupwrap_socketr   r:   rq   rk   _do_ssl_handshakerr   s     r   ra   z"_AsyncStreamConnector._start_async
  sD    	@$*MMM;$,,,MM./3{DJH H H F $LLNNNNNMM8$*EEE!.::J %,1).$($9 ; ; ;

    !!"BDJ"') ) )''...	 ""$$$$$s   .C 
D6DDc                    t                               d           d}	 	 |                                 }n4# t          $ r'}t                               d|| j                    d}~ww xY w| j        Q	 t          | j        || j                  }n# t          $ r'}t                               d|| j                    d}~ww xY w	 t          | j        || j                  }n4# t          $ r'}t                               d|| j                    d}~ww xY wt                               d|           	 |
                    |           n5# t          $ r(}t                               d||| j                    d}~ww xY wt                               d||           ||f}n# t          $ r}|}Y d}~nd}~ww xY w|                     |           dS )	z}Connection is ready: instantiate and link up transport and protocol,
        and invoke user's completion callback.

        z_AsyncStreamConnector._linkup()Nz'protocol_factory() failed: error=%r; %sz%PlainTransport() failed: error=%r; %sz#SSLTransport() failed: error=%r; %sz_linkup(): created transport %rz1protocol.connection_made(%r) failed: error=%r; %sz2_linkup(): introduced transport to protocol %r; %r)r;   rP   r   r:   rq   rS   r   _AsyncPlaintextTransportrR   _AsyncSSLTransportconnection_maderk   )r,   	transportprotocolr   rj   s        r   r   z_AsyncStreamConnector._linkup0  sG    	7888	,	+1133   !!"K"'5 5 5
  ( 8
Hdj!: !:II    %%&M&+TZ9 9 9 24:x37:!? !?II    %%&K&+TZ9 9 9
 MM;YGGG((3333   !!Gudj2 2 2 	 MMN#X/ / /
  *FF  	 	 	FFFFFF	
 	'''''s   4 F 
A%"A  A%%
F 0B F 
B="B88B==F C F 
D'"D		DF -E F 
E5#E00E55F 
F/#F**F/c                    t                               d           | j        | j        k    r(t                               d| j        | j                   dS d}	 	 | j                                         d}t                               d| j                   nh# t          j        $ rU}|j	        t          j
        k    rt                               d| j                   d| _        | j                            | j                                        | j                   | j                            | j                                                   n|j	        t          j        k    rt                               d| j                   d| _        | j                            | j                                        | j                   | j                            | j                                                   n Y d}~nd}~ww xY wnN# t(          $ rA}t                               d	|| j                   |                     |           Y d}~dS d}~ww xY w|rt                               d
| j                   | j                            | j                                                   | j                            | j                                                   d| _        t                               d| j                   |                                  dS dS )zJPerform asynchronous SSL handshake on the already wrapped socket

        z)_AsyncStreamConnector._do_ssl_handshake()z`_do_ssl_handshake: Abandoning streaming linkup due to inactive state transition; state=%s; %s; .NFTz(SSL handshake completed successfully: %szSSL handshake wants read; %s.zSSL handshake wants write. %sz%SSL do_handshake failed: error=%r; %sz8_do_ssl_handshake: removing watchers ahead of linkup: %sz=_do_ssl_handshake: pre-linkup removal of watchers is done; %s)r;   rP   rW   r_   rS   do_handshakerw   r   SSLErrorr   SSL_ERROR_WANT_READr   rR   
set_readerr\   r   r[   SSL_ERROR_WANT_WRITEro   r   r:   rq   rk   r   )r,   doner   s      r   r   z'_AsyncStreamConnector._do_ssl_handshakej  s   
 	ABBB;$,,,MM@AE
   F	)
'')))$ G!Z) ) ) )% <   ;#"999MM"A4:NNN,0D)J))$**;*;*=*=*.*@B B BJ,,TZ->->-@-@AAAA[C$<<<MM"A4:NNN,0D)J))$**;*;*=*=*.*@B B BJ,,TZ->->-@-@AAAA (  	 	 	Eu"j* * *##E***FFFFF		  	MMJ
   J$$TZ%6%6%8%8999J$$TZ%6%6%8%8999 %*D!MMO
   LLNNNNN	 	s=   B 0$G= G9$EG4/G= 4G99G= =
I6IIN)r.   r/   r0   r1   rV   r_   rd   rh   rD   r|   r]   r+   rA   rk   ra   r   r   r2   r   r   r9   r9   d  s          MO.& .& .&`  !  ! _ !D. . .&  ( C C _C> #% #% _#%J 7( 7( _7(r : : _: : :r   r9   c                      e Zd ZdZdZdZdZdZdZdZ	 G d d	e
          Zd
 Zd Zd Zd Zd Zd Zd Zeed                         Zeed                         Zed             Zed             Zed             Zed             ZdS )_AsyncTransportBasezIBase class for `_AsyncPlaintextTransport` and `_AsyncSSLTransport`.

    rH   rI   rJ      i   i  c                   "     e Zd ZdZ fdZ xZS )_AsyncTransportBase.RxEndOfFilezNWe raise this internally when EOF (empty read) is detected on input.

        c                 d    t          t          j        |                               dd           d S )NzEnd of input stream (EOF))superr   RxEndOfFilerD   )r,   	__class__s    r   rD   z(_AsyncTransportBase.RxEndOfFile.__init__  s9    %1488AA/1 1 1 1 1r   )r.   r/   r0   r1   rD   __classcell__r   s   @r   r   r     sB        	 		1 	1 	1 	1 	1 	1 	1 	1 	1r   r   c                     t                               d|           || _        || _        || _        | j        | _        t          j                    | _	        d| _
        dS )a~  

        :param socket.socket | ssl.SSLSocket sock: connected socket
        :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
            corresponding protocol in this transport/protocol pairing; the
            protocol already had its `connection_made()` method called.
        :param AbstractIOServices | AbstractFileDescriptorServices nbio:

        z _AsyncTransportBase.__init__: %sr   N)r;   rP   rS   	_protocolrR   r_   rW   collectionsdeque_tx_buffers_tx_buffered_byte_count)r,   r'   r   r&   s       r   rD   z_AsyncTransportBase.__init__  sW     	8$???
!
(&,..'($$$r   c                 |    t                               d| j        | j                   |                     d           dS )a  Close connection abruptly without waiting for pending I/O to
        complete. Will invoke the corresponding protocol's `connection_lost()`
        method asynchronously (not in context of the abort() call).

        :raises Exception: Exception-based exception on error
        z+Aborting transport connection: state=%s; %sN)r;   rw   rW   rS   _initiate_abortrF   s    r   abortz_AsyncTransportBase.abort  sB     	BDKZ	! 	! 	! 	T"""""r   c                     | j         S )zReturn the protocol linked to this transport.

        :rtype: pika.adapters.utils.nbio_interface.AbstractStreamProtocol
        )r   rF   s    r   get_protocolz _AsyncTransportBase.get_protocol  s    
 ~r   c                     | j         S )ze
        :returns: Current size of output data buffered by the transport
        :rtype: int
        )r   rF   s    r   get_write_buffer_sizez)_AsyncTransportBase.get_write_buffer_size  s    
 ++r   c                 x   |sHt                               d| j        | j                   t	          d                    |                    | j        | j        k    r(t                               d| j        | j                   dS | j        	                    |           | xj
        t          |          z  c_
        dS )Buffer the given data until it can be sent asynchronously.

        :param bytes data:
        :raises ValueError: if called with empty data

        z,write() called with empty data: state=%s; %sz#write() called with empty data {!r};Ignoring write() called during inactive state: state=%s; %sN)r;   r   rW   rS   rQ   r   r_   rP   r   appendr   len)r,   datas     r   _buffer_tx_dataz#_AsyncTransportBase._buffer_tx_data  s      	QMMH+tz3 3 3BII$OOPPP;$,,,MM $TZ9 9 9 F%%%$$D		1$$$$r   c                    d}| j         | j        k    r|| j        k     r|                     | j        | j                  }|t          |          z  }|s4t                              d| j                   | 	                                	 | j
                            |           n4# t          $ r'}t                              d|| j                    d}~ww xY w| j         | j        k    r|| j        k     dS dS dS dS )a  Utility method for use by subclasses to ingest data from socket and
        dispatch it to protocol's `data_received()` method socket-specific
        "try again" exception, per-event data consumption limit is reached,
        transport becomes inactive, or a fatal failure.

        Consumes up to `self._MAX_CONSUME_BYTES` to prevent event starvation or
        until state becomes inactive (e.g., `protocol.data_received()` callback
        aborts the transport)

        :raises: Whatever the corresponding `sock.recv()` raises except the
                 socket error with errno.EINTR
        :raises: Whatever the `protocol.data_received()` callback raises
        :raises _AsyncTransportBase.RxEndOfFile: upon shutdown of input stream

        r   zSocket EOF; %sz-protocol.data_received() failed: error=%r; %sN)rW   r_   _MAX_CONSUME_BYTES_sigint_safe_recvrS   _MAX_RECV_BYTESr   r;   r   r   r   data_receivedr:   rq   )r,   bytes_consumedr   r   s       r   _consumez_AsyncTransportBase._consume  s$     {d000 777))$*d6JKKDc$ii'N  ).
;;;&&(((,,T2222   !!CUJ      	 {d000 77777 1077 10s   B" "
C,"CCc                    | j         r|                     | j        | j         d                   }| j                                         }|t	          |          k     rKt
                              d|t	          |                     | j                             ||d                    | xj        |z  c_        | j        dk    sJ d| j        | j	        f            | j         dS dS )a  Utility method for use by subclasses to emit data from tx_buffers.
        This method sends chunks from `tx_buffers` until all chunks are
        exhausted or sending is interrupted by an exception. Maintains integrity
        of `self.tx_buffers`.

        :raises: whatever the corresponding `sock.send()` raises except the
                 socket error with errno.EINTR

        r   z/Partial send, requeing remaining data; %s of %sNz7_AsyncTransportBase._produce() tx buffer size underflow)
r   _sigint_safe_sendrS   popleftr   r;   rP   
appendleftr   rW   )r,   num_bytes_sentchunks      r   _producez_AsyncTransportBase._produce(  s     	;!33DJ484DQ4GI IN $,,..EE

**O,c%jj: : : ++E.//,BCCC((N:((/1444I,dk7;444  	; 	; 	; 	; 	;r   c                 ,    |                      |          S )am  Receive data from socket, retrying on SIGINT.

        :param sock: stream or SSL socket
        :param max_bytes: maximum number of bytes to receive
        :returns: received data or empty bytes uppon end of file
        :rtype: bytes
        :raises: whatever the corresponding `sock.recv()` raises except socket
                 error with errno.EINTR

        )recv)r'   	max_bytess     r   r   z%_AsyncTransportBase._sigint_safe_recvA  s     yy###r   c                 ,    |                      |          S )a@  Send data to socket, retrying on SIGINT.

        :param sock: stream or SSL socket
        :param data: data bytes to send
        :returns: number of bytes actually sent
        :rtype: int
        :raises: whatever the corresponding `sock.send()` raises except socket
                 error with errno.EINTR

        )send)r'   r   s     r   r   z%_AsyncTransportBase._sigint_safe_sendP  s     yyr   c                 l   | j         | j        k    rt                              d| j         | j                   | j                            | j                                                   | j                            | j                                                   | j	        
                                 dS dS )z2Unregister the transport from I/O events

        z$Deactivating transport: state=%s; %sN)rW   r_   r;   rw   rS   rR   r   r\   r[   r   clearrF   s    r   _deactivatez_AsyncTransportBase._deactivate_  s    
 ;$,,,LL?% % %J$$TZ%6%6%8%8999J$$TZ%6%6%8%8999""$$$$$ -,r   c                 j   | j         | j        k    rt                              d| j         | j                   	 | j                            t          j                   n# t          j	        j
        $ r Y nw xY w| j                                         d| _        d| _        d| _        | j        | _         dS dS )z{Close the transport's socket and unlink the transport it from
        references to other assets (protocol, etc.)

        z4Closing transport socket and unlinking: state=%s; %sN)rW   rh   r;   rw   rS   shutdownrM   	SHUT_RDWRr   r   r   r<   r   rR   rF   s    r   _close_and_finalizez'_AsyncTransportBase._close_and_finalizek  s     ;$///LLOdj2 2 2
##F$45555;+   JDJ!DNDJ/DKKK 0/s   $A A43A4c                 :   t                               d| j        || j                   | j        | j        k    sJ d| j        f            | j        | j        k    rdS |                                  |9| j        | j        k    rt                               d           dS | j        | _        n?| j        | j        k    r#| j        | j        k    sJ d| j        f            dS | j	        | _        | j
                            t          j        | j        |                     dS )a  Initiate asynchronous abort of the transport that concludes with a
        call to the protocol's `connection_lost()` method. No flushing of
        output buffers will take place.

        :param BaseException | None error: None if being canceled by user,
            including via falsie return value from protocol.eof_received;
            otherwise the exception corresponding to the the failed connection.
        zo_AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=%s; error=%r; %szB_AsyncTransportBase._initate_abort() expected non-_STATE_COMPLETEDNzM_AsyncTransportBase._initiate_abort(): ignoring - user-abort already pending.zD_AsyncTransportBase._initate_abort() expected _STATE_ABORTED_BY_USER)r;   rw   rW   rS   rh   r   _STATE_ABORTED_BY_USERrP   r_   _STATE_FAILEDrR   r`   r    partial_connection_lost_notify_asyncrr   s     r   r   z#_AsyncTransportBase._initiate_abort~  sY    	FK
	, 	, 	,
 {d3333#$(K61333 ;$///F = {d999 G H H H 5DKK {d000{d&AAAA-.2kD;AAA ,DK 	
**d@%HH	J 	J 	J 	J 	Jr   c                    t                               d| j        |           | j        | j        k    rdS |3| j        | j        k    r#| j        | j        k    sJ d| j        f            dS 	 | j                            |           n5# t          $ r(}t           	                    d||| j
                    d}~ww xY w	 |                                  dS # |                                  w xY w)a  Handle aborting of transport either due to socket error or user-
        initiated `abort()` call. Must be called from an I/O loop callback owned
        by us in order to avoid reentry into user code from user's API call into
        the transport.

        :param BaseException | None error: None if being canceled by user;
            otherwise the exception corresponding to the the failed connection.
        z1Concluding transport shutdown: state=%s; error=%rNzS_AsyncTransportBase._connection_lost_notify_async() expected _STATE_ABORTED_BY_USERz/protocol.connection_lost(%r) failed: exc=%r; %s)r;   rP   rW   rh   r   r   r   connection_lostr:   rq   rS   r   )r,   r   excs      r   r   z1_AsyncTransportBase._connection_lost_notify_async  s!    	Ik5	* 	* 	* ;$///F0B!B!B;$"====237;@@=== F		'N**51111 	 	 	O#S$*6 6 6 	 2 $$&&&&&D$$&&&&s*   *B C 
B7#B22B77C C'N)r.   r/   r0   r1   r_   r   r   rh   r   r   OSErrorr   rD   r   r   r   r   r   r   staticmethodr"   r   r   r|   r   r   r   r   r2   r   r   r   r     s         MMO $1 1 1 1 1g 1 1 1) ) )&
# 
# 
#  , , ,2 2 2*# # #J; ; ;2 $ $  \$    \ 	% 	% _	% 0 0 _0$ 2J 2J _2Jh  '  ' _ '  '  'r   r   c                   T     e Zd ZdZ fdZd Zed             Zed             Z xZ	S )r   z`Implementation of `nbio_interface.AbstractStreamTransport` for a
    plaintext connection.

    c                     t          t          |                               |||           | j                            | j                                        | j                   dS )a{  

        :param socket.socket sock: non-blocking connected socket
        :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
            corresponding protocol in this transport/protocol pairing; the
            protocol already had its `connection_made()` method called.
        :param AbstractIOServices | AbstractFileDescriptorServices nbio:

        N)r   r   rD   rR   r   rS   r\   _on_socket_readabler,   r'   r   r&   r   s       r   rD   z!_AsyncPlaintextTransport.__init__  sY     	&--66tXtLLL 	
dj//1143KLLLLLr   c                    | j         | j        k    r(t                              d| j         | j                   dS |sJ d|| j         f            |                                 dk    }|                     |           |rY| j                            | j        	                                | j
                   t                              d| j                   dS dS )r   r   Nz7_AsyncPlaintextTransport.write(): empty data from user.r   !Turned on writability watcher: %s)rW   r_   r;   rP   rS   r   r   rR   ro   r\   _on_socket_writabler,   r   tx_buffer_was_emptys      r   writez_AsyncPlaintextTransport.write  s     ;$,,,MM $TZ9 9 9 F 	) 	)ODK) 	) 	) 	) #88::a?T""" 	KJ!!$*"3"3"5"5t7OPPPMM=tzJJJJJ	K 	Kr   c                    | j         | j        k    r(t                              d| j         | j                   dS 	 |                                  | j         | j        k    r(t                              d| j         | j                   dS dS # | j        $ r 	 | j                                        }|rTt          	                    d| j                   | j
                            | j                                                   Y dS t          	                    d| j                   |                     d           Y dS # t          $ rB}t                              d|| j                   |                     |           Y d}~Y dS d}~ww xY wt          t           j        j        f$ r}t'          |t           j        j                  r/|j        t*          v r!t                              d| j                   nmt                              d|| j        d	                    t/          j        t3          j                                          |                     |           Y d}~dS Y d}~dS d}~ww xY w)
zIngest data from socket and dispatch it to protocol until exception
        occurs (typically EAGAIN or EWOULDBLOCK), per-event data consumption
        limit is reached, transport becomes inactive, or failure.

        EIgnoring readability notification due to inactive state: state=%s; %sNz>Leaving Plaintext consumer due to inactive state: state=%s; %sz0protocol.eof_received() elected to keep open: %sz,protocol.eof_received() elected to close: %sz,protocol.eof_received() failed: error=%r; %szRecv would block on %sa_AsyncBaseTransport._consume() failed, aborting connection: error=%r; sock=%s; Caller's stack:
%s )rW   r_   r;   rP   rS   r   r   r   eof_receivedrw   rR   r   r\   r   r:   rq   r   r   r   r   r   _TRY_IO_AGAIN_SOCK_ERROR_CODESjoin	tracebackformat_exceptionsysexc_info)r,   	keep_openr   s      r   r   z,_AsyncPlaintextTransport._on_socket_readable	  s    ;$,,,MM&'+{DJ@ @ @ F%	DMMOOO> {d000 *+/;
D D D D D 10=  	/ 	/ 	// N7799	  /LLJ
$ $ $ J,,TZ->->-@-@AAAAAALL!O!%- - -((......  , , ,!!BEJ      $$U++++++++++	, 4;34 
	, 
	, 
	,5$+":;; 	,K#AAA6
CCCC!!J4:rww!2CLNNC(E (EF F F
 $$U+++++++++ DCCCCC
	,sI   B 
I)D:,AI)5I):
F6F:I)FI)#B5I$$I)c                 L   | j         | j        k    r(t                              d| j         | j                   dS | j        sJ d| j         f            	 |                                  | j        sS| j                            | j        	                                           t                              d| j                   dS dS # t          t          j        j        f$ r}t          |t          j        j                  r/|j        t           v r!t                              d| j                   nmt                              d|| j        d                    t'          j        t+          j                                          |                     |           Y d}~dS Y d}~dS d}~ww xY w)-Handle writable socket notification

        EIgnoring writability notification due to inactive state: state=%s; %sNzP_AsyncPlaintextTransport._on_socket_writable() called, but _tx_buffers is empty.z"Turned off writability watcher: %szSend would block on %sa_AsyncBaseTransport._produce() failed, aborting connection: error=%r; sock=%s; Caller's stack:
%sr   )rW   r_   r;   rP   rS   r   r   rR   r[   r\   r:   r   r   r   r   r   r   rq   r   r   r   r   r   r   rr   s     r   r   z,_AsyncPlaintextTransport._on_socket_writable=  s   
 ;$,,,MM&'+{DJ@ @ @ F  	6 	6()-"6 	6 	6 	6	PMMOOO # P
(():):)<)<===BDJOOOOOP P 4;34 
	, 
	, 
	,5$+":;; 	,K#AAA6
CCCC!!J4:rww!2CLNNC(E (EF F F
 $$U+++++++++ DCCCCC
	,s   C F#B5FF#)
r.   r/   r0   r1   rD   r   r|   r   r   r   r   s   @r   r   r     s         
M M M M M K K K8 1D 1D _1Df  P  P _ P  P  P  P  Pr   r   c                        e Zd ZdZ fdZd Zed             Zed             Ze fd            Z	e fd            Z
 xZS )r   z\Implementation of `.nbio_interface.AbstractStreamTransport` for an SSL
    connection.

    c                 .   t          t          |                               |||           | j        | _        d| _        | j                            | j        	                                | j
                   | j                            | j
                   dS )a{  

        :param ssl.SSLSocket sock: non-blocking connected socket
        :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
            corresponding protocol in this transport/protocol pairing; the
            protocol already had its `connection_made()` method called.
        :param AbstractIOServices | AbstractFileDescriptorServices nbio:

        N)r   r   rD   r   _ssl_readable_action_ssl_writable_actionrR   r   rS   r\   r   r`   r   s       r   rD   z_AsyncSSLTransport.__init__g  s     	 $''00xFFF$(M!$(! 	
dj//1143KLLL
**4+CDDDDDr   c                    | j         | j        k    r(t                              d| j         | j                   dS |sJ d|| j         f            |                                 dk    }|                     |           |rl| j        g| j        | _        | j	        
                    | j                                        | j                   t                              d| j                   dS dS dS )r   r   Nz1_AsyncSSLTransport.write(): empty data from user.r   r   )rW   r_   r;   rP   rS   r   r   r  r   rR   ro   r\   r   r   s      r   r   z_AsyncSSLTransport.write{  s
    ;$,,,MM $TZ9 9 9 F 	) 	)IDK) 	) 	) 	) #88::a?T""" 	K4#<#D(,D%J!!$*"3"3"5"5t7OPPPMM=tzJJJJJ	K 	K#D#Dr   c                 X   | j         | j        k    r(t                              d| j         | j                   dS | j        rD	 |                                  dS # t          $ r }|                     |           Y d}~dS d}~ww xY wt                              d| j        | j                   dS )z+Handle readable socket indication

        r   Nz>SSL readable action was suppressed: ssl_writable_action=%r; %s)	rW   r_   r;   rP   rS   r  r:   r   r  rr   s     r   r   z&_AsyncSSLTransport._on_socket_readable      
 ;$,,,MM&'+{DJ@ @ @ F$ 		,))+++++ , , ,$$U+++++++++, MM-.2.G
       A 
B!A<<Bc                 X   | j         | j        k    r(t                              d| j         | j                   dS | j        rD	 |                                  dS # t          $ r }|                     |           Y d}~dS d}~ww xY wt                              d| j        | j                   dS )r   r   Nz>SSL writable action was suppressed: ssl_readable_action=%r; %s)	rW   r_   r;   rP   rS   r  r:   r   r  rr   s     r   r   z&_AsyncSSLTransport._on_socket_writable  r  r  c                    d}	 t          t          |                                            | j        | j        k    r(t
                              d| j        | j                   dS | j        	                    | j
                   n# t          j        $ r}|j        t          j        k    r!t
                              d| j                   n|j        t          j        k    r#t
                              d| j                   d}nSt
                              d|| j        d                    t%          j        t)          j                                           Y d}~nd}~ww xY w|r| j        s7| j                            | j                                        | j
                   | j        | _        | j        | j        k    r8| j                            | j                                                   d| _        n| j        s7| j                            | j                                        | j                   | j        | _        | j        r8| j                            | j                                                   d| _        | j        rL| j        sG| j        | _        | j                            | j                                        | j                   dS dS dS )	a  [override] Ingest data from socket and dispatch it to protocol until
        exception occurs (typically ssl.SSLError with
        SSL_ERROR_WANT_READ/WRITE), per-event data consumption limit is reached,
        transport becomes inactive, or failure.

        Update consumer/producer registration.

        :raises Exception: error that signals that connection needs to be
            aborted
        Tz8Leaving SSL consumer due to inactive state: state=%s; %sNzSSL ingester wants read: %szSSL ingester wants write: %sFr   r   ) r   r   r   rW   r_   r;   rP   rS   rR   r`   r   r   r   r   r   r   rq   r   r   r   r   r   r  r   r\   r  r[   ro   r   r   r   r   )r,   next_consume_on_readabler   r   s      r   r   z_AsyncSSLTransport._consume  s    $( 	I$d++44666  {d000 *+/;
D D D  J..t/GHHHH9 | 	 	 	{c555;TZHHHH 888<djIII+0((!!J4:rww!2CLNNC(E (EF F F
 	> $ 	1, @
%%dj&7&7&9&9&*&>@ @ @(,D% (DM99
(():):)<)<===,0) , @
%%dj&7&7&9&9&*&>@ @ @(,D%( 1
(():):)<)<===,0)  	QD$= 	Q(,D%J!!$*"3"3"5"5t7OPPPPP	Q 	Q 	Q 	Qs   'B ECEEc                    d}	 t          t          |                                            | j        rJ dt	          | j                  f            n# t
          j        $ r}|j        t
          j        k    r#t          
                    d| j                   d}n|j        t
          j        k    r#t          
                    d| j                   d}nSt                              d|| j        d                    t          j        t#          j                                           Y d}~nd}~ww xY w| j        r3|J d	| j        f            |r| j        s7| j                            | j                                        | j                   | j        | _        | j        | j        k    r8| j                            | j                                                   d| _        n\| j        s7| j                            | j                                        | j                   | j        | _        | j        r8| j                            | j                                                   d| _        n| j        | j        k    rZ| j                            | j                                                   d| _        | j        | j        k    sJ d
| j        f            nh| j        | j        k    s J dd| j        d| j        d| j        f            d| _        | j                            | j                                                   | j        sd| j        | _        | j                            | j                                        | j                   | j                            | j                   dS | j                                         r!| j                            | j                   dS dS )aI  [override] Emit data from tx_buffers all chunks are exhausted or
        sending is interrupted by an exception (typically ssl.SSLError with
        SSL_ERROR_WANT_READ/WRITE).

        Update consumer/producer registration.

        :raises Exception: error that signals that connection needs to be
            aborted

        Nz__AsyncSSLTransport._produce(): no exception from parent class, but data remains in _tx_buffers.zSSL emitter wants read: %sFzSSL emitter wants write: %sTr   r   zE_AsyncSSLTransport._produce(): next_produce_on_writable is still Nonezr_AsyncSSLTransport._produce(): with empty tx_buffers, writable_action cannot be _produce when readable is _producez_AsyncSSLTransport._produce(): with empty tx_buffers, expected writable_action as _produce when readable_action is not _producezwritable_action:zreadable_action:zstate:)!r   r   r   r   r   r   r   r   r   r;   rP   rS   r   rq   r   r   r   r   r   rW   r  rR   ro   r\   r   r  r   r   r   r[   r   r`   pending)r,   next_produce_on_writabler   r   s      r   r   z_AsyncSSLTransport._produce  s
    $( 	'$d++44666$ ' ' ':;>$<& <&*' ' ' ' '# | 	 	 	{c555:DJGGG+0(( 888;TZHHH+/((!!J4:rww!2CLNNC(E (EF F F
 	.  +	>+77"k:+777 ( 50 DJ))$**;*;*=*=*.*BD D D,0M) ,==J,,TZ->->-@-@AAA04D- 0 DJ))$**;*;*=*=*.*BD D D,0M), 5J,,TZ->->-@-@AAA04D- (DM99
(():):)<)<===,0)0DMAAA $D-AAAA 0DMAAA&'9-/A-xDFAAA -1)
(():):)<)<=== ( 	I(,D%J!!$*"3"3"5"5t7OPPPJ..t/GHHHHHZ!! 	IJ..t/GHHHHH	I 	Is   'A D.!CD))D.)r.   r/   r0   r1   rD   r   r|   r   r   r   r   r   r   s   @r   r   r   a  s         
E E E E E(K K K:   _*   _* FQ FQ FQ FQ _FQP ZI ZI ZI ZI _ZI ZI ZI ZI ZIr   r   )(r1   r   r   r    loggingr   rx   rM   r   r   r   "pika.adapters.utils.nbio_interfacer   r   pika.compatr   pika.diagnostic_utilsEAGAINEWOULDBLOCKr   EINPROGRESSrn   	getLoggerr.   r;   diagnostic_utilscreate_log_exception_decoratorr|   r   r   r"   objectr$   r4   r?   r*   r9   r   r   r   r2   r   r   <module>r     s   
            				  



 



    I I I I I I I I         
L	"  
	, (
 '
H
%
% 'FFwOO
 
 
	K 	K 	K  (% % % % %F % % %&& & & & &v & & &R    2   ,v( v( v( v( v(F v( v( v(rA A A A AF A A AH
l' l' l' l' l'l' l' l'^	GP GP GP GP GP2 GP GP GPTGI GI GI GI GI, GI GI GI GI GIr   