o
    a                     @   s   d Z ddlZddlZddlZddlZddlZejdddkZ	ejddZ
e
dkZdZG d	d
 d
ejZedkr?e  dS dS )ab  
test_app.py
websocket - WebSocket client library for Python

Copyright 2021 engn33r

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
    NTEST_WITH_INTERNET01LOCAL_WS_SERVER_PORTz-1Tc                   @   s   e Zd ZG dd dZdd Zdd Zeeddd	 Z	ee
d
dd Zee
d
dd Zee
d
dd Zee
d
dd Zee
d
dd Zee
d
dd Zee
d
dd Zee
d
dd ZdS )WebSocketAppTestc                   @   s   e Zd ZdZdS )zWebSocketAppTest.NotSetYetzI A marker class for signalling that a value hasn't been set yet.
        N)__name__
__module____qualname____doc__ r   r   :/usr/lib/python3/dist-packages/websocket/tests/test_app.py	NotSetYet&   s    r   c                 C   s,   t t t t_t t_t t_d S N)wsZenableTrace	TRACEABLEr   r   keep_running_openkeep_running_closeget_mask_key_idselfr   r   r   setUp*   s   


zWebSocketAppTest.setUpc                 C   s"   t  t _t  t _t  t _d S r   )r   r   r   r   r   r   r   r   r   tearDown1   s   

zWebSocketAppTest.tearDownz/Tests using local websocket server are disabledc                    s>   dd } fdd}dd }t jdt |||d}|  d	S )
z| A WebSocketApp should keep running as long as its self.keep_running
        is not False (in the boolean context).
        c                 _   s   |  d | jt_d| _dS )zn Set the keep_running flag for later inspection and immediately
            close the connection.
            zhello!FN)sendkeep_runningr   r   r   argskwargsr   r   r   on_open<   s   

z1WebSocketAppTest.testKeepRunning.<locals>.on_openc                    s   t |    d S r   printclose)wsappmessager   r   r   
on_messageD      z4WebSocketAppTest.testKeepRunning.<locals>.on_messagec                 _   s   | j t_dS )z< Set the keep_running flag for the test to use.
            N)r   r   r   r   r   r   r   on_closeH   s   z2WebSocketAppTest.testKeepRunning.<locals>.on_closezws://127.0.0.1:)r   r%   r#   N)r   WebSocketAppr   run_forever)r   r   r#   r%   appr   r   r   testKeepRunning6   s
   z WebSocketAppTest.testKeepRunningz%Internet-requiring tests are disabledc                 C   s0   dd }t jd|d}| t|jt| dS )zi A WebSocketApp should forward the received mask_key function down
        to the actual socket.
        c                   S   s   dS )Nz    r   r   r   r   r   my_mask_key_funcV   s   z:WebSocketAppTest.testSockMaskKey.<locals>.my_mask_key_funczwss://stream.meetup.com/2/rsvps)get_mask_keyN)r   r&   assertEqualidr+   )r   r*   r(   r   r   r   testSockMaskKeyP   s   z WebSocketAppTest.testSockMaskKeyc                 C   sB   dd }dd }t jd||d}| jt j|jddd	tjid
 dS )zA Test exception handling if ping_interval < ping_timeout
        c                 S      t d |   d S NzGot a ping!r   r(   msgr   r   r   on_pingd   r$   zDWebSocketAppTest.testInvalidPingIntervalPingTimeout.<locals>.on_pingc                 S   r/   NzGot a pong! No need to respondr   r1   r   r   r   on_pongh   r$   zDWebSocketAppTest.testInvalidPingIntervalPingTimeout.<locals>.on_pongwss://api-pub.bitfinex.com/ws/1r3   r5         	cert_reqsping_intervalping_timeoutssloptNr   r&   assertRaisesZWebSocketExceptionr'   ssl	CERT_NONEr   r3   r5   r(   r   r   r   "testInvalidPingIntervalPingTimeout_   s   "z3WebSocketAppTest.testInvalidPingIntervalPingTimeoutc                 C   s:   dd }dd }t jd||d}|jddd	tjid
 dS )z5 Test WebSocketApp proper ping functionality
        c                 S   r/   r0   r   r1   r   r   r   r3   t   r$   z2WebSocketAppTest.testPingInterval.<locals>.on_pingc                 S   r/   r4   r   r1   r   r   r   r5   x   r$   z2WebSocketAppTest.testPingInterval.<locals>.on_pongr6   r7   r9   r8   r:   r;   N)r   r&   r'   rA   rB   rC   r   r   r   testPingIntervalo   s   z!WebSocketAppTest.testPingIntervalc                 C      t d}|jdddd dS )z( Test WebSocketApp close opcode
        'wss://tsock.us1.twilio.com/v3/wsconnectr9   r8   Ping payloadr<   r=   Zping_payloadNr   r&   r'   r   r(   r   r   r   testOpcodeClose      
z WebSocketAppTest.testOpcodeClosec                 C   rF   )z) Test WebSocketApp binary opcode
        z'streaming.vn.teslamotors.com/streaming/r9   r8   rH   rI   NrJ   rK   r   r   r   testOpcodeBinary   rM   z!WebSocketAppTest.testOpcodeBinaryc                 C   *   t d}| jt j|jddtjid dS )z; A WebSocketApp handling of negative ping_interval
        r6   r:   )r<   r>   Nr?   rK   r   r   r   testBadPingInterval      
 z$WebSocketAppTest.testBadPingIntervalc                 C   rO   )z: A WebSocketApp handling of negative ping_timeout
        r6   r:   )r=   r>   Nr?   rK   r   r   r   testBadPingTimeout   rR   z#WebSocketAppTest.testBadPingTimeoutc                 C   s   dd }t jd|d}t jt jjdd}| ddg|| t jt jjd	d}| d
d
g|| t d}t jt jjd	d}| d
d
g|| | jt j|jdd d
S )zU Test extraction of close frame status code and close reason in WebSocketApp
        c                 S   s   t d d S )Nzon_close reached)r   )r!   Zclose_status_codeZ	close_msgr   r   r   r%      s   z6WebSocketAppTest.testCloseStatusCode.<locals>.on_closerG   )r%   s   no-init-from-client)Zopcodedatai  zno-init-from-client    Nztest if connection is closed)rU   )	r   r&   ZABNFZOPCODE_CLOSEr,   Z_get_close_argsr@   Z"WebSocketConnectionClosedExceptionr   )r   r%   r(   Z
closeframeZapp2r   r   r   testCloseStatusCode   s   
z$WebSocketAppTest.testCloseStatusCodeN)r   r   r	   r   r   r   unittestZ
skipUnlessTEST_WITH_LOCAL_SERVERr)   r   r.   rD   rE   rL   rN   rQ   rT   rW   r   r   r   r   r   $   s,    
















r   __main__)r
   osos.pathZ	websocketr   rA   rX   environgetr   r   rY   r   ZTestCaser   r   mainr   r   r   r   <module>   s    