o
    ¯bš  ã                   @   s–   d Z ddlmZ ddlmZ ddlmZ ddlmZ ddl	m
Z
 zddlmZ W n ey3   dZY nw G d	d
„ d
ƒZee dƒG dd„ de
ƒƒZdS )z-
Tests for L{twisted.internet.epollreactor}.
é    )ÚskipIf©ÚConnectionDone)Ú_ContinuousPolling)ÚClock)ÚTestCase)ÚepollreactorNc                   @   s8   e Zd ZdZdd„ Zdd„ Zdd„ Zdd	„ Zd
d„ ZdS )Ú
DescriptorzF
    Records reads and writes, as if it were a C{FileDescriptor}.
    c                 C   s
   g | _ d S ©N)Úevents©Úself© r   úI/usr/lib/python3/dist-packages/twisted/internet/test/test_epollreactor.pyÚ__init__   s   
zDescriptor.__init__c                 C   s   dS )Né   r   r   r   r   r   Úfileno   s   zDescriptor.filenoc                 C   ó   | j  d¡ d S )NÚread©r   Úappendr   r   r   r   ÚdoRead    ó   zDescriptor.doReadc                 C   r   )NÚwriter   r   r   r   r   ÚdoWrite#   r   zDescriptor.doWritec                 C   s   |  t¡ | j d¡ d S )NÚlost)Útrapr   r   r   )r   Úreasonr   r   r   ÚconnectionLost&   s   
zDescriptor.connectionLostN)	Ú__name__Ú
__module__Ú__qualname__Ú__doc__r   r   r   r   r   r   r   r   r   r	      s    r	   z(epoll not supported in this environment.c                   @   sx   e Zd ZdZdd„ Zdd„ Zdd„ Zdd	„ Zd
d„ Zdd„ Z	dd„ Z
dd„ Zdd„ Zdd„ Zdd„ Zdd„ Zdd„ ZdS )ÚContinuousPollingTestsza
    L{_ContinuousPolling} can be used to read and write from C{FileDescriptor}
    objects.
    c                 C   óv   t tƒ ƒ}|  |j¡ tƒ }|  | |¡¡ | |¡ |  |j¡ |  	|jj
¡ |  |jj|j¡ |  	| |¡¡ dS )zi
        Adding a reader when there was previously no reader starts up a
        C{LoopingCall}.
        N)r   r   ÚassertIsNoneÚ_loopÚobjectÚassertFalseÚ	isReadingÚ	addReaderÚassertIsNotNoneÚ
assertTrueÚrunningÚassertIsÚclockÚ_reactor©r   ÚpollerÚreaderr   r   r   Útest_addReader2   ó   

z%ContinuousPollingTests.test_addReaderc                 C   r$   )zi
        Adding a writer when there was previously no writer starts up a
        C{LoopingCall}.
        N)r   r   r%   r&   r'   r(   Ú	isWritingÚ	addWriterr+   r,   r-   r.   r/   r0   ©r   r2   Úwriterr   r   r   Útest_addWriterA   r5   z%ContinuousPollingTests.test_addWriterc                 C   óV   t tƒ ƒ}tƒ }| |¡ | |¡ |  |j¡ |  |j 	¡ g ¡ |  
| |¡¡ dS )z=
        Removing a reader stops the C{LoopingCall}.
        N)r   r   r'   r*   ÚremoveReaderr%   r&   ÚassertEqualr0   ÚgetDelayedCallsr(   r)   r1   r   r   r   Útest_removeReaderP   ó   


z(ContinuousPollingTests.test_removeReaderc                 C   r;   )z=
        Removing a writer stops the C{LoopingCall}.
        N)r   r   r'   r7   ÚremoveWriterr%   r&   r=   r0   r>   r(   r6   r8   r   r   r   Útest_removeWriter\   r@   z(ContinuousPollingTests.test_removeWriterc                 C   s&   t tƒ ƒ}| tƒ ¡ | tƒ ¡ dS )zM
        Removing unknown readers and writers silently does nothing.
        N)r   r   rA   r'   r<   )r   r2   r   r   r   Útest_removeUnknownh   s   
z)ContinuousPollingTests.test_removeUnknownc                 C   s    t tƒ ƒ}tƒ }| |¡ |  |j¡ | tƒ ¡ |  |j¡ | tƒ ¡ |  |j¡ | tƒ ¡ | |¡ |  |j¡ |  |jj	¡ |  
t|j ¡ ƒd¡ dS )za
        Adding multiple readers and writers results in a single
        C{LoopingCall}.
        r   N)r   r   r'   r7   r+   r&   r*   rA   r,   r-   r=   Úlenr0   r>   r8   r   r   r   Útest_multipleReadersAndWritersp   s   


z5ContinuousPollingTests.test_multipleReadersAndWritersc                 C   ó‚   t ƒ }t|ƒ}tƒ }| |¡ |  |jg ¡ | d¡ |  |jdg¡ | d¡ |  |jddg¡ | d¡ |  |jg d¢¡ dS )za
        Adding a reader causes its C{doRead} to be called every 1
        milliseconds.
        gñhãˆµøä>r   )r   r   r   N)r   r   r	   r*   r=   r   Úadvance©r   Úreactorr2   Údescr   r   r   Útest_readerPollingƒ   ó   



z)ContinuousPollingTests.test_readerPollingc                 C   rF   )zb
        Adding a writer causes its C{doWrite} to be called every 1
        milliseconds.
        çü©ñÒMbP?r   )r   r   r   N)r   r   r	   r7   r=   r   rG   rH   r   r   r   Útest_writerPolling”   rL   z)ContinuousPollingTests.test_writerPollingc                 C   óT   t ƒ }t|ƒ}tƒ }dd„ |_| |¡ |  |jg ¡ | d¡ |  |jdg¡ dS )zu
        If a C{doRead} returns a value indicating disconnection,
        C{connectionLost} is called on it.
        c                   S   ó   t ƒ S r
   r   r   r   r   r   Ú<lambda>­   ó    zBContinuousPollingTests.test_connectionLostOnRead.<locals>.<lambda>rM   r   N)r   r   r	   r   r*   r=   r   rG   rH   r   r   r   Útest_connectionLostOnRead¥   ó   


z0ContinuousPollingTests.test_connectionLostOnReadc                 C   rO   )zv
        If a C{doWrite} returns a value indicating disconnection,
        C{connectionLost} is called on it.
        c                   S   rP   r
   r   r   r   r   r   rQ   »   rR   zCContinuousPollingTests.test_connectionLostOnWrite.<locals>.<lambda>rM   r   N)r   r   r	   r   r7   r=   r   rG   rH   r   r   r   Útest_connectionLostOnWrite³   rT   z1ContinuousPollingTests.test_connectionLostOnWritec                 C   s–   t tƒ ƒ}tƒ }tƒ }tƒ }| |¡ | |¡ | |¡ | |¡ | ¡ }|  | ¡ g ¡ |  | ¡ g ¡ |  t	|ƒd¡ |  t
|ƒ|||h¡ dS )zv
        L{_ContinuousPolling.removeAll} removes all descriptors and returns
        the readers and writers.
        é   N)r   r   r'   r*   r7   Ú	removeAllr=   Ú
getReadersÚ
getWritersrD   Úset)r   r2   r3   r9   ÚbothÚremovedr   r   r   Útest_removeAllÁ   s   




z%ContinuousPollingTests.test_removeAllc                 C   ó.   t tƒ ƒ}tƒ }| |¡ |  || ¡ ¡ dS )zb
        L{_ContinuousPolling.getReaders} returns a list of the read
        descriptors.
        N)r   r   r'   r*   ÚassertInrX   r1   r   r   r   Útest_getReadersÔ   ó   

z&ContinuousPollingTests.test_getReadersc                 C   r^   )zc
        L{_ContinuousPolling.getWriters} returns a list of the write
        descriptors.
        N)r   r   r'   r7   r_   rY   r8   r   r   r   Útest_getWritersÞ   ra   z&ContinuousPollingTests.test_getWritersN)r   r    r!   r"   r4   r:   r?   rB   rC   rE   rK   rN   rS   rU   r]   r`   rb   r   r   r   r   r#   +   s    
r#   )r"   Úunittestr   Útwisted.internet.errorr   Útwisted.internet.posixbaser   Útwisted.internet.taskr   Útwisted.trial.unittestr   Útwisted.internetr   ÚImportErrorr	   r#   r   r   r   r   Ú<module>   s   ÿ
