使用 pygame 构建碰撞模拟器:碰撞检测

本文使用 pygame 中的 Group 类构建网格容器,使用网格算法实现在 30 FPS 下检测上万粒子之间的相互碰撞。

碰撞模拟器 - 碰撞检测 https://github.com/Newverse-Wiki/Code-for-Blog/tree/main/Pygame-On-Web/Collision-Simulator/GridGroup

总览

项目代码可从上述 GitHub - GridGroup 链接下载。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# 项目结构
$ tree GridGroup
GridGroup
├── build
│   └── web
│       ├── favicon.png
│       ├── gridgroup.apk
│       └── index.html
├── debug.py
├── game.py
├── grid.py
├── main.py
└── particle.py

# python 运行
$ python GridGroup/main.py

# pygbag 打包
$ pygbag GridGroup

build/web 目录下为 pygbag 打包好的网页项目,将该目录下的所有内容上传至网站服务器即可完成部署。

其中,debug.py 内容与该系列上一篇 使用 pygame 构建碰撞模拟器:粒子碰撞 中的内容一致。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# 引入 pygame 模块
import pygame
# 引入 asyncio 模块,game.py 中仅需 2 行与之相关的代码
import asyncio
import random
import math

# 引入 debug 模块,方便在游戏界面上输出调试信息
from debug import Debug
from particle import Particle
from grid import GridGroup

class Game:

    """Game 类承载游戏主循环

    Game(dims, FPS)

    定义游戏界面的尺寸 dims,游戏帧数 FPS,控制游戏流程

    """

    def __init__(self, dims, FPS = 60):
        self.dims = dims
        self.FPS  = FPS

        # 初始化pygame,预定义各种常量
        pygame.init()

    def generate(self, max_radius, groups, grid):

        """ 生成一个随机粒子

        generate(groups)

        生成粒子的 位置、速度、半径、密度均随机。

        """

        radius = random.randint(2, max_radius)
        x = random.randint(radius, self.dims[0] - radius)
        y = random.randint(radius, self.dims[1] - radius)

        speed = random.randint(100, 200)
        # 速度方向随机
        angle = random.random() * 2.0 * math.pi
        velocity = (speed * math.cos(angle), speed * math.sin(angle))

        density = random.randint(1, 20)

        grid.add2grid(Particle((x, y), velocity, radius, density, groups))

    # 游戏主循环所在函数需要由 async 定义
    async def start(self):
        # 初始化游戏界面(screen):尺寸、背景色等
        screen = pygame.display.set_mode(self.dims)
        screen_width, screen_height = self.dims
        screen_color = 'Black'

        debug = Debug(screen)

        # 初始化游戏时钟(clock),由于控制游戏帧率
        clock = pygame.time.Clock()

        max_radius = 5

        particles = pygame.sprite.Group()
        grid = GridGroup(max_radius * 2)
        self.generate(max_radius, [particles, grid], grid)
        # 记录并显示游戏中粒子总数
        total_num = 1

        # 游戏运行控制变量(gamen_running)
        # True:游戏运行
        # False:游戏结束
        game_running = True
        gridding = False
        # 游戏主循环
        while game_running:
            # 按照给定的 FPS 刷新游戏
            # clock.tick() 函数返回上一次调用该函数后经历的时间,单位为毫秒 ms
            # dt 记录上一帧接受之后经历的时间,单位为秒 m
            # 使用 dt 控制物体运动可以使游戏物理过程与帧率无关
            dt = clock.tick(self.FPS) / 1000.0
            # 使用 asyncio 同步
            # 此外游戏主体代码中不需要再考虑 asyncio
            await asyncio.sleep(0)

            # 游戏事件处理
            # 包括键盘、鼠标输入等
            for event in pygame.event.get():
                # 点击关闭窗口按钮或关闭网页
                if event.type == pygame.QUIT:
                    game_running = False
                if event.type == pygame.KEYDOWN:
                    # 按 P 键添加随机粒子
                    if event.key == pygame.K_p:
                        total_num += 100
                        for i in range(100):
                            self.generate(max_radius, [particles, grid], grid)
                    # 按 G 键切换碰撞检测算法
                    if event.key == pygame.K_g:
                        gridding = not gridding

            # 以背景色覆盖刷新游戏界面
            screen.fill(screen_color)

            if gridding or total_num > 801:
                # 调用 GridGroup 类的 update() 函数,更新粒子状态
                grid.update(dt)
                # 调用 GridGroup 类的 draw() 函数,绘制粒子
                grid.draw(screen)
                # 调用 debug 函数在游戏界面右上角显示碰撞检测算法
                debug.debug("Gridding", 'red', 'topright')
            else:
                list_particles = particles.sprites()
                # 每对粒子都进行碰撞检测
                for i, p1 in enumerate(list_particles):
                    # 每对粒子仅进行一次碰撞检测
                    for p2 in list_particles[i+1:]:
                        p1.collide(p2)

                # 调用 Group 类的 update() 函数,更新粒子状态
                particles.update(dt)
                # 调用 Group 类的 draw() 函数,绘制粒子
                particles.draw(screen)

            # 调用 debug 函数在游戏界面上方中间显示粒子个数
            debug.debug(total_num, 'white', 'midtop')
            # 调用 debug 函数在游戏界面左上角显示游戏帧率
            debug.debug(f"{clock.get_fps():.1f}", 'green')

            # 将游戏界面内容输出至屏幕
            pygame.display.update()

        # 当 game_running 为 False 时,
        # 跳出游戏主循环,退出游戏
        pygame.quit()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import pygame

class GridGroup(pygame.sprite.Group):
    # 初始化网格群组,网格大小为 box_size
    def __init__(self, box_size, *sprites):
        super().__init__(*sprites)

        self.width, self.height = pygame.display.get_window_size()
        self.size = box_size
        # 计算网格行列数
        self.Nrow = int(self.width  / self.size) + 1
        self.Ncol = int(self.height / self.size) + 1

        self.grid = []
        # 二维网格,每个网格为一个存放粒子的列表
        for row in range(self.Nrow):
            self.grid.append([])
            for col in range(self.Ncol):
                self.grid[row].append([])

    def add2grid(self, sprite):
        # 根据粒子的位置,将粒子放入相应的网格中
        row = int(sprite.position.x / self.size)
        col = int(sprite.position.y / self.size)
        self.grid[row][col].append(sprite)

    def in_grid(self, row, col):
        # 判断行列坐标是否超出网格上下限
        if (row < 0) or (col < 0) or (row >= self.Nrow) or (col >= self.Ncol):
            return False
        else:
            return True

    def regrid(self):
        # 粒子移动之后,进行碰撞检测之前,更新粒子所处的网格
        for row in range(self.Nrow):
            for col in range(self.Ncol):
                box = self.grid[row][col]
                # outside 列表存放离开网格的粒子
                outside = []
                for sprite in box:
                    now_row = int(sprite.position.x / self.size)
                    now_col = int(sprite.position.y / self.size)
                    # 判断粒子是否离开原先的网格
                    if (now_row != row) or (now_col != col):
                        # 不要在 for 循环中增删循环列表中的元素
                        outside.append(sprite)
                        # 更新粒子所处的网格
                        self.grid[now_row][now_col].append(sprite)
                # for 循环结束之后删除离开网格的粒子
                for sprite in outside:
                    box.remove(sprite)

    def collide(self):
        # 只需检测右下方向网格中的粒子
        adjoin = ((1, -1), (1, 0), (1, 1), (0, 1))
        for row in range(self.Nrow):
            for col in range(self.Ncol):
                box = self.grid[row][col]

                for i, p1 in enumerate(box):
                    # 检测同一网格中的粒子
                    for p2 in box[i+1:]:
                        p1.collide(p2)

                    # 检测相邻网格中的粒子
                    for (drow, dcol) in adjoin:
                        next_row = row + drow
                        next_col = col + dcol
                        # 判断行列坐标是否超出网格上下限
                        if self.in_grid(next_row, next_col):
                            next_box = self.grid[next_row][next_col]
                            for p2 in next_box:
                                p1.collide(p2)

    def update(self, dt):
        # 更新粒子位置
        for sprite in self.sprites():
            sprite.update(dt)
        # 更新粒子所处的网格
        self.regrid()
        # 进行碰撞检测和处理
        self.collide()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
"""pygbag 打包游戏内容时要求游戏以 main.py 为入口

将涉及到与 asyncio 相关的代码均放置在 main.py 中,
后续代码中无序考虑 asyncio 异步处理、等待等问题。

"""

# 网页运行 python 需要引入 asyncio
import asyncio

# 游戏主代码存放在 game.py 中,作为模块引入
from game import Game

# main() 主函数需要由 async 定义
async def main():
    # 游戏主体为 Game 类的实例
    # 初始化时确定游戏界面的尺寸、刷新率
    game = Game((1200, 800), FPS = 0)
    # 游戏的主循环(while True)所在函数的调用需有 await
    # 相应的游戏主循环所在函数也需要由 async 定义
    await game.start()

if __name__ == "__main__":
    # 经由 asyncio.run() 调用 main() 主函数
    asyncio.run(main())
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import pygame

class Particle(pygame.sprite.Sprite):
    def __init__(self, position, velocity, radius, density, groups):
        super().__init__(groups)

        self.position = pygame.Vector2(position)
        self.velocity = pygame.Vector2(velocity)

        self.radius  = radius
        self.density = density
        self.mass    = density * radius ** 2
        
        self.elasticity = 0.95

        # 密度越大,蓝色越深
        rg_value = 200 - density * 10
        self.color = pygame.Color(rg_value, rg_value, 255)

        self.image = pygame.Surface((2 * radius, 2 * radius))
        self.image.fill('black')
        self.image.set_colorkey('black')
        pygame.draw.circle(self.image, self.color, (radius, radius), radius)
        self.rect = self.image.get_rect(center = self.position)

    def bounce(self):
        screen_width, screen_height = pygame.display.get_window_size()

        bound_left   = self.radius
        bound_right  = screen_width - self.radius
        bound_top    = self.radius
        bound_bottom = screen_height - self.radius

        if self.position.x < bound_left:
            self.position.x = 2.0 * bound_left - self.position.x
            if self.velocity.x < 0:
                self.velocity.x *= -1.0
        elif self.position.x > bound_right:
            self.position.x = 2.0 * bound_right - self.position.x
            if self.velocity.x > 0:
                self.velocity.x *= -1.0

        if self.position.y < bound_top:
            self.position.y = 2.0 * bound_top - self.position.y
            if self.velocity.y < 0:
                self.velocity.y *= -1.0
        elif self.position.y > bound_bottom:
            self.position.y = 2.0 * bound_bottom - self.position.y
            if self.velocity.y > 0:
                self.velocity.y *= -1.0

    def collide(self, p2):
        p1 = self

        separation  = p1.position - p2.position
        overlap = p1.radius + p2.radius - separation.length()

        if overlap > 0:
            m1 = p1.mass
            m2 = p2.mass
            r1 = p1.position
            r2 = p2.position
            v1 = p1.velocity
            v2 = p2.velocity

            v1n = v1 - 2 * m2 / (m1 + m2) * (v1 - v2).dot(r1 - r2) / (r1 - r2).length_squared() * (r1 - r2)
            v2n = v2 - 2 * m1 / (m1 + m2) * (v2 - v1).dot(r2 - r1) / (r2 - r1).length_squared() * (r2 - r1)

            p1.velocity = v1n * self.elasticity
            p2.velocity = v2n * self.elasticity

            separation.scale_to_length(overlap)
            p1.position += 0.5 * separation
            p2.position -= 0.5 * separation

    def update(self, dt):
        self.position += self.velocity * dt
        self.bounce()
        self.rect.center = self.position

参数调整

为容纳更多粒子,将游戏界面尺寸扩大为 1200x800,同时将 FPS 设置为 0,即不限制帧率:

18
    game = Game((1200, 800), FPS = 0)

同时为了避免因粒子数过多,系统总能量过大从而导致小质量粒子的速度过大,在两帧之间粒子互相穿过而错过碰撞检测。

我们在粒子碰撞过程中引入弹性系数 elasticity,使碰撞后粒子的速度有小幅损失,从而使粒子以及系统的能量随碰撞次数上升而降低,保证粒子速度满足碰撞检测限制。

14
        self.elasticity = 0.95

弹性系数 elasticity 直接影响碰撞后粒子的速度:

69
70
            p1.velocity = v1n * self.elasticity
            p2.velocity = v2n * self.elasticity

网格算法

由于粒子只有可能跟与自己距离很近的其它粒子发生碰撞,碰撞过程属于近距相互作用。 在进行粒子间碰撞检测时,只需判断和比较空间上距离较近的粒子的间距,因此适合用网格法对粒子进行定位。

将粒子置于网格之中

将整个屏幕分成若干大小相等的正方形网格,网格边长尺寸为粒子的最大直径,即保证在某一网格中的粒子只可能与相邻网格内的粒子发生碰撞。 因此在进行碰撞检测的时候只需考虑和比较相邻网格内的粒子,无需比较所有粒子,将碰撞检测的时间复杂度由 $O(n^2)$ 降低为 $O(n)$。

考虑到每对粒子之间仅需要进行一次碰撞处理,给定网格中的粒子无需对相邻的 8 个网格都进行检测,仅需检测其中 4 个网格中的粒子。

相邻网格检测

GridGroup 类

GridGroup 类继承自 pygame.sprite.group 类,可以使用 group 基类提供的增删、绘制 sprites 的函数,同时根据网格碰撞检测的算法需求自定义更新粒子状态的 update() 函数。

初始化

使用模拟中粒子的最大直径作为网格的尺寸,将整个游戏界面分隔为二维网格,每个网格为一个列表,用于存放其中的粒子。

理想模拟
最理想的模拟中所有粒子大小相等,网格大小为粒子直径,每个网格中最多只能容纳一个粒子。 此时网格中粒子处理和碰撞检测会最大程度的简化。

这里每个网格中可以存放大小不同的多个粒子。

 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class GridGroup(pygame.sprite.Group):
    # 初始化网格群组,网格大小为 box_size
    def __init__(self, box_size, *sprites):
        super().__init__(*sprites)

        self.width, self.height = pygame.display.get_window_size()
        self.size = box_size
        # 计算网格行列数
        self.Nrow = int(self.width  / self.size) + 1
        self.Ncol = int(self.height / self.size) + 1

        self.grid = []
        # 二维网格,每个网格为一个存放粒子的列表
        for row in range(self.Nrow):
            self.grid.append([])
            for col in range(self.Ncol):
                self.grid[row].append([])

生成

为直观比较两种碰撞检测算法,我们将粒子加入到两个 Group 类的实例中,其中 particles 采用本系列上一篇采用的碰撞检测方法,grid 采用网格碰撞检测算法。

67
68
69
        particles = pygame.sprite.Group()
        grid = GridGroup(max_radius * 2)
        self.generate(max_radius, [particles, grid], grid)

相应的 generate 函数也做了修改:

30
31
32
33
34
35
36
37
38
39
40
41
42
    def generate(self, max_radius, groups, grid):
        radius = random.randint(2, max_radius)
        x = random.randint(radius, self.dims[0] - radius)
        y = random.randint(radius, self.dims[1] - radius)

        speed = random.randint(100, 200)
        # 速度方向随机
        angle = random.random() * 2.0 * math.pi
        velocity = (speed * math.cos(angle), speed * math.sin(angle))

        density = random.randint(1, 20)

        grid.add2grid(Particle((x, y), velocity, radius, density, groups))
  • grid.add2grid(Particle()) 将 Particle 类生成的实例加入到 grid.grid[][] 网格中。
  • Particle(groups = [particles, grid]) 将 Particle 类生成的实例以默认方式加入到 grid.sprites() 列表中。

添加粒子

根据粒子坐标位置将粒子放入相应坐标的网格列表中。

21
22
23
24
25
    def add2grid(self, sprite):
        # 根据粒子的位置,将粒子放入相应的网格中
        row = int(sprite.position.x / self.size)
        col = int(sprite.position.y / self.size)
        self.grid[row][col].append(sprite)

更新网格

粒子移动位置之后,需要先根据粒子目前的位置更新粒子所处的网格,再进行碰撞检测。

34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
    def regrid(self):
        # 粒子移动之后,进行碰撞检测之前,更新粒子所处的网格
        for row in range(self.Nrow):
            for col in range(self.Ncol):
                box = self.grid[row][col]
                # outside 列表存放离开网格的粒子
                outside = []
                for sprite in box:
                    now_row = int(sprite.position.x / self.size)
                    now_col = int(sprite.position.y / self.size)
                    # 判断粒子是否离开原先的网格
                    if (now_row != row) or (now_col != col):
                        # 不要在 for 循环中增删循环列表中的元素
                        outside.append(sprite)
                        # 更新粒子所处的网格
                        self.grid[now_row][now_col].append(sprite)
                # for 循环结束之后删除离开网格的粒子
                for sprite in outside:
                    box.remove(sprite)

碰撞检测

只需检测粒子所在网格及其右下方向相邻网格中的粒子即可。 调用 Particle.collide(particle) 函数判断两个粒子是否发生碰撞并进行碰撞之后的处理。

54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
    def collide(self):
        # 只需检测右下方向网格中的粒子
        adjoin = ((1, -1), (1, 0), (1, 1), (0, 1))
        for row in range(self.Nrow):
            for col in range(self.Ncol):
                box = self.grid[row][col]

                for i, p1 in enumerate(box):
                    # 检测同一网格中的粒子
                    for p2 in box[i+1:]:
                        p1.collide(p2)

                    # 检测相邻网格中的粒子
                    for (drow, dcol) in adjoin:
                        next_row = row + drow
                        next_col = col + dcol
                        # 判断行列坐标是否超出网格上下限
                        if self.in_grid(next_row, next_col):
                            next_box = self.grid[next_row][next_col]
                            for p2 in next_box:
                                p1.collide(p2)

在检测相邻网格时,需注意网格坐标是否超出网格上下限:

27
28
29
30
31
32
    def in_grid(self, row, col):
        # 判断行列坐标是否超出网格上下限
        if (row < 0) or (col < 0) or (row >= self.Nrow) or (col >= self.Ncol):
            return False
        else:
            return True

update 函数

在 Group 类的 update 函数中,调用所有 particle 的 update 函数更新粒子坐标位置之后,先更新粒子所处的网格,再进行碰撞检测。

以本系列上一篇 使用 pygame 构建碰撞模拟器:粒子膨胀 # Group 类 中提及的三种常规方式将 Sprite 加入 Group 后,可调用 Group.sprites() 函数获取 Group 中存放的所有 Sprite 的列表。

76
77
78
79
80
81
82
83
    def update(self, dt):
        # 更新粒子位置
        for sprite in self.sprites():
            sprite.update(dt)
        # 更新粒子所处的网格
        self.regrid()
        # 进行碰撞检测和处理
        self.collide()

添加粒子

P 键在游戏中添加 100 个随机粒子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
        # 记录并显示游戏中粒子总数
        total_num = 1

        while game_running:
            # 游戏事件处理
            # 包括键盘、鼠标输入等
            for event in pygame.event.get():
                if event.type == pygame.KEYDOWN:
                    # 按 P 键添加随机粒子
                    if event.key == pygame.K_p:
                        total_num += 100
                        for i in range(100):
                            self.generate(max_radius, [particles, grid], grid)

            # 调用 debug 函数在游戏界面上方中间显示粒子个数
            debug.debug(total_num, 'white', 'midtop')

切换检测算法

G 键切换游戏中粒子碰撞检测的算法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
        gridding = False

        while game_running:
            # 游戏事件处理
            # 包括键盘、鼠标输入等
            for event in pygame.event.get():
                if event.type == pygame.KEYDOWN:
                    # 按 G 键切换碰撞检测算法
                    if event.key == pygame.K_g:
                        gridding = not gridding

当游戏中粒子数过多时,一般算法帧率过低,将自动启用网格算法:

108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
            if gridding or total_num > 801:
                # 调用 GridGroup 类的 update() 函数,更新粒子状态
                grid.update(dt)
                # 调用 GridGroup 类的 draw() 函数,绘制粒子
                grid.draw(screen)
                # 调用 debug 函数在游戏界面右上角显示碰撞检测算法
                debug.debug("Gridding", 'red', 'topright')
            else:
                list_particles = particles.sprites()
                # 每对粒子都进行碰撞检测
                for i, p1 in enumerate(list_particles):
                    # 每对粒子仅进行一次碰撞检测
                    for p2 in list_particles[i+1:]:
                        p1.collide(p2)

                # 调用 Group 类的 update() 函数,更新粒子状态
                particles.update(dt)
                # 调用 Group 类的 draw() 函数,绘制粒子
                particles.draw(screen)

算法帧率比较

  • 无任何优化的一般算法的帧率随粒子数 $n$ 增多迅速降低,帧率与粒子数的平方成反比,拟合曲线为:
    $$ \rm{FPS} = \frac{10^7}{(n + 40)^2} $$
  • 网格算法的帧率随粒子数 $n$ 增多线性降低,拟合曲线为:
    $$ \rm{FPS} = 450 - 0.17n $$

游戏帧率表现基本符合算法时间复杂度预期。

上万粒子碰撞检测
上万粒子碰撞检测

【参考】:

0%