o
    `                     @   sR  d dl mZmZmZ d dlmZ d dlZd dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZ d d	lmZmZ d d
lmZmZmZmZ d dlmZ d dlmZmZmZmZ d dl m!Z!m"Z" d dl#m$Z$ d dl%m&Z& ddl'm(Z(m)Z)m*Z*m+Z+m,Z, e
ddd G dd dZ-d"ddZ.G dd dZ/G dd de!Z0G dd dZ1d"d d!Z2dS )#    )as_completedFuturewait_for)countN)Optional)warn)IOLoop)IOStream)Event)Queue	QueueFull)
SASLParsermake_auth_externalBEGINAuthenticationError)get_bus)ParserMessageTypeMessageMessageFlag)	ProxyBase
unwrap_msg)Router)message_bus   )MessageFiltersFilterHandleReplyMatcherRouterClosedcheck_replyablezsjeepney.io.tornado is deprecated. Tornado is now built on top of asyncio, so please use jeepney.io.asyncio instead.   )
stacklevelc                   @   sD   e Zd ZdefddZdddefddZd	efd
dZdd ZdS )DBusConnectionstreamc                 C   s$   || _ t | _tdd| _d | _d S )Nr   )start)r#   r   parserr   outgoing_serialunique_name)selfr#    r)   4/usr/lib/python3/dist-packages/jeepney/io/tornado.py__init__   s   
zDBusConnection.__init__Nserialmessagec                   s0   |d u r
t | j}| j||I d H  d S N)nextr&   r#   writeZ	serialiser(   r.   r-   r)   r)   r*   send!   s   
zDBusConnection.sendreturnc                    s>   	 | j  }|d ur|S | jjdddI d H }| j | q)NTi   partial)r%   Zget_next_messager#   
read_bytesZadd_data)r(   msgbr)   r)   r*   receive'   s   
zDBusConnection.receivec                 C      | j   d S r/   )r#   closer(   r)   r)   r*   r<   0      zDBusConnection.close)	__name__
__module____qualname__r	   r+   r   r3   r:   r<   r)   r)   r)   r*   r"      s
    	r"   SESSIONc                    s   t | }ttjtjd}||I d H  |dt  I d H  t }|js?|	|j
dddI d H  |jr<t|j|jr'|tI d H  t|}t|}ttt| dI d H }|d |_W d    |S 1 smw   Y  |S )N)Zfamily    i   Tr5   
   r   )r   r	   socketZAF_UNIXZconnectr1   r   r   ZauthenticatedZfeedr7   errorr   r   r"   
DBusRouterr   Proxyr   ZHellor'   )busZbus_addrr#   Zauth_parserconnrouterZ
reply_bodyr)   r)   r*   open_dbus_connection4   s(   


rL   c                   @   s   e Zd ZdefddZddddZdd	 Zdd
ddee fddZ	dd Z
dd Zdd Zedd ZdefddZdefddZdd ZdS )rG   rJ   c                 C   s<   || _ t | _t | _t | _t 	| j
 tt| _d S r/   )rJ   r   _repliesr   _filtersr
   _stop_receivingr   ZcurrentZadd_callback	_receiverr   r   rK   )r(   rJ   r)   r)   r*   r+   L   s   zDBusRouter.__init__Nr,   c                   s   | j j||dI d H  d S )Nr,   )rJ   r3   r2   r)   r)   r*   r3   V   s   zDBusRouter.sendc                    sz   t | | j rtdt| jj}| j|t	 }| j
||dI d H  |I d H W  d    S 1 s6w   Y  d S )NzThis DBusRouter has stoppedr,   )r   rO   Zis_setr   r0   rJ   r&   rM   Zcatchr   r3   )r(   r.   r-   Z	reply_futr)   r)   r*   send_and_get_replyY   s   
$zDBusRouter.send_and_get_replyr   )queuebufsizerR   c                C   s   t | j||p	t|S )a  Create a filter for incoming messages

        Usage::

            with router.filter(rule) as queue:
                matching_msg = await queue.get()

        :param jeepney.MatchRule rule: Catch messages matching this rule
        :param tornado.queues.Queue queue: Matched messages will be added to this
        :param int bufsize: If no queue is passed in, create one with this size
        )r   rN   r   )r(   ZrulerR   rS   r)   r)   r*   filterd   s   zDBusRouter.filterc                 C   r;   r/   )rO   setr=   r)   r)   r*   stopr   r>   zDBusRouter.stopc                 C   s   | S r/   r)   r=   r)   r)   r*   	__enter__u   s   zDBusRouter.__enter__c                 C   s   |    dS )NF)rV   r(   exc_typeZexc_valexc_tbr)   r)   r*   __exit__x   s   zDBusRouter.__exit__c                 C   s   | j jS r/   )rJ   r'   r=   r)   r)   r*   r'   ~   s   zDBusRouter.unique_namer.   c                    sF   |j jtjkr|j jtj@ st| |I d H S | 	|I d H  d S r/   )
headermessage_typer   Zmethod_returnflagsr   Zno_reply_expectedr   rQ   r3   )r(   r.   r)   r)   r*   send_message   s   zDBusRouter.send_messager8   c              	   C   sH   | j |rdS | j|D ]}z|j| W q ty!   Y qw dS )zHandle one received messageN)rM   dispatchrN   matchesrR   Z
put_nowaitr   )r(   r8   rT   r)   r)   r*   	_dispatch   s   zDBusRouter._dispatchc                    sx   z1	 t | j | j gD ]"}|I dH }|du r& W d| _| j  dS | | | j	
| qqd| _| j  w )z'Receiver loop - runs in a separate taskTNF)r   rJ   r:   rO   waitZ
is_runningrM   Zdrop_allrb   rK   Zincoming)r(   coror8   r)   r)   r*   rP      s   

zDBusRouter._receiver)r?   r@   rA   r"   r+   r3   rQ   r   r   rT   rV   rW   r[   propertyr'   r   r_   rb   rP   r)   r)   r)   r*   rG   K   s    

rG   c                       s2   e Zd Zdef fddZdd Zdd Z  ZS )rH   rK   c                    s   t  | || _d S r/   )superr+   _router)r(   ZmsggenrK   	__class__r)   r*   r+      s   
zProxy.__init__c                 C   s   d | j| jS )NzProxy({}, {}))formatZ_msggenrg   r=   r)   r)   r*   __repr__   s   zProxy.__repr__c                    s    fdd}|S )Nc                     s8    | i |}|j jtju sJ tj|I d H S r/   )r\   r]   r   Zmethod_callr   rg   rQ   )argskwargsr8   make_msgr(   r)   r*   inner   s   z!Proxy._method_call.<locals>.innerr)   )r(   ro   rp   r)   rn   r*   _method_call   s   zProxy._method_call)r?   r@   rA   rG   r+   rk   rq   __classcell__r)   r)   rh   r*   rH      s    rH   c                   @   s.   e Zd ZdZdZd	ddZdd Zdd ZdS )
_RouterContextNrB   c                 C   s
   || _ d S r/   rI   )r(   rI   r)   r)   r*   r+      s   
z_RouterContext.__init__c                    s&   t | jI d H | _t| j| _| jS r/   )rL   rI   rJ   rG   rK   r=   r)   r)   r*   
__aenter__   s   z_RouterContext.__aenter__c                    s   | j   | j  d S r/   )rK   rV   rJ   r<   rX   r)   r)   r*   	__aexit__   s   
z_RouterContext.__aexit__rB   )r?   r@   rA   rJ   rK   r+   ru   rv   r)   r)   r)   r*   rs      s    
rs   c                 C   s   t | S )a  Open a D-Bus 'router' to send and receive messages.

    Use as an async context manager::

        async with open_dbus_router() as req:
            ...

    :param str bus: 'SESSION' or 'SYSTEM' or a supported address.
    :return: :class:`DBusRouter`

    This is a shortcut for::

        conn = await open_dbus_connection()
        async with conn:
            async with conn.router() as req:
                ...
    )rs   rt   r)   r)   r*   open_dbus_router   s   rx   rw   )3Zasyncior   r   r   	itertoolsr   rE   typingr   warningsr   Ztornado.ioloopr   Ztornado.iostreamr	   Ztornado.locksr
   Ztornado.queuesr   r   Zjeepney.authr   r   r   r   Zjeepney.busr   Zjeepney.low_levelr   r   r   r   Zjeepney.wrappersr   r   Zjeepney.routingr   Zjeepney.bus_messagesr   commonr   r   r   r   r   r"   rL   rG   rH   rs   rx   r)   r)   r)   r*   <module>   s2    
^