FreeRTOS学习笔记(五) 资源管理

为什么要进行资源管理

在freertos中, 高优先级的任务会抢占低优先级的任务, 中断又会抢占高优先级的任务

试想以下这个情景:

任务A正在访问变量X

这时任务B由于更高的优先级, 抢占了任务A, 同时也访问变量X(例如做了一个++操作)

任务B结束, 任务A对变量X做一个*10操作

这时问题就出现了, 往往我们使任务A进入就绪态目标是为了使现在的X变为原来的10倍, 可现在X变为了原来的10倍还要多10

这明显和我们的预期不符

那么有什么办法可以避免这种情况呢?

第一种方法是使用临界区

通过taskENTER_CRITICAL()taskEXIT_CRITICAL()划定临界区

执行里面的代码使任何其他任务和优先级相同或更低的中断都会失效

临界区所屏蔽的优先级在FreeRTOSConfig.hconfigMAX_SYSCALL_INTERRUPT_PRIORITY设置

该操作是可以嵌套的, 由内核记录嵌套的深度, 深度归0时退出临界区

用例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void vPrintString( const char *pcString )
{
/* Write the string to stdout, using a critical section as a crude method of
mutual exclusion. */
taskENTER_CRITICAL();
{
printf( "%s", pcString );
fflush( stdout );
}
taskEXIT_CRITICAL();
}

void vAnInterruptServiceRoutine( void ) //中断安全版本
{
/* Declare a variable in which the return value from
taskENTER_CRITICAL_FROM_ISR() will be saved. */
UBaseType_t uxSavedInterruptStatus;
/* This part of the ISR can be interrupted by any higher priority
interrupt. */
/* Use taskENTER_CRITICAL_FROM_ISR() to protect a region of this ISR.
Save the value returned from taskENTER_CRITICAL_FROM_ISR() so it can
be passed into the matching call to taskEXIT_CRITICAL_FROM_ISR(). */
uxSavedInterruptStatus = taskENTER_CRITICAL_FROM_ISR();
/* This part of the ISR is between the call to
taskENTER_CRITICAL_FROM_ISR() and taskEXIT_CRITICAL_FROM_ISR(), so can
only be interrupted by interrupts that have a priority above that set
by the configMAX_SYSCALL_INTERRUPT_PRIORITY constant. */
/* Exit the critical section again by calling taskEXIT_CRITICAL_FROM_ISR(),
passing in the value returned by the matching call to
taskENTER_CRITICAL_FROM_ISR(). */
taskEXIT_CRITICAL_FROM_ISR( uxSavedInterruptStatus );
/* This part of the ISR can be interrupted by any higher priority
interrupt. */
}

但是缺点也很明显, 临界区里的代码屏蔽了所有优先级相同或更低的中断

可能导致必要的中断无法执行

所以要求临界区要尽量的短, 上面这个例子明显不符合这个要求

第二种方法是通过暂停调度器

通过 vTaskSuspendAll()xTaskResumeAll()

执行完vTaskSuspendAll()会暂停调度器的执行, 但是中断不受影响

调度器暂停了就不会有更高优先级的任务来抢占了

这个操作是可以嵌套的

内核会记录嵌套深度, 只有嵌套深度归0时, 才会恢复调度器

缺点就是终端仍有可能带来干扰, 同时恢复调度器通常是一个较为耗费时间的过程

示例:

1
2
3
4
5
6
7
8
9
10
11
void vPrintString( const char *pcString )
{
/* Write the string to stdout, suspending the scheduler as a method of
mutual exclusion. */
vTaskSuspendScheduler();
{
printf( "%s", pcString );
fflush( stdout );
}
xTaskResumeScheduler();
}

第三种方法则是使用互斥量

通过将FreeRTOSConfig.hconfigUSE_MUTEXES设置为1以启用互斥锁

互斥量可以看作是一种特殊的信号量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static void prvNewPrintString( const char *pcString )
{
/* The mutex is created before the scheduler is started, so already exists
by the time this task executes.
Attempt to take the mutex, blocking indefinitely to wait for the mutex
if it is not available straight away. The call to xSemaphoreTake() will
only return when the mutex has been successfully obtained, so there is
no need to check the function return value. If any other delay period
was used then the code must check that xSemaphoreTake() returns pdTRUE
before accessing the shared resource (which in this case is standard
out). As noted earlier in this book, indefinite time outs are not
recommended for production code. */
xSemaphoreTake( xMutex, portMAX_DELAY );
{
/* The following line will only execute once the mutex has been
successfully obtained. Standard out can be accessed freely now as
only one task can have the mutex at any one time. */
printf( "%s", pcString );
fflush( stdout );
/* The mutex MUST be given back! */
}
xSemaphoreGive( xMutex );
}

不同的点在于其具有所有权和优先级继承机制

优先级继承机制可以用于解决优先级反转问题

那么什么是优先级反转呢?

先从互斥量机制说起, 两个任务在访问同一资源时, 需要先take互斥量

此时另一个任务再take该信号量时就会因为无法得到互斥量陷入阻塞状态

待互斥量被give后另一个任务又重新进入就绪态

13

试想以下情景, 现在有三个任务, LP最低优先级任务 MP中等优先级任务 HP最高优先级任务

任务LP和HP使用互斥量来控制对同一资源的使用

现在LP先进入运行态, take互斥量

紧跟着HP被触发进入就绪态, 因为高优先级原因抢占LP

HP在尝试take互斥量的时候, 因为此时互斥量在LP任务手里, 于是进入阻塞态

回到LP任务, 此时MP任务进入就绪态, 抢占LP任务

知道MP任务运行完毕回到LP任务, LP任务运行完毕才轮到HP任务

问题就出现在这里: HP任务明明是最高优先级的任务, 却到了最后才执行

15

为了缓解这种问题(无法被根除), 互斥量引入了优先级继承机制

优先级继承的工作原理是: 暂时将互斥锁的持有者的优先级提升至尝试获取同一互斥锁的最高优先级任务的优先级

持有互斥锁的低优先级任务继承了该优先级

在这种情况下上述情景将会变为:

LP任务先进入运行态

HP任务进入就绪态, 抢占LP

HP任务获取互斥锁失败, LP任务的优先级提升至HP优先级同一水平

MP任务进入就绪态, 优先级低于LP 故无法进行抢占

LP任务执行完毕, HP任务进入运行态

HP任务执行完毕, MP任务进入运行态

16

当有多个任务使用同一互斥量的时候, 继承优先级的规则是如何呢?

  • 如果任务在未释放其已持有的互斥锁的情况下获取了新的互斥锁, 则该任务的继承优先级可能进一步提升
  • 任务会保持其继承的最高优先级, 直到其释放所有的互斥锁, 且与互斥锁释放的顺序无关
  • 当一个任务同时持有多个互斥量时,只要其中任意一个互斥量曾导致它继承过更高的优先级,那么该任务会一直保持“最高的继承优先级”,即使等待这些互斥量的其他任务后来超时放弃等待,也不会立即恢复原优先级。

这意味着, 继承的任务总是保持在一个较高的优先级, 很好的缓解了优先级反转的问题

互斥锁的另一个缺陷是死锁(Deadlock, Deadly Embarce)

形成原因是由于两个任务都在等待对方归还互斥量, 导致两个任务都进入阻塞态

试想以下场景:

有任务A, 任务B, 任务A先take互斥量X, 再take互斥量Y 任务B先take互斥量Y再take互斥量X

任务A先进入运行态, take互斥量X

一个时间片过去, 任务B进入运行态, take互斥量Y

又一个时间片过去, 任务A take 互斥量Y失败, 进入阻塞态

任务B 进入运行态, take互斥量X, 进入阻塞态

这时两个任务均在等待对方归还互斥量

但我们知道, 如果take函数的等待时间均为无限等待的话, 这两个任务将永远无法执行

同样, 死锁也是无法完全消除的, 只能在设计时尽量避免

最简单的避免方法就是给take函数设置一个比预期等待时间稍长的超时时间, 让其take失败继续执行剩下的部分

对于单个任务也有可能发生死锁

情景如下:

任务A获取互斥锁X

之后任务A再获取互斥锁X, 此时X已经被占用了, 无法获取, 任务A进入阻塞态

形成死锁

解决方法是使用递归互斥锁(Recursive Mutexes)

使用方法与标准互斥锁类似, 不过递归互斥锁和它名称一样支持递归调用

take多少次就需要give多少次才能解除互斥锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/* Recursive mutexes are variables of type SemaphoreHandle_t. */
SemaphoreHandle_t xRecursiveMutex;
/* The implementation of a task that creates and uses a recursive mutex. */
void vTaskFunction( void *pvParameters )
{
const TickType_t xMaxBlock20ms = pdMS_TO_TICKS( 20 );
/* Before a recursive mutex is used it must be explicitly created. */
xRecursiveMutex = xSemaphoreCreateRecursiveMutex();
/* Check the semaphore was created successfully. configASSERT() is
described in section 11.2. */
configASSERT( xRecursiveMutex );
/* As per most tasks, this task is implemented as an infinite loop. */
for( ;; )
{
/* ... */
/* Take the recursive mutex. */
if( xSemaphoreTakeRecursive( xRecursiveMutex, xMaxBlock20ms ) == pdPASS )
{
/* The recursive mutex was successfully obtained. The task can now
access the resource the mutex is protecting. At this point the
recursive call count (which is the number of nested calls to
xSemaphoreTakeRecursive()) is 1, as the recursive mutex has
only been taken once. */
/* While it already holds the recursive mutex, the task takes the
mutex again. In a real application, this is only likely to occur
inside a sub-function called by this task, as there is no
practical reason to knowingly take the same mutex more than
once. The calling task is already the mutex holder, so the
second call to xSemaphoreTakeRecursive() does nothing more than
increment the recursive call count to 2. */
xSemaphoreTakeRecursive( xRecursiveMutex, xMaxBlock20ms );
/* ... */
/* The task returns the mutex after it has finished accessing the
resource the mutex is protecting. At this point the recursive
call count is 2, so the first call to xSemaphoreGiveRecursive()
does not return the mutex. Instead, it simply decrements the
recursive call count back to 1. */
xSemaphoreGiveRecursive( xRecursiveMutex );
/* The next call to xSemaphoreGiveRecursive() decrements the
recursive call count to 0, so this time the recursive mutex is
returned. */
xSemaphoreGiveRecursive( xRecursiveMutex );
/* Now one call to xSemaphoreGiveRecursive() has been executed for
every proceeding call to xSemaphoreTakeRecursive(), so the task
is no longer the mutex holder. */
}
}
}

相同优先级任务使用互斥锁 处理时间不均问题

情景:

任务A,B两个相同优先级任务, 两个任务都来自同一个模板, 该模板如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/* The implementation of a task that uses a mutex in a tight loop. The task
creates a text string in a local buffer, then writes the string to a display.
Access to the display is protected by a mutex. */
void vATask( void *pvParameter )
{
extern SemaphoreHandle_t xMutex;
char cTextBuffer[ 128 ];
for( ;; )
{
/* Generate the text string – this is a fast operation. */
vGenerateTextInALocalBuffer( cTextBuffer ); //快操作
/* Obtain the mutex that is protecting access to the display. */
xSemaphoreTake( xMutex, portMAX_DELAY );
/* Write the generated text to the display–this is a slow operation. */
vCopyTextToFrameBuffer( cTextBuffer ); //慢操作
/* The text has been written to the display, so return the mutex. */
xSemaphoreGive( xMutex );
}
}

我们使用快操作和慢操作区分该模板中的两个惭怍

任务A先执行快操作, 之后take互斥量

一个时间片过去, 切换到任务B

任务B执行完快操作, take互斥量失败, 进入阻塞态, 在时间片结束之前切换到任务A

任务A执行完慢操作且give互斥量, 任务B回到就绪态

由于优先级相同, 任务B无法抢占任务A

任务A继续执行完快操作然后take互斥量, 时间片结束切换到任务B

任务B take互斥量失败, 又回到阻塞态

14

任务A的占用时间远大于任务B, 这是不合理的

解决方法是在合适的时机使用taskYIELD()主动切换上下文

手册给出的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void vFunction( void *pvParameter )
{
extern SemaphoreHandle_t xMutex;
char cTextBuffer[ 128 ];
TickType_t xTimeAtWhichMutexWasTaken;
for( ;; )
{
/* Generate the text string – this is a fast operation. */
vGenerateTextInALocalBuffer( cTextBuffer );
/* Obtain the mutex that is protecting access to the display. */
xSemaphoreTake( xMutex, portMAX_DELAY );
/* Record the time at which the mutex was taken. */
xTimeAtWhichMutexWasTaken = xTaskGetTickCount();
/* Write the generated text to the display–this is a slow operation. */
vCopyTextToFrameBuffer( cTextBuffer );
/* The text has been written to the display, so return the mutex. */
xSemaphoreGive( xMutex );
/* If taskYIELD() was called on each iteration then this task would
only ever remain in the Running state for a short period of time,
and processing time would be wasted by rapidly switching between
tasks. Therefore, only call taskYIELD() if the tick count changed
while the mutex was held. */
if( xTaskGetTickCount() != xTimeAtWhichMutexWasTaken ) //如果结束时和获取互斥量时不位于同一时间片
{
taskYIELD();
}
}
}

当然这个示例并不能完全解决这个问题, 通过时间片来进行判断往往不够精确

但确实能在一定程度上缓解该问题

第四种方式则是使用gatekeeper任务

核心是通过唯一的一个任务来使用这个共享的资源, 任务间通过队列向GateKeeper任务传送数据

由GateKeeper任务来对共享的资源进行操作

有时为了更快的对数据进行处理通常会给GateKeeper任务一个较高的优先级, 这会导致GateKeeper任务延迟处理优先级较低的任务