o
    ` :                     @   s  d dl Z d dlmZ d dlZd dlmZ d dlZd dlmZ zd dlm	Z	 W n e
y5   d dlm	Z	 Y nw d dlmZmZ d dlZd dlmZ d dlmZmZ d d	lmZ d d
lmZmZ d dlmZmZmZ d dlmZm Z  d dl!m"Z" ddl#m$Z$m%Z%m&Z&m'Z'm(Z( e)e*Z+g dZ,edd Z-G dd deZ.d%ddde.fddZ/G dd de%Z0G dd dZ1G dd  d Z2G d!d" d"eZ3e	d%ddd#d$Z4dS )&    N)contextmanager)count)Optional)asynccontextmanager)ValueError)Channel)AuthenticatorBEGIN)get_bus)FileDescriptorfds_buf_size)ParserMessageTypeMessage)	ProxyBase
unwrap_msg)message_bus   )MessageFiltersFilterHandleReplyMatcherRouterClosedcheck_replyable)open_dbus_connectionopen_dbus_routerProxyc               
   c   sX    zd V  W d S  t y+ }  z| jtjtjhv rtdd td| | d } ~ ww )Nzthis socket was already closedzsocket connection broken: {})OSErrorerrnoZEBADFENOTSOCKtrioClosedResourceErrorZBrokenResourceErrorformat)exc r$   1/usr/lib/python3/dist-packages/jeepney/io/trio.py)_translate_socket_errors_to_stream_errors0   s   r&   c                   @   s~   e Zd ZdZdddZdddefdd	Zd
efddZdd
e	fddZ
defddZdd Zdd Zdd Zedd ZdS )DBusConnectiona  A plain D-Bus connection with no matching of replies.

    This doesn't run any separate tasks: sending and receiving are done in
    the task that calls those methods. It's suitable for implementing servers:
    several worker tasks can receive requests and send replies.
    For a typical client pattern, see :class:`DBusRouter`.

    Implements trio's channel interface for Message objects.
    Fc                 C   sD   || _ || _t | _tdd| _d | _t | _	t | _
d | _d S )Nr   )start)socket
enable_fdsr   parserr   outgoing_serialunique_namer    ZLock	send_lock	recv_lock_leftover_to_send)selfr)   r*   r$   r$   r%   __init__I   s   


zDBusConnection.__init__Nserialmessagec             	      s   | j 4 I dH / |du rt| j}| jrtdnd}|j||d}| ||I dH  W d  I dH  dS 1 I dH s=w   Y  dS )z.Serialise and send a :class:`~.Message` objectNi)fds)r.   nextr,   r*   arrayZ	serialise
_send_data)r1   r5   r4   r7   datar$   r$   r%   sendS   s   
.zDBusConnection.sendr;   c              	      s   | j jr
tdt Y | jr| | jI d H  t|0}|r5| j |gtj j	tj j
|fgI d H }n	| j |I d H }| ||I d H  W d    n1 sQw   Y  W d    d S W d    d S 1 siw   Y  d S )Nz!can't send data after sending EOF)r)   Zdid_shutdown_SHUT_WRr    r!   r&   r0   _send_remainder
memoryviewZsendmsgZ
SOL_SOCKETZ
SCM_RIGHTSr<   )r1   r;   r7   sentr$   r$   r%   r:   ^   s"   


"zDBusConnection._send_datar   c                    s   z5|t |k r1||d  }| j|I d H }W d    n1 s"w   Y  ||7 }|t |k sd | _W d S  tjyF   ||d  | _ w N)lenr)   r<   r0   r    Z	Cancelled)r1   r;   Zalready_sentZ	remainingr?   r$   r$   r%   r=   q   s   zDBusConnection._send_remainderreturnc              	      s   | j 4 I dH / 	 | j }|dur|W  d  I dH  S |  I dH \}}|s/td| j|| q
1 I dH s=w   Y  dS )z5Return the next available message from the connectionNTzSocket closed at the other end)r/   r+   Zget_next_message
_read_datar    ZEndOfChannelZadd_data)r1   msgbr7   r$   r$   r%   receive   s   

zDBusConnection.receivec                    s   | j rC| j }t  | j|t I d H \}}}}W d    n1 s&w   Y  |ttjdd@ r<| 	  t
d|t|fS t  | jdI d H }W d    |g fS 1 s]w   Y  |g fS )NZ
MSG_CTRUNCr   z&Unable to receive all file descriptorsi   )r*   r+   Zbytes_desiredr&   r)   Zrecvmsgr   getattrr    _closeRuntimeErrorr   Zfrom_ancdataZrecv)r1   nbytesr;   Zancdataflags_r$   r$   r%   rC      s$   

zDBusConnection._read_datac                 C   s   | j   d | _d S r@   )r)   closer0   r1   r$   r$   r%   rH      s   

zDBusConnection._closec                    s   |    dS )zClose the D-Bus connectionN)rH   rN   r$   r$   r%   aclose   s   zDBusConnection.aclosec              	   C  s   t  4 I dH -}t| }||I dH  z|V  W | I dH  n| I dH  w W d  I dH  dS 1 I dH s<w   Y  dS )aY  Temporarily wrap this connection as a :class:`DBusRouter`

        To be used like::

            async with conn.router() as req:
                reply = await req.send_and_get_reply(msg)

        While the router is running, you shouldn't use :meth:`receive`.
        Once the router is closed, you can use the plain connection again.
        N)r    Zopen_nursery
DBusRouterr(   rO   )r1   nurseryrouterr$   r$   r%   rR      s   ".zDBusConnection.router)F)r   )__name__
__module____qualname____doc__r2   r   r<   bytesr:   r>   r=   rF   rC   rH   rO   r   rR   r$   r$   r$   r%   r'   ?   s    
	
r'   SESSIONFr*   rB   c          	   	      s   t | }t|I dH }t|d}|D ]}||I dH  || I dH  q|tI dH  t|j	|d}|
 4 I dH }|t I dH }|jd |_W d  I dH  |S 1 I dH sbw   Y  |S )zHOpen a plain D-Bus connection

    :return: :class:`DBusConnection`
    NrY   r   )r   r    Zopen_unix_socketr	   Zsend_allZfeedZreceive_somer
   r'   r)   rR   send_and_get_replyr   ZHellobodyr-   )	busr*   Zbus_addrZsockZauthrZreq_dataconnrR   replyr$   r$   r%   r      s    
r   c                       sF   e Zd Zdef fddZedd Zdd Zdd	 Zd
d Z	  Z
S )TrioFilterHandlefiltersc                    s   t  ||| || _d S r@   )superr2   send_channel)r1   r`   ruleZsend_chnZrecv_chn	__class__r$   r%   r2      s   
zTrioFilterHandle.__init__c                 C   s   | j S r@   queuerN   r$   r$   r%   receive_channel   s   z TrioFilterHandle.receive_channelc                    s   |    | j I d H  d S r@   )rM   rb   rO   rN   r$   r$   r%   rO      s   zTrioFilterHandle.aclosec                    s   | j S r@   rf   rN   r$   r$   r%   
__aenter__   s   zTrioFilterHandle.__aenter__c                    s   |   I d H  d S r@   )rO   )r1   exc_typeZexc_valexc_tbr$   r$   r%   	__aexit__   s   zTrioFilterHandle.__aexit__)rS   rT   rU   r   r2   propertyrh   rO   ri   rl   __classcell__r$   r$   rd   r%   r_      s    
r_   c                   @   s0   e Zd ZdZdd Zdd Zdd Zdd	 Zd
S )Futurez4A very simple Future for trio based on `trio.Event`.c                 C   s   d | _ t | _d S r@   )_outcomer    ZEvent_eventrN   r$   r$   r%   r2      s   zFuture.__init__c                 C      t || _| j  d S r@   )r   rp   rq   set)r1   resultr$   r$   r%   
set_result      
zFuture.set_resultc                 C   rr   r@   )r   rp   rq   rs   )r1   r#   r$   r$   r%   set_exception   rv   zFuture.set_exceptionc                    s   | j  I d H  | j S r@   )rq   waitrp   ZunwraprN   r$   r$   r%   get   s   
z
Future.getN)rS   rT   rU   rV   r2   ru   rw   ry   r$   r$   r$   r%   ro      s    ro   c                   @   s   e Zd ZdZdZdZdefddZedd Z	ddd	d
Z
defddZddddeej fddZdejfddZdd ZdefddZejfddZdS )rP   zA client D-Bus connection which can wait for replies.

    This runs a separate receiver task and dispatches received messages.
    Nr]   c                 C   s   || _ t | _t | _d S r@   )_connr   _repliesr   _filters)r1   r]   r$   r$   r%   r2     s   zDBusRouter.__init__c                 C   s   | j jS r@   )rz   r-   rN   r$   r$   r%   r-     s   zDBusRouter.unique_namer3   c                   s   | j j||dI dH  dS )z/Send a message, don't wait for a reply
        r3   N)rz   r<   )r1   r5   r4   r$   r$   r%   r<     s   zDBusRouter.sendrB   c                    s~   t | | jdu rtdt| jj}| j|t }| j	||dI dH  |
 I dH W  d   S 1 s8w   Y  dS )zSend a method call message and wait for the reply

        Returns the reply message (method return or error message type).
        NzThis DBusRouter has stoppedr3   )r   _rcv_cancel_scoper   r8   rz   r,   r{   Zcatchro   r<   ry   )r1   r5   r4   Z	reply_futr$   r$   r%   rZ     s   
$zDBusRouter.send_and_get_replyr   )channelbufsizer~   c                C   s,   |du rt |\}}nd}t| j|||S )a  Create a filter for incoming messages

        Usage::

            async with router.filter(rule) as receive_channel:
                matching_msg = await receive_channel.receive()

            # OR:
            send_chan, recv_chan = trio.open_memory_channel(1)
            async with router.filter(rule, channel=send_chan):
                matching_msg = await recv_chan.receive()

        If the channel fills up,
        The sending end of the channel is closed when leaving the ``async with``
        block, whether or not it was passed in.

        :param jeepney.MatchRule rule: Catch messages matching this rule
        :param trio.MemorySendChannel channel: Send matching messages here
        :param int bufsize: If no channel is passed in, create one with this size
        N)r    Zopen_memory_channelr_   r|   )r1   rc   r~   r   Zrecv_channelr$   r$   r%   filter#  s   zDBusRouter.filterrQ   c                    s,   | j d ur
td|| jI d H | _ d S )Nz+DBusRouter receiver task is already running)r}   rI   r(   	_receiver)r1   rQ   r$   r$   r%   r(   @  s   
zDBusRouter.startc                    s0   | j dur| j   d| _ tdI dH  dS )z Stop the sender & receiver tasksNr   )r}   Zcancelr    sleeprN   r$   r$   r%   rO   E  s
   

zDBusRouter.acloserD   c              	   C   sJ   | j |rdS | j|D ]}z|j| W q tjy"   Y qw dS )zHandle one received messageN)r{   dispatchr|   matchesrb   Zsend_nowaitr    Z
WouldBlock)r1   rD   r   r$   r$   r%   	_dispatchR  s   zDBusRouter._dispatchc                    s   t  K}d| _|| z	 | j I dH }| | qd| _| j  t 	d}| j
j D ]}d|_|j I dH  q2W d   w 1 sJw   Y  w 1 sSw   Y  dS )z'Receiver loop - runs in a separate taskTNF   )r    ZCancelScopeZ
is_runningZstartedrz   rF   r   r{   Zdrop_allZmove_on_afterr|   r`   valuesZshieldrb   rO   )r1   Ztask_statusZcscoperD   Zcleanup_scoper   r$   r$   r%   r   ]  s$   



zDBusRouter._receiver)rS   rT   rU   rV   Z_nursery_mgrr}   r'   r2   rm   r-   r<   r   rZ   r   r    ZMemorySendChannelr   ZNurseryr(   rO   r   ZTASK_STATUS_IGNOREDr   r$   r$   r$   r%   rP      s    
rP   c                       s(   e Zd ZdZ fddZdd Z  ZS )r   a  A trio proxy for calling D-Bus methods

    You can call methods on the proxy object, such as ``await bus_proxy.Hello()``
    to make a method call over D-Bus and wait for a reply. It will either
    return a tuple of returned data, or raise :exc:`.DBusErrorResponse`.
    The methods available are defined by the message generator you wrap.

    :param msggen: A message generator object.
    :param ~trio.DBusRouter router: Router to send and receive messages.
    c                    s(   t  | t|tstd|| _d S )Nz)Proxy can only be used with DBusRequester)ra   r2   
isinstancerP   	TypeError_router)r1   ZmsggenrR   rd   r$   r%   r2   ~  s   

zProxy.__init__c                    s    fdd}|S )Nc                     s<    | i |}|j jtju sJ j|I d H }t|S r@   )headerZmessage_typer   Zmethod_callr   rZ   r   )argskwargsrD   r^   make_msgr1   r$   r%   inner  s
   z!Proxy._method_call.<locals>.innerr$   )r1   r   r   r$   r   r%   _method_call  s   zProxy._method_call)rS   rT   rU   rV   r2   r   rn   r$   r$   rd   r%   r   s  s    
r   c             
   C  s   t | |dI dH }|4 I dH - | 4 I dH }|V  W d  I dH  n1 I dH s-w   Y  W d  I dH  dS 1 I dH sCw   Y  dS )a  Open a D-Bus 'router' to send and receive messages.

    Use as an async context manager::

        async with open_dbus_router() as req:
            ...

    :param str bus: 'SESSION' or 'SYSTEM' or a supported address.
    :return: :class:`DBusRouter`

    This is a shortcut for::

        conn = await open_dbus_connection()
        async with conn:
            async with conn.router() as req:
                ...
    rY   N)r   rR   )r\   r*   r]   Zrtrr$   r$   r%   r     s   *.r   )rX   )5r9   
contextlibr   r   	itertoolsr   Zloggingtypingr   r   ImportErrorasync_generatorZoutcomer   r   r    Ztrio.abcr   Zjeepney.authr	   r
   Zjeepney.busr   Zjeepney.fdsr   r   Zjeepney.low_levelr   r   r   Zjeepney.wrappersr   r   Zjeepney.bus_messagesr   commonr   r   r   r   r   Z	getLoggerrS   log__all__r&   r'   r   r_   ro   rP   r   r   r$   r$   r$   r%   <module>   sB    

~u