IPC-消息隊(duì)列


消息隊(duì)列是IPC(進(jìn)程間通信,inter process communication)中常用的一種方式,相比與其他的通信方式消息隊(duì)列具有在短消息處理和消息類別上有突出的表現(xiàn)。在理解消息隊(duì)列前,需要了解一點(diǎn)關(guān)于Linux內(nèi)核的知識(shí)。
![]() | ![]() |
如圖所示,Linux/unix的體系架構(gòu)可以抽像成三層結(jié)構(gòu):應(yīng)用程序(用戶程序,軟件等),內(nèi)核(用于操作底層硬件)以及硬件。所以用戶編寫的程序是不能夠直接操縱底層硬件的,需要通過系統(tǒng)內(nèi)核。因此這就可以解釋為什么共享內(nèi)存的方式是IPC最快的方式,因?yàn)楣蚕韮?nèi)存沒有在內(nèi)核中,而其他的包括消息隊(duì)列是在內(nèi)核中開辟空間,所以在訪問速度上直接訪問用戶內(nèi)存比內(nèi)核空間要快。
1.1 消息隊(duì)列結(jié)構(gòu)
消息隊(duì)列雖然不能夠進(jìn)程大量數(shù)據(jù)通信,但是卻有一個(gè)很明顯的優(yōu)點(diǎn),那就是在同一個(gè)消息隊(duì)列中可以包含不同類型的消息,接收端可以根據(jù)自己的情況接受相應(yīng)的消息,那么消息隊(duì)列是怎樣來保證這樣的特點(diǎn)的呢?先看一下它的結(jié)構(gòu):

如圖在消息隊(duì)列其實(shí)就是一個(gè)鏈表,其中可以存在不同的type,每一個(gè)type可能會(huì)有多條消息,接受端根據(jù)自己需要的type,一次從鏈表中獲取第一個(gè),第二個(gè)消息。
1.2 如何使用消息隊(duì)列
消息隊(duì)列的使用很簡(jiǎn)單,使用msgget( ), msgsnd( ),msgrcv( )以及msgctl( )就完全搞定。
key:是內(nèi)核中消息隊(duì)列的標(biāo)識(shí),一般使用ftok來生成。
flag:msgget(key, IPC_CREAT|0666); 創(chuàng)建由key指定的消息隊(duì)列,操作權(quán)限為0666。
IPC_CREAT:用來創(chuàng)建一個(gè)消息隊(duì)列
IPC_EXCL:查詢消息隊(duì)列是否存在,IPC_CREAT同時(shí)使用,存在則報(bào)錯(cuò)。
IPC_NOWAIT:之后的消息隊(duì)列操作都為非阻塞


msqid:msgget創(chuàng)建隊(duì)列后返回的消息隊(duì)列唯一的標(biāo)識(shí)。
ptr:用于接受或發(fā)送消息的內(nèi)容,其結(jié)構(gòu)固定,
struct msgbuf {long mtype;//消息類型char mtext[1024];//消息數(shù)據(jù)};
nbytes:消息長(zhǎng)度。
flag:標(biāo)志選項(xiàng)
0 阻塞等待
IPC_NOWAIT 使操作不阻塞,沒有消息返回-1,error設(shè)置為ENOMSG
消息隊(duì)列如何使用?一個(gè)簡(jiǎn)單的例子:
發(fā)送端:

接受端:

在使用消息隊(duì)列的時(shí)候,你是否會(huì)有這樣的疑問:如果多個(gè)進(jìn)程同時(shí)向一個(gè)消息隊(duì)列里面同一個(gè)Type發(fā)送消息,那么是否會(huì)出現(xiàn)資源競(jìng)爭(zhēng)同步的問題呢?

是否會(huì)出現(xiàn)上面圖中的局面呢,也就是說多個(gè)進(jìn)程同時(shí)往消息隊(duì)列里面寫,那么就會(huì)同時(shí)認(rèn)為msg1是鏈表的末尾,于是都把自己的消息添加到msg1的后面,導(dǎo)致了上面的現(xiàn)象,在接收端取數(shù)據(jù)的時(shí)候就會(huì)出現(xiàn)問題了。
正確答案是,系統(tǒng)并沒有這么傻,對(duì)于消息隊(duì)列而言,一定要保證單鏈表,在內(nèi)核代碼中其實(shí)是做了同步的,也就是說當(dāng)多個(gè)進(jìn)程同時(shí)寫的話,內(nèi)核里面也會(huì)一個(gè)一個(gè)的進(jìn)行鏈表的連接。我們大可放心,且不像共享內(nèi)存一樣需要用戶自己同步,在使用消息隊(duì)列的時(shí)候用戶不需要進(jìn)行同步。
內(nèi)核代碼里面有一個(gè)ipc_lock_check()來進(jìn)行同步,保證資源競(jìng)爭(zhēng)。
// 先看msgsnd()函數(shù),它通過系統(tǒng)調(diào)用接口界面,進(jìn)入內(nèi)核執(zhí)行,代碼如下:SYSCALL_DEFINE4(msgsnd, int, msqid, struct msgbuf __user *, msgp, size_t, msgsz,int, msgflg){long mtype;if (get_user(mtype, &msgp->mtype))return -EFAULT;return do_msgsnd(msqid, mtype, msgp->mtext, msgsz, msgflg);}// 接下來看do_msgsnd()部分的代碼,如下:long do_msgsnd(int msqid, long mtype, void __user *mtext,size_t msgsz, int msgflg){struct msg_queue *msq;struct msg_msg *msg;int err;struct ipc_namespace *ns;ns = current->nsproxy->ipc_ns;if (msgsz > ns->msg_ctlmax || (long) msgsz < 0 || msqid < 0)return -EINVAL;if (mtype < 1)return -EINVAL;msg = load_msg(mtext, msgsz);if (IS_ERR(msg))return PTR_ERR(msg);msg->m_type = mtype;msg->m_ts = msgsz;msq = msg_lock_check(ns, msqid);if (IS_ERR(msq)) {err = PTR_ERR(msq);goto out_free;}for (;;) {struct msg_sender s;err = -EACCES;if (ipcperms(&msq->q_perm, S_IWUGO))goto out_unlock_free;err = security_msg_queue_msgsnd(msq, msg, msgflg);if (err)goto out_unlock_free;if (msgsz + msq->q_cbytes <= msq->q_qbytes &&1 + msq->q_qnum <= msq->q_qbytes) {break;}/* queue full, wait: */if (msgflg & IPC_NOWAIT) {err = -EAGAIN;goto out_unlock_free;}ss_add(msq, &s);ipc_rcu_getref(msq);msg_unlock(msq);schedule();ipc_lock_by_ptr(&msq->q_perm);ipc_rcu_putref(msq);if (msq->q_perm.deleted) {err = -EIDRM;goto out_unlock_free;}ss_del(&s);if (signal_pending(current)) {err = -ERESTARTNOHAND;goto out_unlock_free;}}msq->q_lspid = task_tgid_vnr(current);msq->q_stime = get_seconds();if (!pipelined_send(msq, msg)) {/* noone is waiting for this message, enqueue it */list_add_tail(&msg->m_list, &msq->q_messages);msq->q_cbytes += msgsz;msq->q_qnum++;atomic_add(msgsz, &ns->msg_bytes);atomic_inc(&ns->msg_hdrs);}err = 0;msg = NULL;out_unlock_free:msg_unlock(msq);out_free:if (msg != NULL)free_msg(msg);return err;}// 在這段代碼中,請(qǐng)注意臨近入口位置的這個(gè)函數(shù)msg_lock_check(),我們跟進(jìn),看一下這個(gè)lock是如何check// 的,代碼如下:static inline struct msg_queue *msg_lock_check(struct ipc_namespace *ns,int id){struct kern_ipc_perm *ipcp = ipc_lock_check(&msg_ids(ns), id);if (IS_ERR(ipcp))return (struct msg_queue *)ipcp;return container_of(ipcp, struct msg_queue, q_perm);}// ipc_lock_check()是一個(gè)能夠check所有IPC object同步信息的函數(shù),它的定義如下:struct kern_ipc_perm *ipc_lock_check(struct ipc_ids *ids, int id){struct kern_ipc_perm *out;out = ipc_lock(ids, id);if (IS_ERR(out))return out;if (ipc_checkid(out, id)) {ipc_unlock(out);return ERR_PTR(-EIDRM);}return out;}// 這里的ipc_lock()是至關(guān)重要的地方!通過這個(gè)函數(shù)的注釋,也能明白它的作用了:/*** ipc_lock - Lock an ipc structure without rw_mutex held* @ids: IPC identifier set* @id: ipc id to look for** Look for an id in the ipc ids idr and lock the associated ipc object.** The ipc object is locked on exit.*/struct kern_ipc_perm *ipc_lock(struct ipc_ids *ids, int id){struct kern_ipc_perm *out;int lid = ipcid_to_idx(id);rcu_read_lock();out = idr_find(&ids->ipcs_idr, lid);if (out == NULL) {rcu_read_unlock();return ERR_PTR(-EINVAL);}spin_lock(&out->lock);/* ipc_rmid() may have already freed the ID while ipc_lock* was spinning: here verify that the structure is still valid*/if (out->deleted) {spin_unlock(&out->lock);rcu_read_unlock();return ERR_PTR(-EINVAL);}return out;}
end

排序-冒泡排序
網(wǎng)絡(luò)流媒體-RTP與RTCP
專注音視頻技術(shù)、
編程語(yǔ)言學(xué)習(xí)筆記以及互聯(lián)網(wǎng)信息分享與交流,
掃碼關(guān)注


