题目:
当程序运行时,父进程fork出4个子进程。父进程负责产生消息(每1s产生一个消息),4个子进程负责处理消息。父子进程之间通过消息队列来传递消息。
父进程需要维护一个本地数据库(格式与共享数据库相同),当生成一个消息时,父进程要同步更新本地数据库。子进程在处理消息时,根据消息的内容来对共享数据库进行更新(比如,当一个子进程收到一个[index=2, op=increase]的消息时,需要将共享数据库中index为2的条目的count值递增1),并延迟500ms。
父进程需要处理SIGTERM信号。父进程接收到SIGTERM信号意味着要求结束该程序。在结束程序之前,父进程要结束所有子进程、显示并对比本地数据库和共享数据库中的信息(两者应该相同)。
父进程终止子进程的方法可以自行确定。当子进程结束前,需要打印该子进程已经处理的消息数量。
在程序退出之后,不能有程序相关的信息遗留在系统中(比如僵尸进程、未删除的消息队列等)。
要求
要求使用POSIX标准的消息队列、信号量、共享内存
需要结束时,可以使用kill命令来给指定父进程发送结束信号,比如kill -SIGTERM 4502
0.创建本地数据库和共享数据库
- 使用fork创建子进程
- 父进程每秒通过消息结构体msg生产一条消息
- 更新本地数据库
- 父进程发送msg到子进程(消息队列)
- 子进程处理消息:更新共享数据库,并统计处理消息的次数
- 父进程接收到SIGTERM信号时,杀死所有的子进程,并打印出本地数据库和共享数据库内容
- 父进程确保所有子进程被杀死
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/msg.h>
#include <mqueue.h>
#include <errno.h>
#include <time.h>
#include <signal.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include<sys/wait.h>
#include <iostream>
using namespace std;
enum OP{increase,reduce};
/*本身存在于内核的联合体,不过随着版本变化后来注释掉了,需要自己添加*/
union semun
{
int val; /*信号的值*/
struct semid_ds *buf; /*IPC_STAT、IPC_SET 使用缓存区*/
unsigned short *array; /*GETALL,、SETALL 使用的数组 */
struct seminfo *__buf; /*IPC_INFO(Linux特有) 使用缓存区*/
};
int count=0; /*每个子进程的运算次数*/
int pid[4]; /*存储进程描述符的数组*/
int localDatabaseArray[10] = {0}; /*本地数据库的大小*/
mqd_t msgQueueId; /*消息队列描述符*/
int shareMemoryId,semDescId; /*定义共享内存描述符,信号量描述符*/
char *pShareMemoryAddr; /*共享内存地址*/
/************************************************************************
* 函数功能:通过信号量的P操作进行加锁,且信号量值为1
* 参 数:*semId:共享内存Id
* 返 回 值:void
* ************************************************************************/
void P(int semId)
{
/*struct sembuf:sem_op =-1执行P操作:信号量值减一*/
struct sembuf semStructArray[1]= {0,-1, SEM_UNDO};
/*在 Linux 下,PV 操作通过调用semop函数来实现*/
semop(semId, semStructArray, 1);
}
/************************************************************************
* 函数功能:信号量V操作释放锁
* 参 数:*semId:共享内存Id
* 返 回 值:void
* ************************************************************************/
void V(int semId)
{
/*struct sembuf:sem_op =1执行V操作:信号量值加1*/
struct sembuf semStructArray[1] = {0, 1, SEM_UNDO};
/*在 Linux 下,PV 操作通过调用semop函数来实现*/
semop(semId, semStructArray, 1);
}
/************************************************************************
* 函数功能:杀掉子进程,并打印出处理了多少消息
* 参 数:*signNum:为了匹配函数库里面的函数指针
* 返 回 值:void
* ************************************************************************/
void onKillChild(int sigNum){
cout << "子进程 pid = " <<getpid()<<", 执行次数count = "<<count <<endl;
exit(1);
}
/************************************************************************
* 函数功能:删除信号量
* 参 数:无
* 返 回 值:void
* ************************************************************************/
void onDeleteSemaphore(){
if(semctl(semDescId,0, IPC_RMID, 0)==-1){
cout<<"删除信号量错误"<<endl;
}
else
{
cout<<"删除信号量成功"<<endl;
}
}
/************************************************************************
* 函数功能:父进程发送消息到消息队列,并更新本地数据库
* 参 数:*msgQueueId:消息队列id
* 返 回 值:void
* ************************************************************************/
void onSendMsg(mqd_t msgQueueId){
srand((int)time(0));
int index=rand()%10;
int op=rand()%2; //0-1
/*待发送的消息*/
char msg[]={index+48,op+48};
/*如果op为1则更新本地数据库进行减少操作*/
if(op==reduce){
localDatabaseArray[index]--;
cout<<"index = "<<index<<" <------> op = "<<"reduce(减1)"<<endl;
}
/*如果op为0则更新本地数据库进行增加操作*/
if(op == increase){
localDatabaseArray[index]++;
cout<<"index = "<<index<<" <------> op = "<<"increase(加1)"<<endl;
}
else{}
/*写入消息msg到消息队列msgQueueId里面*/
if(mq_send(msgQueueId,msg,sizeof(msg),0) < 0){
cout<< "父进程往消息队列写入数据失败";
}
}
/************************************************************************
* 函数功能:子进程从消息队列取数据
* 参 数:*msgQueueId:待读取的消息队列,
* *pShareMemoryAddr:带写入的共享内存,*semid:控制写入加锁的信号量
* 返 回 值:void
* ************************************************************************/
void onRecvMsg(mqd_t msgQueueId,char *pShareMemoryAddr,int semid){
struct mq_attr mqattr;
/*获取消息队列msgQueueId的属性*/
mq_getattr(msgQueueId,&mqattr);
int len = mqattr.mq_msgsize;
char *msgbuff = new(std::nothrow) char[len];
if(NULL == msgbuff)
{
cout<<"分配失败"<<endl;
}
/*从消息队列读取数据之前上锁,因为只有一个信号量一次只让一个进程来读,不管它是谁来读*/
P(semid);
/*从msgQueueId指向的队列中读取数据存放在msgbuff中*/
if((mq_receive(msgQueueId,msgbuff,len,NULL))!=-1){
/*intIndex代表index,intOP代表op*/
int intOP = msgbuff[1]-48;
int intIndex = msgbuff[0] - 48;
/*子线程取出的消息是一个字符数组,数组下标为1的值经过转换即为intOP,再通过intOP操作共享内存*/
if(intOP == reduce){
pShareMemoryAddr[intIndex]--;
}
else if(intOP == increase){
pShareMemoryAddr[intIndex]++;
}
else{}
}
else{
cout<<"读取错误"<<endl;
}
count++;
V(semid); /*释放锁*/
}
/************************************************************************
* 函数功能:处理僵尸进程
* 参 数:*signum
* 返 回 值:void
* ************************************************************************/
void dealCorpse(int signum){
int stat;
int pid;
/*僵尸进程由父进程捕获杀死*/
while((pid=wait(NULL))>0){
cout<<"pid = "<<pid <<"被杀死"<<endl;
}
}
/************************************************************************
* 函数功能:删除共享内存
* 参 数:*:无
* 返 回 值:void
* ************************************************************************/
void onDeleteShareMemory(){
/*使用shmctl删除共享内存*/
if(shmctl(shareMemoryId,0,IPC_RMID)==-1){
cout<<"删除共享内存错误"<<endl;
}
else
{
cout<<"删除共享内存成功"<<endl;
}
}
/************************************************************************
* 函数功能:删除消息队列
* 参 数:*:无
* 返 回 值:void
* ************************************************************************/
void onDeleteMsgQueue(){
if(mq_unlink("/msgQueue")==-1){
cout<<"删除消息队列错误"<<endl;
}
else{
cout<<"删除消息队列成功"<<endl;
}
}
/************************************************************************
* 函数功能:kill父进程
* 参 数:*signum
* 返 回 值:void
*************************************************************************/
void onKillFather(int signum){
int i=0;
for(;i<4;i++){
kill(pid[i],SIGTERM); /*当触发SIGTERM时,杀死各个子进程*/
}
/*显示本地数据库和共享数据库的数据*/
sleep(2); /*睡两秒防止前面打印的消息和接下来的输出粘在一起*/
cout<<"----------------------------localDatabase--------------------------------"<<endl;
for(i=0;i<10;i++){
cout<<"index = "<<i<<", value = "<<localDatabaseArray[i]<<endl;
}
cout<<"----------------------------shareDatabase--------------------------------"<<endl;
for(i=0;i<10;i++){
cout<<"index = "<<i<<", value = "<<(int)pShareMemoryAddr[i]<<endl;
}
/*删除消息队列,删除信号量,删除共享内存*/
onDeleteMsgQueue();
onDeleteShareMemory();
onDeleteSemaphore();
exit(1);
}
int main(){
/*1.创建消息队列*/
msgQueueId=mq_open("/msgQueue",O_RDWR | O_CREAT, 0666, NULL);
/*等于-1则创建失败并退出,否则返回描述符*/
if(msgQueueId==-1){
cout<<"消息队列创建失败,退出进程!"<<endl;
exit(1);
}
else{
cout<<"消息队列创建成功"<<endl;
}
/*2.父进程创建共享内存,大小和本地数据库大小相同*/
shareMemoryId=shmget(0,sizeof(localDatabaseArray),IPC_PRIVATE|0666);
/*返回-1则出错*/
if(shareMemoryId==-1){
cout<<"创建失败,退出进程!"<<endl;
exit(1);
}
else{
cout<<"共享内存创建成功"<<endl;
}
/*3.把共享内存区对象映射到调用进程的地址空间*/
pShareMemoryAddr= (char*)shmat(shareMemoryId,NULL,0);
/*初始化共享内存*/
for(int i = 0; i<10; i++)
{
pShareMemoryAddr[i] = 0;
}
/*4.创建信号量。nsems参数 = 1即只创建一个信号量*/
semDescId=semget(1111,1,IPC_CREAT|0644);
if(semDescId == -1){
cout<<"信号量创建失败"<<endl;
}
else{
cout<<"信号量创建成功"<<endl;
}
/*5.初始化信号量*/
union semun semUion;
semUion.val = 1;
/*第三个参数:cmd = SETVAL 命令为设置信号量的值su*/
semctl(semDescId,0,SETVAL,semUion);
/*6.创建四个子进程*/
int i,forkRet;
for(i=0;i<4;i++){ /*fork四个子进程*/
forkRet=fork();
if(forkRet==0){
cout<<"子进程"<<i+1<<", pid = "<<getpid()<<", ppid = "<<getppid()<<endl;
break;//将产生的子进程扔出循环,不让子进程执行循环
}
else if(forkRet > 0 )
{
cout<<"父进程"<<", pid = "<<getpid()<<endl;
}
else
{
cout<<"错误"<<endl;
}
pid[i]=forkRet;
}
/*全体休息两秒*/
sleep(2);
/*8.子进程从消息队列中取出消息存到pShareMemoryAddr共享内存中并处理更新共享内存*/
if(forkRet==0){
while(1){
onRecvMsg(msgQueueId,pShareMemoryAddr,semDescId);
sleep(0.5); /*子进程接收消息,延迟500ms,因为还需要0.5s子进程更新共享队列*/
signal(SIGTERM,onKillChild); /*杀掉子进程,同时再onKillChild中统计子进程执行次数*/
}
}
/*7.父进程发送消息到消息队列*/
else if(forkRet>0){
cout<<"---------父进程发送消息Msg、子进程死亡及执行次数统计,消息格式(index<--->op)--------------------------------"<<endl;
while(1){
onSendMsg(msgQueueId);
sleep(1); /*父进程每隔1s发送一条消息*/
signal(SIGTERM,onKillFather);
signal(SIGCHLD,dealCorpse); /*使用SIGCHILD宏处理僵尸进程*/
}
}
else{}
}
消息队列,信号量,共享内存使用如xmind:
版权声明:本文为qq_41961154原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。