<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          用 Python 實(shí)現(xiàn)快速 Ping 一個(gè) IP 網(wǎng)段地址!

          共 12516字,需瀏覽 26分鐘

           ·

          2021-07-10 07:28

          原文鏈接:www.yjsec.com/2020/11/07

          ping 命令是我們檢查網(wǎng)絡(luò)中最常用的命令,作為網(wǎng)絡(luò)人員,基本上每天都會(huì)用到,可以很好地幫助我們分析和判定網(wǎng)絡(luò)故障;

          如果有 10 設(shè)備,100 臺(tái)設(shè)備,1000 臺(tái)設(shè)備怎么辦?一個(gè)個(gè) ping 過(guò)去人都要瘋掉了,這種情況在大型網(wǎng)絡(luò)中我們有可能遇到,那怎么辦呢?我們今天來(lái)看下如何用 python 來(lái)實(shí)現(xiàn)批量 ping 測(cè)試主機(jī)。
          直接上代碼:
          #!/usr/bin/python3
          # -*- coding: utf-8 -*-

          import os
          import argparse
          import socket
          import struct
          import select
          import time


          ICMP_ECHO_REQUEST = 8 # Platform specific
          DEFAULT_TIMEOUT = 0.1
          DEFAULT_COUNT = 4


          class Pinger(object):
              """ Pings to a host -- the Pythonic way"""

              def __init__(self, target_host, count=DEFAULT_COUNT, timeout=DEFAULT_TIMEOUT):
                  self.target_host = target_host
                  self.count = count
                  self.timeout = timeout


              def do_checksum(self, source_string):
                  """  Verify the packet integritity """
                  sum = 0
                  max_count = (len(source_string)/2)*2
                  count = 0
                  while count < max_count:

                      val = source_string[count + 1]*256 + source_string[count]
                      sum = sum + val
                      sum = sum & 0xffffffff
                      count = count + 2

                  if max_count<len(source_string):
                      sum = sum + ord(source_string[len(source_string) - 1])
                      sum = sum & 0xffffffff

                  sum = (sum >> 16)  +  (sum & 0xffff)
                  sum = sum + (sum >> 16)
                  answer = ~sum
                  answer = answer & 0xffff
                  answer = answer >> 8 | (answer << 8 & 0xff00)
                  return answer

              def receive_pong(self, sock, ID, timeout):
                  """
                  Receive ping from the socket.
                  """

                  time_remaining = timeout
                  while True:
                      start_time = time.time()
                      readable = select.select([sock], [], [], time_remaining)
                      time_spent = (time.time() - start_time)
                      if readable[0] == []: # Timeout
                          return

                      time_received = time.time()
                      recv_packet, addr = sock.recvfrom(1024)
                      icmp_header = recv_packet[20:28]
                      type, code, checksum, packet_ID, sequence = struct.unpack(
                 "bbHHh", icmp_header
             )
                      if packet_ID == ID:
                          bytes_In_double = struct.calcsize("d")
                          time_sent = struct.unpack("d", recv_packet[28:28 + bytes_In_double])[0]
                          return time_received - time_sent

                      time_remaining = time_remaining - time_spent
                      if time_remaining <= 0:
                          return


              def send_ping(self, sock,  ID):
                  """
                  Send ping to the target host
                  """

                  target_addr  =  socket.gethostbyname(self.target_host)

                  my_checksum = 0

                  # Create a dummy heder with a 0 checksum.
                  header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1)
                  bytes_In_double = struct.calcsize("d")
                  data = (192 - bytes_In_double) * "Q"
                  data = struct.pack("d", time.time()) + bytes(data.encode('utf-8'))

                  # Get the checksum on the data and the dummy header.
                  my_checksum = self.do_checksum(header + data)
                  header = struct.pack(
                "bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1
            )
                  packet = header + data
                  sock.sendto(packet, (target_addr, 1))


              def ping_once(self):
                  """
                  Returns the delay (in seconds) or none on timeout.
                  """

                  icmp = socket.getprotobyname("icmp")
                  try:
                      sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
                  except socket.error as e:
                      if e.errno == 1:
                          # Not superuser, so operation not permitted
                          e.msg +=  "ICMP messages can only be sent from root user processes"
                          raise socket.error(e.msg)
                  except Exception as e:
                      print("Exception: %s" %(e))

                  my_ID = os.getpid() & 0xFFFF

                  self.send_ping(sock, my_ID)
                  delay = self.receive_pong(sock, my_ID, self.timeout)
                  sock.close()
                  return delay


              def ping(self):
                  """
                  Run the ping process
                  """

                  for i in range(self.count):
                      print ("Ping to %s..." % self.target_host,)
                      try:
                          delay  =  self.ping_once()
                      except socket.gaierror as e:
                          print ("Ping failed. (socket error: '%s')" % e[1])
                          break

                      if delay  ==  None:
                          print ("Ping failed. (timeout within %ssec.)" % self.timeout)
                      else:
                          delay = delay * 1000
                          print("Get pong in %0.4fms" % delay)


          if __name__ == '__main__':
              alive = []
              host_prefix = '192.168.242.'
              for i in range(1255):
                  host = host_prefix + str(i)
                  pinger = Pinger(target_host=host)

                  delay = pinger.ping_once()
                  if delay == None:
                      print("Ping %s 失敗,超時(shí)2秒" % host)
                  else:
                      print("ping %s = %s ms" % (host, round(delay * 10004)))
                      alive.append(host)
                  # time.sleep(0.5)

          測(cè)試如下:

          - END -

           推薦閱讀 

          31天拿下Kubernetes含金量最高的CKA+CKS證書(shū)! 
          最新Kubernetes實(shí)戰(zhàn)指南:從零到架構(gòu)師的進(jìn)階之路
          Linux 這些工具堪稱(chēng)神器!
          下一代Docker鏡像構(gòu)建神器 BuildKit
          面試數(shù)十家Linux運(yùn)維工程師,總結(jié)了這些面試題(含答案)
          記一次 Linux 被入侵,服務(wù)器變 “礦機(jī)”~
          七年老運(yùn)維實(shí)戰(zhàn)中的 Shell 開(kāi)發(fā)經(jīng)驗(yàn)總結(jié)
          快速入門(mén) Ansible 自動(dòng)化運(yùn)維工具 | 16張圖
          最強(qiáng)整理!常用正則表達(dá)式速查手冊(cè)
          搭建一套完整的企業(yè)級(jí) K8s 集群(v1.20,kubeadm方式)
          12年資深運(yùn)維老司機(jī)的成長(zhǎng)感悟



          點(diǎn)亮,服務(wù)器三年不宕機(jī)

          瀏覽 67
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产农村乱婬片A片AAA图片 | www.俺来也.com | 成人免费视频网 | 国产91乱码一区二区三区 | 操批视频网站 |