o
    U˟i                     @   s   d dl Z d dl mZ d dl mZ d dlmZ d dlmZ G dd deZG dd deej	Z
ed	G d
d dej	ZedkrFe  dS dS )    N)socket)	selectors)timingc                   @   s.   e Zd Zed
ddZdZdd Zdd Zd	S )SelectorTestMixin   c                 C   s6   | j |d}|D ]\}}|| |j| t  qd S Ntimeout)selectdatafileobjgeventsleep)selr	   eventskeymask r   d/var/www/apps/myagent/mysuperagent/venv/lib/python3.10/site-packages/gevent/tests/test__selectors.pyrun_selector_once   s
   
z#SelectorTestMixin.run_selector_onceTc                 C   s8   | d}|r|| | jr|| |  d S d S )Nd   )recvsendunregister_after_send
unregisterclose)selfselectorconn_eventsr   r   r   r    read_from_ready_socket_and_reply   s   


z2SelectorTestMixin.read_from_ready_socket_and_replyc              	   C   s   t  \}}d }z9||tj| j t| j|}d}|	| |
d}| || W |  |  |  |d urA|d n|  |  |  |d urX|d w w | |d uob|  d S )Ns   abcdef2   
   )r   
socketpairregisterr   
EVENT_READr    r   spawnr   r   r   assertEqualr   join
assertTrueready)r   r   serverclientgletDATAr   r   r   r   _check_selector"   s,   


z!SelectorTestMixin._check_selectorN)r   )__name__
__module____qualname__staticmethodr   r   r    r/   r   r   r   r   r      s    
r   c                   @   s   e Zd Zdd Zdd ZdS )GeventSelectorTestc                 C   s8   t  }| | W d    d S 1 sw   Y  d S N)r   GeventSelectorr/   )r   r   r   r   r   test_select_using_socketpair8   s   
"z/GeventSelectorTest.test_select_using_socketpairc              
   C   s  zt j}W n ty   d }Y nw dd tdD }zt }t }t|D ]%\}}|\}}||tj| j	 ||tj| t
|d}	||	 q't  | j||d d j|kr_dndd d}
|jddD ]\}}t
|jd}|jd	}	| |	| |
d
7 }
qk| |
t| W |  |  |D ]}|D ]}|  qqd S |  |  |D ]}|D ]}|  qqw )Nc                 S   s   g | ]}t  qS r   )r   r#   ).0_r   r   r   
<listcomp>C   s    z?GeventSelectorTest.test_select_many_sockets.<locals>.<listcomp>r"   asciir   r   r   r!      )r   AF_UNIXAttributeErrorranger   r6   	enumerater$   r%   r    strencoder   r   idler   familyr
   r   r   r   r'   lenr   )r   r>   pairs
server_sel
client_selipairr+   r,   r   foundr   r9   expectedsr   r   r   test_select_many_sockets=   sV   




z+GeventSelectorTest.test_select_many_socketsN)r0   r1   r2   r7   rO   r   r   r   r   r4   5   s    r4   z*Things like os.close don't work on Windowsc                   @   s0   e Zd ZdZdd Zdd Zdd Zdd	 Zd
S )TestPossibleCrashesa  
    Tests for the crashes and unexpected exceptions
    that happen when we try to use or create (depending on
    loop implementation) a IO watcher for a closed/invalid file descriptor.

    See https://github.com/gevent/gevent/issues/2100
    See test__select.py
    c                 C   sn   t   }| |j t|j t }||tj | |j	t
jd W d    d S 1 s0w   Y  d S r   )r   
addCleanupr   r   r&   r   r6   r$   r%   r
   r   SMALLEST_RELIABLE_DELAY)r   sockr   r   r   r   #test_closing_object_while_selectingy   s   "z7TestPossibleCrashes.test_closing_object_while_selectingc                 C   s$   z|   W d S  ty   Y d S w r5   )r   OSError)r   rS   r   r   r   _close_invalid_sock   s
   z'TestPossibleCrashes._close_invalid_sockc                 C   s   ddl m} t }| | j| t |j|  t	 }|
|tj | |jtjd W d    d S 1 s:w   Y  d S )Nr   )osr   )r   rW   r   rQ   rV   r&   r   filenor   r6   r$   r%   r
   r   rR   r   rW   rS   r   r   r   r   test_closing_fd_while_selecting   s   "z3TestPossibleCrashes.test_closing_fd_while_selectingc                 C   st   dd l }t }| | j| ||  t }| t	d |
|tj W d    d S 1 s3w   Y  d S )Nr   zInvalid file)rW   r   rQ   rV   r   rX   r   r6   assertRaisesRegex
ValueErrorr$   r%   rY   r   r   r    test_closing_fd_before_selecting   s   "z4TestPossibleCrashes.test_closing_fd_before_selectingN)r0   r1   r2   __doc__rT   rV   rZ   r]   r   r   r   r   rP   n   s    	rP   __main__)r   r   r   gevent.testingtesting	greentestr   objectr   TestCaser4   skipOnWindowsrP   r0   mainr   r   r   r   <module>   s   *9;