一、使用场景

有大量IP地址的同一端口需要Telnet,简化操作。可以进行端口的判断判断。
写此脚本原因是需要在大量IP中查找出那些是(大华:37777、海康:8000)的摄像头。

二、使用方法

使用时需要对应环境有Python环境。该脚本适应Python3 和Python2 环境,都可正常使用。
将所有IP地址复制到txt文件,命名为ip.txt 一个IP地址一行。
将telnet_ipc.py文件与ip.txt文件放在任意同一目录下。执行黄色背景中的命令:
[root@dh-db-HMxSrM5x ~]# python telnet_ipc.py 端口号
例如: python telnet_ipc.py 37777
脚本将会开始批量telnet 指定IP的指定端口。

三、结果

运行结束后将会生成对应的through.txt文件,所有telnet通的IP都记录在该文件,按行区分,不记录未telnet通的端口。

四、具体代码如下:

#!/usr/bin/python
# -*- coding: utf-8 -*-

"""
@version: V1.5
@author: sea-whales
@site: www.sea-whales.cn
@contact: sea-whales@sea-whales.cn
@fun: 读取ip.txt文件telnet端口找到大华、海康IPC。
        未进行上限控制,建议一次不多于500个。
        建议在linux下使用。
@time: 2020年11月11日
"""
import telnetlib
import threading
import os,sys

brands = {'through': 'through.txt', "ip":'ip.txt'}



PATH=os.path.split(os.path.realpath(__file__))[0]
class MyThread(threading.Thread):
    """
    重写多线程,使其能够返回值
    """

    def __init__(self, target=None, args=()):
        super(MyThread, self).__init__()
        self.func = target
        self.args = args

    def run(self):
        self.result = self.func(*self.args)

    def get_result(self):
        try:
            return self.result  # 子线程使用join方法,否则此处可能会报没有self.result的错误
        except Exception:
            return None

def convert_path(path):
    '''
        任意系统的路径转换成当前系统的格式
    '''
    return path.replace(r'\/'.replace(os.sep, ''), os.sep)

def readIP(filename):
    '''
        打开txt文件读取设备IP
    '''
    ip_list=[]
    with open(filename, 'r') as f:
        for f in f.readlines():
            if f.strip() != '':
                ip_list.append(f.strip())
    return ip_list


def telnetPort(host, port, timeout=5):
    """
        telnet 端口,判断指定端口是否通。设置超时时间为5秒
    """
    try:
        telnetlib.Telnet(host, port, timeout=timeout)
        return host, True, port
    except:
        return host, False, port


def WriteJudgment(retvals, brand=None):
    '''
        判断设备品牌,写入txt文件
    '''
    for retval in retvals:
        if retval[1]:
            with open(convert_path('%s/%s' % (PATH,brands['through'])), 'a+') as f:
                f.writelines('%s\n' % retval[0])


def threadsJudgeBrand(iplist, port):
    '''
        开线程
    '''
    retval, threads, _Threads = [], [], []
    for ip in iplist:
        t = MyThread(target=telnetPort, args=(ip, port))
        threads.append(t)
        t.start()
        _Threads.append(t)
    for t in _Threads:
        t.join()
        retval.append(t.get_result())
    return retval


if __name__ == "__main__":
    # brand = {"@hua": [], "HIK": [], "other": []}
    PORT = sys.argv[1]
    WriteJudgment(retvals=threadsJudgeBrand(readIP(convert_path('%s/%s' % (PATH,brands['ip']))), PORT))

批量Telnet脚本使用方法.docx
ip.txt