准备

MySQL内核版本: 8.0.19

simulated-AIO

simulated-AIO 是一套由 InnoDB 早先实现的异步 I/O 模型. 在 MySQL 的存储引擎 InnoDB 中分别实现了同步IO以及异步IO, Redo Log 的写入方式采用同步IO, 而数据页的写入由于 Redo Log 的保护则采用异步 IO 的写入方式. 在 Linux AIO 引入之前, InnoDB 实现了一套异步 IO 框架, 即 simulated-AIO. simulated-AIO 的原理类似于 libaio, 原理实现都较为简单.

在Linux平台, 假如安装了 libaio, MySQL 是默认使用 libaio, 只有在设置了innodb_use_native_aio = 0的情况下才会使用 simulated-AIO.

InnoDB的异步IO主要是用来处理预读和数据Page的写请求,对于正常Page的数据读取则是通过同步 IO 进行.

simulated-AIO 原理

数据结构

simulated-AIO 预分配 n 个大小 slot 数组, 每个用户的读写请求通过申请数组中的 slot, 构造对应的 IO 类型、写入 offset 等等. 而 simulated-AIO 的工作线程则根据slot的内容来完成对应的 IO 请求.

/** The asynchronous I/O context */
/** 异步 IO 请求单元 */
struct Slot {
  /** 在 array 中的下标 */
  uint16_t pos{0};

  /** 是否已被申请分配 */
  bool is_reserved{false};

  /** 已被分配的时间长度 */
  ib_time_monotonic_t reservation_time{0};

  /** buffer used in i/o */
  byte *buf{nullptr};

  /** Buffer pointer used for actual IO. We advance this
  when partial IO is required and not buf */
  byte *ptr{nullptr};

  /** IO 类型 OS_FILE_READ or OS_FILE_WRITE */
  IORequest type{IORequest::UNSET};

  /** 在文件中的偏移量 */
  os_offset_t offset{0};

  /** 文件描述符 */
  pfs_os_file_t file{
#ifdef UNIV_PFS_IO
      nullptr,  // m_psi
#endif
      0  // m_file
  };

  /** 文件名 */
  const char *name{nullptr};

  /** IO 是否已经完成 */
  bool io_already_done{false};

  /** fil_node_t 节点 参考 Fil_system */
  fil_node_t *m1{nullptr};

  /** the requester of an aio operation and which can be used
  to identify which pending aio operation was completed */
  void *m2{nullptr};

  /** AIO 状态 */
  dberr_t err{DB_ERROR_UNSET};

  /** ... */

  /** 读写的 block 长度 */
  ulint len{0};

  /** 读写字节数 */
  ulint n_bytes{0};

  /** 读写的 block 压缩前的长度 */
  uint32 original_len{0};

  /** block */
  Block *buf_block{nullptr};

  /** ... */
};

simulated-AIO 原理非常简单,可以理解为一个生产者-消费者模型, 示意图如下:

simulated_aio

生产者(用户读写流程)

  • buf_page_get_gen()(预读):
 /* 获取数据页 */
 --------------------
| buf_page_get_gen() |
 --------------------
  |
  |    /* ... */
  |    ---------------------------------
  --> | Buf_fetch_normal::single_page() |
       ---------------------------------
         |
         |    /* 调用线性预读 */
         |    -------------------------
         --> | buf_read_ahead_linear() |
              -------------------------
               |
               |   /* 读Page */
               |   ---------------------
               -> | buf_read_page_low() |
                   ---------------------
                    |
                    |    /* 文件读写操作 */
                    |    ----------
                    --> | fil_io() |
                         ----------
                          |
                          |    ----------------
                          --> | shard->do_io() |
                               ----------------
                                |
                                |   /* 异步 IO 接口 */
                                |    ----------
                                --> | os_aio() |
                                     ----------
  • buf_flush_page()(写):
 /* 刷 Page 至文件 */
 ------------------
| buf_flush_page() |
 ------------------
  |
  |    /* 刷 Page */
  |    -----------------------------
  --> | buf_flush_write_block_low() |
       -----------------------------
        |
        |    ----------
        --> | fil_io() |
             ----------
              |
              |    ----------------
              --> | shard->do_io() |
                   ----------------
                    |
                    |    /* 异步IO接口 */
                    |    ----------
                    --> | os_aio() |
                         ----------

无论是读操作还是写操作,都要交由os_aio()处理, os_aio是一个通用的接口, 在Linux平台封装了 libaio 和 simulated AIO. 具体的处理逻辑如下:

 ----------
| os_aio() |
 ----------
  |
  |
  |    /* 申请 slot */
  |    ---------------------
  --> | AIO::reserve_slot() |
  |     --------------------
  |
  |    /* 唤醒 simulated-AIO 后台处理线程 */
  |    --------------------------------------
  --> | AIO::wake_simulated_handler_thread() |
       --------------------------------------
  • 根据IO类型选择对应的 I/O slot 数组(select_slot_array()).

  • 向 I/O slot 数组申请 slot (reserve_slot()).

  • 唤醒对应的异步IO线程处理IO请求(AIO::wake_simulated_handler_thread()).

消费者(异步I/O处理流程)

在MySQL启动时,会分别创建1个ibuf处理线程, 1个log处理线程, n个(srv_n_read_io_threads)读处理线程, n个(srv_n_write_io_threads)写处理线程.


 /* DB启动 */
 -------------
| srv_start() |
 -------------
     |
     |    /* 根据 srv_n_file_io_threads 参数创建 IO 处理线程 */
     |    ---------------------
     --> | io_handler_thread() |
          ---------------------
            |
            |   /* 监控异步 IO 请求 */
            |   ----------------
            -> | fil_aio_wait() |
                ----------------
                  |
                  |    /* 根据设定的 AIO mode 选择不同的AIO处理函数 */
                  |    ------------------
                  --> | os_aio_handler() |
                       ------------------
                         |
                         |    /* simulated-AIO 负责处理异步IO的函数 */
                         |    ----------------------------
                         --> | os_aio_simulated_handler() |
                         |     ----------------------------
                         |
                         |    /* 异步 IO 完成后的清理工作 */
                         |    ------------------------
                         --> | buf_page_io_complete() |
                              ------------------------

io_handler_thread()会持续监控 IO 请求,直到 MySQL shutdown:

/* storage/innobase/srv/srv0start.cc */

static void io_handler_thread(ulint segment) {
    while (srv_shutdown_state.load() != SRV_SHUTDOWN_EXIT_THREADS ||
                    buf_flush_page_cleaner_is_active() || !os_aio_all_slots_free()) {
          fil_aio_wait(segment);
    }
}

fil_aio_wait()会调用os_aio_handler()根据不同的IO模型选择不同的函数处理IO请求, simulated AIO 的处理函数是os_aio_simulated_handler():

  1. 根据 global segment id 选择对应I/O工作线程的event, 计算在该array的segment id.

  2. 检查是否有已经完成但状态尚未更新的IO请求:

  • 假如存在已经完成但状态尚未更新的IO请求, 则调用AIO::release()更新slot状态.
  1. 需要判断是否MySQL准备shutdown, 假如需要shutdown则立即返回.

  2. 否则从AIO::m_slots选择等待的IO请求:

  • 选择策略是先选择一个等待时间超过2s的IO请求, 防止等待时间过长.

  • 否则选择写入偏移量最小的一个slot.

  1. 假如目前没有待处理的IO请求,则进入wait状态.

  2. 处理选中的IO请求前,会调用merge()进行IO合并, 选择文件偏移量offset连续的IO请求进行合并.

  3. 调用 simulated-AIO 封装的同步IO接口(pwrite()/pread())完成IO操作.

源码分析

核心处理函数os_aio_simulated_handler():

/* storage/innobase/os/os0file.cc */

/* 参数解释: 
  global_segment:
  m1:
  m2:
  type: 
*/
static dberr_t os_aio_simulated_handler(ulint global_segment, fil_node_t **m1,
                                        void **m2, IORequest *type) {
  Slot *slot;
  AIO *array;
  ulint segment;
  os_event_t event = os_aio_segment_wait_events[global_segment];

  /* 计算对应的子segment */
  segment = AIO::get_array_and_local_segment(&array, global_segment);

  /* 构造 simulated-AIO 的 handler */
  SimulatedAIOHandler handler(array, segment);

  for (;;) {
    srv_set_io_thread_op_info(global_segment, "looking for i/o requests (a)");

    /* 检查目前的 slots 数量 */
    ulint n_slots = handler.check_pending(global_segment, event);

    if (n_slots == 0) {
      continue;
    }

    /* 初始化 handler */
    handler.init(n_slots);

    srv_set_io_thread_op_info(global_segment, "looking for i/o requests (b)");

    array->acquire();

    ulint n_reserved;

    /* 检查是否有已经完成但状态尚未更新的IO请求 */
    slot = handler.check_completed(&n_reserved);

    if (slot != NULL) {
      /* 存在已完成但状态未更新的slot */
      break;

    } else if (n_reserved == 0
#ifndef UNIV_HOTBACKUP
               && !buf_flush_page_cleaner_is_active() &&
               srv_shutdown_state.load() == SRV_SHUTDOWN_EXIT_THREADS
#endif /* !UNIV_HOTBACKUP */
    ) {

      /* 目前没有待处理的 IO 请求,并且 MySQL 准备 shutdown, 则返回 */
      array->release();

      *m1 = NULL;

      *m2 = NULL;

      return (DB_SUCCESS);

    } else if (handler.select()) {
      /* 否则根据 slot 选择策略,选择对应的 slot */
      break;
    }

    /* 假如目前没有待处理的IO请求,则进入wait状态 */

    srv_set_io_thread_op_info(global_segment, "resetting wait event");

    /* We wait here until tbere are more IO requests
    for this segment. */

    os_event_reset(event);

    array->release();

    srv_set_io_thread_op_info(global_segment, "waiting for i/o request");

    os_event_wait(event);
  }

  /** Found a slot that has already completed its IO */

  if (slot == NULL) {
    /* slot == NULL 代表所有已完成的 slot 状态都已经更新,并且我们通过
     * select() 选择了合适的 slot 需要完成 I/O 处理 */

    /* 合并 I/O 操作 */
    handler.merge();

    srv_set_io_thread_op_info(global_segment, "consecutive i/o requests");

    array->release();

    srv_set_io_thread_op_info(global_segment, "doing file i/o");

    /* IO 操作(pwrite()/pread()) */
    handler.io();

    srv_set_io_thread_op_info(global_segment, "file i/o done");

    /* simulated-AIO 中 io_complete() 为空实现 */
    handler.io_complete();

    array->acquire();

    /* 设置 slot->io_already_done = true 即表示已完成,但其他状态尚未更新, 交由下次
     * 循环更新其他状态 */
    handler.done();

    /* 返回 handler 的第一个 slot */
    slot = handler.first_slot();
  }

  /* 更新 slot 的状态 */
  ut_ad(slot->is_reserved);

  *m1 = slot->m1;
  *m2 = slot->m2;

  *type = slot->type;

  array->release(slot);

  array->release();

  return (DB_SUCCESS);
}

Q & A

  • 关于 simulated AIO 多个线程同时写入一个文件的问题?

simulated AIO 不能保证多线程同时写一个文件, 但 simulated AIO 底层调用的文件接口是 pwrite(), 通过指定参数 offset, 以及每次写的时候加上 Page 锁, 就能保证不写在同一个 offset.

总结

综上所述,通过源码分析我们详细的了解 MySQL 实现的模拟异步 I/O 的框架, 原理非常简单,由用户线程获取 slot 并记录相关的 I/O 信息,而 simulated-AIO 的后台工作线程则通过一定的策略来逐一处理 I/O 请求, 并且通过合并 I/O 的策略来对 I/O 读写做了一些优化.