o
    U˟i'                     @   s.  d dl Z d dlZd dlZd dlmZ d dlmZ d dlZ	d dl
mZ d dl	mZ d dl	mZ d dl	mZ d dl	mZ d dl
mZ G d	d
 d
e	jjjZedG dd de	jjjZeeeddedG dd de	jjjZG dd dejZedG dd dejZedkre  dS dS )    N)contextmanager)patch)get_hub)osselect)socket)timingc                   @   s   e Zd Zdd ZdS )
TestSelectc                 C   s   t  g g g | d S Nr   )selftimeout r   a/var/www/apps/myagent/mysuperagent/venv/lib/python3.10/site-packages/gevent/tests/test__select.pywait   s   zTestSelect.waitN)__name__
__module____qualname__r   r   r   r   r   r
      s    r
   zCant select on filesc                   @   s0   e Zd Zdd Zeejdddd Z	dS )TestSelectReadc              	   C   sP   t  \}}zt|gg g | W t | t | d S t | t | w r   )r   piper   close)r   r   rwr   r   r   r      s   

zTestSelectRead.waitfreebsdz*skip because of a FreeBSD bug: kern/155606c                 C   s   t tdD}| }|  zt|gg g d W n ty3 } z| |jtj W Y d }~nd }~ww | 	d W d    d S W d    d S 1 sLw   Y  d S )Nrbr   zexception not raised)
open__file__filenor   r   OSErrorassertEqualerrnoEBADFfail)r   fpfderrr   r   r   
test_errno#   s   "zTestSelectRead.test_errnoN)
r   r   r   r   unittestskipIfsysplatform
startswithr&   r   r   r   r   r      s    	r   pollz
Needs pollzCant poll on filesc                   @   $   e Zd Zdd Zdd Zdd ZdS )TestPollReadc              	   C   sv   t  \}}z$t }||tj ||d  W || t | t | d S || t | t | w )Ni  )r   r   r   r,   registerPOLLIN
unregisterr   )r   r   r   r   r,   r   r   r   r   6   s   



zTestPollRead.waitc                 C   s   t  }| t|jd d S )N   )r   r,   assertRaisesKeyErrorr1   )r   r,   r   r   r    test_unregister_never_registeredD   s   z-TestPollRead.test_unregister_never_registeredc                 C   s~   |  d ttd+}| }t }||tj |  |d}| 	||tj
fg W d    d S 1 s8w   Y  d S )Nzlibev >= 4.27 aborts the process if built with EV_VERIFY >= 2. For libuv, depending on whether the fileno is reused or not this either crashes or does nothing.r   r   )skipTestr   r   r   r   r,   r/   r0   r   r   POLLNVAL)r   r#   r$   r,   resultr   r   r   test_poll_invalidK   s   
"zTestPollRead.test_poll_invalidN)r   r   r   r   r5   r9   r   r   r   r   r.   3   s    r.   c                   @   r-   )TestSelectTypesc                 C   s<   t   }ztt| gg g d W |  d S |  w )NMbP?)r   r   intr   r   r   sockr   r   r   test_int^   s   zTestSelectTypes.test_intc                    s@   t     fdd}zt| g g d W    d S    w )Nc                   3   s    t   V  d S r   )r<   r   r   r>   r   r   fileno_iterh   s   z2TestSelectTypes.test_iterable.<locals>.fileno_iterr;   )r   r   r   )r   rA   r   r@   r   test_iterablee   s
   zTestSelectTypes.test_iterablec                 C   s"   d| _ | ttjdgg g d d S )NFhellor;   )switch_expectedr3   	TypeErrorr   r   r   r   r   test_stringp   s   zTestSelectTypes.test_stringN)r   r   r   r?   rB   rG   r   r   r   r   r:   \   s    r:   z*Things like os.close don't work on Windowsc                   @   sL   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d Ze	dd Z
dd ZdS )TestPossibleCrashesa  
    Tests for the crashes and unexpected exceptions
    that happen when we try to use or create (depending on
    loop implementation) a IO watcher for a closed/invalid file descriptor.

    See https://github.com/gevent/gevent/issues/2100
    See test__selectors.py
    c                 C   sj   t   }| |j t|j tjtddd t|gddtj	 W d    d S 1 s.w   Y  d S N_original_selectr   r   r   return_valuer   )
r   
addCleanupr   geventspawnPatchobjectr   r	   SMALLEST_RELIABLE_DELAYr=   r   r   r   #test_closing_object_while_selecting   s   "z7TestPossibleCrashes.test_closing_object_while_selectingc                 C   s$   z|   W d S  ty   Y d S w r   )r   r   r=   r   r   r   _close_invalid_sock   s
   z'TestPossibleCrashes._close_invalid_sockc                 C   s&   zt | W d S  ty   Y d S w r   )r   r   r   )r   r$   r   r   r   _close_invalid_fd   s
   z%TestPossibleCrashes._close_invalid_fdc                 C   sr   t   }| | j| t| j|  tjt	ddd t		|gddt
j W d    d S 1 s2w   Y  d S rI   )r   rN   rU   rO   rP   rV   r   rQ   rR   r   r	   rS   r=   r   r   r   test_closing_fd_while_selecting   s   	"z3TestPossibleCrashes.test_closing_fd_while_selectingc              	   C   s   t   }| | j| t|  tjtddd/ | 	  t|gddt
j W d    n1 s4w   Y  W d    d S W d    d S 1 sLw   Y  d S rI   )r   rN   rU   r   r   r   rQ   rR   r   _check_os_error_on_libuvr	   rS   r=   r   r   r    test_closing_fd_before_selecting   s   
"z4TestPossibleCrashes.test_closing_fd_before_selectingc                 c   s:    zd V  W d S  t y   | dtt jj Y d S w )Nzgevent.libuv)r   assertIntyper   loopr   rF   r   r   r   rX      s   z,TestPossibleCrashes._check_os_error_on_libuvc              	   C   sh  t   }| |j | }t|j t }||tj	 t
jtddd d }|   |tj}W d    n1 s?w   Y  |d urh| t|dk dd |D \\}}| || | |tj	 d }|   |tj}W d    n1 sw   Y  |d ur|\\}}| || | |tj W d    d S W d    d S 1 sw   Y  d S )NrJ   rK   rL      c                 S   s   g | ]}|d  t jkr|qS )r]   )r   r0   ).0xr   r   r   
<listcomp>   s    zITestPossibleCrashes.test_closing_object_while_polling.<locals>.<listcomp>)r   rN   r   r   rO   rP   r   r,   r/   r0   rQ   rR   rX   r	   rS   
assertTruelenr   r7   )r   r>   orig_filenopollerfds_and_eventsr$   eventr   r   r   !test_closing_object_while_polling   s6   


"z5TestPossibleCrashes.test_closing_object_while_pollingN)r   r   r   __doc__rT   rU   rV   rW   rY   r   rX   rg   r   r   r   r   rH   u   s    	&
rH   __main__)r    r)   r'   
contextlibr   unittest.mockr   rQ   gevent.corerO   gevent.testingtesting	greentestr   r   r   r   r	   AbstractGenericWaitTestCaser
   skipOnWindowsr   
skipUnlesshasattrr.   TestCaser:   rH   r   mainr   r   r   r   <module>   s2    ' 