手寫了個(gè) BOSS 來了的摸魚神器!
作者:小小明
原文鏈接:
https://blog.csdn.net/as604049322/article/details/120110098
本文為讀者投稿
大家好,我是小小明。
前段時(shí)間,我寫了篇水文《獲取當(dāng)前局域網(wǎng)下所有連接設(shè)備的ip地址和mac地址》鏈接:https://blog.csdn.net/as604049322/article/details/118442299,但是沒有想到的是居然上了熱榜,也是我個(gè)人第一篇上熱榜的文章,閱讀量瞬間飆升??。然而我的硬核技術(shù)文卻幾乎沒有人看到。既然有很多人對這個(gè)話題感興趣,那么我們就繼續(xù)對相關(guān)原理深挖,最好能自己實(shí)現(xiàn),理解透徹。
首先我們回顧一下前文,在前文中我介紹了windows下獲取ip地址和arp映射表的命令,通過分析最新arp映射表知道當(dāng)前網(wǎng)段下在線或下線的設(shè)備?。
文章使用的技術(shù)是通過python調(diào)用系統(tǒng)ping命令,實(shí)現(xiàn)arp表的更新。然而系統(tǒng)自帶的ping命令訪問整個(gè)網(wǎng)段的ip時(shí),耗時(shí)達(dá)到了2分鐘,后面通過多線程加速,最終也只能提速到最快25秒。這個(gè)速度實(shí)在延時(shí)過大,無法應(yīng)用于更高級的應(yīng)用??。
今天我們的目標(biāo)是就是將Ping整個(gè)網(wǎng)段IP的總耗時(shí)降低到5秒以內(nèi),這樣我們就能夠在5秒內(nèi)知道指定mac地址設(shè)備的上下線,例如開發(fā)一個(gè)BOSS來了的摸魚神器,只要老板的手機(jī)一連上wifi,這邊在5秒內(nèi)收到通知,立馬停止摸魚,就保證了平時(shí)放心大膽的摸魚?。

那么如何提速呢?經(jīng)過我?guī)滋斓目嗨稼は耄⒃趯W(xué)習(xí)了一些網(wǎng)絡(luò)知識后,自己實(shí)現(xiàn)了PING命令,成功的實(shí)現(xiàn)了放心大膽的摸魚。于是,在我看了幾本書,寫了幾千行代碼,踩了幾百個(gè)坑后,終于把相關(guān)知識理解透了。下面是我將涉及到的核心知識點(diǎn)總結(jié)成了這篇文章,所以這篇文章都是非常精簡的干貨,強(qiáng)烈??建議收藏??。
學(xué)完本文,你的力量將不僅僅止于此,還能夠底層化開發(fā)任何基于IP協(xié)議的自定義協(xié)議,當(dāng)然這要看你自己是否具有舉一反三的能力。甚至你還能繼續(xù)自己深挖,去研究開發(fā)比IP協(xié)議更底層的協(xié)議。

渴望嗎?渴望那就學(xué)起來吧??下面是本文的知識點(diǎn)目錄:

01
socket 套接字核心知識
socket 簡介
進(jìn)程間通信指運(yùn)行的程序之間的數(shù)據(jù)共享,在1臺電腦上可以通過進(jìn)程號(PID)來唯一標(biāo)識一個(gè)進(jìn)程進(jìn)行通信。
在網(wǎng)絡(luò)中,TCP/IP協(xié)議族網(wǎng)絡(luò)層的“ip地址”可以唯一標(biāo)識網(wǎng)絡(luò)中的主機(jī),而傳輸層的“協(xié)議+端口”可以唯一標(biāo)識主機(jī)中的應(yīng)用進(jìn)程(進(jìn)程)。網(wǎng)絡(luò)中的進(jìn)程通信就可以通過ip地址,協(xié)議,端口這個(gè)標(biāo)志與其它進(jìn)程進(jìn)行交互。
socket(簡稱 套接字) 就是實(shí)現(xiàn)網(wǎng)絡(luò)進(jìn)程間通信的一種方式,網(wǎng)絡(luò)上各種各樣的服務(wù)大多都是基于 Socket 來完成通信的。為了建立通信通道,網(wǎng)絡(luò)通信的每個(gè)端點(diǎn)擁有一個(gè)socket套接字對象,它們允許程序接受并進(jìn)行連接,如發(fā)送和接受數(shù)據(jù)。
socket 鏈接
在 Python 中 使用socket 模塊的函數(shù) socket 就可以完成:
import socket
socket.socket(family=-1, type=-1, proto=-1, fileno=None)
參數(shù)說明:
family為指定的地址族,主要有三種:
socket.AF_UNIX :用于同一臺機(jī)器進(jìn)程間通信
socket.AF_INET :基于ipv4協(xié)議的Internet 進(jìn)程間通信
socket.AF_INET6 :基于ipv6協(xié)議的Internet 進(jìn)程間通信
更多的地址族還包括,socket.AF_BLUETOOTH藍(lán)牙相關(guān)、socket.AF_VSOCK虛擬機(jī)通信、socket.AF_PACKET直連網(wǎng)絡(luò)設(shè)備底層接口等。
type為指定的套接字類型,主要有三種:
socket.SOCK_STREAM :流式套接字,使用面向連接的TCP協(xié)議實(shí)現(xiàn)字節(jié)流的傳輸
socket.SOCK_DGRAM :數(shù)據(jù)報(bào)套接字,使用面向非連接的UDP實(shí)現(xiàn)數(shù)據(jù)報(bào)套接字
socket.SOCK_RAW:原始套接字,該套接字允許對較低層協(xié)議(如 IP或 ICMP)進(jìn)行直接訪問
更多套接字類型還包括socket.SOCK_RDM和socket.SOCK_SEQPACKET等。
TCP 與 UDP 通信模型
對于tcp或udp套接字可以直接使用以下方式進(jìn)行創(chuàng)建:
import socket
# 創(chuàng)建tcp的套接字
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 創(chuàng)建udp的套接字
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 不用的時(shí)候,關(guān)閉套接字
s.close()
UDP通信模型:在通信開始之前,不需要建立相關(guān)的鏈接,只需要發(fā)送數(shù)據(jù)即可,類似于寫信:

UDP服務(wù)端示例代碼:
from socket import *
# 創(chuàng)建套接字
udp_socket = socket(AF_INET, SOCK_DGRAM)
# 綁定本地的相關(guān)信息,不綁定系統(tǒng)會隨機(jī)分配
udp_socket.bind(('0.0.0.0', 8080))
# 等待接收對方發(fā)送的數(shù)據(jù)
recv_data = udp_socket.recvfrom(1024) # 1024表示本次接收的最大字節(jié)數(shù)
# 顯示接收到的數(shù)據(jù),第1個(gè)元素是對方發(fā)送的數(shù)據(jù),第2個(gè)元素是對方的ip和端口
print(recv_data[0].decode('u8'))
# 關(guān)閉套接字
udp_socket.close()
UDP客戶端示例代碼:
from socket import *
# 創(chuàng)建udp套接字
udp_socket = socket(AF_INET, SOCK_DGRAM)
# 發(fā)送數(shù)據(jù)到指定的電腦上的指定程序中
udp_socket.sendto("你好,服務(wù)器~".encode('u8'), ('192.168.1.103', 8080))
# 關(guān)閉套接字
udp_socket.close()
TCP通信模型:在通信開始之前,一定要先建立相關(guān)的鏈接,才能發(fā)送數(shù)據(jù),類似于打電話:

TCP服務(wù)端示例代碼:
from socket import *
# 創(chuàng)建socket
tcp_server_socket = socket(AF_INET, SOCK_STREAM)
# 服務(wù)器綁定本機(jī)ip和端口
tcp_server_socket.bind(('0.0.0.0', 8080))
# 監(jiān)聽端口,128表示最大同時(shí)接收128個(gè)客戶端鏈接
tcp_server_socket.listen(128)
# 如果有新的客戶端來鏈接服務(wù)器,那么就產(chǎn)生一個(gè)新的套接字專門為這個(gè)客戶端服務(wù)
client_socket, clientAddr = tcp_server_socket.accept()
# 接收對方發(fā)送過來的數(shù)據(jù)
recv_data = client_socket.recv(1024) # 接收1024個(gè)字節(jié)
print('接收到的數(shù)據(jù)為:', recv_data.decode('u8'))
# 發(fā)送一些數(shù)據(jù)到客戶端
client_socket.send("你好客戶端!".encode('u8'))
# 關(guān)閉為這個(gè)客戶端服務(wù)的套接字
client_socket.close()
TCP客戶端示例代碼:
from socket import *
# 創(chuàng)建socket
tcp_client_socket = socket(AF_INET, SOCK_STREAM)
# 鏈接服務(wù)器
tcp_client_socket.connect(('192.168.3.31', 8080))
tcp_client_socket.send("測試發(fā)送的內(nèi)容".encode("u8"))
# 接收對方發(fā)送過來的數(shù)據(jù),最大接收1024個(gè)字節(jié)
recvData = tcp_client_socket.recv(1024)
print('接收到的數(shù)據(jù)為:', recvData.decode('u8'))
# 關(guān)閉套接字
tcp_client_socket.close()
SOCK_RAW 原始套接字
上述兩種套接字是常規(guī)的套接字模式,第三個(gè)參數(shù)省略或?yàn)榱?IP協(xié)議)會自動選擇正確的協(xié)議(TCP協(xié)議和UDP協(xié)議)。
當(dāng)我們指定套接字類型為socket.SOCK_RAW原始套接字時(shí),第三個(gè)參數(shù)就需要指定proto協(xié)議號。
python的socket庫預(yù)定義的協(xié)議號有:
socket.IPPROTO_TCP:TCP傳輸協(xié)議,值為6
socket.IPPROTO_UDP:UDP傳輸協(xié)議,值為17
socket.IPPROTO_ICMP:ICMP協(xié)議,值為1
socket.IPPROTO_IP:IP協(xié)議,值為0
socket.IPPROTO_RAW:可自行構(gòu)建IP頭部構(gòu)建更底層的協(xié)議,值為1
也可以通過協(xié)議名稱獲取協(xié)議號常量:
import socket
print(socket.IPPROTO_ICMP, socket.getprotobyname("icmp"),
socket.IPPROTO_ICMP == socket.getprotobyname("icmp"))
1 1 True
可以看到兩種方式獲取協(xié)議號均可。
通過原始套接字我們可以使用ICMP或更底層的協(xié)議進(jìn)行通訊從而實(shí)現(xiàn)更高級的功能。
我們需要使用ICMP協(xié)議進(jìn)行網(wǎng)絡(luò)通信就可以使用SOCK_RAW原始套接字:
icmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
socket 模塊和對象的其他常用方法
socket模塊的其他常用方法:
socket.gethostbyname:將主機(jī)名轉(zhuǎn)換為IPv4地址格式。IPv4地址以字符串形式返回
socket.gethostname:返回包含Python解釋器當(dāng)前正在執(zhí)行的機(jī)器的主機(jī)名的字符串
socket.gethostbyaddr:根據(jù)IP地址獲取主機(jī)名
socket.getprotobyname:將Internet協(xié)議名稱轉(zhuǎn)換為協(xié)議號常量
在主機(jī)字節(jié)順序與網(wǎng)絡(luò)字節(jié)順序不相同的機(jī)器上,使用以下方法轉(zhuǎn)換:
| 網(wǎng)絡(luò)順序轉(zhuǎn)換為主機(jī)字節(jié)順序 | 主機(jī)順序轉(zhuǎn)換為網(wǎng)絡(luò)字節(jié)順序 | |
|---|---|---|
| 32位正整數(shù) 4字節(jié)的交換操作 | socket.ntohl | socket.htonl |
| 16位正整數(shù) 2字節(jié)的交換操作 | socket.ntohs | socket.htons |
在主機(jī)字節(jié)順序與網(wǎng)絡(luò)字節(jié)順序相同的機(jī)器上,執(zhí)行以上方法是無操作的。
socket.inet_aton:將字符串格式的IPv4地址打包為32位4字節(jié)的字節(jié)對象
獲取本機(jī)ip地址方法1:先獲取本機(jī)主機(jī)名,再通過主機(jī)名獲取ip
import socket
ip = socket.gethostbyname(socket.gethostname())
print(ip)
192.168.3.31
獲取本機(jī)所有網(wǎng)卡的IP:
ips = socket.gethostbyname_ex(socket.gethostname())[-1]
print(ips)
['192.168.3.31']
??注意:如果本機(jī)沒有正確設(shè)置主機(jī)名時(shí)可能無法獲取本機(jī)ip地址。
socket套接字對象的公用函數(shù)套接字函數(shù):
s.getpeername() :返回連接套接字的遠(yuǎn)程地址。返回值通常是元組(ipaddr,port)
s.getsockname() :返回套接字自己的地址。通常是一個(gè)元組(ipaddr,port)
s.setsockopt(level,optname,value) :設(shè)置給定套接字選項(xiàng)的值。
s.getsockopt(level,optname[.buflen]) :返回套接字選項(xiàng)的值。
s.settimeout(timeout) :設(shè)置套接字操作的超時(shí)期,timeout是一個(gè)浮點(diǎn)數(shù),單位是秒。值為None表示沒有超時(shí)期。一般,超時(shí)期應(yīng)該在剛創(chuàng)建套接字時(shí)設(shè)置,因?yàn)樗鼈兛赡苡糜谶B接的操作(如connect)
s.gettimeout() :返回當(dāng)前超時(shí)期的值,單位是秒,如果沒有設(shè)置超時(shí)期,則返回None
s.fileno() :返回套接字的文件描述符
s.setblocking(flag) :如果flag為0,則將套接字設(shè)為非阻塞模式,否則將套接字設(shè)為阻塞模式(默認(rèn)值)。非阻塞模式下,如果調(diào)用recv()沒有發(fā)現(xiàn)任何數(shù)據(jù),或send()調(diào)用無法立即發(fā)送數(shù)據(jù),那么將引起socket.error異常。
s.makefile() :創(chuàng)建一個(gè)與該套接字相關(guān)聯(lián)的文件。
獲取本機(jī)ip地址方法2:向任意網(wǎng)絡(luò)地址發(fā)送一個(gè)無狀態(tài)的UDP請求后,再通過套接字對象獲取自己的地址從而獲取本機(jī)地址
import socket
def get_local_ip():
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(('1.1.1.1', 80))
ip, port = s.getsockname()
return ip
# 獲取本機(jī)IP
ip = get_local_ip()
print(ip)
192.168.3.31
?即使無法連接Internet目標(biāo)地址無法訪問(發(fā)出報(bào)文會丟失),也可以使用該方法獲取本機(jī)ip地址。
struct 二進(jìn)制數(shù)據(jù)的轉(zhuǎn)換
Python提供了一個(gè)struct模塊來解決bytes和其他二進(jìn)制數(shù)據(jù)類型的轉(zhuǎn)換。
struct的pack函數(shù)把任意數(shù)據(jù)類型變成bytes。
import struct
print(struct.pack('>I', 10240099))
b'\x00\x9c@c'
pack 的第一個(gè)參數(shù)是處理指令:
>:表示字節(jié)順序是 big-endian,也就是網(wǎng)絡(luò)序I:表示 4 字節(jié)無符號整數(shù)H:2 字節(jié)無符號整數(shù)。
后面的參數(shù)字節(jié)個(gè)數(shù)要和處理指令一致。
unpack 把 bytes 變成相應(yīng)的數(shù)據(jù)類型:
>>> struct.unpack('>IH', b'\xf0\xf0\xf0\xf0\x80\x80')
(4042322160, 32896)
struct模塊定義的數(shù)據(jù)類型可以參考Python官方文檔:
https://docs.python.org/zh-cn/3/library/struct.html#format-characters
| 格式 | C 類型 | Python 類型 | 標(biāo)準(zhǔn)大小 | 注釋 |
|---|---|---|---|---|
| x | 填充字節(jié) | 無 | ||
| c | char | 長度為 1 的字節(jié)串 | 1 | |
| b | signed char | 整數(shù) | 1 | (1), (2) |
| B | unsigned char | 整數(shù) | 1 | -2 |
| ? | _Bool | bool | 1 | -1 |
| h | short | 整數(shù) | 2 | -2 |
| H | unsigned short | 整數(shù) | 2 | -2 |
| i | int | 整數(shù) | 4 | -2 |
| I | unsigned int | 整數(shù) | 4 | -2 |
| l | long | 整數(shù) | 4 | -2 |
| L | unsigned long | 整數(shù) | 4 | -2 |
| q | long long | 整數(shù) | 8 | -2 |
| Q | unsigned long long | 整數(shù) | 8 | -2 |
| n | ssize_t | 整數(shù) | -3 | |
| N | size_t | 整數(shù) | -3 | |
| e | -6 | 浮點(diǎn)數(shù) | 2 | -4 |
| f | float | 浮點(diǎn)數(shù) | 4 | -4 |
| d | double | 浮點(diǎn)數(shù) | 8 | -4 |
| s | char [] | 字節(jié)串 | ||
| p | char [] | 字節(jié)串 | ||
| P | void * | 整數(shù) | -5 |
02
Ping 的工作原理
ping 基于 ICMP 協(xié)議工作的,ICMP 全稱是 Internet Control Message Protocol,也就是互聯(lián)網(wǎng)控制報(bào)文協(xié)議。ping 發(fā)出的ICMP 報(bào)文實(shí)際上是以偵察網(wǎng)絡(luò)狀態(tài)的形式實(shí)現(xiàn)了控制,反饋網(wǎng)絡(luò)狀態(tài),從而調(diào)整傳輸策略以此控制整個(gè)局面。
ICMP 主要的功能包括:確認(rèn) IP 包是否成功送達(dá)目標(biāo)地址、報(bào)告發(fā)送過程中 IP 包被廢棄的原因和改善網(wǎng)絡(luò)設(shè)置等。ICMP 協(xié)議主要負(fù)責(zé)在 IP 通信中通知某個(gè) IP 包未能達(dá)到目標(biāo)地址的原因。
ICMP 報(bào)文格式
Ping命令發(fā)出的ICMP 報(bào)文封裝在 IP 包里面的,結(jié)構(gòu)如下:

上述報(bào)文格式中,左邊的IP頭部分不需要太關(guān)心,因?yàn)槲覀兪褂胹ocket的原始套接字模式會自動幫我們封裝IP頭部分,右邊的ICMP報(bào)文才是我們需要關(guān)心的部分。
??注意:相比原生的 ICMP,Ping命令發(fā)出的ICMP報(bào)文多出了標(biāo)識符和序號兩個(gè)字段。
對于ICMP報(bào)文的類型,有兩大類:
查詢報(bào)文類型:用于診斷的查詢消息
差錯報(bào)文類型:通知出錯原因的錯誤消息
不過咱們使用的PING只需要使用查詢報(bào)文類型中的回送應(yīng)答和回送請求。
常見的 ICMP 類型包括:

ICMP 查詢報(bào)文類型
回送消息:0表示回送應(yīng)答,8表示回送請求。用于進(jìn)行通信的主機(jī)或路由器之間,判斷所發(fā)送的數(shù)據(jù)包是否已經(jīng)成功到達(dá)對端的一種消息。
ping 命令是通過ICMP協(xié)議的回送消息實(shí)現(xiàn)的:

發(fā)送端主機(jī)向接收端主機(jī)發(fā)送一個(gè)回送請求(ICMP Echo Request Message,類型 8),只要正常接收到接收端返回的回送響應(yīng)(ICMP Echo Reply Message,類型 0),則代表發(fā)送端主機(jī)到接收端主機(jī)可達(dá)。
ICMP 差錯報(bào)文類型
對于差錯報(bào)文類型,在本次編碼中不會用到,無需深究,簡單了解一下即可。
ICMP 常見差錯報(bào)文:
目標(biāo)不可達(dá)消息 —— 類型 為
3原點(diǎn)抑制消息 —— 類型
4重定向消息 —— 類型
5超時(shí)消息 —— 類型
11
目標(biāo)不可達(dá)消息(Destination Unreachable Message):
IP 路由器無法將 IP 數(shù)據(jù)包發(fā)送給目標(biāo)地址時(shí),會給發(fā)送端主機(jī)返回一個(gè)目標(biāo)不可達(dá)的 ICMP 消息,并在這個(gè)消息中顯示不可達(dá)的具體原因,原因記錄在 ICMP 包頭的代碼字段。
由此,根據(jù) ICMP 不可達(dá)的具體消息,發(fā)送端主機(jī)也就可以了解此次發(fā)送不可達(dá)的具體原因。
目標(biāo)不可達(dá)的原因有:
網(wǎng)絡(luò)不可達(dá)代碼為
0主機(jī)不可達(dá)代碼為
1協(xié)議不可達(dá)代碼為
2端口不可達(dá)代碼為
3需要進(jìn)行分片但設(shè)置了不分片位代碼為
4
原點(diǎn)抑制消息(ICMP Source Quench Message):
ICMP 原點(diǎn)抑制消息的目是為了緩和網(wǎng)絡(luò)擁堵的問題,當(dāng)路由器向低速線路發(fā)送數(shù)據(jù)時(shí),其發(fā)送隊(duì)列的緩存變?yōu)榱愣鵁o法發(fā)送出去時(shí),可以向 IP 包的源地址發(fā)送一個(gè) ICMP 原點(diǎn)抑制消息。
但是收到這種 ICMP 消息的主機(jī)并不見得真的會增大 IP 包的傳輸間隔,還可能會引起不公平的網(wǎng)絡(luò)通信,所以一般不被使用。
重定向消息(ICMP Redirect Message):
在路由器持有更好的路由信息時(shí),發(fā)現(xiàn)發(fā)送端主機(jī)使用了不是最優(yōu)的路徑發(fā)送數(shù)據(jù),那么路由器會返回一個(gè) ICMP 重定向消息給這個(gè)主機(jī)。這個(gè)消息中包含了最合適的路由信息和源數(shù)據(jù),發(fā)送端下次可以發(fā)給另外一個(gè)更近的路由器。
超時(shí)消息(ICMP Time Exceeded Message):
IP 包中有一個(gè)8位的字段叫做 TTL (Time To Live,生存周期),它的值隨著每經(jīng)過一次路由器就會減 1,直到減到 0 時(shí)該 IP 包會被丟棄。
此時(shí),IP 路由器將會發(fā)送一個(gè)ICMP超時(shí)消息給發(fā)送端主機(jī),并通知該包已被丟棄。設(shè)置 IP 包生存周期的主要目的是為了在路由控制遇到問題發(fā)生循環(huán)狀況時(shí),避免 IP 包無休止地在網(wǎng)絡(luò)上被轉(zhuǎn)發(fā)。
也可以通過設(shè)置一個(gè)較小的 TTL 值 控制包的到達(dá)范圍。
03
socket 原始套接字實(shí)現(xiàn) ping 命令
學(xué)了這么多基礎(chǔ)的網(wǎng)絡(luò)知識,我們最終為了什么?就是為了能夠自己實(shí)現(xiàn)PING命令。相關(guān)的網(wǎng)絡(luò)知識還有很多,但對于我們實(shí)現(xiàn)PING命令并沒有太大關(guān)系,就暫不做深究。
下面我們從實(shí)戰(zhàn)出現(xiàn),一步步調(diào)試?yán)^續(xù)深挖PING命令的實(shí)現(xiàn)原理。
首先我們創(chuàng)建ICMP協(xié)議的原始套接字鏈接:
import socket
icmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
發(fā)送回送請求
然后需要向目標(biāo)發(fā)送一個(gè)回送請求,結(jié)構(gòu)如下:

下面開始組織報(bào)文數(shù)據(jù)(對于系列號,我們可以自行決定要發(fā)送的值):
import os
import time
import struct
# 校驗(yàn)需要后面再計(jì)算,這里先設(shè)置為0
ICMP_ECHO_REQUEST, code, checksum, identifier, serial_num = 8, 0, 0, os.getpid() & 0xFFFF, 0
# 初步打包ICMP頭部
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 打包選項(xiàng)數(shù)據(jù),包含當(dāng)前時(shí)間戳,后面用Q補(bǔ)齊到192位
data = struct.pack("d", time.time()).ljust(192, b"Q")
計(jì)算校驗(yàn)和的規(guī)則這里我已經(jīng)寫成代碼,大家可以直接看代碼:
def calc_checksum(src_bytes):
"""用于計(jì)算ICMP報(bào)文的校驗(yàn)和"""
total = 0
max_count = len(src_bytes)
count = 0
while count < max_count:
val = src_bytes[count + 1]*256 + src_bytes[count]
total = total + val
total = total & 0xffffffff
count = count + 2
if max_count < len(src_bytes):
total = total + ord(src_bytes[len(src_bytes) - 1])
total = total & 0xffffffff
total = (total >> 16) + (total & 0xffff)
total = total + (total >> 16)
answer = ~total
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return socket.htons(answer)
??注意:最終返回時(shí)通過socket.htons方法將數(shù)據(jù)從主機(jī)序轉(zhuǎn)換為網(wǎng)絡(luò)序。
然后就可以計(jì)算出校驗(yàn)和重新打包header:
checksum = calc_checksum(header + data)
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
然后就可以發(fā)送了:
# 發(fā)送給目標(biāo)地址,ICMP協(xié)議沒有端口的概念端口可以隨便填
target_addr = "192.168.3.31"
icmp_socket.sendto(header + data, (target_addr, 1))
??注意:雖然發(fā)送給了1號端口,但其實(shí)發(fā)送給任意端口都可以。
接收回送響應(yīng)
回送響應(yīng)與回送請求結(jié)構(gòu)一致:

發(fā)送完消息后,我們就可以接收回送相應(yīng):
# 接收回送請求
recv_packet, addr = icmp_socket.recvfrom(1024)
# 前20字節(jié)是ip協(xié)議的ip頭
icmp_header = recv_packet[20:28]
data = recv_packet[28:]
ICMP_Echo_Reply, code, checksum, identifier, serial_num = struct.unpack(
"bbHHh", icmp_header
)
time_sent, = struct.unpack("d", data[:struct.calcsize("d")])
??注意:我們接收的回送請求中包含了前20自己的IP頭。
從選項(xiàng)數(shù)據(jù)中可解析出了這個(gè)包發(fā)送的時(shí)間(之前發(fā)出時(shí)寫入的時(shí)間)。
完善 ping 命令的開發(fā)
雖然標(biāo)準(zhǔn)的PING命令是用以上協(xié)議規(guī)則實(shí)現(xiàn)的,但我們并不需要完全按照上述規(guī)范,例如標(biāo)識符可以發(fā)送任何16位的值,序號可以從任意數(shù)值開始,選項(xiàng)數(shù)據(jù)192位的空間也可以用來存放任何數(shù)據(jù)。
我們在接收回送響應(yīng)時(shí)需要檢查包的標(biāo)識符,確定是自己發(fā)出的包才接收。
最終封裝出如下方法:
import struct
import time
import os
import socket
import select
def calc_checksum(src_bytes):
"""用于計(jì)算ICMP報(bào)文的校驗(yàn)和"""
total = 0
max_count = len(src_bytes)
count = 0
while count < max_count:
val = src_bytes[count + 1]*256 + src_bytes[count]
total = total + val
total = total & 0xffffffff
count = count + 2
if max_count < len(src_bytes):
total = total + ord(src_bytes[len(src_bytes) - 1])
total = total & 0xffffffff
total = (total >> 16) + (total & 0xffff)
total = total + (total >> 16)
answer = ~total
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return socket.htons(answer)
def sent_ping(icmp_socket, target_addr, identifier=os.getpid() & 0xFFFF,
serial_num=0, data=None):
# 校驗(yàn)需要后面再計(jì)算,這里先設(shè)置為0
ICMP_ECHO_REQUEST, code, checksum = 8, 0, 0
# 初步打包ICMP頭部
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 打包選項(xiàng)數(shù)據(jù)
if data:
data = data.ljust(192, b"Q")
else:
data = struct.pack("d", time.time()).ljust(192, b"Q")
checksum = calc_checksum(header + data)
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 發(fā)送給目標(biāo)地址,ICMP協(xié)議沒有端口的概念端口可以隨便填
icmp_socket.sendto(header + data, (target_addr, 1))
def receive_pong(icmp_socket, identifier=os.getpid() & 0xFFFF, serial_num=0, timeout=2):
icmp_socket.settimeout(timeout)
time_remaining = timeout
while True:
start_time = time.time()
# 接收回送請求
recv_packet, (ip, port) = icmp_socket.recvfrom(1024)
time_received = time.time()
time_spent = time_received-start_time
# 前20字節(jié)是ip協(xié)議的ip頭
icmp_header = recv_packet[20:28]
data = recv_packet[28:]
ICMP_Echo_Reply, code, checksum, identifier_reciver, serial_num_reciver = struct.unpack(
"bbHHh", icmp_header
)
if identifier_reciver != identifier or serial_num != serial_num_reciver:
# 不是當(dāng)前自己發(fā)的包則忽略
time_remaining -= time_spent
if time_remaining <= 0:
raise socket.timeout
continue
time_sent, = struct.unpack("d", data[:struct.calcsize("d")])
return int((time_received - time_sent)*1000), ip
192.168.3.31是我當(dāng)前本機(jī)的局域網(wǎng)IP地址,測試一下:
icmp_socket = socket.socket(
socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
ip = '192.168.3.31'
sent_ping(icmp_socket, ip)
try:
delay, ip_received = receive_pong(icmp_socket, timeout=2)
print(f"延遲:{delay}ms,對方ip:{ip_received}")
except socket.timeout as e:
print("超時(shí)")
延遲:0ms,對方ip:192.168.3.31
然后再批量ping一下指定當(dāng)前網(wǎng)段的所有IP:
def get_local_ip():
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(('1.1.1.1', 80))
ip, port = s.getsockname()
return ip
icmp_socket = socket.socket(
socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
local_ip = get_local_ip()
net_segment = local_ip[:local_ip.rfind(".")]
ips = []
for i in range(1, 255):
ip = f"{net_segment}.{i}"
sent_ping(icmp_socket, ip)
print("ping", ip, end=" ")
try:
delay, ip_received = receive_pong(icmp_socket, timeout=0.1)
print(f"延遲:{delay}ms,對方ip:{ip_received}")
ips.append(ip)
except socket.timeout as e:
print("超時(shí)")
print(ips)
icmp_socket.close()
超時(shí)時(shí)間0.1秒時(shí),總耗時(shí)30秒:

超時(shí)時(shí)間設(shè)置為0.01秒時(shí),總耗時(shí)則為2.59秒。
借助 arp 表獲取當(dāng)前網(wǎng)段在線設(shè)備
如何盡量快的獲取到當(dāng)前在線的設(shè)備?經(jīng)過測試發(fā)現(xiàn),被ping后,ping不通的機(jī)器,arp表能夠自動刪除對應(yīng)的條目,那么思路1就是快速的向全網(wǎng)段發(fā)送回送請求不等待回送響應(yīng),然后2秒后去查arp表,即可看到最新的在線設(shè)備。
實(shí)現(xiàn)思路1:
import struct
import time
import os
import re
import socket
import pandas as pd
def calc_checksum(src_bytes):
"""用于計(jì)算ICMP報(bào)文的校驗(yàn)和"""
total = 0
max_count = len(src_bytes)
count = 0
while count < max_count:
val = src_bytes[count + 1]*256 + src_bytes[count]
total = total + val
total = total & 0xffffffff
count = count + 2
if max_count < len(src_bytes):
total = total + ord(src_bytes[len(src_bytes) - 1])
total = total & 0xffffffff
total = (total >> 16) + (total & 0xffff)
total = total + (total >> 16)
answer = ~total
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return socket.htons(answer)
def sent_ping(icmp_socket, target_addr, identifier=os.getpid() & 0xFFFF,
serial_num=0, data=None):
# 校驗(yàn)需要后面再計(jì)算,這里先設(shè)置為0
ICMP_ECHO_REQUEST, code, checksum = 8, 0, 0
# 初步打包ICMP頭部
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 打包選項(xiàng)數(shù)據(jù)
if data:
data = data.ljust(192, b"Q")
else:
data = struct.pack("d", time.time()).ljust(192, b"Q")
checksum = calc_checksum(header + data)
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 發(fā)送給目標(biāo)地址,ICMP協(xié)議沒有端口的概念端口可以隨便填
icmp_socket.sendto(header + data, (target_addr, 1))
def get_arp_ip_mac():
header = None
with os.popen("arp -a") as res:
for line in res:
line = line.strip()
if not line or line.startswith("接口"):
continue
if header is None:
header = re.split(" {2,}", line.strip())
break
df = pd.read_csv(res, sep=" {2,}",
names=header, header=0, engine='python')
return df
def ping_net_segment_all(net_segment):
with socket.socket(
socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) as icmp_socket:
for i in range(1, 255):
ip = f"{net_segment}.{i}"
sent_ping(icmp_socket, ip)
net_segment = "192.168.3"
ping_net_segment_all(net_segment)
# 等待回送響應(yīng)的到來,預(yù)計(jì)1秒之內(nèi)
time.sleep(1)
# 讀取最新的arp表
df = get_arp_ip_mac()
df
于是我們獲取到了當(dāng)前網(wǎng)段在線的設(shè)備列表:

雙線程獲取指定網(wǎng)段的在線設(shè)備
不過使用arp表查看有個(gè)缺陷,只能查看當(dāng)前網(wǎng)段的,跨網(wǎng)段的在線設(shè)備似乎看不到。經(jīng)分析我使用的臺式機(jī)通過有線連接到3網(wǎng)段,而手機(jī)通過WiFi連接到2網(wǎng)段,所以必須能夠分析2網(wǎng)段設(shè)備的在線設(shè)備才有意義。
思路2:用兩個(gè)線程一個(gè)線程專門發(fā)回送請求,一個(gè)線程專門接收回送響應(yīng),可以通過回送響應(yīng)獲取IP地址,于是就可以得到指定網(wǎng)段的當(dāng)前在線的設(shè)備的ip。
先完成獲取在線設(shè)備列表:
from concurrent.futures import ThreadPoolExecutor
import _thread
import struct
import time
import os
import re
import socket
import pandas as pd
def calc_checksum(src_bytes):
"""用于計(jì)算ICMP報(bào)文的校驗(yàn)和"""
total = 0
max_count = len(src_bytes)
count = 0
while count < max_count:
val = src_bytes[count + 1]*256 + src_bytes[count]
total = total + val
total = total & 0xffffffff
count = count + 2
if max_count < len(src_bytes):
total = total + ord(src_bytes[len(src_bytes) - 1])
total = total & 0xffffffff
total = (total >> 16) + (total & 0xffff)
total = total + (total >> 16)
answer = ~total
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return socket.htons(answer)
def sent_ping(icmp_socket, target_addr, identifier=os.getpid() & 0xFFFF,
serial_num=0, data=None):
# 校驗(yàn)需要后面再計(jì)算,這里先設(shè)置為0
ICMP_ECHO_REQUEST, code, checksum = 8, 0, 0
# 初步打包ICMP頭部
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 打包選項(xiàng)數(shù)據(jù)
if data:
data = data.ljust(192, b"Q")
else:
data = struct.pack("d", time.time()).ljust(192, b"Q")
checksum = calc_checksum(header + data)
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 發(fā)送給目標(biāo)地址,ICMP協(xié)議沒有端口的概念端口可以隨便填
icmp_socket.sendto(header + data, (target_addr, 1))
def receive_pong(icmp_socket, net_segment, timeout=2):
icmp_socket.settimeout(timeout)
ips = set()
while True:
start_time = time.time()
try:
recv_packet, (ip, port) = icmp_socket.recvfrom(1024)
if ip.startswith(net_segment):
ips.add(ip)
except socket.timeout as e:
break
return ips
def ping_net_segment_all(icmp_socket, net_segment):
for i in range(1, 255):
ip = f"{net_segment}.{i}"
sent_ping(icmp_socket, ip)
icmp_socket = socket.socket(
socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
with ThreadPoolExecutor() as p:
p.submit(ping_net_segment_all, icmp_socket, "192.168.2")
future = p.submit(receive_pong, icmp_socket, "192.168.2", 3)
ips = future.result()
ips
運(yùn)行結(jié)果,目前我的手機(jī)ip為192.168.2.122,運(yùn)行后被順利檢測到:
{'192.168.2.1',
'192.168.2.122',
'192.168.2.17',
'192.168.2.18',
'192.168.2.19',
'192.168.2.20',
'192.168.2.21',
'192.168.2.22',
'192.168.2.23',
'192.168.2.49'}
關(guān)閉手機(jī)WiFi后,再次運(yùn)行,順利看到該IP的下線。
完成 BOSS 來了的摸魚神器
在已經(jīng)將更新時(shí)間縮短到5秒以內(nèi)時(shí),咱們就可以PING指定網(wǎng)段,最后完成分析設(shè)備上下線的功能,從而達(dá)到最終的目的完成BOSS來了的摸魚神器。
from concurrent.futures import ThreadPoolExecutor
import _thread
import struct
import time
import os
import re
import socket
import pandas as pd
def calc_checksum(src_bytes):
"""用于計(jì)算ICMP報(bào)文的校驗(yàn)和"""
total = 0
max_count = len(src_bytes)
count = 0
while count < max_count:
val = src_bytes[count + 1]*256 + src_bytes[count]
total = total + val
total = total & 0xffffffff
count = count + 2
if max_count < len(src_bytes):
total = total + ord(src_bytes[len(src_bytes) - 1])
total = total & 0xffffffff
total = (total >> 16) + (total & 0xffff)
total = total + (total >> 16)
answer = ~total
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return socket.htons(answer)
def sent_ping(icmp_socket, target_addr, identifier=os.getpid() & 0xFFFF,
serial_num=0, data=None):
# 校驗(yàn)需要后面再計(jì)算,這里先設(shè)置為0
ICMP_ECHO_REQUEST, code, checksum = 8, 0, 0
# 初步打包ICMP頭部
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 打包選項(xiàng)數(shù)據(jù)
if data:
data = data.ljust(192, b"Q")
else:
data = struct.pack("d", time.time()).ljust(192, b"Q")
checksum = calc_checksum(header + data)
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 發(fā)送給目標(biāo)地址,ICMP協(xié)議沒有端口的概念端口可以隨便填
icmp_socket.sendto(header + data, (target_addr, 1))
def receive_pong(icmp_socket, net_segment, timeout=2):
icmp_socket.settimeout(timeout)
ips = set()
while True:
start_time = time.time()
try:
recv_packet, (ip, port) = icmp_socket.recvfrom(1024)
if ip.startswith(net_segment):
ips.add(ip)
except socket.timeout as e:
break
return ips
def ping_net_segment_all(icmp_socket, net_segment):
for i in range(1, 255):
ip = f"{net_segment}.{i}"
sent_ping(icmp_socket, ip)
last = None
while 1:
icmp_socket = socket.socket(
socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
with ThreadPoolExecutor() as p:
p.submit(ping_net_segment_all, icmp_socket, "192.168.2")
future = p.submit(receive_pong, icmp_socket, "192.168.2")
ips = future.result()
if last is None:
print("當(dāng)前在線設(shè)備:", ips)
if last:
up = ips-last
if up:
print("\r新上線設(shè)備:", up, end=" "*100)
down = last-ips
if down:
print("\r剛下線設(shè)備:", down, end=" "*100)
last = ips
time.sleep(3)
結(jié)果示例:
當(dāng)前在線設(shè)備: {'192.168.2.122', '192.168.2.18', '192.168.2.20', '192.168.2.1', '192.168.2.23', '192.168.2.49', '192.168.2.21', '192.168.2.17', '192.168.2.22', '192.168.2.19'}
剛下線設(shè)備: {'192.168.2.122'}
經(jīng)測試,手工關(guān)閉或打開手機(jī)WiFi能夠順利看到設(shè)備IP的打印信息。這種方法雖然無法獲取MAC地址,但是經(jīng)測試,同一臺機(jī)器都會被分配同一個(gè)IP,在我當(dāng)前的網(wǎng)絡(luò)下是滿足要求的,只需要知道老板手機(jī)連接的IP就行了?;蛘哂^察一下,老板走之后,到底哪個(gè)IP下線了,專門去監(jiān)控這個(gè)IP。
更安全的做法就是每看到有新的IP上線都額外警惕一點(diǎn),如果你是win10系統(tǒng)可以使用如下方法實(shí)現(xiàn)系統(tǒng)通知:
from win10toast import ToastNotifier
toaster = ToastNotifier()
toaster.show_toast("通知標(biāo)題", "通知內(nèi)容!", duration=10)
上述三個(gè)參數(shù)分別是通知標(biāo)題,通知的內(nèi)容和通知持續(xù)的時(shí)間,對于摸魚這種事持續(xù)時(shí)間可以調(diào)大掉,再手工關(guān)閉通知,通過pip install win10toast安裝。
04
總結(jié)
總算做成了這個(gè)摸魚神器,不過雖然我上面一本正經(jīng)的講的津津有味,但不會真有人打算拿這個(gè)代碼去應(yīng)用于實(shí)際去對付老板吧??不會吧,不會吧??
真打算做摸魚神器的童鞋,我個(gè)人推薦搞個(gè)網(wǎng)絡(luò)攝像頭,寫個(gè)人物圖像識別的代碼,發(fā)現(xiàn)有人進(jìn)來了都自動提醒,這樣才可以更放心的摸魚。萬一老板沒連wifi就過來了,這就有點(diǎn)坑。
開發(fā)摸魚神器不是本文本身的目的,學(xué)習(xí)網(wǎng)絡(luò)知識自主實(shí)現(xiàn)網(wǎng)絡(luò)協(xié)議,從通過實(shí)際例子理解網(wǎng)絡(luò)協(xié)議才是本文真正的目的。為了構(gòu)思本文,我也是苦思冥想了幾天幾夜了,小小明在這里在線求大家一個(gè)3連可以嗎???
我是小小明,咱們下期再見~別忘了點(diǎn)亮小紅心噢~
推薦閱讀

