
    )Jf_A                     |   d Z ddlZddlZddlZddlZddlZ	 ddlZn# e$ r ddlZY nw xY wddl	Z	ddl
Z	ddlZ	ddlZddlmZ ddlmZ ddlmZmZ ddlmZmZ  ej        e          Z G d de          Z G d	 d
e          Z G d de          Z G d de          Z G d de          Z  G d de          Z!dS )z Use pika with the Gevent IOLoop.    N)BaseConnection)check_callback_arg)AbstractIOReferenceAbstractIOServices)AbstractSelectorIOLoopSelectorIOServicesAdapterc                   L     e Zd ZdZ	 	 	 	 	 	 d fd	Ze	 	 dd            Z xZS )GeventConnectionzwImplementation of pika's ``BaseConnection``.

    An async selector-based connection which integrates with Gevent.
    NTc                 6   t           j        j        rt          d          |pt	          t          j                              }t          |t                    r|}nt          |          }t          t          |                               ||||||           dS )a  Create a new GeventConnection instance and connect to RabbitMQ on
        Gevent's event-loop.

        :param pika.connection.Parameters|None parameters: The connection
            parameters
        :param callable|None on_open_callback: The method to call when the
            connection is open
        :param callable|None on_open_error_callback: Called if the connection
            can't be established or connection establishment is interrupted by
            `Connection.close()`:
            on_open_error_callback(Connection, exception)
        :param callable|None on_close_callback: Called when a previously fully
            open connection is closed:
            `on_close_callback(Connection, exception)`, where `exception` is
            either an instance of `exceptions.ConnectionClosed` if closed by
            user or broker or exception of another type that describes the
            cause of connection failure
        :param gevent._interfaces.ILoop|nbio_interface.AbstractIOServices|None
            custom_ioloop: Use a custom Gevent ILoop.
        :param bool internal_connection_workflow: True for autonomous connection
            establishment which is default; False for externally-managed
            connection workflow via the `create_connection()` factory
        z-GeventConnection is not supported on Windows.)internal_connection_workflowN)pikacompat
ON_WINDOWSRuntimeError_GeventSelectorIOLoopgeventget_hub
isinstancer    _GeventSelectorIOServicesAdaptersuperr
   __init__)	self
parameterson_open_callbackon_open_error_callbackon_close_callbackcustom_ioloopr   nbio	__class__s	           b/home/alex/cs2snipeproduction/venv/lib/python3.11/site-packages/pika/adapters/gevent_connection.pyr   zGeventConnection.__init__'   s    < ;! 	PNOOO& A.v~/?/?@@ 	 m%788 	C DD3MBBD%%..")E 	/ 	G 	G 	G 	G 	G    c                      |pt          t          j                              }t          |           fd}                     ||||          S )z_Implement
        :py:classmethod::`pika.adapters.BaseConnection.create_connection()`.
        c                 B    | t          d           | d          S )zConnection factory.NzIExpected pika.connection.Parameters instance, but got None in params arg.F)r   r   r   )
ValueError)paramsclsr   s    r    connection_factoryz>GeventConnection.create_connection.<locals>.connection_factoryf   sD    ~  "I J J J3&%)49; ; ; ;r!   )connection_configsr'   r   workflowon_done)r   r   r   r   _start_connection_workflow)r&   r(   r*   r   r)   r'   r   s   `     @r    create_connectionz"GeventConnection.create_connectionX   s     ' A.v~/?/?@@ 	 0>>	; 	; 	; 	; 	; 	; --11 .   	r!   )NNNNNT)NN)__name__
__module____qualname____doc__r   classmethodr,   __classcell__)r   s   @r    r
   r
   !   s          !"&(,#'#.2/G /G /G /G /G /Gb  )-#'	   [    r!   r
   c                   :    e Zd ZdZd Zed             Zd Zd ZdS )_TSafeCallbackQueueziDispatch callbacks from any thread to be executed in the main thread
    efficiently with IO events.
    c                     t          j                    | _        t          j                    \  | _        | _        t          j                    | _	        dS )zQ
        :param _GeventSelectorIOLoop loop: IO loop to add callbacks to.
        N)
queueQueue_queueospipe_read_fd	_write_fd	threadingRLock_write_lockr   s    r    r   z_TSafeCallbackQueue.__init__|   s=    
 kmm(*		%t~ %?,,r!   c                     | j         S )z?The file-descriptor to register for READ events in the IO loop.)r;   r@   s    r    fdz_TSafeCallbackQueue.fd   s     }r!   c                     | j                             |           | j        5  t          j        | j        d           ddd           dS # 1 swxY w Y   dS )zAdd an item to the queue from any thread. The configured handler
        will be invoked with the item in the main thread.

        :param item: Object to add to the queue.
           N)r8   putr?   r9   writer<   r   callbacks     r    add_callback_threadsafez+_TSafeCallbackQueue.add_callback_threadsafe   s     	!!! 	. 	.HT^W---	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	.s   A

AAc                     	 | j                                         }t          j        | j        d            |             dS # t
          j        $ r t                              d           Y dS w xY w)a  Invoke the next callback from the queue.

        MUST run in the main thread. If no callback was added to the queue,
        this will block the IO loop.

        Performs a blocking READ on the pipe so must only be called when the
        pipe is ready for reading.
           zCallback queue was empty.N)	r8   
get_nowaitr9   readr;   r6   EmptyLOGGERwarningrG   s     r    run_next_callbackz%_TSafeCallbackQueue.run_next_callback   sz    	{--//H GDM1%%%HJJJJJ { 	8 	8 	8NN6777777	8s   A )A.-A.N)	r-   r.   r/   r0   r   propertyrB   rI   rQ    r!   r    r4   r4   w   sf         
- 
- 
-   X	. 	. 	.    r!   r4   c                   \    e Zd ZdZdZdZdZddZd Zd Z	d	 Z
d
 Zd Zd Zd Zd Zd ZdS )r   zImplementation of `AbstractSelectorIOLoop` using the Gevent event loop.

    Required by implementations of `SelectorIOServicesAdapter`.
    rK      r   Nc                     |pt          j                     _        i  _        t           j                                         _        t                       _         fd} 	                     j        j
        | j                   dS )z>
        :param gevent._interfaces.ILoop gevent_loop:
        c                 >    ~ ~j                                          dS )z$Swallow the fd and events arguments.N)_callback_queuerQ   )rB   eventsr   s     r    run_callback_in_main_threadzC_GeventSelectorIOLoop.__init__.<locals>.run_callback_in_main_thread   s$     2244444r!   N)r   r   _hub_io_watchers_by_fdhubWaiter_waiterr4   rX   add_handlerrB   READ)r   
gevent_hubrZ   s   `  r    r   z_GeventSelectorIOLoop.__init__   s     2&."2"2	"$z((**  344	5 	5 	5 	5 	5 	-02M	$ 	$ 	$ 	$ 	$r!   c                 P    | j         j                                         d| _         dS )zRelease the loop's resources.N)r[   loopdestroyr@   s    r    closez_GeventSelectorIOLoop.close   s#    	   			r!   c                     t                               d           | j                                         t                               d           | j                                         dS )zNRun the I/O loop. It will loop until requested to exit. See `stop()`.
        z"Passing control to Gevent's IOLoopz,Control was passed back from Gevent's IOLoopN)rO   debugr_   getclearr@   s    r    startz_GeventSelectorIOLoop.start   sX     	9:::CDDDr!   c                 :    | j                             d           dS )a$  Request exit from the ioloop. The loop is NOT guaranteed to
        stop before this method returns.

        To invoke `stop()` safely from a thread other than this IOLoop's thread,
        call it via `add_callback_threadsafe`; e.g.,

            `ioloop.add_callback(ioloop.stop)`
        N)r_   switchr@   s    r    stopz_GeventSelectorIOLoop.stop   s      	D!!!!!r!   c                 d   t          j                    | j        k    r;t                              d           | j        j                            |           dS t                              d           t          j        | j        j        j        |          }| j	        
                    |           dS )a  Requests a call to the given function as soon as possible in the
        context of this IOLoop's thread.

        NOTE: This is the only thread-safe method in IOLoop. All other
        manipulations of IOLoop must be performed from the IOLoop's thread.

        For example, a thread may request a call to the `stop` method of an
        ioloop that is running in a different thread via
        `ioloop.add_callback_threadsafe(ioloop.stop)`

        :param callable callback: The callback method
        z Adding callback from main threadz#Adding callback from another threadN)r   r   r[   rO   rh   rd   run_callback	functoolspartialrX   rI   rG   s     r    add_callbackz"_GeventSelectorIOLoop.add_callback   s     >ty((LL;<<<IN''11111
 LL>??? ()DhOOH 88BBBBBr!   c                 n    | j         j                            |          }|                    |           |S )a  Add the callback to the IOLoop timer to be called after delay seconds
        from the time of call on best-effort basis. Returns a handle to the
        timeout.

        :param float delay: The number of seconds to wait to call callback
        :param callable callback: The callback method
        :returns: handle to the created timeout that may be passed to
            `remove_timeout()`
        :rtype: object
        )r[   rd   timerrk   )r   delayrH   ru   s       r    
call_laterz _GeventSelectorIOLoop.call_later   s2     	$$U++Hr!   c                 .    |                                  dS )zURemove a timeout

        :param timeout_handle: Handle of timeout to remove
        N)rf   )r   timeout_handles     r    remove_timeoutz$_GeventSelectorIOLoop.remove_timeout  s    
 	r!   c                     | j         j                            ||          }|| j        |<   |                    |||           dS )a  Start watching the given file descriptor for events

        :param int fd: The file descriptor
        :param callable handler: When requested event(s) occur,
            `handler(fd, events)` will be called.
        :param int events: The event mask (READ|WRITE)
        N)r[   rd   ior\   rk   )r   rB   handlerrY   
io_watchers        r    r`   z!_GeventSelectorIOLoop.add_handler  sH     Y^&&r622
&0#"f-----r!   c                     | j         |         }|j        }|                                 | j         |= |                     |||           dS )zChange the events being watched for.

        :param int fd: The file descriptor
        :param int events: The new event mask (READ|WRITE)
        N)r\   rH   rf   r`   )r   rB   rY   r~   rH   s        r    update_handlerz$_GeventSelectorIOLoop.update_handler!  sV     ,R0
 &#B'Xv.....r!   c                 X    | j         |         }|                                 | j         |= dS )zgStop watching the given file descriptor for events

        :param int fd: The file descriptor
        N)r\   rf   )r   rB   r~   s      r    remove_handlerz$_GeventSelectorIOLoop.remove_handler/  s4    
 ,R0
#B'''r!   )N)r-   r.   r/   r0   ra   WRITEERRORr   rf   rk   rn   rs   rw   rz   r`   r   r   rS   r!   r    r   r      s          DEE$ $ $ $*  
  	" 	" 	"C C C2    
. 
. 
./ / /( ( ( ( (r!   r   c                   "    e Zd ZdZ	 	 	 	 ddZdS )r   zESelectorIOServicesAdapter implementation using Gevent's DNS resolver.r   c           
          t          | j        |||||||          }|                                 t          |          S )zOImplement :py:meth:`.nbio_interface.AbstractIOServices.getaddrinfo()`.
        )native_loophostportfamilysocktypeprotoflagsr*   )_GeventAddressResolver_looprk   _GeventIOLoopIOHandle)	r   r   r   r*   r   r   r   r   resolvers	            r    getaddrinfoz,_GeventSelectorIOServicesAdapter.getaddrinfo<  sQ     *dj/3/3173;050529; ; ; 	$X...r!   N)r   r   r   r   )r-   r.   r/   r0   r   rS   r!   r    r   r   9  s<        OO / / / / / /r!   r   c                       e Zd ZdZd Zd ZdS )r   zXImplement `AbstractIOReference`.

    Only used to wrap the _GeventAddressResolver.
    c                     |j         | _        dS )zY
        :param subject: subject of the reference containing a `cancel()` method
        N)cancel_cancel)r   subjects     r    r   z_GeventIOLoopIOHandle.__init__Y  s     ~r!   c                 *    |                                  S )zCancel pending operation

        :returns: False if was already done or cancelled; True otherwise
        :rtype: bool
        )r   r@   s    r    r   z_GeventIOLoopIOHandle.cancel_  s     ||~~r!   N)r-   r.   r/   r0   r   r   rS   r!   r    r   r   S  s<         
& & &    r!   r   c                   @    e Zd ZdZdZd Zd Zd Zd Zd Z	d Z
d	 Zd
S )r   zPerforms getaddrinfo asynchronously Gevent's configured resolver in a
    separate greenlet and invoking the provided callback with the result.

    See: http://www.gevent.org/dns.html
    )	r   _on_done	_greenlet_ga_host_ga_port
_ga_family_ga_socktype	_ga_proto	_ga_flagsc	                     t          |d           || _        || _        d| _        || _        || _        || _        || _        || _        || _	        dS )a  Initialize the `_GeventAddressResolver`.

        :param AbstractSelectorIOLoop native_loop:
        :param host: `see socket.getaddrinfo()`
        :param port: `see socket.getaddrinfo()`
        :param family: `see socket.getaddrinfo()`
        :param socktype: `see socket.getaddrinfo()`
        :param proto: `see socket.getaddrinfo()`
        :param flags: `see socket.getaddrinfo()`
        :param on_done: on_done(records|BaseException) callback for reporting
            result from the given I/O loop. The single arg will be either an
            exception object (check for `BaseException`) in case of failure or
            the result returned by `socket.getaddrinfo()`.
        r*   N)
r   r   r   r   r   r   r   r   r   r   )	r   r   r   r   r   r   r   r   r*   s	            r    r   z_GeventAddressResolver.__init__z  sX      	7I... 
 $r!   c                     | j          t          j        | j                  | _         dS t                              d           dS )z-Start an asynchronous getaddrinfo invocation.Nz&_GeventAddressResolver already started)r   r   	spawn_raw_resolverO   rP   r@   s    r    rk   z_GeventAddressResolver.start  s;    >!#-dm<<DNNNNNCDDDDDr!   c                 l    d}| j         d}|                                  |                                  |S )zCancel the pending resolver.FNT)r   _stop_greenlet_cleanup)r   changeds     r    r   z_GeventAddressResolver.cancel  s8    >%G!!!r!   c                 J    |                                   d| _        d| _        dS )z,Stop the resolver and release any resources.N)r   r   r   r@   s    r    r   z_GeventAddressResolver._cleanup  s&    
r!   c                 X    | j         "t          j        | j                    d| _         dS dS )zbStop the greenlet performing getaddrinfo if running.

        Otherwise, this is a no-op.
        N)r   r   killr@   s    r    r   z%_GeventAddressResolver._stop_greenlet  s1    
 >%K'''!DNNN &%r!   c                 ^   	 t           j                            | j        | j        | j        | j        | j        | j                  }n4# t          $ r'}t                              d|           |}Y d}~nd}~ww xY wt          j        | j        |          }| j                            |           dS )zoCall `getaddrinfo()` and return result via user's callback
        function on the configured IO loop.
        zAddress resolution failed: %rN)r   socketr   r   r   r   r   r   r   	ExceptionrO   errorrq   rr   _dispatch_callbackr   rs   )r   resultexcrH   s       r    r   z_GeventAddressResolver._resolve  s    	]..t}dm/3/3/@/3~t~O OFF  	 	 	LL8#>>>FFFFFF	 $T%<fEE
)))))s   AA 
A6A11A6c                     	 t                               d| j                   |                     |           |                                  dS # |                                  w xY w)zInvoke the configured completion callback and any subsequent cleanup.

        :param result: result from getaddrinfo, or the exception if raised.
        z9Invoking async getaddrinfo() completion callback; host=%rN)rO   rh   r   r   r   )r   r   s     r    r   z)_GeventAddressResolver._dispatch_callback  s\    
	LLK   MM&!!!MMOOOOODMMOOOOs   5A A#N)r-   r.   r/   r0   	__slots__r   rk   r   r   r   r   r   rS   r!   r    r   r   h  s         

I  <E E E	 	 	  " " "* * *"    r!   r   )"r0   rq   loggingr9   r=   weakrefr6   ImportErrorr7   r   
gevent.hubgevent.socketpika.compatr   pika.adapters.base_connectionr   %pika.adapters.utils.io_services_utilsr   "pika.adapters.utils.nbio_interfacer   r   +pika.adapters.utils.selector_ioloop_adapterr   r   	getLoggerr-   rO   r
   objectr4   r   r   r   r   rS   r!   r    <module>r      sX   & &      				     LLLL                 8 8 8 8 8 8 D D D D D D              
 
	8	$	$S S S S S~ S S Sl2 2 2 2 2& 2 2 2jJ( J( J( J( J(2 J( J( J(Z/ / / / /'@ / / /4    /   *m m m m mV m m m m ms    	))