用 Python 實(shí)現(xiàn)快速 Ping 一個(gè) IP 網(wǎng)段地址!
原文鏈接:www.yjsec.com/2020/11/07
ping 命令是我們檢查網(wǎng)絡(luò)中最常用的命令,作為網(wǎng)絡(luò)人員,基本上每天都會(huì)用到,可以很好地幫助我們分析和判定網(wǎng)絡(luò)故障;
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import os
import argparse
import socket
import struct
import select
import time
ICMP_ECHO_REQUEST = 8 # Platform specific
DEFAULT_TIMEOUT = 0.1
DEFAULT_COUNT = 4
class Pinger(object):
""" Pings to a host -- the Pythonic way"""
def __init__(self, target_host, count=DEFAULT_COUNT, timeout=DEFAULT_TIMEOUT):
self.target_host = target_host
self.count = count
self.timeout = timeout
def do_checksum(self, source_string):
""" Verify the packet integritity """
sum = 0
max_count = (len(source_string)/2)*2
count = 0
while count < max_count:
val = source_string[count + 1]*256 + source_string[count]
sum = sum + val
sum = sum & 0xffffffff
count = count + 2
if max_count<len(source_string):
sum = sum + ord(source_string[len(source_string) - 1])
sum = sum & 0xffffffff
sum = (sum >> 16) + (sum & 0xffff)
sum = sum + (sum >> 16)
answer = ~sum
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return answer
def receive_pong(self, sock, ID, timeout):
"""
Receive ping from the socket.
"""
time_remaining = timeout
while True:
start_time = time.time()
readable = select.select([sock], [], [], time_remaining)
time_spent = (time.time() - start_time)
if readable[0] == []: # Timeout
return
time_received = time.time()
recv_packet, addr = sock.recvfrom(1024)
icmp_header = recv_packet[20:28]
type, code, checksum, packet_ID, sequence = struct.unpack(
"bbHHh", icmp_header
)
if packet_ID == ID:
bytes_In_double = struct.calcsize("d")
time_sent = struct.unpack("d", recv_packet[28:28 + bytes_In_double])[0]
return time_received - time_sent
time_remaining = time_remaining - time_spent
if time_remaining <= 0:
return
def send_ping(self, sock, ID):
"""
Send ping to the target host
"""
target_addr = socket.gethostbyname(self.target_host)
my_checksum = 0
# Create a dummy heder with a 0 checksum.
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1)
bytes_In_double = struct.calcsize("d")
data = (192 - bytes_In_double) * "Q"
data = struct.pack("d", time.time()) + bytes(data.encode('utf-8'))
# Get the checksum on the data and the dummy header.
my_checksum = self.do_checksum(header + data)
header = struct.pack(
"bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1
)
packet = header + data
sock.sendto(packet, (target_addr, 1))
def ping_once(self):
"""
Returns the delay (in seconds) or none on timeout.
"""
icmp = socket.getprotobyname("icmp")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
except socket.error as e:
if e.errno == 1:
# Not superuser, so operation not permitted
e.msg += "ICMP messages can only be sent from root user processes"
raise socket.error(e.msg)
except Exception as e:
print("Exception: %s" %(e))
my_ID = os.getpid() & 0xFFFF
self.send_ping(sock, my_ID)
delay = self.receive_pong(sock, my_ID, self.timeout)
sock.close()
return delay
def ping(self):
"""
Run the ping process
"""
for i in range(self.count):
print ("Ping to %s..." % self.target_host,)
try:
delay = self.ping_once()
except socket.gaierror as e:
print ("Ping failed. (socket error: '%s')" % e[1])
break
if delay == None:
print ("Ping failed. (timeout within %ssec.)" % self.timeout)
else:
delay = delay * 1000
print("Get pong in %0.4fms" % delay)
if __name__ == '__main__':
alive = []
host_prefix = '192.168.242.'
for i in range(1, 255):
host = host_prefix + str(i)
pinger = Pinger(target_host=host)
delay = pinger.ping_once()
if delay == None:
print("Ping %s 失敗,超時(shí)2秒" % host)
else:
print("ping %s = %s ms" % (host, round(delay * 1000, 4)))
alive.append(host)
# time.sleep(0.5)
測(cè)試如下:

- END -
推薦閱讀 31天拿下Kubernetes含金量最高的CKA+CKS證書(shū)! 最新Kubernetes實(shí)戰(zhàn)指南:從零到架構(gòu)師的進(jìn)階之路 Linux 這些工具堪稱(chēng)神器! 下一代Docker鏡像構(gòu)建神器 BuildKit 面試數(shù)十家Linux運(yùn)維工程師,總結(jié)了這些面試題(含答案) 記一次 Linux 被入侵,服務(wù)器變 “礦機(jī)”~ 七年老運(yùn)維實(shí)戰(zhàn)中的 Shell 開(kāi)發(fā)經(jīng)驗(yàn)總結(jié) 快速入門(mén) Ansible 自動(dòng)化運(yùn)維工具 | 16張圖 最強(qiáng)整理!常用正則表達(dá)式速查手冊(cè) 搭建一套完整的企業(yè)級(jí) K8s 集群(v1.20,kubeadm方式) 12年資深運(yùn)維老司機(jī)的成長(zhǎng)感悟
點(diǎn)亮,服務(wù)器三年不宕機(jī)
評(píng)論
圖片
表情


