mirror of https://github.com/jumpserver/jumpserver
64 lines
1.9 KiB
Python
64 lines
1.9 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
import asyncio
|
|
|
|
from common.utils import lookup_domain
|
|
|
|
PROMPT_REGEX = r'[\<|\[](.*)[\>|\]]'
|
|
|
|
|
|
async def telnet(dest_addr, port_number=23, timeout=10):
|
|
try:
|
|
reader, writer = await asyncio.wait_for(
|
|
asyncio.open_connection(dest_addr, port_number), timeout
|
|
)
|
|
except asyncio.TimeoutError:
|
|
return False, 'Timeout'
|
|
except (ConnectionRefusedError, OSError) as e:
|
|
return False, str(e)
|
|
try:
|
|
# 发送命令
|
|
writer.write(b"command\r\n")
|
|
await writer.drain()
|
|
# 读取响应
|
|
response = await reader.readuntil()
|
|
except asyncio.TimeoutError:
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
return False, 'Timeout'
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
return True, response.decode('utf-8', 'ignore')
|
|
|
|
|
|
async def verbose_telnet(dest_ip, dest_port=23, timeout=10, display=None):
|
|
if not display:
|
|
return
|
|
|
|
ip, err = lookup_domain(dest_ip)
|
|
if not ip:
|
|
await display(err)
|
|
return
|
|
|
|
await display(f'Trying {dest_ip} ({ip}:{dest_port})')
|
|
try:
|
|
is_connective, resp = await telnet(dest_ip, dest_port, timeout)
|
|
if is_connective:
|
|
msg = f'Connected to {dest_ip} {dest_port} {resp}.\r\n' \
|
|
f'Connection closed by foreign host.'
|
|
else:
|
|
msg = f'Unable to connect to remote host\r\n' \
|
|
f'Reason: {resp}'
|
|
except Exception as e:
|
|
msg = 'Error: %s' % e
|
|
await display(msg)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print(verbose_telnet(dest_addr='1.1.1.1', port_number=2222))
|
|
print(verbose_telnet(dest_addr='baidu.com', port_number=80))
|
|
print(verbose_telnet(dest_addr='baidu.com', port_number=8080))
|
|
print(verbose_telnet(dest_addr='192.168.4.1', port_number=2222))
|
|
print(verbose_telnet(dest_addr='192.168.4.1', port_number=2223))
|
|
print(verbose_telnet(dest_addr='ssssss', port_number=-1))
|