<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          用 Python 實(shí)現(xiàn)快速 Ping 一個(gè) IP 網(wǎng)段地址!

          共 12378字,需瀏覽 25分鐘

           ·

          2021-06-17 19:23

          大家好,我是杰哥。

          ping 命令是我們檢查網(wǎng)絡(luò)中最常用的命令,作為網(wǎng)絡(luò)人員,基本上每天都會(huì)用到,可以很好地幫助我們分析和判定網(wǎng)絡(luò)故障;

          更多可查看:ping 命令的七種用法,你真的會(huì)了嗎?

          如果有 10 設(shè)備,100 臺(tái)設(shè)備,1000 臺(tái)設(shè)備怎么辦?一個(gè)個(gè) ping 過(guò)去人都要瘋掉了,這種情況在大型網(wǎng)絡(luò)中我們有可能遇到,那怎么辦呢?我們今天來(lái)看下如何用 python 來(lái)實(shí)現(xiàn)批量 ping 測(cè)試主機(jī)。

          代碼如下:

          #!/usr/bin/python3
          # -*- coding: utf-8 -*-

          import os
          import argparse
          import socket
          import struct
          import select
          import time


          ICMP_ECHO_REQUEST = 8 # Platform specific
          DEFAULT_TIMEOUT = 0.1
          DEFAULT_COUNT = 4


          class Pinger(object):
              """ Pings to a host -- the Pythonic way"""

              def __init__(self, target_host, count=DEFAULT_COUNT, timeout=DEFAULT_TIMEOUT):
                  self.target_host = target_host
                  self.count = count
                  self.timeout = timeout


              def do_checksum(self, source_string):
                  """  Verify the packet integritity """
                  sum = 0
                  max_count = (len(source_string)/2)*2
                  count = 0
                  while count < max_count:

                      val = source_string[count + 1]*256 + source_string[count]
                      sum = sum + val
                      sum = sum & 0xffffffff
                      count = count + 2

                  if max_count<len(source_string):
                      sum = sum + ord(source_string[len(source_string) - 1])
                      sum = sum & 0xffffffff

                  sum = (sum >> 16)  +  (sum & 0xffff)
                  sum = sum + (sum >> 16)
                  answer = ~sum
                  answer = answer & 0xffff
                  answer = answer >> 8 | (answer << 8 & 0xff00)
                  return answer

              def receive_pong(self, sock, ID, timeout):
                  """
                  Receive ping from the socket.
                  """

                  time_remaining = timeout
                  while True:
                      start_time = time.time()
                      readable = select.select([sock], [], [], time_remaining)
                      time_spent = (time.time() - start_time)
                      if readable[0] == []: # Timeout
                          return

                      time_received = time.time()
                      recv_packet, addr = sock.recvfrom(1024)
                      icmp_header = recv_packet[20:28]
                      type, code, checksum, packet_ID, sequence = struct.unpack(
                 "bbHHh", icmp_header
             )
                      if packet_ID == ID:
                          bytes_In_double = struct.calcsize("d")
                          time_sent = struct.unpack("d", recv_packet[28:28 + bytes_In_double])[0]
                          return time_received - time_sent

                      time_remaining = time_remaining - time_spent
                      if time_remaining <= 0:
                          return


              def send_ping(self, sock,  ID):
                  """
                  Send ping to the target host
                  """

                  target_addr  =  socket.gethostbyname(self.target_host)

                  my_checksum = 0

                  # Create a dummy heder with a 0 checksum.
                  header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1)
                  bytes_In_double = struct.calcsize("d")
                  data = (192 - bytes_In_double) * "Q"
                  data = struct.pack("d", time.time()) + bytes(data.encode('utf-8'))

                  # Get the checksum on the data and the dummy header.
                  my_checksum = self.do_checksum(header + data)
                  header = struct.pack(
                "bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1
            )
                  packet = header + data
                  sock.sendto(packet, (target_addr, 1))


              def ping_once(self):
                  """
                  Returns the delay (in seconds) or none on timeout.
                  """

                  icmp = socket.getprotobyname("icmp")
                  try:
                      sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
                  except socket.error as e:
                      if e.errno == 1:
                          # Not superuser, so operation not permitted
                          e.msg +=  "ICMP messages can only be sent from root user processes"
                          raise socket.error(e.msg)
                  except Exception as e:
                      print("Exception: %s" %(e))

                  my_ID = os.getpid() & 0xFFFF

                  self.send_ping(sock, my_ID)
                  delay = self.receive_pong(sock, my_ID, self.timeout)
                  sock.close()
                  return delay


              def ping(self):
                  """
                  Run the ping process
                  """

                  for i in range(self.count):
                      print ("Ping to %s..." % self.target_host,)
                      try:
                          delay  =  self.ping_once()
                      except socket.gaierror as e:
                          print ("Ping failed. (socket error: '%s')" % e[1])
                          break

                      if delay  ==  None:
                          print ("Ping failed. (timeout within %ssec.)" % self.timeout)
                      else:
                          delay = delay * 1000
                          print("Get pong in %0.4fms" % delay)


          if __name__ == '__main__':
              alive = []
              host_prefix = '192.168.242.'
              for i in range(1255):
                  host = host_prefix + str(i)
                  pinger = Pinger(target_host=host)

                  delay = pinger.ping_once()
                  if delay == None:
                      print("Ping %s 失敗,超時(shí)2秒" % host)
                  else:
                      print("ping %s = %s ms" % (host, round(delay * 10004)))
                      alive.append(host)
                  # time.sleep(0.5)

          測(cè)試如下:

          來(lái)源:www.yjsec.com/2020/11/07

          推薦閱讀

          7 個(gè)日常實(shí)用的 Shell 拿來(lái)就用腳本實(shí)例!

          原創(chuàng) | 7 個(gè)非常實(shí)用的 Shell 拿來(lái)就用腳本實(shí)例!

          原創(chuàng) | 超硬核!11 個(gè)非常實(shí)用的 Python 和 Shell 拿來(lái)就用腳本實(shí)例!

          杰哥的另一個(gè)公眾號(hào),主要分享關(guān)于個(gè)人成長(zhǎng)經(jīng)歷的那點(diǎn)事,歡迎您的關(guān)注。

          瀏覽 73
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲第97页 | 亚洲搞清视频日本 | av先锋成人网站 h片免费在线观看 | 亚洲高清视频在线观看免费 | 美女免费一级操逼视频 |