
公司业务部门经常会需要导出各种数据进行分析,因为需要的数据模板多变,产品一直没有将其规划成系统功能,每次需要的数据都是手写 SQL 语句进行导出。在写了上万行 SQL 语句以后,我对这种体力劳动感觉到了厌烦,于是和领导申请,自己利用空闲时间规划开发了一个灵活配置、适用性强的导表系统。



为了尽快实现功能,导出 Excel 功能是做的同步导出,但在我的设计中应该是异步导出,因此在后续的优化中引入了 RabbitMQ 消息队列实现异步导出功能。

因为不能将 Db 对象投递到消息队列中,所以投递过去的是 SQL 的预处理语句以及对应的变量,使用Db::select($sql, $bindings);执行 SQL 。然后尝试查询百万条数据,直接就内存溢出了。

vendor/hyperf/database/src/Connection.php 271 行

 * Run a select statement against the database.
public function select(string $query, array $bindings = [], bool $useReadPdo = true): array
    return $this->run($query, $bindings, function ($query, $bindings) use ($useReadPdo) {
        if ($this->pretending()) {
            return [];

        // For select statements, we'll simply execute the query and return an array
        // of the database result set. Each element in the array will be a single
        // row from the database table, and will either be an array or objects.
        $statement = $this->prepared($this->getPdoForSelect($useReadPdo)

        $this->bindValues($statement, $this->prepareBindings($bindings));


        return $statement->fetchAll();



vendor/hyperf/database/src/Connection.php 295 行

 * Run a select statement against the database and returns a generator.
public function cursor(string $query, array $bindings = [], bool $useReadPdo = true): \Generator
    $statement = $this->run($query, $bindings, function ($query, $bindings) use ($useReadPdo) {
        if ($this->pretending()) {
            return [];

        // First we will create a statement for the query. Then, we will set the fetch
        // mode and prepare the bindings for the query. Once that's done we will be
        // ready to execute the query against the database and return the cursor.
        $statement = $this->prepared($this->getPdoForSelect($useReadPdo)


        // Next, we'll execute the query against the database and return the statement
        // so we can return the cursor. The cursor will use a PHP generator to give
        // back one row at a time without using a bunch of memory to render them.

        return $statement;

    while ($record = $statement->fetch()) {
        yield $record;



使用此方法配合 xlsWriter 的固定内存模式导出的话,应该就可以完成百万数据的导出了。代码如下:

$config = [
    'path' => BASE_PATH . "/public/export" // xlsx文件保存路径
$excel = new Excel($config);
$name = md5(Carbon::now()->toDateTimeString() . Str::random());
$excel = $excel->constMemory("$name.xlsx", "Sheet1", false);
$header = [];
$row = 0;
$sheet = 2;
foreach (Db::connection('export')->cursor($sql, $bindings) as $item) {
    $item = get_object_vars($item);
    $header = $header ?: array_keys($item);
    if ($row === 1048576) { // 超过最大行 新增工作簿
        $excel = $excel->addSheet("Sheet$sheet");
        $row = 0;
    if ($row === 0) { // 写入标题
        $excel = $excel->header($header);
    $column = 0;
    foreach ($item as $v) {
        $excel = $excel->insertText($row, $column, $v);


尝试导出 140w 数据,发现代码报错了,但是 Excel 文件成功导出了。消费者成功回复了 ack ,但是 MQ 却又一次推送了消费消息,就这样一直死循环的导出文件。在根据报错信息,调试查看源码半天之后,我的 C 盘空间被干满了😢,猜测应该是 Docker 日志太多造成的,在将所有容器和镜像删除之后,C 盘占用就下去了,并将镜像文件移至 D 盘保存。


再次构建好环境复现以后,去查看了一下 MQ 的日志输出,发现很明显的提示了报错信息。原因就是 MQ 没有如期收到心跳包,消费者错过了心跳,导致 MQ 认为消费者已经不可用了,断开了链接并重新推送了消费消息


vendor/hyperf/amqp/src/AMQPConnection.php 308 行

protected function heartbeat(): void
    if (! $this->enableHeartbeat && $this->getHeartbeat() > 0) {
        $this->enableHeartbeat = true;

        Coroutine::create(function () {
            while (true) {
                if (CoordinatorManager::until(Constants::WORKER_EXIT)->yield($this->getHeartbeat())) {
                    $this->exited = true;

                try {
                    // PING
                    if ($this->isConnected() && $this->chan->isEmpty()) {
                        $pkt = new AMQPWriter();
                        $this->chan->push($pkt->getvalue(), 0.001);
                } catch (\Throwable $exception) {
                    $this->logger && $this->logger->error((string) $exception);

虽然 Swoole 的一键协程化 hook 掉了文件相关的函数,可以将 PHP 的文件操作函数从同步 IO 转化为异步 IO ,但是我们导出使用的 xlsWriter 是 C 扩展实现的,所以产生的文件 IO 并不能被 Swoole 给 hook 掉,依然是同步 IO ,这将会阻塞起心跳协程,使其不能按时发送心跳数据包,导致 MQ 端口链接。




$db = Db::connection('export');
$sql = $data['sql'];
$bindings = $data['bindings'];
$pageSize = 1000000;
$total = $db->select("select count(*) as total from (" . $sql . ") as s", $bindings)[0]->total;
$pageNum = ceil($total / $pageSize);

$name = md5(Carbon::now()->toDateTimeString() . Str::random());
$excel = $excel->constMemory("$name.xlsx", "Sheet1", false);
$header = [];
$row = 0;
$sheet = 2;
for ($page = 0; $page < $pageNum; $page++) {
    foreach ($db->cursor("$sql limit " . $page * $pageSize . ",$pageSize", $bindings) as $item) {
        $item = get_object_vars($item);
        $header = $header ?: array_keys($item);
        if ($row === 1048576) {
            $excel = $excel->addSheet("Sheet$sheet");
            $row = 0;
        if ($row === 0) {
            $excel = $excel->header($header);
        $column = 0;
        foreach ($item as $v) {
            $excel = $excel->insertText($row, $column, $v);

if ($excel->addSheet("Sheet$sheet")
    ->output()) {
    $export->status = 1;
} else {
    $export->status = -1;

再次尝试导出成功了,不出所料 MQ 又断开了,不得不说 xlsWriter 的固定内存模式导出真的厉害,一千万条数据也可以导出来👍。