中级项目_bos lab

源码和相关资料在github上获取https://github.com/Gintoki-jpg/BOS_lab

绪论

1.bos lab简介

bos主要是实现一个用户态线程管理器和内存管理器,用户态线程管理器由三个部分组成,用户态线程创建,多种调度策略,同步机制;内存管理器由内存管理构成。bos lab的情况如下:

  • Lab 1: 用户态线程创建 - 2022/10/14
  • Lab 2: 多种调度策略 - 2022/11/04
  • Lab 3: 同步机制 - 2022/11/25
  • Lab 4: 内存管理 - 2022/12/16

每一个lab中包含多个需要实现的功能点,以及一个自测的程序证明各个功能点可以正常执行;

每一个lab最后需要提交一个压缩包,压缩包由一个pdf格式的实验报告和一个代码文件夹组成,压缩包命名为【学号+姓名+bos+labN.zip】:

  • 实验报告的命名方式是【学号+姓名+bos+labN.pdf】,实验报告讲解这个lab中各个功能点的实现情况,以及测试结果,实验报告不需要卷格式,只需要有内容;
  • lab的代码放在一个文件夹里面,每次lab的代码在需要在上一次代码基础上依次递增(推荐使用git管理代码,每次完成一个lab,打一个LabN的tag),代码文件夹命名为【学号+姓名+bos+labN】;

所谓用户态线程就是把内核态的线程在用户态实现了一遍而已(有没有可能意味着我们可以参考内核态线程的实现),目的是更轻量化(更少的内存占用、更少的隔离、更快的调度)和更高的可控性(可以自己控制调度器)。用户态所有东西内核态都「看得见」,只是对于内核而言「用户态线程」只是一堆内存数据而已,这就意味着用户级线程不是真正的线程,所以有时候我们也说,“对操作系统而言,用户态线程具有透明性”。

本次实验应该包含两个部分,一个部分是按内核代码原则设计用户态的线程库,由一系列的函数和以线程控制块TCB为核心的一系列数据结构组成;另一个部分是演示系统,调用线程库创建多线程的程序,使其并发执行,以展示系统的运行状态,显示系统的关键数据结构的内容。

2.开发环境

开发用户态的程序对开发环境要求不高,主要需要配置编译工具链,在虚拟机和裸机上开发体验基本一致。我们提供一个docker镜像作为基础开发环境,大家根据需求取用,包含了gcc(9.4.0)和gdb(9.2),主目录为/data/bos;(关于docker怎么玩的参考Docker - Tintoki_blog (gintoki-jpg.github.io)

但是,说实话Docker我们根本就没用熟练,更别说如何在命令行的模式下进行编程调试了,所以要么选择虚拟机要么就直接用Windows环境编程;

参考如下环境配置

Lab_1

文章参考:操作系统基础: C 语言实现用户态线程_网易订阅 (163.com)

(29条消息) 课程设计—–C语言实现用户态线程_山有木兮°的博客-CSDN博客_thread_create

(29条消息) 用户态线程库原理、设计与实现_扫地僧ninja的博客-CSDN博客_用户级线程库

https://mp.weixin.qq.com/s/Z27-SvfFntXltC7_yiM-Lg

1.实验要求

用户态线程创建需要实现的功能及说明如下

  1. 提供创建用户态线程的机制
    • 栈的空间大小自己定义
    • 线程的数量上线可以设置成一个常数
    • 注意要清理执行完的线程所占用的空间
  2. 上下文切换
    • 了解上下文的概念,搞清楚包含哪些寄存器
    • 明白如何切换栈
    • 这个部分需要手写,不能借用外部函数
  3. 线程状态的管理
    • 线程可以在多种状态之间来回切换,状态的数量最少有3个
  4. random调度策略
    • 目前调度器不需要tick,上下文切换可以是显式调用
    • 调度策略随意发挥,没有任何要求,主要在lab2中实现
  5. 测试程序
    • 测试程序需要同时创建多个线程,体现出用户态线程的并发能力
    • 测试时打印出上下文切换过程中信息,主要是提示切换的时机、切换前后线程的id、两个线程的状态

其他注意事项:

  • 我们lab1实现的架构统一为x86
  • 代码量<1000行,工程量不大,难点主要在于上下文切换
  • 用户态线程模型不定,可以自由选择
  • 用户态debug很方便,debug过程中善用gdb,注意编译时优化不要开的太高,学习基本的gdb命令,掌握gdb中reverse这个神器

2.开发日志

这个专栏用来记录学习过程中的每一点一滴的心得体会过程,以及从完全找不到资料到写线程的一系列流程;

bos这个lab在网上没有一模一样的,我们只能根据实验要求找到类似的项目作为参考;

然后就是必须要吐槽的一点,从关键词“用户态线程管理器”、“用户态线程的实现”到最后的“user level thread”,网上的参考资料真的非常非常少,而且感觉有用更是凤毛麟角,中文的文章现在已经基本上放弃了(中文的那些文章要么就是打广告说一半,要么就是毫无逻辑讲的晕头晕脑),现在就只能寄希望于英文的资料了(或者先跟着中文的某个稍微有用一点的教程实现一遍,再做修改,因为每次其实你检索之后都没怎么仔细看,说不定人家的思想非常棒);

现在我们找到的唯一相关的资料和源码已经准备的差不多了,接下来就是静下心来好好跟着敲代码理解(这很可能需要我们在Linux系统环境下);


2022/9/30 16:47 现在首要任务有两个,第一个是看收集的四个资料(因为源码用的都是同一份所以可以选择比较容易懂得文章好好琢磨),可以不着急敲代码;第二个工作我不知道有没有必要,就是在Linux环境下写代码以及运行代码(大概率是不行的,因为教程使用的代码都是Linux下的函数库)

2022/10/2 9:40 今天大概看了一下参考资料,然后差不多明白了在讲什么以及我们需要实现什么,然后惊奇的发现这需要使用到《汇编语言》和《操作系统原理、实现与实践》,关于这几篇博客,的确是需要好好参考一下的,但是还是感觉的出来不是专业的,所以有些地方不要较真,还是需要查看专业书、专业资料;

2022/10/2 10:24 刚看完(29条消息) 用户态线程库原理、设计与实现_扫地僧ninja的博客-CSDN博客_用户级线程库这篇博客,只能说写的中规中矩,确实也算是入门引导手册,而且给的参考代码也一言难尽;况且关于时间片调度那个地方,哈工大的参考书说的是优先级越低时间片越大(哈工大意思是时间片不一样大,数量都一样),此处说的是时间片大小相同但是优先级高的时间片数量多(可能是为了方便编程?);我们现在的打算就是好好先把网络参考资料看完,然后再看哈工大的内核项目的实现寻找类似点补充,网络参考资料的优先级一定是最高的!

2022/10/2 16:31 网易订阅和微信文章是一样的,都是从宏观上介绍了结构体的一些定义以及线程切换的注意事项,当然代码部分乱的很就不要参考了,代码部分主要还是得看山有木兮的;

2022/10/3 1:15 山有木兮的代码部分整理得很好,但是没有细讲每个地方为什么这么写,所以还是得回过头看网易订阅的讲解;

2022/10/3 11:21 刚刚把整体的代码和讲解看完了,只能说写的确实中规中矩,然后很多功能其实根本就没实现,比如栈清空,EXIT状态清除等,这些还需要自己看书实现,下一步就是看哈工大的书加深理解了;

2022/10/4 22:51 现在只能说已经基本理解了代码,但是还不知道怎么使用gcc和gdb进行调试,所以新编写的程序出现了挺多问题,而且也不知道怎么解决,所以下一步就是熟悉这两个工具,注意不要再乱改windows的源文件了,做一个备份;

2022/10/9 9:07 源码和实验数据、测试程序等均完成,只是源码的话可能需要加上一些自己的东西上去 会更好,这部分在写完实验报告之后完成;

2022/10/9 11:07 到时候魔改代码的时候直接备份一份然后魔改即可,注意加上注释(注释不要太详细),将魔改的代码作为需要提交的代码整理;

2022/10/9 16:40 今天上github和gitee搜刮了一下,发现其实还是有很多合适的代码(只是没有注释),所以接下来还是筛选一下这些代码进行学习整理,不要无脑魔改,注意吸收借鉴别人的思想,一个正常的项目学习流程应该是:看项目运行实际效果(很重要,不要你可能根本不知道自己在做什么)->分模块理解(这部分当然有注释最好不过,这部分也是最耗费经历的)->整体结构梳理,形成思维导图或模块依赖图(从main函数开始)->找其他相似项目代码借鉴吸收,加入自己的思想整理融合,修改代码,写注释文档

3.前置知识点

3.1 函数传参

在汇编层面,C语言如何处理函数调用过程中的参数传递?主要有如下几点:

  • __cdecl:C Declaration的缩写,即C语言默认的函数调用方法。所有参数从右到左依次入栈,这些参数由调用者清除,称为手动清栈。被调用函数不会要求调用者传递多少参数,调用者传递过多或者过少的参数,甚至完全不同的参数都不会产生编译阶段的错误;
  • __stdcall:Standard Call 的缩写,是 C++ 的标准调用方式。所有参数从右到左依次入栈,如果是调用类成员的话,最后一个入栈的是 this 指针。这些堆栈中的参数由被调用的函数在返回后清除,使用的指令是 retnX,X 表示参数占用的字节数,CPU 在 ret 之后自动弹出 X 个字节的堆栈空间,称为自动清栈。函数在编译的时候就必须确定参数个数,并且调用者必须严格的控制参数的生成,不能多,不能少,否则返回后会出错;
  • __pascal:Pascal 语言(Delphi)的函数调用方式,也可以在 C/C++ 中使用,参数压栈顺序与前两者相反。返回时的清栈方式与 _stdcall 相同;
  • __fastcall:编译器指定的快速调用方式。由于大多数的函数参数个数很少,使用堆栈传递比较费时,故通常规定将前两个(或若干个)参数由寄存器传递,其余参数还是通过堆栈传递。不同编译器编译的程序规定的寄存器不同,返回方式和 _stdcall 相同;
  • __thiscall:为了解决类成员调用中 this 指针传递而规定,要求把 this 指针放在特定寄存器中,该寄存器由编译器决定,VC 使用 ecx,Borland 的 C++ 编译器使用 eax,返回方式和 _stdcall 相同;

_fastcall 和 _thiscall 涉及的寄存器由编译器决定,因此不能用作跨编译器的接口,故 Windows 上的 COM 对象接口(平台无关、语言中立、位置透明、支持网络的中间件技术)都定义为 _stdcall 调用方式;

C 语言中不加说明默认函数为 _cdecl 方式(C中也只能用这种方式),C++ 也一样,但是默认的调用方式可以在 IDE 环境中修改;

3.2 堆栈变化

在_cdecl的函数调用规范下,CPU目前执行的函数为father,该函数即将调用函数child,函数栈空间如何变化呢?

1
2
3
void father(){
child(120);
}
1
2
3
void child(int a){
int b=a;
}

上述两个函数经过编译后产生如下汇编代码

1
2
3
4
5
6
7
father proc:
push eax ;将寄存器eax中的数据送入栈中
mov eax,120 ;将120送入eax寄存器
push eax ;将寄存器eax中的数据送入栈中,这一步等价于将参数传递给child函数
call child ;调用child函数
pop eax ;从栈顶取出数据送入eax
ret ;汇编ret指令含义是子程序的返回指令
1
2
3
4
5
child proc:
push ebp ;寄存器ebp中的数据送入栈中
mov ebp,esp ;将esp中的值送入ebp中
mov eax,[ebp+8] ;取出参数到eax寄存器中,因为ebp指向当前栈顶,而120数据在栈中father函数的pop指令的返回地址之后,因此需要偏移8字节
ret

3.3 线程切换

线程的切换有以下几个要点:

  • 我们需要为每一个线程设立一个独立的,互相不干扰的栈空间;

  • 当线程发生切换的时候,当前线程被切换之前,需要把自己的现场进行完好的保留,同时记录下一条需要执行指令的指令地址;

  • 把CPU的栈顶指针寄存器esp切换到即将被调入的线程的堆栈的栈顶地址,完成线程栈空间的切换;

程序执行实际是汇编指令在CPU上逐条执行,对于单核CPU(也就是我们当前的实验环境)来说,永远只有一条控制流(这意味着咱们模拟出来的多线程本质上还是单线程);

汇编指令执行的时候依赖它所在的CPU环境:

  • 通用寄存器:eax、edx、ecx、ebx、esp、ebp、esi、edi
  • 标志寄存器:eflags
  • 指令寄存器:eip (eip用来保存下一条要被指令的地址)
  • 栈:用于保存指令序列执行过程中的临时数据

上述通用寄存器中esp保存的是栈顶指针内存地址的值,下面几条指令都依赖于esp工作:

- call指令把它后面的指令地址保存到esp指向的内存单元,同时修改eip。如call 0x2000,先把call 0x2000的下一条指令地址压栈,同时修改eip为0x2000;

- ret指令把esp指向的内存单元中的值保存到eip;

- push指令把值保存到esp指向的内存单元;

- pop把esp指向的内存单元的值取出;

为啥要介绍上面的知识点?因为这涉及控制流的切换,实际上就是更改esp的栈顶指针顺带把通用寄存器中保存的环境进行修改以适应新指令的执行环境(通常新的指令的也保存在栈中),通常有多种手段来保存寄存器的环境,最简单的一种就是将其保存到定义好的结构体中,当然我们实际开发中使用的是将当前寄存器的环境保存到当前使用的栈中;

当然线程切换不仅仅只是简单的控制流切换,同时还应该保存当前执行的环境(当前寄存器环境以及当前栈顶位置等),我们可以将当前寄存器的环境保存在当前线程的运行栈中,在切换控制流的同时切换寄存器环境,这就是所谓的上下文切换;

在栈中进行线程以及上下文的切换流程图如下

  • 线程 0 准备切换时,将当前 CPU 中的寄存器环境一个一个压入到自己的栈中,最后一个压栈的是 eflags 寄存器;

  • 线程 0 将自己的栈顶指针(保存 eflags 的那个位置)保存到全局数组 task[0] 中;

  • 线程 0 从全局数据 task 中取出下一个线程的栈顶,假设下一个要运行的线程是 1 号线程,则从 task[1] 中取出线程 1 的栈顶指针保存到 CPU 的 esp 寄存器中。此时意味着栈已经被切换。栈切换完成后,本质上已经在线程 1 中了;

  • 线程 1 将自己栈中的寄存器环境 pop 到对应的 CPU 寄存器中,比如第一个 pop 到 eflags 中,最后一个是 pop ebp;

用户级线程切换不同于内核级线程切换,在多个用户级线程之间实现切换只需要在切换位置(俗称调度点)调用一个普通的调度切换函数schedule即可,这个函数主要实现的功能就是从线程队列中选择一个合适的线程,接着将当前线程切换为被选择的线程,对应我们分别会设计的线程调度函数pick以及上下文切换函数switch_to_next,需要注意的是pick函数和switch_to_next函数一定是一起出现的,单独出现没有意义,因此我们会将这两个函数封装在schedule函数中;

pick函数实现了从一系列可调度线程中选择一个线程进行切换;swith_to_next函数实现的是被选中线程和当前线程的上下文的切换,这部分涉及一些汇编的知识,但整体上来说还是能够理解,在函数封装章节中我们会详细介绍;

所以接下来的问题是如何确定调度点也就是线程需要进行切换的位置,首先我们先声明本实验基于抢占式线程调度的背景(至于线程调度是什么我们后面会介绍),因此线程的切换可以分为主动切换和被动切换,主动切换是指线程主动让出CPU,被动切换是指线程因为被操作系统强制结束执行让出CPU;

实现主动切换非常简单,我们只需要在进行主动切换的地方手动添加schedule实现线程切换,当然这只是基本思想,实际上我们在进行主动切换之后还需要知道线程什么时候能够被切回来,这就引出了my_sleep函数,我们将在后面进行详细解析;

那应该如何实现被动切换呢?基于时间片的基本原理,为每个线程分配一定数量的时间片,线程在执行的过程中会逐渐消耗时间片数量,当它消耗掉自己所有所有的时间片后就需要调用schedule函数进行切换(无论此时它是否执行完成自己的任务);

3.4 线程调度

线程切换中我们是否会思考这样一个问题 —— 如何在一系列可供选择的就绪线程中选择下一个线程(其目的是将CPU分配给这个线程)是良好的,这就引出我们下面要讨论的线程调度,这也是pick函数要做的事;

线程调度的两种方式:

  • 非抢占方式:这种方式下一旦把CPU分给一个线程,该线程就会保持CPU直到终止或转换到等待状态,这种方式不能用于分时系统和多数实时系统(无法处理紧急任务);
  • 抢占方式:若有某个更加重要的线程需要使用处理机的时候,立即暂停正在执行的线程,将处理机分配给这个更加重要的线程,这种方式可以处理更高优先级的线程,也可以实现时间片的轮流处理;

本次实验采用抢占方式进行线程调度,便于实现之后的时间片轮转调度;

现在我们回到刚开始的问题,如何选择一个线程是良好的?最简单的方式当然是直接在就绪线程队列中选择第一个线程调度执行,这也是FCFS先来先服务调度算法的基本思想,特性是公平,缺点是很可能导致任务的平均周转时间较长,我们用下面这个例子举例

根据FCFS的基本思想我们可以得到其算法实例如下

则其平均周转时间为(10+39+42+49+61)/5=40.2;

FCFS调度算法是一种非交互式的调度算法,在进行某些交互任务的时候表现并不会很好,非常影响响应时间;假如我们在一段时间内让所有的任务都有机会向前推进(基于时间片的调度)而不是呆呆的让抢占到CPU的任务执行完毕(非交互式调度)才让出CPU,是否可以优化响应时间呢?这就引出时间片轮转RR调度:将一段时间等分(执行时间片)的分割给每个任务,当前任务的时间片用完后就会切换到下一个任务;

在RR算法中核心是选择合适的时间片大小和数量:时间片设得太短会导致过多的线程切换,降低了CPU效率;而设得太长又可能引起对短的交互请求的响应变差;

4.数据结构设计

4.1 线程控制块

TCB与PCB相似,只是TCB中保存的线程状态少于PCB中保存的,通过控制TCB中的数据,可以对线程的状态等一系列事务进行操作

1
2
3
4
5
6
7
8
9
10
struct task_struct {
int id; //线程的标识符,即线程id
void (*thread_func)(); //指向线程函数的函数指针,我们传入该参数的时候,需要把函数的地址以整数的形式传入
int esp; //用来在发生线程切换时保存线程的栈顶地址,简单来说esp记录了栈顶指针
unsigned int wakeuptime; // 线程唤醒时间;当线程调用sleep函数之后, Wakeuptime便被设置为当前的时间加上线程需要休眠的时间,同时线程的状态被设置为thread_sleep。当线程发生调度的时候,调度函数会检查每一个处于睡眠状态的线程,如果当前的时间大于wakeuptime则将其状态设置为THREAD_RUNNING,重新参与现场的调度
int status; // 线程状态,包括RUNNING,SLEEP,EXIT,READY等
int counter; // 我们为每一个线程设定了一定数量的时间片,counter记录了线程当前还有多少时间片,每一个时间片都是一个单位的时间,比如说10毫秒。每一个时间片结束都会发生一次调度中断,这个中断的中断服务子程序会检查当前线程的counter是否小于0,如果小于零则代表当前线程的时间片用完了而需要进行线程的调度,调度算法会从线程队列中寻找另外一个可调度的而且counter大于0且counter最大的线程进行调度,否则的话直接结束中断。
int priority; // 线程优先级;当所有的线程时间片都已经用完的时候,需要重新为每一个线程分配时间片。每一个线程分配多少的时间变得这个就由priority来决定。优先级高的线程能够分配到更多的时间片,而优先级低的线程分配到时间片就相对的少,这样便实现了线程之间的优先级,让优先级高的线程能够得到更多的CPU处理时间,而线程优先级低的线程CPU处理时间则相对较少。
int stack[STACK_SIZE]; //线程运行栈
};

注意,初始化的时候stack栈预先保存了现场的初始状态便于线程进行调度,esp初始时指向的就是stack的栈顶,因此在初始化线程的时候需要在stack中设置好线程上下文的初始环境,同时传入start函数的地址作为启动函数的地址;

4.2 线程队列

TCB仅仅只是针对每一个线程进行设计,我们还需要一个数据结构将所有线程集合起来让调度算法对其进行统一操作,这个数据结构就是线程队列,我们使用task数组来保存线程结构体指针

1
2
3
4
#define THR_TASKS 32 
static struct thread_struct init_task = {0, NULL, THREAD_STATUS_RUNNING, 0, 0, 15, 15, {0}};
struct thread_struct *current = &init_task;
struct thread_struct *task[THR_TASKS] = {&init_task,};

5.函数封装

整个程序的主要函数及其功能如下:

  1. 通过调用thread_create函数(这个是自己写的函数,不是pthread_create)创建线程;
  2. 通过两种方式启动线程:
    • 调用函数detach实现线程的分离式启动(父线程不必等待子线程执行结束,可以继续执行);
    • 调用函数thread_join实现阻塞式启动(父线程等待该子线程结束后才能继续执行);
  3. 线程的状态转换:通过一系列的线程状态函数实现线程的状态转换
  4. 线程切换:线程的切换方式有两种:
    • 主动切换my_sleep:调用schedule切换到指定线程,并在一段时间后wakeup可调度;
    • 时钟中断切换do_timer:通过设置时钟中断,中断后执行schedule函数来完成;
  5. 线程调度pick:采用时间片轮转调度算法,根据线程优先级为每个线程设置时间片;

5.1 线程创建函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//线程创建函数
int thread_create(int *tid, void (*start_routine)()){
int id = -1; //初始化线程id
struct thread_struct *tsk = (struct thread_struct *)malloc(sizeof(struct thread_struct)); //为线程分配一个结构体
while (++id < THR_TASKS && task[id]); //在线程队列中寻找位置
if (id == THR_TASKS) //如果没有位置了则创建失败
return -1;
task[id] = tsk; //将线程结构体放在task线程队列中
if (tid) *tid = id; //将线程队列的索引号作为id号传给tid,便于之后传出

//初始化线程结构体
tsk->id = id; //设置线程id
tsk->thread_func = start_routine; //线程过程函数,对应之后会编写的fun1,fun2...
int *stack = tsk->stack; //线程运行栈
tsk->esp = (int)(stack + STACK_SIZE - 11); //获取esp栈顶指针
tsk->status = THREAD_RUNNING; //线程状态设置为RUNNING
tsk->wakeuptime = 0; //设置Wakeuptime
tsk->counter= 15; //时间片的单位不是纳秒、微秒或者毫秒,而是嘀嗒数,此处初始化为15个嘀嗒数
tsk->priority = 15; //设置线程优先级

//初始化线程运行栈,地址从低到高
stack[STACK_SIZE - 11] = 0; // eflags
stack[STACK_SIZE - 10] = 0; // eax
stack[STACK_SIZE - 9] = 0; // edx
stack[STACK_SIZE - 8] = 0; // ecx
stack[STACK_SIZE - 7] = 0; // ebx
stack[STACK_SIZE - 6] = 0; // esi
stack[STACK_SIZE - 5] = 0; // edi
stack[STACK_SIZE - 4] = 0; // old ebp
stack[STACK_SIZE - 3] = (int)start; // ret to start,线程第一次被调度的时候会在此启动
//start函数栈,刚开始进入start函数的运行栈环境
stack[STACK_SIZE - 2] = 100; // ret to unknown
stack[STACK_SIZE - 1] = (int)tsk; // start函数的参数,是一个线程结构体指针
return 0;
}

5.2 线程启动函数

1
2
3
4
5
6
7
8
9
//线程阻塞式启动
int thread_join(int tid){
while(task[tid]->status != THREAD_EXIT)//只要不是exit状态的线程,持续schedule
{
schedule();
}
free(task[tid]); //释放task队列相应位置内存
task[tid] = NULL; //task队列相应位置设置为NULL
}
1
2
3
4
5
6
7
//线程分离式启动
void detach(int tid){
if(task[tid]!=NULL && task[tid]->status==THREAD_STOP&& task[tid]->status!=THREAD_EXIT){
task[tid]->status=THREAD_RUNNING;
schedule();
}
}

5.3 线程调度函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//pick函数的任务是从task数组中挑选一个合适的线程并返回其线程结构体指针(这部分是参照的别人的写法)
//每次执行pick函数都会挑选一个时间片值最大的线程并返回
static struct thread_struct *pick()
{
int i, next, c;
for (i = 0; i < THR_TASKS; ++i) //首先需要检查每个线程的唤醒时间是否小于当前时间,如果是,则将该线程调整为可调度状态
{
if (task[i] && task[i]->status != THREAD_EXIT && getmstime() > task[i]->wakeuptime)
{
task[i]->status = THREAD_RUNNING;
}
}
while (1) //基于优先级的时间片轮转
{
c = -1;
next = 0;
for (i = 0; i < THR_TASKS; ++i)
{
if (!task[i])
continue;
if (task[i]->status == THREAD_RUNNING && task[i]->counter > c)
{
c = task[i]->counter;
next = i;
}
}
if (c)
break;
if (c == 0) // 如果所有的THREAD_RUNNING任务的时间片的值都是0,则重新调整时间片的值
{
for (i = 0; i < THR_TASKS; ++i)
{
if (task[i])
{
task[i]->counter = task[i]->priority + (task[i]->counter >> 1);
}
}
}
}
return task[next]; //返回选择的线程结构体
}

5.4 上下文切换函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
.section .text
.global switch_to_next
switch_to_next:
call closealarm
push %ebp
mov %esp,%ebp
push %edi
push %esi
push %ebx
push %edx
push %ecx
push %eax
pushfl

mov current,%eax
mov %esp,8(%eax)
mov 8(%ebp),%eax
mov %eax,current
mov 8(%eax),%esp

popfl
popl %eax
popl %edx
popl %ecx
popl %ebx
popl %esi
popl %edi

popl %ebp
call openalarm
ret

5.5 主动切换函数

1
2
3
4
5
6
7
//sleep函数,用于设置wakeuptime以及线程状态,主动将该线程切换(可以看作是对schedule主动切换的优化)
void mysleep(int seconds)
{
current->wakeuptime = getmstime() + 1000 * seconds; //设置当前线程的唤醒时间
current->status = THREAD_STATUS_SLEEP; //将线程置于休眠状态
schedule(); //调用schedule函数准备调度
}

5.6 时间中断切换函数

1
2
3
4
5
6
7
8
9
//基于时间中断的调度
//基本思想是系统在每个嘀嗒中产生一个时钟中断并进入时钟中断处理函数do_timer,在do_timer中让线程的时间片值减1直到0,执行schedule函数
static void do_timer() //do_timer在每个嘀嗒数中自动运行一次
{
if (--current->counter > 0)
return;
current->counter = 0;
schedule();
}

6.模块设计

6.1 main.h

头文件中主要包含了整个程序需要使用的外部头文件,宏定义了一些常量以及线程状态、线程结构体,同时声明了程序的一系列函数;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include <sys/time.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>

#define STACK_SIZE 1024 //最大栈空间
#define THR_TASKS 32 //最大线程个数

#define THREAD_STATUS_RUNNING 0 //线程处于可调度队列,暂时还没得到cpu资源
#define THREAD_STATUS_SLEEP 1 //线程睡眠,此时线程不可被cpu调度
#define THREAD_STATUS_READY 2 //线程正在运行
#define THREAD_STATUS_EXIT 3 //线程退出
#define THREAD_STATUS_STOP 4 //线程受到调控,停止运行
#define THREAD_STATUS_BLOCK 5 //线程因为等待某个时间阻塞,此时不可被cpu调度

struct thread_struct {
int id; //线程标识符
void (*thread_func)(); //指向线程过程函数的函数指针,用来记录线程执行的函数
int esp; //stack的栈顶指针
unsigned int wakeuptime; //线程唤醒时间
int status; //线程状态
int counter; //时间片数量
int priority; //线程优先级
int stack[STACK_SIZE]; //线程运行栈
};

void start(struct thread_struct *tsk); //start函数用于统一管理线程过程函数
int thread_create(int *tid, void (*start_routine)()); //线程创建函数
int thread_join(int tid); //线程阻塞式启动,父线程等待该子线程结束后才能继续执行
void detach(int tid); //线程分离式启动,父线程不必等待子线程执行结束,可以继续执行
static unsigned int getmstime(); //获取毫秒精度的时间
static struct thread_struct *pick(); //线程调度函数,基于时间片的调度
void closealarm(); //关中断函数
void openalarm(); //开中断函数
void schedule(); //调度切换函数,封装了pick以及switch_to_next函数
void mysleep(int seconds); //主动切换函数
static void do_timer(); //时间片中断切换函数
void switch_to_next(); //上下文切换函数
void wait_thread(); //线程阻塞函数,令线程进入阻塞状态 THREAD_STATUS_BLOCK

6.2 thread.c

线程源文件,主要包含线程创建、线程启动、线程状态切换等函数;

6.3 schedule.c

线程切换源文件,主要包含主动切换、时间片中断切换、线程调度等函数;

6.4 concurrency_test.c

线程并发性测试程序,同时创建多个线程,测试用户态线程的并发能力;

6.5 debug_test.c

线程切换测试程序,测试时打印出上下文切换过程中信息,主要提示切换的时机、切换前后线程的id、两个线程的状态;

6.6 switch.s

上下文切换函数,这部分使用汇编语言进行编写,主要涉及寄存器之间的值传递;

6.7 Makefile

Makefile文件,用于Linux下多文件编译命令make;

7.程序调试

7.1 前期准备

在Linux环境下我们不能简单地使用IDE来Build以及Debug代码,需要使用到make、gcc、gdb等工具,我们先简单介绍一下这几个工具;(注意咱们都是讨论的工具!不是命令,命令只是调用工具的方式)

gcc

关于gcc的介绍参考链接C语言 - Tintoki_blog (gintoki-jpg.github.io)

make

文章参考自Linux学习-自动化编译工具make_顾城沐心的博客-CSDN博客_make工具C语言 - Tintoki_blog (gintoki-jpg.github.io)

make工具通过一个称为makefile的文件来自动完成编译工作,使用make工具编译有如下好处:

  • 如果仅修改了某几个源文件,则只重新编译这几个源文件;

  • 如果某个头文件被修改了,则重新编译所有包含该头文件的源文件;

利用这种自动编译可大大简化开发工作,避免不必要的重新编译;

make的工作步骤如下:

  1. 在当前目录下寻找名称为Makefile的文件;
  2. 如果找到,它会找文件中的第一个目标文件(target)并把这个文件作为最终的目标文件;
  3. 如果main文件不存在,或是main所依赖的后面的 .o 文件的文件修改时间要比main这个文件新,那么,make就会执行后面所定义的命令来生成main这个文件;
  4. 如果main所依赖的.o文件也存在,那么make会在当前文件中找目标为.o文件的依赖性,如果找到则再根据那一个规则生成.o文件;
  5. make会生成 .o 文件,然后执行文件main(执行这一步应该需要编写了run命令才行);

gdb

文章参考自GDB是什么?_CodeAllen嵌入式编程的博客-CSDN博客_gdb是什么

程序错误主要有两种:

  • 语法错误
  • 逻辑错误

程序的语法错误可以借助编译器直接找出来,但是逻辑错误(编译可以通过但运行结果是错误的)只能靠自己纠正和发现,所以这就用到gdb调试工具了;

所谓调试(Debug),就是让代码一步一步慢慢执行,跟踪程序的运行过程。比如,可以让程序停在某个地方,查看当前所有变量的值,或者内存中的数据;也可以让程序一次只执行一条或者几条语句,看看程序到底执行了哪些代码;(很多人可能觉得说,我直接print或者cout不就好了?在有些情况下还真不能这么玩,况且gdb可以控制程序一个间隔一个间隔执行下去,cout你还能让程序停下来?)


gdb很好理解,关于gcc和make的区别到底有些什么,我们具体给出答案(文章参考自深入讲解GCC和Make的区别_cleverln的博客-CSDN博客_gcc make

gcc make
编译器 解释Makefile中指令的命令工具,依赖gcc(或别的编译器)来编译程序
编译一个文件 编译多个文件
简单来说gcc就是一个非常基本的、纯净的编译器 make可以从某个角度上来说包含了gcc

7.2 测试数据

7.2.1 并发性测试

首先编译得到main可执行文件

接着./main执行

执行过程部分图片

完整执行结果如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
thread_1's origin stack[11] is 0
thread_1's origin stack[10] is 0
thread_1's origin stack[9] is 0
thread_1's origin stack[8] is 0
thread_1's origin stack[7] is 0
thread_1's origin stack[6] is 0
thread_1's origin stack[5] is 0
thread_1's origin stack[4] is 0
thread_1's origin stack[3] is 134515133
thread_1's origin stack[2] is 100
thread_1's origin stack[1] is 138399752
<=======================>
Thread_ 1 has been created
thread_2's origin stack[11] is 0
thread_2's origin stack[10] is 0
thread_2's origin stack[9] is 0
thread_2's origin stack[8] is 0
thread_2's origin stack[7] is 0
thread_2's origin stack[6] is 0
thread_2's origin stack[5] is 0
thread_2's origin stack[4] is 0
thread_2's origin stack[3] is 134515133
thread_2's origin stack[2] is 100
thread_2's origin stack[1] is 138404912
<=======================>
Thread_ 2 has been created
thread_3's origin stack[11] is 0
thread_3's origin stack[10] is 0
thread_3's origin stack[9] is 0
thread_3's origin stack[8] is 0
thread_3's origin stack[7] is 0
thread_3's origin stack[6] is 0
thread_3's origin stack[5] is 0
thread_3's origin stack[4] is 0
thread_3's origin stack[3] is 134515133
thread_3's origin stack[2] is 100
thread_3's origin stack[1] is 138409040
<=======================>
Thread_ 3 has been created
thread_4's origin stack[11] is 0
thread_4's origin stack[10] is 0
thread_4's origin stack[9] is 0
thread_4's origin stack[8] is 0
thread_4's origin stack[7] is 0
thread_4's origin stack[6] is 0
thread_4's origin stack[5] is 0
thread_4's origin stack[4] is 0
thread_4's origin stack[3] is 134515133
thread_4's origin stack[2] is 100
thread_4's origin stack[1] is 138413168
<=======================>
Thread_ 4 has been created
<====================================>
Main thread is running
thread_1 is running----Time clock = 0.00
thread_2 is running----Time clock = 0.00
thread_3 is running----Time clock = 0.00
thread_4 is running----Time clock = 0.00
thread_4 is running----Time clock = 0.00
thread_4 is running----Time clock = 0.00
thread_4 is running----Time clock = 0.00
thread_4 is running----Time clock = 0.00
thread_4 is running----Time clock = 0.00
thread_4 is running----Time clock = 0.00
thread_4 is running----Time clock = 0.00
thread_4 is running----Time clock = 0.00
thread_4 is running----Time clock = 0.00
thread_4 is running----Time clock = 0.00
thread_4 is running----Time clock = 0.00
thread_4 is running----Time clock = 0.00
thread_4 is running----Time clock = 0.00
thread_4 is running----Time clock = 0.00
Thread_4 excited
Main thread is running
thread_2 is running----Time clock = 0.95
thread_1 is running----Time clock = 1.88
thread_2 is running----Time clock = 1.89
thread_2 is running----Time clock = 2.85
thread_1 is running----Time clock = 3.68
thread_2 is running----Time clock = 3.68
thread_3 is running----Time clock = 4.61
thread_2 is running----Time clock = 4.61
thread_1 is running----Time clock = 5.44
thread_2 is running----Time clock = 5.44
thread_2 is running----Time clock = 6.34
thread_1 is running----Time clock = 7.30
thread_2 is running----Time clock = 7.30
thread_2 is running----Time clock = 8.11
Thread_3 excited
thread_1 is running----Time clock = 9.09
Thread_2 excited
thread_1 is running----Time clock = 11.00
thread_1 is running----Time clock = 12.94
thread_1 is running----Time clock = 14.90
thread_1 is running----Time clock = 16.87
Thread_1 excited
Main thread is running

在concurrency_test.c源文件中我们分别创建了四个线程过程函数-fun1、fun2、fun3以及fun4,这四个函数分别赋予了不同的睡眠时间以及循环次数;

其中fun1要求循环输出10次running并在每个循环进入THREAD_STATUS_SLEEP,2s后wakeup并接受调度;

fun2要求循环输出10次running并在每个循环进入THREAD_STATUS_SLEEP,1s后wakeup接受调度;

fun3要求循环输出2次running并在每个循环进入THREAD_STATUS_SLEEP,5s后wakeup接受调度;

fun4要求循环输出15次running,每个循环并不要求进入睡眠状态;

main函数也就是我们的主线程在启动子线程的之前会输出一次running并进入3s的sleep,在子线程执行过程中main还会输出一次running(因为3s后main被唤醒),最后当所有子线程都执行完毕后,main执行最后一次printf(“Main thread is running\n”),接着return 0结束整个程序;

我们观察实验结果,首先四个线程都被成功创建,其stack运行栈中的数据除了stack[3]和stack[1]外都是默认初始化的数据,其中stack[1]是start函数的参数,是一个线程结构体指针,我们可以看到这四个指针地址均不相同,而stack[3]中存储的是start函数入口,因为线程第一次被启动的时候都会找到start并进入,图中四个线程的start函数入口均相同;至此,四个线程创建成功;

接着执行主线程中第一次while循环,输出”Main thread is running”,之后陷入THREAD_STATUS_SLEEP模式,接着我们采用的是线程阻塞启动子线程,这意味着主线程需要在子线程执行完毕后才能继续执行(这一点我们将在下面验证),四个子线程一次启动并各自执行自己的第一次printf,此时认为clock=0时刻,接着因为线程4没有任何措施,所以会直接将while循环中的所有printf执行完毕,随后将自己的STATUS设置为EXIT后退出主线程;此时恰好主线程wakeup(注意主线程是在四个子线程join之前sleep的,所以计时起点并不相同),pick函数选择了mian并schedule,输出主线程的第二次”Main thread is running”;随后thread_2被唤醒,对应clock=0.95,pick函数选择并将cpu切换给thread_2;当thread_2执行完毕后再次进入sleep,此时clock=1.88时刻,thread_1唤醒并接受调度…如此循环执行,根据线程执行时间(这里输出的只是小数点后两位,实际上可以更精确一点,至于为什么不是精确的相差1s或2s是因为在调用clock函数本身以及printf会耗费额外时间),我们可以粗略估算出下一个被唤醒并将接受调度的thread是哪个,这也印证了线程的并发性;当所有子线程执行完毕后,main最后一次输出”Main thread is running”并return 0结束程序;

7.2.2 线程切换测试

首先make得到main可执行文件

接着./main执行文件

执行过程部分图片

完整执行结果如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
thread_1's origin stack[11] is 0
thread_1's origin stack[10] is 0
thread_1's origin stack[9] is 0
thread_1's origin stack[8] is 0
thread_1's origin stack[7] is 0
thread_1's origin stack[6] is 0
thread_1's origin stack[5] is 0
thread_1's origin stack[4] is 0
thread_1's origin stack[3] is 134514653
thread_1's origin stack[2] is 100
thread_1's origin stack[1] is 134701064
<=======================>
Thread_ 1 has been created
thread_2's origin stack[11] is 0
thread_2's origin stack[10] is 0
thread_2's origin stack[9] is 0
thread_2's origin stack[8] is 0
thread_2's origin stack[7] is 0
thread_2's origin stack[6] is 0
thread_2's origin stack[5] is 0
thread_2's origin stack[4] is 0
thread_2's origin stack[3] is 134514653
thread_2's origin stack[2] is 100
thread_2's origin stack[1] is 134706224
<=======================>
Thread_ 2 has been created
<====================================>
<===There is schedule===>
the next thread is 1,the next esp is 134705144,it's counter is 15
stack[11] is 0
stack[10] is 0
stack[9] is 0
stack[8] is 0
stack[7] is 0
stack[6] is 0
stack[5] is 0
stack[4] is 0
stack[3] is 134514653
stack[2] is 100
stack[1] is 134701064
<=======================>
thread_ 1 is running----it's status is 0
<===There is schedule===>
the next thread is 2,the next esp is 134710304,it's counter is 15
stack[11] is 0
stack[10] is 0
stack[9] is 0
stack[8] is 0
stack[7] is 0
stack[6] is 0
stack[5] is 0
stack[4] is 0
stack[3] is 134514653
stack[2] is 100
stack[1] is 134706224
<=======================>
thread_ 2 is running----it's status is 0
<===There is schedule===>
the next thread is 1,the next esp is 134705020,it's counter is 29
stack[11] is 0
stack[10] is 1
stack[9] is 0
stack[8] is -474548736
stack[7] is 134705176
stack[6] is 134514667
stack[5] is 0
stack[4] is 134516475
stack[3] is 0
stack[2] is 100
stack[1] is 134701064
<=======================>
thread_ 1 is running----it's status is 0
<===There is schedule===>
the next thread is 2,the next esp is 134710180,it's counter is 29
stack[11] is 0
stack[10] is 2
stack[9] is 0
stack[8] is -474548736
stack[7] is 134710336
stack[6] is 134514667
stack[5] is 0
stack[4] is 134516475
stack[3] is 0
stack[2] is 100
stack[1] is 134706224
<=======================>
thread_ 2 is running----it's status is 0
<===There is schedule===>
the next thread is 1,the next esp is 134705020,it's counter is 29
stack[11] is 0
stack[10] is 0
stack[9] is 0
stack[8] is -474548736
stack[7] is 134705176
stack[6] is 134514667
stack[5] is 0
stack[4] is 134516475
stack[3] is 0
stack[2] is 100
stack[1] is 134701064
<=======================>
Thread_1 excited
<===There is schedule===>
the next thread is 2,the next esp is 134710180,it's counter is 29
stack[11] is 0
stack[10] is 1
stack[9] is 0
stack[8] is -474548736
stack[7] is 134710336
stack[6] is 134514667
stack[5] is 0
stack[4] is 134516475
stack[3] is 0
stack[2] is 100
stack[1] is 134706224
<=======================>
thread_ 2 is running----it's status is 0
<===There is schedule===>
the next thread is 2,the next esp is 134710180,it's counter is 29
stack[11] is 0
stack[10] is 0
stack[9] is 0
stack[8] is -474548736
stack[7] is 134710336
stack[6] is 134514667
stack[5] is 0
stack[4] is 134516475
stack[3] is 0
stack[2] is 100
stack[1] is 134706224
<=======================>
Thread_2 excited

在debug_test.c中我们只创建了两个线程(避免线程过多后期验证困难),我们直接来看输出结果;

首先thread_1和thread_2分别被创建成功,stack运行栈也初始化完成,需要记住的特殊寄存器数值是”stack[3] is 134514653”以及”thread_1’s origin stack[1] is 134701064”和”thread_2’s origin stack[1] is 134706224”;

我们重新复习一下stack各个位存放的寄存器是什么

1
2
3
4
5
6
7
8
9
10
11
stack[STACK_SIZE - 11] = 0;         // eflags
stack[STACK_SIZE - 10] = 0; // eax
stack[STACK_SIZE - 9] = 0; // edx
stack[STACK_SIZE - 8] = 0; // ecx
stack[STACK_SIZE - 7] = 0; // ebx
stack[STACK_SIZE - 6] = 0; // esi
stack[STACK_SIZE - 5] = 0; // edi
stack[STACK_SIZE - 4] = 0; // old ebp
stack[STACK_SIZE - 3] = (int)start; // ret to start
stack[STACK_SIZE - 2] = 100; // ret to unknown
stack[STACK_SIZE - 1] = (int)tsk; // arg of start

从debug中可以获取的信息是thread_1的stack指针地址为”134705020”,thread_2的stack指针地址为”134710180”(stack栈顶指针保存在esp中,我们直接看esp的存放的内容即可);

首先调度的是thread_1,第一次调度thread_1可以看到它的stack中仍然是初始化的样子,接着将esp指向thread_2的stack栈顶,thread_2开始调度执行;

第二次调度thread_1可以看到

1
2
3
4
5
6
7
8
9
10
11
stack[11] is 0
stack[10] is 1
stack[9] is 0
stack[8] is -474548736
stack[7] is 134705176
stack[6] is 134514667
stack[5] is 0
stack[4] is 134516475
stack[3] is 0
stack[2] is 100
stack[1] is 134701064

我们观察到stack[3]中的值也就是start函数的入口被赋值为0,这是因为start只会在线程第一次调用的时候进入;当thread_1执行完毕并exit之后,thread_2独自按照自己的节奏执行并结束;

现在我们解释为什么运行过程中会出现寄存器的值改变的情况,如下是switch_to_next的汇编指令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
push %ebp
mov %esp, %ebp /* 更改栈帧,以便寻参 */
/* 保存现场 */
push %edi
push %esi
push %ebx
push %edx
push %ecx
push %eax
pushfl
/* 准备切换栈 */
mov current, %eax /* 保存当前 esp, eax 是 current 基址 */
mov %esp, 8(%eax)
mov 8(%ebp), %eax /* 取下一个线程结构体基址*/
mov %eax, current /* 更新 current */
mov 8(%eax), %esp /* 切换到下一个线程的栈 */
/* 恢复现场, 到这里,已经进入另一个线程环境了,本质是 esp 改变 */
popfl
popl %eax
popl %edx
popl %ecx
popl %ebx
popl %esi
popl %edi
popl %ebp

最后要说明一点,一开始我们设置的counter都是15,为什么经过睡眠以后两个thread的counter都变成了29?这是因为在pick函数中我们重置counter值的时候,counter=priority+priority/2+priority/4+…+priority/2n+…=2p(约等于),也就是说陷入睡眠状态的线程在wakeup后优先级会变得很高,进而被pick优先选择;

7.2.3 分离式启动测试

前面一直用的阻塞式启动子线程,很多人可能不理解为什么要这样,假如换成分离式启动会有什么现象呢?我们将concurrency_test.c中的join改为detach,接着重新make并执行main文件

我们发现当主线程醒来并执行第二次printf之后,直接就跳到最后一次printf后结束整个程序,这就是detach的思想:主线程不必等待子线程执行完毕再向下执行,因此如果我们使用分离式启动的话会导致不能将所有的子线程执行信息输出,无法观察到完整的现象;

Lab_2

文章参考:

1.实验要求

用户态内存管理需要实现一个显式分配的动态内存分配器

  1. 实现内存管理基本机制,拥有malloc和free,remalloc三个api
    • 开局向操作系统申请一片内存,之后手动管理这片内存区域
    • malloc
      • 申请一块指定大小的内存区域,这个api参数有一个,表示内存区域大小
      • 如果分配失败了,返回null
      • 如果成功了,就返回内存区域的地址
      • 如果当前分配器的内存用完了,不需要向操作系统申请新的内存
    • free
      • 销毁指定大小的内存区域,这个api的参数只有一个,表示内存区域位置
      • free的时候需要完成空闲块的合并
    • realloc
      • 改变一块已经申请内存区域的大小,这个api的参数有两个,第一个是需要改变的内存区域的地址,第二个是新的内存区域需要的大小
      • 如果分配失败了,返回null,旧的内存区域不发生改变
      • 如果成功了,就返回内存区域的地址
    • 只需要单线程支持就可以
  2. 内存管理策略(以下三选一)
    • first fit,best fit,worst fit三种方法任选其一。此时采用的是链表的方法组织内存
    • 类似于Slab allocation,此时采用列表的方式组织内存
    • 类似于Buddy allocation,此时采用伙伴树的方法组织内存
  3. 和lab1配合
    • 在lab1生成的一个线程中申请一块内存区域,并free掉。注意,此时只要求单线程。
    • 用户态内存库向操作系统申请128KB大小的内存,在用户态线程中用循环依次申请1,2,4,8,16…字节大小的内存,直到没有空余内存,内存库应该处理这种情况,并打出“run out of memory”的log

2.开发日志

2022/10/20 20:24 首先说一下,关于内存管理这一块我始终还是没有理解上课教的(虚拟内存、分页分段等)和要实现的这个动态内存分配器有什么关系,一个是纯操作系统内部甚至更加底层关于硬件的解释,一个是让我们简单的管理虚拟内存,所以真的就是一开始其实我们不知道怎么下手是很正常的;简单来说Lab_2分为三个步骤,分别是手动实现三个基本内存管理API、三选一实现一个内存管理策略、与lab1配合;

3.前置知识点

malloc并不是操作系统提供的系统调用或C的关键字,malloc仅仅只是C标准库中提供的一个普通函数;

要实现一个类malloc函数函数并不难,但是相应的效率也比不上现有的C标准库(如glibc)提供的malloc函数;

3.1 动态内存管理

因为我们无法提前知道程序到底需要使用多少内存,只有在程序真正运行起来的时候才知道,因此我们需要使用动态内存分配(注意malloc只能开辟,调整大小是realloc的工作);

3.1.1 malloc函数

  • malloc函数会在内存的堆区申请一块空间给我们使用;

  • 因此我们需要传一个int类型的值给malloc函数,这个值代表我们需要的字节个数;

  • 同时malloc函数会返回一个void*类型的指针变量,我们需要将它强制类型转换成我们需要的指针变量;

例如我们需要一个int类型的长度为10的数组

1
int *pa=(int *)malloc(10 * sizeof(int));//当然可以直接写成40

开辟空间不一定是成功的,失败则malloc函数返回一个空指针,我们需要对返回指针做判断

1
2
3
4
if(NULL == pa){
perror(" Null pointer! ");//报错
return;
}
  • malloc分配的内存大小至少为size参数所指定的字节数;
  • malloc的返回值是一个指针,指向一段可用内存的起始地址;
  • 多次调用malloc所分配的地址不能有重叠部分,除非某次malloc所分配的地址被释放掉;
  • malloc应该尽快完成内存分配并返回(不能使用NP-hard的内存分配算法);
  • 实现malloc时应同时实现内存大小调整和内存释放函数(即realloc和free);

3.1.2 realloc函数

如果在使用堆区的空间时,申请的空间大小是固定的,则这些空间实际上和栈区的空间没什么区别,realloc函数是整个动态内存管理的关键;

  • realloc函数可以对我们之前用malloc函数或calloc函数申请的空间进行扩大或缩小;

  • realloc函数需要两个参数,第一个是之前指向空间的指针变量(注:这个指针变量只能指向空间的首地址,如果传入其他指针,如数组第2个元素的地址,就会报错,因此,之前用来接收的指针变量不能轻易修改),第二个变量是修改后的空间大小;

  • realloc函数返回的也是void*类型的变量,需要进行强制类型转换;

注意,当使用realloc修改完空间大小之后,数据可能就不在原来的区域上(因为很可能在原始地址上扩容会与其他空间发生冲突),因此我们需要对指向这块空间的指针变量pa进行重新赋值;

当然realloc函数也可能返回空指针,因此我们需要创建一个临时变量来存储返回的地址,先对其进行判断,如果不是空指针则将它交付给指针变量;

1
2
3
4
5
6
int* tmp = (int*)realloc(pa, 20 * sizeodf(int));//将上述空间扩容为20个
if(NULL = tmp){
perror(" ");
return;
}
pa = tmp;

realloc函数第一个参数可以传空指针,如果这么做,realloc函数作用与malloc函数相同;

3.1.3 free函数

free函数用于释放我们申请的空间,只需要将空间的首地址传给free函数即可完成释放;

1
free(pa);

注意,这里我们只是将这块空间释放了(简单理解就是解除了和pa指针的绑定),然而此时我们的pa指针仍然指向一块未知的区域,这是非常危险的,我们需要将该指针也设置为空

1
pa=NULL;

1.free函数可以传空指针,如果传空指针,free函数就什么都不做;

2.free函数只能传由malloc和calloc开辟出的空间的首地址,传其他地址会报错;


Q:到这里可能很多人会和我有一样的疑惑,C++中的new和delete为什么没有对应realloc的函数呢?意思是C++不能给申请的自由存储区扩容?

A:参考回答(1条消息) new的空间能否用realloc扩容?_SseakomS_HK的博客-CSDN博客怎么样扩大new分配的内存?-CSDN社区

首先,千万不要尝试用realloc给new的空间扩容,对于内置类型来说可能不会出错,但是自定义类型一定会出问题;

在C++中假如尝试扩容可以选择重新new一块空间然后将数据复制过去,也可以使用指针直接在原有基础上new一部分空间,最好的方法还是使用容器这种可以自动管理内存的类;


3.2 原理剖析

我们可以认为程序运行起来后在内存中是这样的,内存的动态申请和释放都发生在堆区;

从内存分配器的角度看,不论是整数、浮点数或者链表、结构体等,通通被看作是存放在内存块中的数据,这些内存块中用于存放原生的字节序列;申请者拿到内存块之后可以塑造成各种各样的数据结构以及对象,但这与内存分配器毫无关系;

Lab_2实际上要实现的就是对堆区进行管理,主要提供三个API:malloc、free以及realloc函数;

内存从申请到释放中有以下细节需要注意;

细节1:申请内存时我们如何知道哪些内存块是空闲的?

这意味着我们需要将内存块使用某种方式组织起来,这样才能追踪到每一块内存的分配状态;

细节2:在一次内存申请时可能有多个空闲内存块满足要求,应该选择哪一个空闲内存块分配给用户呢?

细节3:假如我们选择了32字节的空闲内存块,而用户实际只需要16个字节,剩下的字节该如何处理?

细节4:当分配给用户的内存使用完毕并归还给我们后,应当如何处理用户归还的内存?

3.2.1 管理空闲内存块

(1)空闲内存块结构

这部分核心是需要某种方法区分空闲内存块和已分配内存块;

因为无法将空闲内存块的信息保存在其他地方(因为将这些信息保存在其他数据结构中意味着首先就要申请内存,这就成了一个鸡生蛋的问题),所以只能将维护内存块的分配信息保存在内存块本身中;

为了维护内存块分配状态,我们需要以下信息:

  • 一个标记,用于标识该内存块是否空闲;
  • 一个数字,用于记录该内存块的大小;

注意,在接下来的讨论中我们不可避免的会加一些限制,这些限制是为了方便理解,实际上malloc并不会存在这些限制;

方便起见,假设我们的内存分配器不对内存对齐有要求,且允许一次内存申请的最大内存块为2G,这意味着可以使用31个bit记录内存块的大小,剩下一个bit标识该内存块空闲/已分配;

我们将上述32bit作为header,用于存储块信息,剩下的真正可以用于分配给用户的内存部分被称作负载payload,调用malloc返回的内存起始地址就是负载的起始地址;

(2)跟踪内存分配状态

我们知道每一个内存块的结构后,就可以组织整块堆区进行内存的分配与释放

上图中,每个方框代表4字节,红色区域表示已经被分配出去,灰色区域表示该内存块空闲;

最后一个内存块比较特殊,0/1这个方框的作用就是作为一个特殊标记告知内存分配器已经到达堆区末尾;

3.2.2 选择空闲内存块

当有多个空闲内存块满足要求,这就涉及分配策略;

(1)First Fit

最简单的就是每次从头开始找起,找到第一个满足要求的就返回;

这种方法的优势在于简单,但该策略总是从前面的空闲块找起,因此很容易在堆区前半部分因分配出内存留下很多小的内存块,因此下一次内存申请搜索的空闲块数量将会越来越多;

(2)Next Fit

从上一次找到合适的空闲内存块的位置找起,因为上一次找到某个合适的内存块的地方很有可能剩下的内存块能满足接下来的内存分配请求;

由于不需要从头开始搜索,因此Next Fit将远快于First Fit;

然而也有研究表明Next Fit方法内存使用率不及First Fit,也就是同样的停车场面积,First Fit方法能停更多的车;

(3)Best Fit

Best Fit算法会找到所有的空闲内存块,然后将所有满足要求的并且大小为最小的那个空闲内存块返回,这样的空闲内存块才是最Best的,因此被称为Best Fit;

Best Fit最大的缺点就是分配内存时需要遍历堆上所有的空闲内存块,在速度上显然不及前面两种方法;

3.2.3 分配内存

现在我们来讨论这样的情况,用户申请12字节的内存,我们选择了32字节的空闲内存块,我们是将整块32字节的空闲内存块标记为已分配?这样当然速度很快,但是会造成内存碎片(指该内存块剩下的空间将无法被利用);

一种解决方法是将空闲内存块进行划分,将前部分设置为已划分并返回给内存申请者使用,后部分划分为一块新的空闲内存块;

3.2.4 释放内存

释放内存可不像free看上去那么简单,要考虑到的关键一点就在于,与被释放的内存块相邻的内存块可能也是空闲的。如果释放一块内存后我们仅仅简单的将其标志位置为空闲,那么可能会出现下面的场景:

从图中我们可以看到,被释放内存的下一个内存块也是空闲的,如果我们仅仅将这16个字节的内存块标记为空闲的话,那么当下一次申请20字节时图中的这两个内存块都不能满足要求,尽管这两个空闲内存块的总数要超过20字节;

一种更好的方法是当应用程序向我们的malloc释放内存时,我们查看一下相邻的内存块是否是空闲的,如果是空闲的话我们需要合并空闲内存块;

在这里我们又面临一个新的决策,那就是释放内存时我们要立即去检查能否够合并相邻空闲内存块吗?还是说我们可以推迟一段时间,推迟到下一次分配内存找不到满足要的空闲内存块时再合并相邻空闲内存块:

  • 释放内存时立即合并空闲内存块相对简单,但每次释放内存时将引入合并内存块的开销,However,这种策略最为简单;
  • 实际使用的内存分配器都会有某种推迟合并空闲内存块的策略;

当然合并内存块的故事并没有结束,考虑如下情况:

使用的内存块其前和其后都是空闲的,在当前的设计中我们可以很容易的知道后一个内存块是空闲的,因为我们只需要从当前位置向下移动16字节就是下一个内存块,但我们怎么能知道上一个内存块是不是空闲的呢?

我们之所以能向后跳是因为当前内存块的大小是知道的,那么我们该怎么向前跳找到上一个内存块呢?

思路很简单,类比于header,在内存块的末尾加上一个footer,header和footer的内容是一样的,因为上一内存块的footer和下一个内存块的header是相邻的,这样当我们释放内存时就可以快速的进行相邻空闲内存块的合并;

上面的实现思路都是基于数组,经过讨论我们也发现了数组并不是很适合,所以下面我们会将block的组织方式变为链表实现;

3.3 Linux内存简介

3.3.1 heap内存模型

OS课程中已经介绍过,malloc申请的内存主要是从heap区域分配,而进程面对的是虚拟内存地址空间,这意味着只有将虚拟内存地址/页映射到真实的物理内存地址/页框才能真正使用该虚拟内存;然而受到实际物理存储容量的限制,整个堆虚拟空间是不可能全部映射到实际的物理内存的,Linux对堆虚拟空间的管理如下

简单来说,Linux维护着一个break指针,这个指针指向堆空间的某个地址:

  • 从堆起始地址到break之间的虚拟地址空间为映射好的,可以供进程访问;

  • 从break往上,是未映射的虚拟地址空间,如果访问这段空间则程序会报错;

Linux通过brk和sbrk系统调用来操作break指针,这两个系统调用的原型如下(如果是严格按照实验要求来实现,这两个系统调用也需要重写):

1
2
int brk(void *addr);
void *sbrk(intptr_t increment);

brk将break指针直接设置为某个地址,brk在执行成功时返回0,否则返回-1并设置errno为ENOMEM;

sbrk将break从当前位置移动increment所指定的增量,sbrk成功时返回break移动之前所指向的地址,否则返回(void *)-1;(如果将increment设置为0,则可以获得当前break的地址)

PS:由于Linux是按页进行内存映射的,所以如果break被设置为没有按页大小对齐,系统实际上会在最后映射一个完整的页以实现页对齐,这将导致实际已映射的内存空间比break指向的地方要大一些,有人可能有疑惑说那这样break之后的一小部分不就合法可以使用了吗?理论上来说的确是可以的,但是使用break之后的地址是很危险的一件事,不要尝试!

3.3.2 资源限制

进程是系统资源分配最小的单位,但系统对进程分配的资源并不是无限的,每个进程都有一个rlimit表示当前进程可用的资源的上限,rlimit是一个结构体

1
2
3
4
5
6
struct rlimit {
rlim_t rlim_cur; /* Soft limit 软限制*/
rlim_t rlim_max; /* Hard limit 硬限制*/
};
//可以通过setrlimit对rlimit进行有条件设置;
//硬限制作为软限制的上限,非特权进程只能设置软限制并且不能超过硬限制;

可以通过系统调用getrlimit得到

1
2
3
4
5
6
//获取当前进程虚拟内存空间的rlimit
int main() {
struct rlimit *limit = (struct rlimit *)malloc(sizeof(struct rlimit));
getrlimit(RLIMIT_AS, limit);
printf("soft limit: %ld, hard limit: %ld\n", limit->rlim_cur, limit->rlim_max);
}

4.数据结构设计

4.1 block块

将堆区内存空间以block的方式组织,每个block由meta区和data区组成:

  • meta区域记录了block的元信息(数据区大小、空闲标志位、指针等);
  • data区是真实分配的内存区域,data区的第一个字节地址为malloc返回的地址;

还有一点需要注意的是无论是data区还是meta区其长度都需要是8字节的整数倍,因为计算机每次读取数据都按照8字节读取;

双向链表的设计是为了实现在free的时候便于和相邻的空闲block合并;

下面给出block对应的数据结构图示和代码

1
2
3
4
5
6
7
8
9
struct s_block{
size_t data_size;//data区域大小,32位系统中size_t是4字节
int is_free; //block是否空闲标记,大小为4字节,0表示已分配,1表示空闲
t_block pre; //采用双向链表组织block,每个block分别有指向前一个block和指向后一个block的指针,32位的编译器指针为4字节
t_block next;
void *malloc_ptr;//该指针指向malloc函数返回的地址,void*在32位系统占4字节
int meta_fill; //填充meta区域使之长度为8的倍数
char data[1]; //虚拟字段,表示data区的第一个字节,虚拟字段不占用字节
};

4.2 block链表

我们将多个block块以双向链表的方式组织,便于管理和创建,下面是图示

5.函数封装

5.1 malloc函数

5.1.1 First Fit算法

我们将block组织成链表的形式,现在考虑如何在block链中查找并选择合适的空闲block,这里我们采用First Fit算法

1
2
3
4
5
6
7
8
9
10
11
12
13
t_block first_find_block(t_block *last_block,size_t data_size){
t_block current_block = (t_block)first_block; //temp指针从first_block开始寻找
while(current_block && !(current_block->is_free && current_block->data_size >= data_size)){ //终止条件为找到free块且data_size大于等于申请的data_size
*last_block = current_block; //last_block指向block链表中的最后一个block,只有在开辟新block的时候会使用
if(current_block->next == NULL){
return NULL; //假如遍历完所有block后还是未能找到合适的block,则返回NULL
}
else{
current_block = current_block->next;
}
}
return current_block;
}

5.1.2 开辟block

当现有的空闲block都不满足用户申请的size要求,就需要在block链最后开辟一个新的block,这里借助了系统调用sbrk

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
t_block add_block(t_block last_block,size_t data_size){
t_block new_block;
t_block break_add = (t_block)heap_sbrk(0); //获取当前break指针的地址
if(heap_sbrk((size_t)(BLOCK_SIZE+data_size))==NULL){
return NULL; //使用sbrk开辟新块,并判断是否开辟成功,失败则返回NULL
}
new_block = break_add; //配置新块的信息
new_block->data_size = data_size;
new_block->next = NULL;
new_block->is_free = 1;
if(last_block){ //last_block实际上可能是NULL,需要先判断
last_block->next = new_block;
new_block->pre = last_block;
}
return new_block;
}

5.1.3 分裂block

为了提高payload,应该在剩余data区足够大的情况下(我们规定剩余空间至少有BLOCK_SIZE + 8才执行分裂操作,因为默认8字节对齐)将其分裂为新的block;

1
2
3
4
5
6
7
8
9
void break_block(t_block current_block,size_t data_size){
t_block new_block = (t_block)(current_block->data+data_size); //block指针跨过前一个块的整个data区
new_block->data_size = current_block->data_size-data_size-BLOCK_SIZE; //设置新的data区大小(注意size是不包含meta区的)
new_block->is_free = 1;
new_block->pre = current_block;
new_block->next = current_block ->next;
current_block->next = new_block;
current_block->data_size = data_size;
}

5.1.4 字节对齐

我们希望malloc分配的数据区是按8字节对齐,所以在size不为8的倍数时,我们需要将size调整为大于size的最小的8的倍数;

1
2
3
4
5
size_t align8(size_t s) {
if(s & 0x7 == 0)
return s;
return ((s >> 3) + 1) << 3;
}

5.1.5 实现malloc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void *malloc(size_t data_size){
t_block current_block,last_block;
size_t align_size = align_8(data_size);
if(first_block){ //1.如果链表不为空,也就是已经在堆区开辟了部分空间,等价于firstblock不为空
last_block = (t_block)first_block; //初始化last_block指针
current_block = first_find_block(&last_block,align_size);//使用first fit算法
if(current_block){ //1.1 假如找到合适的block块,判断是否可以进行分裂
if((current_block->data_size-align_size)>=(BLOCK_SIZE+8)){
break_block(current_block,align_size);
}
current_block->is_free = 0;
}
else{ //1.2 假如没有找到合适的block块则需要申请
current_block = add_block(last_block,align_size);
if(current_block==0){ //假如申请失败则返回NULL
return NULL;
}
}
}
else{ //2.如果链表是空的,那么就把该block作为链表的头部
current_block = add_block(NULL,align_size);
if(current_block==0){
return NULL;
}
first_block = current_block;
}
current_block->malloc_ptr = current_block->data;
return current_block->data; //返回data区的首地址
}

5.2 free函数

free需要解决两个关键的问题:

  1. 如何验证所传入的地址是有效地址,即确实是通过malloc方式分配的数据区首地址
    • 地址应该在之前malloc所分配的区域内,即在first_block和当前break指针范围内 —— 进行地址比较
    • 这个地址确实是之前通过我们自己的malloc分配的 —— magic pointer(s_block->data指向的是data区首地址,magic pointer指向的也是data区首地址??在一般情况下两者是等价的,但是在中途如果分裂了block则会有两个s_block->data但是仍然只有一个magic pointer)
  2. 如何解决碎片问题

5.2.1 获取meta地址

该函数的作用是通过传入的data区首地址反向获取对应的meta去首地址,进而获得其malloc_ptr

1
2
3
4
5
6
t_block get_meta_add(void *data_add){
data_add = data_add - BLOCK_SIZE;
//printf("%p\n",data_add);
return (t_block)data_add;
//return (data_add = temp -= BLOCK_SIZE);
}

5.2.2 判断data地址

通过判断用户输入的data区首地址和实际malloc分配的data区首地址是否相等来决定是否free

1
2
3
4
5
6
7
8
9
10
11
12
int judge_add(void *data_add){
if(first_block){
if(data_add>first_block && data_add<heap_sbrk(0)){ //如果传递进来的地址在堆的可映射合法空间
//printf("com suc");
//printf("%p\n",data_add);
//printf("%p\n",get_meta_add(data_add)->malloc_ptr);
return data_add == get_meta_add(data_add)->malloc_ptr; //查看传进来的地址和malloc申请空间时设置的数据区地址是否相等
}
}
//printf("com fail");
return 0;
}

5.2.3 合并block

free的block块其前后若有同样free的块,我们进行合并操作,这样可以避免无法满足一些较大的申请块的申请;

1
2
3
4
5
6
7
8
9
10
t_block merge_block(t_block data_add){
if(data_add->next && data_add->next->is_free){ //如果要释放的一块的后面有一个已经分配过的块,但还是空闲的(我们这里实现的是向后合并)
data_add->data_size = data_add->data_size+BLOCK_SIZE+data_add->next->data_size;
data_add->next = data_add->next->next;
if(data_add->next){ //此时的data_add->next已经是第三个块了
data_add->next->pre = data_add;
}
}
return data_add;
}

5.2.4 实现free

free的思路如下:

  • 首先检查参数地址的合法性;如果不合法则不做任何事;否则,将此block的free标为1,并且在可以的情况下与后面的block进行合并;
    • 如果当前是链表的最后一个block,则回退break指针释放进程内存(修改break的值);
    • 如果当前block是最后一个block,则回退break指针并设置first_block为NULL;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void free(void *data_add){
t_block ptr;
if(judge_add(data_add)){
//printf("free judge suc\n");
ptr = get_meta_add(data_add);
ptr->is_free = 1;
if(ptr->pre && ptr->pre->is_free){ //1.假如前一个block存在,且是free的,则将前一个block和当前block合并
ptr = merge_block(ptr->pre);
}
else if(ptr->next){ //2.假如后一个block存在,则使用merge_block判断并合并
merge_block(ptr);
}
else{
if(ptr->pre){ //3.这种情况是尽管前一个block存在,但是并不是free的
ptr->pre->next = NULL; //将指针置NULL即可,不要合并
}
else{ //4.这种情况是这个block是整个堆空间最后一个还未被free的block
first_block = NULL;
}
heap_brk(ptr); //之所以将3和4情况合并是因为这两种情况下都需要将最后的break指针重新定位
}
}
else{
printf("内存释放失败!");
}
}

5.3 realloc函数

5.3.1 复制数据

为了实现realloc,首先要实现一个内存复制方法,为了效率,以8字节为单位进行复制(通过新建一个size_t指针,将内存区域强制看做size_t类型来实现);

之所以要实现这个函数是为了应对现有的block不能满足扩容需求而新申请block的需求;

1
2
3
4
5
6
7
8
void copy_data(t_block old_block,t_block new_block){
size_t *old_data, *new_data;
size_t i;
old_data = (size_t *)old_block->malloc_ptr;
new_data = (size_t *)new_block->malloc_ptr;
for(i = 0; (i * 8) < old_block->data_size && (i * 8) < new_block->data_size; i++)
new_data[i] = old_data[i];
}

5.3.2 实现realloc

实现realloc可以直接无脑malloc新的内存再将数据复制过去(类似于C++中的一种扩容方法),更加高效的方法是考虑如下情况:

  • 如果当前block的数据区大于等于realloc所要求的size,则不做任何操作
  • 如果新的size变小了,考虑split
  • 如果当前block的数据区不能满足size,但是其后继block是free的,并且合并后可以满足,则考虑做合并
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
void *realloc(void *data_add,size_t data_size){
size_t align_size;
t_block new_block,old_block;
void *new_ptr;
if(!data_add){ //1.如果首地址都是空的直接使用malloc申请一个新的block
return malloc(data_size);
}
if(judge_add(data_add)){ //与free同理,relloc只能在malloc返回的地址上进行操作
//printf("judge suc");
align_size = align_8(data_size);
old_block = get_meta_add(data_add);
if(old_block->data_size > align_size){ //2.如果原有的空间已经大于需要重新分配的空间,意味着需要缩小空间
if(old_block->data_size-align_size >= (BLOCK_SIZE+8)){
break_block(old_block,align_size);
}
}

else{ //3.扩容
if(old_block->next && old_block->is_free){ //3.1 检查后面的block是否能满足要求
if((old_block->data_size+BLOCK_SIZE+old_block->next->data_size)>=align_size){//如果加上后面的空间满足要求大小
merge_block(old_block);
if(old_block->data_size-align_size>=(BLOCK_SIZE+8)){ //在满足自身要求的同时还有可以分裂的空间
break_block(old_block,align_size);
}
}
}
else{ //3.2 后面的block不能满足要求,则只能重新malloc区域然后赋值
new_ptr = malloc(data_size);
if(!new_ptr){
//printf("first fail");
return NULL;
}
new_block = get_meta_add(new_ptr);
copy_data(old_block,new_block);
free(old_block);
return new_ptr;
}
}
return data_add;
}
//printf("last fail");
return NULL;
}

5.4 系统调用

Lab_2是有一个限制在题干中的,要求开局向操作系统申请一片内存,之后手动管理这片内存区域,这就意味着实际上是要求我们做出如下模型的

相信很多人也发现了在上面我们实现malloc、free以及realloc函数的时候使用的并不是C库提供的sbrk函数和brk函数,而是自定义的heap_sbrk和heap_brk,该模型的实现原理也很简单,因为是在mapped region中操作,因此不需要额外的系统调用,我们的目的主要分为三个:

  • 开局申请一块mapped region;
  • 实现heap_brk函数,功能是直接设置heap_break指针为某个合法地址;
  • 实现heap_sbrk函数,功能是将当前heap_break从当前位置移动increment的增量并返回原有heap_break地址;

根据C对brk以及sbrk系统调用的描述,以及我们自定义的heap_break指针,我们可以较容易的实现,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void set_heap(size_t heap_size){        //开局申请一块内存用于我们自由管理
printf("set_heap is running...\n");
heap_start = (t_block)sbrk(0);
printf("heap_start is %p\n",heap_start);
heap_break = heap_start; //初始化全局变量heap_break
if(sbrk(heap_size)==(void*)-1){
printf("Error:set_heap fail!\n");
return;
}
else{
printf("set_heap suc!\n");
heap_end = (t_block)sbrk(0);
printf("heap_end is %p\n",heap_end);
}
}
1
2
3
4
5
6
7
8
9
int heap_brk(void *heap_addr){    //类brk函数实现
if(heap_start<=(t_block)heap_addr&&(t_block)heap_addr<=(t_block)heap_end){
heap_break = heap_addr;
return 0;
}
else{
return -1;
}
}
1
2
3
4
5
6
7
8
9
void *heap_sbrk(size_t heap_inc){ //类sbrk函数实现
if(heap_start<=(t_block)((size_t *)heap_break+heap_inc)&&(t_block)((size_t *)heap_break+heap_inc)<=(t_block)heap_end){
heap_break = (t_block)((size_t *)heap_break+heap_inc);
return (t_block)((size_t*)heap_break-heap_inc);
}
else{
return NULL;
}
}

6.模块设计

除了原有Lab_1的文件,我们新增了两个源文件和一个头文件,整体文件树结构如下

Dmm.c中包含了malloc函数、free函数以及realloc函数的实现;

Sys_call.c包含了类系统调用三个函数的实现;

Dmm.h中声明了所需头文件以及block数据结构以及函数声明;

7.程序测试

Lab_2主要分为两个测试:

  • 在lab1生成的一个线程中申请一块内存区域,并free掉。注意,此时只要求单线程。
  • 用户态内存库向操作系统申请128KB大小的内存,在用户态线程中用循环依次申请1,2,4,8,16…字节大小的内存,直到没有空余内存

7.1 基本函数测试

首先我们单线程测试malloc、realloc以及free函数的正确性,编译test2.c以及相关文件

其中test2.c内容如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include"Dmm.h"

int main()
{
set_heap(100*sizeof(int));
//printf("main is running...\n"); //if we add this annotate,our program will fail
int* p = (int*)malloc(10 * sizeof(int));
if (p == NULL)
{
printf("内存开辟失败!\n");
}
else
{
printf("内存开辟成功,请依次输入数组数据:\n");
for(int i = 0; i <= 4; i++){
scanf("%d",&p[i]);}
for(int i = 0; i <= 4; i++){
printf("p[%d] = %d\n", i,p[i]);}

int *ptr = (int*)realloc(p,5*sizeof(int));
if(ptr!=NULL){
ptr=p;
p=NULL;
printf("内存扩容成功!\n");
for(int i = 0; i <= 4; i++){
printf("ptr[%d] = %d\n", i,ptr[i]);}

}
else{
printf("内存扩容失败!\n");
}
free(ptr);
ptr=NULL;
printf("内存释放成功!\n");
}
return 0;

}

原理很简单,当然我们也可以使用C库提供的malloc、realloc以及free函数验证;

首先我们全局申请一块较大的内存用于自由管理,接着使用malloc为长度为10的数组申请一块空间,分别输入数组元素,接着我们将数组的长度realloc为5(注意realloc并不是必须要扩大容量,缩小容量同样是可以的),此时我们为了避免指针原本指向的区域变动引起错误,将ptr指针指向的地址赋值给p指针并释放ptr指针,最后释放p指针指向的区域并令p指针为NULL;

程序运行结果如下,我们的数组申请成功后分别输入2,4,5,67以及8,在realloc之后发现数据仍然正确,接着我们将其free掉;

7.2 线程申请测试

我们申请128KB大小的内存,使用一个循环依次申请内存直到没有空余内存,test1.c源文件如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include"Dmm.h"
#include"main.h"
extern struct thread_struct *current;
extern struct thread_struct *task[THR_TASKS];
//In this program,we test thread rather than scanf(beacause it has something wrong...)

void fun1()
{
int i = 1;
while (1)
{
int* p = (int*)malloc(i * sizeof(char));
if (p == NULL)
{
printf("ran out of memory!\n");
return;
}
else
{
printf("malloc %d Byte succeed!\n",i);
}
i=i*2;
mysleep(1);
}
}

int main()
{
set_heap(32768*sizeof(int));//we should note that our thread also need heap memory,so we can't calculate memory presicely
//printf("main is running...\n"); //if we add this annotate,our program will fail

int tid1;
thread_create(&tid1, fun1);
printf("Thread_ %d has been created\n", tid1);
thread_join(tid1);

return 0;

}

基本原理非常简单,首先在main主函数中申请一块内存,接着创建线程1并使用while循环一直申请内存直到没有剩余内存,运行结果如下

  • 需要注意的是我们创建的线程同样是占用了开局申请的内存中的空间的;
  • 同时开局申请的内存大小是以4Byte为单位,与128KB进行换算的时候需要注意;

Lab_3

1.实验要求

在lab1中,我们拥有了用户态线程库,但是驱动我们进行线程管理的动力是我们在线程里面显示调用schedule函数,而操作系统往往是采用时钟tick来管理线程,在这个lab中,我们会让大家首先实现tick机制,然后继承多种调度策略

  1. 实现定时器的机制,作为tick和给线程提供定时能力的基础;
    • 定时器结构体需要自己手写,不能采用现成的定时器,表示时间相关的结构体可以采用现成的库;
    • 定时器的相关接口最少需要有create_timer、start_timer、stop_timer、delete_timer的api接口;
    • 需要借助外部机制通知定时器到期,但是不能借助外部机制完成处理定时器handler、组织管理定时器的任务;
    • 组织定时器队列的数据结构、定时器的类型、定时器的时钟源不做要求;
    • 需要考虑定时器过期的情况;
  2. 在定时器的基础上产生tick作为驱动线程调度的动力,并实现相关调度策略
    • 采用tick机制改写lab1中的多线程管理,不在线程里面调用schedule机制;
    • tick的间隔时长自定义;
    • 实现FIFO,RR,彩票调度算法;
    • 注意在关键的代码段,防止时钟进入;

2.开发日志

2022/11/14 22:58 本质上我们的Lab_1早就实现了tick,所以我们这里只需要简单的复习一下然后再补充一点相关的还没有实现的功能进去即可;

2022/11/15 10:47 经过一上午的头脑风暴和资料搜索,我差不多是明白了我们这个实验应该做什么,首先这个实验并不是看起来的那么简单,主要分为两部分,实现定时器和实现三个调度算法,实现定时器的完整代码和demo我们已经找到并且能够成功运行,实现FIFO以及RR的算法是内置已经设计好了的,我们只需要手动参考彩票算法的讲解简单实现彩票算法即可;

2022/11/23 19:01 现在的问题就差如何实现FIFO和彩票算法了,我是真的不想花时间搞这个了,所以到时候直接参考一下同学的代码吧;

3.前置知识点

3.1 定时器的实现

文章参考:

3.1.1 setitimer概述

在linux c编程中,setitimer是一个比较常用的函数,可用来实现延时和定时的功能,因此我们这里考虑使用setitimer函数来实现一个简单的定时器;

首先我们简单介绍一下setitimer这个函数,在使用的时候需要引入头文件

1
#include <sys/time.h>

setitimer函数原型如下

1
int setitimer(int which, const struct itimerval *new_value,struct itimerval *old_value);

其中which参数表示类型,可选的值有:

  • ITIMER_REAL:以系统真实的时间来计算,它送出SIGALRM信号;

  • ITIMER_VIRTUAL:以该进程在用户态下花费的时间来计算,它送出SIGVTALRM信号;

  • ITIMER_PROF:以该进程在用户态下和内核态下所费的时间来计算,它送出SIGPROF信号;

new_value和old_value均为itimerval结构体,itimerval结构体定义如下,itimeval由两个timeval结构体组成

1
2
3
4
struct itimerval {
struct timeval it_interval; /* next value */
struct timeval it_value; /* current value */
};

timeval结构体包含tv_sec和tv_usec两部分,其中tv_se为秒,tv_usec为微秒

1
2
3
4
struct timeval {
time_t tv_sec; /* seconds */
suseconds_t tv_usec; /* microseconds */
};

参数总结如下:

  • new_value参数用来对计时器进行设置;
  • it_interval为计时间隔;
  • it_value为延时时长;
  • old_value参数,通常用不上,设置为NULL,它是用来存储上一次setitimer调用时设置的new_value值;

settimer工作机制是,先对it_value倒计时,当it_value为零时触发信号,然后重置为it_interval,继续对it_value倒计时,一直这样循环下去;基于此机制,setitimer既可以用来延时执行,也可定时执行;

需要注意的是若it_value为0是不会触发信号的,所以要能触发信号,it_value需要设置为大于0;如果it_interval为零,只会延时,不会定时(也就是说只会触发一次信号);

3.1.2 定时器概述

无论是用户态程序还是内核态程序的开发,大多数时候都需要有定时器的支持,定时器属于程序开发中最基本的组件之一,一般按照使用场景分为如下两种类型:

1.Single-Shot Timer:从注册到终止只执行一次;

2.Repeating Timer:在每次执行完以后,自动重新开始;

本质上,可以认为 Repeating Timer 是在 Single-Shot Timer 终止之后,再次注册到定时器系统里的 Single-Shot Timer;

Linux定时器主要有两种,第一种是2.4内核版本的,也就是我们上面介绍的itimerval结构体和setitimer api,setitimer 能够在 timer 到期之后,自动再次启动自己,因此,用它来解决 Single-Shot Timer 和 Repeating Timer 的问题较简单;

另一种是2.6内核版本的,新增了POSIX timer的API,这也是我们要模拟自定义的版本(需要注意的是,POSIX timer 接口支持在一个进程中同时拥有多个定时器实例,但POSIX timer 接口只在进程环境下才有意义 ,并不适合多线程环境)

1
2
3
4
5
int timer_create(clockid_t clockid, struct sigevent *evp,timer_t *timerid); 									//创建了一个定时器
int timer_settime(timer_t timerid, int flags, const struct itimerspec *new_value, struct itimerspec * old_value); //启动或者停止一个定时器
int timer_gettime(timer_t timerid, struct itimerspec *curr_value); //返回到下一次到期的剩余时间值和定时器定义的时间间隔
int timer_getoverrun(timer_t timerid); //返回上次定时器到期时超限值
int timer_delete(timer_t timerid); //停止并删除一个定时器

最重要的接口是timer_create,其参数介绍如下:

  • clockid 表明了要使用的时钟类型,在 POSIX 中要求必须实现 CLOCK_REALTIME 类型的时钟;
  • evp 参数指明了在定时到期后,调用者被通知的方式;
  • sigev_notify 指明了通知的方式:
    • SIGEV_NONE:当定时器到期时,不发送异步通知,但该定时器的运行进度可以使用 timer_gettime监测;
    • SIGEV_SIGNAL:当定时器到期时,发送 sigev_signo 指定的信号;
    • SIGEV_THREAD:当定时器到期时,以 sigev_notify_function 开始一个新的线程。该函数使用 sigev_value 作为其参数,当 sigev_notify_attributes 非空,则指定该线程的属性。注意,由于 Linux 上线程的特殊性,这个功能实际上是由 glibc 和内核一起实现的;
    • SIGEV_THREAD_ID (Linux-specific):仅推荐在实现线程库时候使用,如果 evp 为空的话,则该函数的行为等效于:sigev_notify = SIGEV_SIGNAL,sigev_signo = SIGVTALRM,sigev_value.sival_int = timer ID

3.2 彩票调度算法

因为我们对FIFO以及RR算法都比较熟悉,所以这里我们只介绍彩票调度算法;

彩票调度的基本思想是:一开始的时候给每个进程发彩票(优先级越高,发的彩票越多),然后每隔一段时间(一个时间片),举行一次彩票抽奖,抽出来的号是谁的,谁就能运行;

假如有两个进程A和B,调度器想让A占用80%的 CPU 时间,B占用20%的CPU时间,调度器就给A发80张彩票,给B发20张彩票;这样,每次抽奖的时候,A就有80%的概率占用CPU,从数学期望上讲,1秒钟之内,A能运行800ms;

实际上彩票调度并没有在CPU调度程序里广泛使用,一个原因是不能很好的适合I/O,另一个原因是票数分配问题没有确定的解决方式,比如新打开了一个浏览器进程,那该给它分配多少票?票数少了,响应跟不上,票数多了,又会浪费 CPU时间;

彩票调度的实现非常简单,只需要使用一个随机数生成器选择中奖彩票、一个记录系统中所有进程的数据结构、所有彩票的总数;

这里我们简单举个例子来说明,假设我们使用列表记录进程,下面进程A、B、C分别持有一定数量的彩票

在做出调度决策之前需要先从彩票总数400中选择一个随机数,假设是300,然后遍历链表,借助一个计数器帮助我们找到该数字;

从前向后遍历进程列表,将每张票的值加到couter上,直到所有的值超过winner,此时当前列表对应的进程就是中奖进程;

本例中,中奖彩票是300。首先,计A的票后,counter增加到100。因为100小于300,继续遍历。然后counter会增加到150(B的彩票),仍然小于300,继续遍历。最后,counter增加到400(显然大于300),因此退出遍历,current指向C(中奖者);

4.数据结构设计

4.1 线程结构体

在Lab_1中我们大致介绍过,整个程序的主要函数及其功能如下:

  1. 通过调用thread_create函数(这个是自己写的函数,不是pthread_create)创建线程;
  2. 通过两种方式启动线程:
    • 调用函数detach实现线程的分离式启动(父线程不必等待子线程执行结束,可以继续执行);
    • 调用函数thread_join实现阻塞式启动(父线程等待该子线程结束后才能继续执行);
  3. 线程的状态转换:通过一系列的线程状态函数实现线程的状态转换
  4. 线程切换:线程的切换方式有两种:
    • 主动切换my_sleep:调用schedule切换到指定线程,并在一段时间后wakeup可调度;
    • 时钟中断切换do_timer:通过设置时钟中断,中断后执行schedule函数来完成;
  5. 线程调度pick:采用时间片轮转调度算法,根据线程优先级为每个线程设置时间片;

本次实验我们详细介绍如何实现tick机制以及实现FIFO、RR和彩票调度算法(即线程调度部分);

可以将tick理解为嘀嗒,也就是对时间的粗粒度化,我们可以任意规定一个tick的大小,此处我们假设一个tick为10ms,之所以要介绍tick是因为它作为时间片的基本单位,比如我们规定我们程序中使用的时间片的大小为15个tick,这就意味着线程的一个时间片换算为时间为150ms;

因为我们引入了时间片的概念,所以需要将时间片引入线程结构体,后面我们还引入了彩票调度算法,所以这里还引入了彩票数量

1
2
3
4
5
6
7
8
9
10
11
struct thread_struct {
int id; //线程标识符
void (*thread_func)(); //指向线程过程函数的函数指针,用来记录线程执行的函数
int esp; //stack的栈顶指针
unsigned int wakeuptime; //线程唤醒时间
int status; //线程状态
int counter; //时间片数量
int priority; //线程优先级
int stack[STACK_SIZE]; //线程运行栈
int tickets; //彩票数量
};

要实现时间中断的原理非常简单,系统在每个tick中产生一个时钟中断并进入时钟中断处理函数do_timer,在do_timer中让线程的时间片值减1直到0,执行schedule函数;

难点在于如何使do_timer时间中断函数在每个嘀嗒数知道自己应当自动执行,这里借助了Linux中的signal机制,使用函数 setitimer 每隔 10 ms 发送一次信号 SIGALRM,然后捕捉此信号;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//基于时间中断的调度
//基本思想是系统在每个嘀嗒中产生一个时钟中断并进入时钟中断处理函数do_timer,在do_timer中让线程的时间片值减1直到0,执行schedule函数
//此处可以使用linux的信号机制,使用函数setitimer每间隔10ms(此处设一个嘀嗒为10ms)发送一次信号SIGALRM然后捕捉该信号即可
__attribute__((constructor)) //修饰,使得该函数能够在main函数之前执行
static void init()
{
struct itimerval value;
value.it_value.tv_sec = 0; //it_value为延时时长,1000us等于1ms
value.it_value.tv_usec = 1000;
value.it_interval.tv_sec = 0; //it_interval为计时间隔.1000*10us等于10ms
value.it_interval.tv_usec = 1000 * 10;
if (setitimer(ITIMER_REAL, &value, NULL) < 0)
{
perror("setitimer");
}
signal(SIGALRM, do_timer);
}

当然上述想法只是最简单的一种方式,我们这个实验要求的是具备至少有create_timer、start_timer、stop_timer和delete_timer的定时器,所以我们需要基于系统提供的settimer函数对定时器进行设计和实现;

至于我们要实现的FIFO、RR以及彩票调度算法都是基于tick机制(也就是上面实现的定时器)对pick函数的修改;

4.2 定时器队列

该队列主要用于装入定时器,被装入定时器队列中的定时器会start且开始执行传入的回调函数

1
2
3
4
5
6
7
8
9
struct timer_list{
struct list_head head;
int num;
int size;
void (*sighandler_old)(int);
void (*sighandler)(int);
struct itimerval ovalue;
struct itimerval value;
};

4.3 定时器结构体

主要设计如下

1
2
3
4
5
6
7
8
9
typedef struct timer{
struct list_head node;
int interval; /*timer interval(second)*/
int elapse; /*timer count*/
callback cb; /*call back function*/
void *user; /*user data*/
int len;
int id; /*timerid*/
}timer_node_t;

5.函数封装

5.1 定时器的实现

主要定义了如下结构体和函数指针

1
2
typedef void (*callback)(int id, void *data, int len);  //给void(*)(int id, void *data, int len)函数指针定义别名callback
typedef void (*SIG_FUNC)(int signo); //给void (*)(int signo)函数指针定义别名SIG_FUNC
1
2
3
4
5
6
7
8
9
10
11
//timer_node_t作为struct timer的别名
typedef struct timer{
struct list_head node;
int interval; /*timer interval(second)*/
int elapse; /*timer count*/

callback cb; /*call back function*/
void *user; /*user data*/
int len;
int id; /*timerid*/
}timer_node_t;
1
2
3
4
5
6
7
8
9
struct timer_list{
struct list_head head;
int num;
int size;
void (*sighandler_old)(int);
void (*sighandler)(int);
struct itimerval ovalue;
struct itimerval value;
};

接着定义如下函数

1
static void sig_func(int signo) ;

timer队列创建函数,等同于create_timer

1
2
3
4
5
/**
*Create timer list
*@param
*/
struct timer_list *create_timer_list(int count) ;

添加timer计时器进入timer队列,等同于start_timer

1
2
3
4
5
6
7
8
9
10
11
/*
* Add a timer to timer list.
*
* @param interval The timer interval(second).
* @param cb When cb!= NULL and timer expiry, call it.
* @param user_data Callback's param.
* @param len The length of the user_data.
*
* @return if == -1, add timer fail.
*/
int add_timer(int interval, callback cb, void *user_data, int len) ;

将timer队列中的指定timer_id的计时器删除,停止该timer工作,等同于stop_timer

1
int  del_timer(int timer_id);

将整个timer队列删除,等同于delete_timer

1
2
3
4
5
6
/*
* Destroy the timer list.
*
* @return 0 means ok, the other means fail.
*/
int destroy_timer(struct timer_list *list);

使用定时器的方式也很简单,我们只需要依次调用create_timer_list创建timer队列、使用add_timer加并启动入计时器,计时器使用完毕后调用del_timer删除计时器,最后销毁整个timer队列并结束整个进程;

主要的难点在于将定时器提供的API接口和调度算法结合,我们将在下面讨论;

5.2 调度算法实现

5.2.1 FIFO调度算法

该调度算法也被称为先进先出算法,需要注意的是和先来先服务算法FCFS做区别:

  • FIFO:按照时间片轮转方式运行,分给每个进程的时间一样,若正在运行的进程在一个时间片内已经完成,则激活调度程序,此时调度就绪队列排在队首的进程运行;若在一个时间片内进程没有运行完,就将它送到就绪队列的末尾,等待下一个时间片再执行;
  • FCFS:对每个进程都比较公平FCFS算法就是调度最先进入就绪队列的进程,不考虑时间长短,比如A进程执行30min,B进程执行10s,此时B进程就必须等30min;

很多人可能会有疑问,FIFO明明是页面置换算法,和进程调度八竿子打不着…其实这里是因为在某些教材中将FCFS也称作FIFO,简单来说就是让我们实现FCFS(稍微动动脑子也知道Lab_3怎么可能让我们去做内存相关);

对FIFO调度的实现如下,因为线程都是按照顺序进入线程队列中的,所以只需要考虑从当前线程往后寻找一个非空的线程即可;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static struct thread_struct *FIFO_pick() {    
int current_id = current->id;
int i;
struct task_struct *next = NULL;
repeat:
for (i = 0; i < THR_TASKS; ++i) {
if (task[i] && task[i]->status == THREAD_STATUS_SLEEP) {
if (getmstime() > task[i]->wakeuptime)
task[i]->status = THREAD_STATUS_RUNNING;
}
}
i = current_id;
while(1) { // 寻找下一个不空的线程
i = (i + 1) % THR_TASKS;
if (i == current_id) {
// 循环了一圈说明没找到可被调度的线程,重新计算线程是否可被唤醒
goto repeat;
}
if (task[i] && task[i]->status == THREAD_STATUS_RUNNING) {
next = task[i];
break;
}
}
return next;
}

5.2.2 RR调度算法

RR调度算法的基本思想就是每次执行调度函数的时候都会在线程队列中挑选一个时间片counter最大的线程运行,也就是遍历所有THREAD_STATUS_RUNNING的线程并找到其中couter值最大的线程进行返回;

存在的一个问题是假如所有的THREAD_STATUS_RUNNING的线程的时间片都是0,则该调度函数会重新为所有的线程设置couter;

下面的RR调度算法对于THREAD_SLEEP的任务优先级设置很高,这意味着THREAD_SLEEP的线程一旦进入THREAD_RUNNING状态就会被优先投入运行;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
static struct thread_struct *RR_pick()
{
int i, next, c;
for (i = 0; i < THR_TASKS; ++i)
{
if (task[i] && task[i]->status != THREAD_STATUS_EXIT && getmstime() > task[i]->wakeuptime)
{
task[i]->status = THREAD_STATUS_RUNNING;
}
}
while (1)
{
c = -1;
next = 0;
for (i = 0; i < THR_TASKS; ++i)
{
if (!task[i])
continue;
if (task[i]->status == THREAD_STATUS_RUNNING && task[i]->counter > c)
{
c = task[i]->counter;
next = i;
}
}
if (c)
break;
if (c == 0)
{
for (i = 0; i < THR_TASKS; ++i)
{
if (task[i])
{
task[i]->counter = task[i]->priority + (task[i]->counter >> 1);
}
}
}
}
return task[next];
}

5.2.3 彩票调度算法

彩票调度算法的基本思想和原理也不难,我们在前置知识点部分已经介绍过,一个实现的伪代码如下,且给出了注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// counter: used to track if we've found the winner yet
int counter = 0;

// winner: use some call to a random number generator to get a value, between 0 and the total # of tickets
int winner = getrandom(0, totaltickets);

// current: use this to walk through the list of jobs
node_t *current = head;

// loop until the sum of ticket values is &gt; the winner
while (current) {
counter = counter + current->tickets;
if (counter < winner) break; // found the winner
current = current->next;
} // 'current' is the winner: schedule it...

因为我们为线程分配了彩票所以需要拓展结构体

1
2
3
4
5
6
7
8
9
10
11
struct thread_struct {
int id; //线程标识符
void (*thread_func)(); //指向线程过程函数的函数指针,用来记录线程执行的函数
int esp; //stack的栈顶指针
unsigned int wakeuptime; //线程唤醒时间
int status; //线程状态
int counter; //时间片数量
int priority; //线程优先级
int stack[STACK_SIZE]; //线程运行栈
int tickets; //彩票数量
};

在初始化线程的时候随机分配彩票数量(这里我们做了简化,固定分配彩票数量)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int thread_create(int *tid, void (*start_routine)())
{
int id = -1; //初始化线程id
struct thread_struct *tsk = (struct thread_struct *)malloc(sizeof(struct thread_struct)); //为线程分配一个结构体
while (++id < THR_TASKS && task[id]); //在线程队列中寻找位置
if (id == THR_TASKS) //如果没有位置了则创建失败
return -1;
task[id] = tsk; //将线程结构体放在task线程队列中
if (tid) *tid = id; //将线程队列的索引号作为id号传给tid,便于之后传出

//初始化线程结构体
tsk->id = id; //设置线程id
tsk->thread_func = start_routine; //线程过程函数,对应之后会编写的fun1,fun2...
int *stack = tsk->stack; //线程运行栈
tsk->esp = (int)(stack + STACK_SIZE - 11); //获取esp栈顶指针
tsk->status = THREAD_STATUS_RUNNING; //线程状态设置为RUNNING
tsk->wakeuptime = 0; //设置Wakeuptime
tsk->counter= 15; //时间片的单位不是纳秒、微秒或者毫秒,而是嘀嗒数,此处初始化为15个嘀嗒数
tsk->priority = 15; //设置线程优先级
tsk->tickets=200; //分配彩票数量200

make_context(stack,tsk,start_routine);
return 0;
}

接着在pick算法中根据伪代码实现了彩票算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static struct thread_struct *Lottery_pick(){
int counter_ticket=0;
int winner=50; //彩票winner
int current_id = current->id;
int i;
struct task_struct *next = NULL;
repeat:
for (i = 0; i < THR_TASKS; ++i) {
if (task[i] && task[i]->status == THREAD_STATUS_SLEEP) {
if (getmstime() > task[i]->wakeuptime)
task[i]->status = THREAD_STATUS_RUNNING;
}
}
i = current_id;
while(1) {
i = (i + 1) % THR_TASKS;
if (i == current_id) {
goto repeat;
}
if (task[i] && task[i]->status == THREAD_STATUS_RUNNING) {
counter_ticket += task[i]->tickets;
if(counter_ticket>=winner){
next = task[i];
break;}
break;
}
}
return next;
}

6.模块设计

我们在Lab_1的基础上增加了一个timer.c源文件和list.h头文件,文件目录树的结构如下

list.h就是常规的链表头文件,主要用于实现timer定时器的timer_list;timer.c源文件实现了timer_list和timer定时器的主要接口;

定时器的使用方式非常简单,只需要依次调用create_timer、add_timer、del_timer和destory_timer四个API,下面是我们的实现,定时器的时间间隔为10ms

1
2
3
4
5
6
7
8
9
10
void timer_func(){
int timer_id = -1;
int count = 0;
timer_list = create_timer_list(10);
timer_id = add_timer(10, do_timer, &count, 50);
while(count++ < 20)
sleep(1);
del_timer(timer_id);
destroy_timer(timer_list);
}

三个调度函数的实现我们都包含在schedule.c文件中实现

需要使用哪个调度函数就直接在schedule函数中调用即可,形式如下

1
2
3
4
5
6
7
8
9
10
void schedule()
{
struct thread_struct *next = Lottery_pick();
//struct thread_struct *next = FCFS_pick();
//struct thread_struct *next = RR_pick();
if (next)
{
switch_to_next(next);
}
}

7.程序测试

7.1 定时器测试

首先我们先测试基本的定时器是否正常工作,测试文件如下

基本原理也非常简单,我们在回调函数中分别输出timer_id、data以及一个提示信息,告知我们每隔5s输出一次(这个在实际调试过程中能够清楚的感知到);

接着在main函数中一次调用了create_timer_list、add_timer以及del_timer和destroy_timer函数,如果定时器正常工作的话我们可以看到屏幕上有正常输出并在一段时间后定时器自动停止工作结束整个main函数;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void timer_test_callback(int id, void *data, int len)
{
int *user = (int *)data;
printf("timer id is :%d\n",id);
printf("data is : %d\n", *user);
printf("print this for every 5 sec!!!\n");
user[0]++;
}

int main()
{
int timer_id = -1;
int count = 0;
timer_list = create_timer_list(10);
timer_id = add_timer(5, timer_test_callback, &count, 5);
while(count++ < 30)
sleep(1);
del_timer(timer_id);
destroy_timer(timer_list);
}

这里因为是单独测试timer,所以我们只需要使用gcc编译timer.c,将得到的a.out文件执行即可;

7.2 调度函数测试

关于这三个调度函数,我们使用的都是concurrency_test.c文件,也就是lab_1中我们用于测试线程同步的文件,只需要修改schedule.c中的schedule函数的pick函数的选择即可;

FIFO调度函数测试如下

RR调度函数测试如下

彩票调度函数测试如下

由此可见,我们实现的调度函数能够和定时器正确的搭配并正常工作,实现线程的并发调度切换,从实验结果来看三种调度算法对应的线程调度的次序是不同的,侧面证明了这三种调度算法的区别;

Lab_4

1.实验要求

虽然我们可以通过关闭中断来实现UP(Uni-Processor)下临界区的互斥,但是这会引发安全、效率等诸多问题。在lab4中我们需要实现一个用户态的同步机制,这是完成多线程相互协作使用的最后一环。

  1. 利用test&set或者Compare&Swap构建spinlock、mutex、condition variable
    • spinlock
      • spinlock的要求是如果锁正在占用,就需要一直忙等待;
    • mutex
      • mutex是睡眠锁,如果当前的线程被上锁了,就不需要等待了,直接让出当前的CPU,需要维护一个等待队列,然后把自己放在等待队列里面,等到下一次唤醒;
      • mutex实现时需要注意让出CPU和guard锁解锁这两个操作的原子性;
    • condition variable
      • 实现cv的wait,signal,Broadcast三个接口;
      • 注意wait使用时需要持锁;
  2. 测试mutex/spinlock锁的正确性,以下二选一:
    • lab2配合
      • 在之前的lab2中,我们拥有了内存管理机制,但是当时我们不能支持多个线程同时使用这个内存分配器,现在我们可以拥有这个feature;
      • 现在我们可以用spinlock和mutex分别给内存分配器加上同步的机制,支持多个线程同时使用malloc;
        • 使用200个线程,每个线程分配1kB内存,在200个线程里面将申请到的内存每个字节依次填写为0-199;
        • 最后检查这些内存区域是否和分配预取的一致,重复以上实验50次;
        • 注:如果你自己写的用户库没有实现lab3中的tick调度功能,需要使用pthread线程库来创建线程;
    • lab3配合
      • 在之前的lab3中,我们拥有了具有“中断”功能的用户态线程库,但是多线程之间无法协作,在lab4中我们将其升级为具有同步机制的用户态线程库;
      • 分别使用spinlock/mutex,计算把0自加加100000次的结果是否正确;
  3. 测试condition variable的正确性:
    • 搭建一个消费生产者模型,生产者在每次被唤醒时向一个全局变量依次写入1-100000,两个消费者线程获取这个全局变量的值,并打印;
    • 需要保证结果的正确性:在生产者写入和消费者读值后,需要对一个flag分别设0和1;在写入和读值前,需要在assert里面假设这个flag分别为1和0;

2.开发日志

2022/11/14 20:33 今天稍微看了以下lab4的简介,差不多是能够理解一些意思,初步打算是先看这篇文章(三种经典的同步工具):https://blog.csdn.net/weixin_36073895/article/details/112678185,理解这三个工具的作用和差异以及怎么使用(具体是为了之后main中测试使用),然后怎么实现这三个工具网上直接搜索即可(注意不要直接搜索三个一起的,要分开搜索,咱们这个实验网上没有,类似lab2需要自己实现然后进行测试),接着分别针对每一个工具进行实现即可;

2022/11/14 22:55 刚刚看了一下,lab4完全没办法做,真的是一点实现办法都没有的,因为没有人会无聊甚至虐待自己到这种地步去使用C语言实现一个自旋锁、互斥锁甚至条件变量,这是非常无脑的行为,还是好好准备实现lab3吧;

3.前置知识点

3.1 synchronization概述

并发编程如果各自处理自己的任务互不干涉,这是最理想的情况,但在实际开发过程中我们需要在各个执行体之间进行通信或资源共享,此时就需要保证对共享资源操作的安全性,因此引入了synchronization(同步),synchronization发生在一个执行体与另外的执行体有交互依赖的场景,通常有两种交互需要synchronization:competition和cooperation

  • competition经常是发生在多个执行体需要对共享资源执行指令,但是必须只能同时有一个执行体执行指令;比较典型的场景,多个线程执行计数,对一个共享的全局计数变量执行自加操作,那么这个自加操作指令就是会存在competition,需要synchronization工具保证正确性和安全性;
  • cooperation发生在多个执行体需要依赖对方的执行结果的时候,很典型的场景就是生产者消费者模型,为了保证生产者和消费者问题中的特性,需要一种在生产者和消费者之间的synchronization工具来保证生产者与消费者之间的cooperation

无论是competition还是cooperation一般都会涉及到共享资源的操作,为了保护共享资源通常使用锁;

这里还需要介绍一些零碎的概念:

  • critical section:操作共享资源的代码片段称之为临界区,这个临界区就是我们在并发编程中需要保护的地方;
  • race condition:多个执行体(线程或进程)同时进入临界区的时候,同时进行共享资源修改,这时候就产生了race condition(竞态条件);
  • indeterminate:因为多个执行体同时进入临界区,可能带来的执行结果是不能预料的,所以对于一个并发的执行来说,其结果是indeterminate的;
  • mutual exclusive:mutex(互斥量,也称为互斥锁)的命名也是来源于这个词,它代表着一种互斥机制,用来保证只有一个线程可以进入临界区,对于临界区的操作保证其不存在race condition并且执行结果是deterministric;关于mutex和spinlock的区别可以参考【转】自旋锁spin和互斥量mutex的区别 - 腾讯云开发者社区-腾讯云 (tencent.com)

3.2 锁概述

这里我们简单介绍一下为什么锁能够保证只有一个线程进入临界区;

在涉及并发编程中共享资源的保护首先我们想到的就是使用锁,锁可以保证在多并发环境下只有一个线程进入临界区,但是锁也是一个共享资源,为什么锁不会有并发问题呢?

从内部看,无论是mutex还是spinlock都依赖于test&set或者compare&swap或者compare&exchange的原子性;换句话说,保证只有一个线程进入临界区的真正原因在于OS对原子性保证;

3.3 常见synchronization工具

常见的synchronization工具可以按照行为(抢锁的时候没有抢到)分为两类:

  • spin是指不停地去查询状态,占用了CPU;
  • blocking是指在没有抢到锁的时候切换出去,然后当前执行体处于blocking状态;

我们主要介绍mutex、spinlock、condition variable三种synchronization工具,因为这三种比较典型:

  • spinlock是spin类型的synchronization;

  • mutex和condition variable是blocking类型的synchronization,且condition variable是一种monitor 类型的bocking synchronization工具;

3.3.1 spinlock

spinlock是一种spin类型的lock,在抢不到锁的时候会保持spin状态,spin状态会消耗CPU资源,但这并非一定是一件不好的事情,在临界区运行时间非常短且不会出现block的场景下,spinlock就比较合适,因为如果不在用户态spin,而是进入内核等待,内核的切换也会带来一定的时间开销,假设临界区执行时间是1us,内核切换是4us,那么陷入内核就有点得不偿失了;
spinlock是纯用户态锁,当然也是基于TAS的原子性来保证只有一个线程会进入临界区;

用户可以自己实现spinlock,只是要注意两个点:1. 保证进入临界区的原子性,2. 保证lock的acquire语意和unlock的release语意;

spinlock比较适合的场景:临界区内执行时间较短,不会出现block,比如单纯的查表操作可以使用spinlock;

3.3.2 mutex

spinlock的特点就是用户态完成同步操作,但是对于临界区很长或者会存在block的场景下,spinlock显然是一种浪费CPU的行为,因此对于那种临界区可能会block或者花很久事件的场景来说,需要让那些没有抢到锁的线程放弃掉CPU,主动进入睡眠,因此就需要mutex,mutex可以使没有抢到锁的线程放弃CPU进入睡眠;

既然要放弃CPU且需要唤醒,就需要OS的支持,内核态会将sleep的线程切换出去,Linux内核通过futex提供了mutex所需要的陷入内核,放弃CPU的功能;

futex提供了完整的线程间同步机制,由用户态和内核态两部分组成,在非竞争状态下,是在用户态运行的,当遇到竞争的时候,没抢到锁会陷入内核;futex提供了两个主要的接口,futex_wait和futex_wake;futex有一个内核对象waiting queue,用于提供队列和调度交互;

mutex的使用场景:在划分锁的时候,有时候会以用户态和内核态来区分,mutex通常被划分为内核态的锁,所以会被认为性能比较差,因为它牵涉到了内核的上下文切换,上面已经提到,mutex只会在竞争的时候陷入内核,通常情况下mutex并不会造成性能影响,常规的在没有竞争情况下mutex lock和unlock的时间大概是在ns级别的且只是在用户态切换,所以在竞争不激烈的情况下mutex不会是瓶颈;临界区的耗时相对较长,或者用户不希望当前线程在抢锁失败的时候仍然占用CPU的时候;

3.3.3 condition variable

条件变量是一种monitor synchronization工具,通常用于cooperation场景,比较典型的应用场景是在producer和consumer。condition variable与mutex都是一种blocking类型的synchronization,也就是说condition variable在没有满足条件的时候也会陷入内核睡眠,前提是他先抢占了锁。因为也需要陷入内核睡眠,因此很容易想到,condition variable在内核中也是基于futex实现;

条件变量通常需要与一把锁联合使用,来提供synchronization功能,condition variable提供wait方法,在线程抢到锁的时候如果不满足响应的条件调用wait的时候,会执行两个操作:release lock,陷入内核等待,同时condition variable提供了notify方法,notify方法会将wait的thread从内核唤醒,调用wait的线程在唤醒的时候会重新acquire lock;


中级项目_bos lab
https://gintoki-jpg.github.io/2022/09/29/项目_bos/
作者
杨再俨
发布于
2022年9月29日
许可协议