跳转到内容

通过 Connect 批量生成 PDF 并跟踪进度

通过 NextPDF Connect(此引擎的独立 HTTP 服务发行版),在单个客户端进程中把一份文档清单推进到全部完成。这份 recipe(示例)会把每项渲染请求提交到异步工作 endpoint(端点)POST /api/v1/jobs,以 GET /api/v1/jobs/{id} 轮询每个工作,直到其进入终止状态;同时读取服务器返回的每个工作 statusprogress 字段,并从 GET /api/v1/jobs/{id}/result 下载每个已完成的 PDF。

工作的生命周期固定且简短。一个工作先是 pending,接着变为 running,然后只会进入一个终止状态:completedfailed,或 cancelled。当服务器跟踪进度时,状态响应会带有一个 0 到 100 的 progress 整数;每次针对非终止工作的轮询响应都会带有 Retry-After 标头,告诉你下一次请求前应该等待多久。为每次提交附上 Idempotency-Key,这样重试提交会返回同一个工作,而不会启动第二次渲染。

这份 recipe 采用贴近传输层、直面 wire(连接层)的方式。它直接使用 REST 接口,不假设存在任何特定语言的 software development kit(SDK,软件开发工具包),因此同一套流程可以移植到任何 HTTP 客户端。

服务器端使用标准的 Connect 发行版:

Terminal window
composer require nextpdf/server

下方生产环境示例中的 PHP 客户端使用符合 PSR-18 与 PSR-17 的 Hypertext Transfer Protocol(HTTP,超文本传输协议)客户端与消息工厂。安装你的项目已标准采用的任一实现,例如:

Terminal window
composer require psr/http-client psr/http-factory

异步工作接口将 提交取回 分离。你不需要为每份文档分别维持一条长时间打开的 HTTP 连接。做法是提交一个工作、取得一组标识符,再轮询一个成本低廉的状态 endpoint,直到工作完成。正是这种形态让批量处理变得可控:客户端可以同时跟踪 N 个独立工作,而不必维持 N 条被阻塞的连接。

整个流程由三个 endpoint 承载:

  • POST /api/v1/jobs 接受与同步的 /api/v1/render endpoint 相同的渲染请求主体:一个 page_size、一个 orientation,以及一个有序的 operations 数组。对于新工作,它会返回 201 Created;当某个 Idempotency-Key 对应到你已提交过的工作时,则返回 200 OK
  • GET /api/v1/jobs/{id} 会返回当前的工作记录。对于非终止工作,它还会设置一个 Retry-After 标头(服务器采用 2 秒间隔)以及一个 poll_url 字段。请遵循这个标头,而不要在紧密循环中持续轮询。
  • GET /api/v1/jobs/{id}/result 会以 application/pdf 流式返回已完成的 PDF。只要工作尚未抵达 completed,它就会返回 409 Conflict;因此只在状态轮询确认进入终止状态后才调用它。

每个成功响应都共用同一个封装结构:一个带有工作字段的 data 对象,以及一个 meta 对象,其中含有 request_idtimestampduration_msapi_version。你会读取的工作字段都位于 data 下:data.statusdata.progressdata.job_id,以及已完成工作中的 data.result_url

关于当前版本,有一点需要说明。服务器会在响应 POST 之前先以内联方式处理提交的工作,所以实际提交响应可能已经带有终止的 status,结果也可能在第一次轮询时就已就绪。此处记录的轮询与进度契约就是稳定的 API 形态。当处理 backend(后端)转为基于队列的 worker 池时,服务器仍会维持这个契约不变,因此一个实现了轮询的客户端现在是正确的,在那项变更之后依然正确。请把轮询循环写出来。不要假设第一个响应是非终止状态,也不要假设它是终止状态。

如下是服务器 OpenAPI 文档与 JobHandler 路由所定义的 Connect 异步工作 REST 接口:

  • POST /api/v1/jobs:提交一个渲染工作。可选的 Idempotency-Key 请求标头。主体是一个渲染请求(operations 为必填,且至少需含一个操作)。响应:201 表示新工作、200 表示幂等重播、422 表示主体无效、409 表示幂等冲突、429 表示触发速率限制。
  • GET /api/v1/jobs/{id}:轮询状态。返回 200 并附上工作记录;非终止期间会带有 Retry-After 标头;若工作不存在或属于另一个客户端,则返回 404
  • GET /api/v1/jobs/{id}/result:下载 PDF。当工作为 completed 时,返回 200 application/pdf;尚未完成时返回 409;未知时返回 404
  • DELETE /api/v1/jobs/{id}:取消一个 pendingrunning 工作,或删除一个 completed 工作(204)。

工作记录在 data 下包含以下字段,字段形式与服务器序列化结果保持一致。

  • job_id:标识符(job_ 前缀加上 24 个十六进制字符)。
  • status:取值为 pendingrunningcompletedfailedcancelled 之一。前两者为非终止;后三者为终止。
  • created_at,以及设置后的 started_atcompleted_at:ISO-8601 时间戳。
  • progress:一个 0 到 100 的整数,仅在服务器为该工作跟踪进度时才存在;否则不存在(视为未知)。
  • error:消息字符串,仅在 failed 工作上才存在。
  • result_url:仅在 completed 工作上才存在;指向结果下载路径。
  • poll_url:仅在工作为非终止时才存在。

身份验证使用 Authorization 标头中的 bearer token(持有者令牌):Authorization: Bearer npk_live_{kid}_{secret}

这个示例会在 wire 层把单个工作从头到尾执行一遍,让你看清楚这三次调用以及它们返回的字段。它会提交、轮询一次,然后下载。下方的生产环境示例加上了批量循环、Retry-After 等待,以及完整的错误处理。

Terminal window
# 1. Submit an async render job. Capture the job_id from data.job_id.
curl -sS -X POST "$NEXTPDF_CONNECT_URL/api/v1/jobs" \
-H "Authorization: Bearer $NEXTPDF_CONNECT_TOKEN" \
-H 'Content-Type: application/json' \
-H "Idempotency-Key: invoice-2026-04-0001" \
-d '{"page_size":"A4","orientation":"portrait","operations":[{"type":"add_text","text":"Invoice 0001"}]}'
# 2. Poll status. Read data.status and data.progress; honour Retry-After.
curl -sS "$NEXTPDF_CONNECT_URL/api/v1/jobs/job_0123456789abcdef01234567" \
-H "Authorization: Bearer $NEXTPDF_CONNECT_TOKEN"
# 3. Once data.status is "completed", download the PDF binary.
curl -sS "$NEXTPDF_CONNECT_URL/api/v1/jobs/job_0123456789abcdef01234567/result" \
-H "Authorization: Bearer $NEXTPDF_CONNECT_TOKEN" \
-o invoice-0001.pdf

这是一个自包含的客户端。它会提交一批渲染请求,限制同时进行的工作数量,按照服务器通过 Retry-After 要求的节奏轮询每个工作,报告服务器返回的 progress,下载每个已完成的 PDF,并记录各项失败。它使用 PSR-18 HTTP 客户端与 PSR-17 工厂(Connect recipe 标准采用的传输契约),并为每次调用可能抛出的异常捕获最具体的类型:以 Psr\Http\Client\ClientExceptionInterface 对应传输失败,并以带类型的 BatchJobException 对应批量无法继续处理的服务器响应。没有任何 catch 块是空的。每个块都会记录并重新抛出,或记下一个明确定义的结果。

请把内联的 $documents 列表换成你自己的输入。在构造函数需要 PSR 接口的位置,注入你项目的具体 HTTP 客户端与工厂。

<?php
declare(strict_types=1);
require_once __DIR__ . '/vendor/autoload.php';
use Psr\Http\Client\ClientExceptionInterface;
use Psr\Http\Client\ClientInterface;
use Psr\Http\Message\RequestFactoryInterface;
use Psr\Http\Message\StreamFactoryInterface;
/**
* Raised when a Connect job response prevents the batch from proceeding.
*
* Distinct from the PSR-18 transport exception: this means the request was
* delivered and the server answered, but the answer is one the batch
* cannot act on (a non-success status code, or a job that ended in a
* terminal failure).
*/
final class BatchJobException extends RuntimeException
{
}
/**
* Drives a batch of async render jobs over the NextPDF Connect REST surface.
*
* The client submits each render request, polls every job on the cadence
* the server requests through Retry-After, and downloads each completed
* PDF. It enforces bounded concurrency so a large batch never opens more
* in-flight jobs than the host should track at once.
*/
final readonly class ConnectBatchRunner
{
/**
* @param non-empty-string $baseUrl Connect base URL, no trailing slash
* @param non-empty-string $bearerToken Connect API key (npk_live_...)
* @param positive-int $maxInFlight Concurrent jobs cap
* @param positive-int $maxPolls Per-job poll attempts before giving up
*/
public function __construct(
private ClientInterface $httpClient,
private RequestFactoryInterface $requestFactory,
private StreamFactoryInterface $streamFactory,
private string $baseUrl,
private string $bearerToken,
private int $maxInFlight = 8,
private int $maxPolls = 150,
) {}
/**
* Render every document in the batch and write each completed PDF.
*
* @param array<non-empty-string, array<string, mixed>> $documents
* Map of stable document key to render request body. The key
* doubles as the Idempotency-Key, so a re-run of the same batch
* does not duplicate server-side work.
* @param non-empty-string $outputDir Directory for the written PDFs
*
* @throws BatchJobException When the batch cannot proceed at all
* @throws ClientExceptionInterface When the transport cannot send a request
*
* @return array<non-empty-string, string> Map of document key to a
* human-readable outcome line
*/
public function run(array $documents, string $outputDir): array
{
$this->assertWritableDir($outputDir);
$outcomes = [];
// Process in bounded windows so the in-flight job count never
// exceeds the configured cap, regardless of batch size.
foreach (array_chunk($documents, $this->maxInFlight, preserve_keys: true) as $window) {
$jobIds = [];
foreach ($window as $key => $body) {
$jobIds[$key] = $this->submit($key, $body);
}
foreach ($jobIds as $key => $jobId) {
$record = $this->pollToTerminal($jobId);
$outcomes[$key] = $this->finish($key, $record, $outputDir);
}
}
return $outcomes;
}
/**
* Submit one render job and return its identifier.
*
* @param non-empty-string $idempotencyKey Stable per-document key
* @param array<string, mixed> $body Render request body
*
* @throws BatchJobException
* @throws ClientExceptionInterface
*
* @return non-empty-string The job_id from data.job_id
*/
private function submit(string $idempotencyKey, array $body): string
{
$request = $this->requestFactory
->createRequest('POST', $this->baseUrl . '/api/v1/jobs')
->withHeader('Authorization', 'Bearer ' . $this->bearerToken)
->withHeader('Content-Type', 'application/json')
->withHeader('Idempotency-Key', $idempotencyKey)
->withBody($this->streamFactory->createStream($this->encode($body)));
$response = $this->httpClient->sendRequest($request);
$status = $response->getStatusCode();
// 201 new job; 200 idempotent replay. Anything else stops the batch.
if ($status !== 201 && $status !== 200) {
throw new BatchJobException(
sprintf('Submit for "%s" returned HTTP %d.', $idempotencyKey, $status),
);
}
$data = $this->decodeData($response->getBody()->__toString());
$jobId = $data['job_id'] ?? null;
if (!is_string($jobId) || $jobId === '') {
throw new BatchJobException(
sprintf('Submit for "%s" returned no job_id.', $idempotencyKey),
);
}
return $jobId;
}
/**
* Poll one job until it reaches a terminal state.
*
* Honours the Retry-After header on every non-terminal poll. Gives up
* after maxPolls attempts and reports the wait as a failure so the
* batch records it rather than blocking forever.
*
* @param non-empty-string $jobId
*
* @throws BatchJobException
* @throws ClientExceptionInterface
*
* @return array<string, mixed> The terminal job record (data object)
*/
private function pollToTerminal(string $jobId): array
{
$url = $this->baseUrl . '/api/v1/jobs/' . rawurlencode($jobId);
for ($attempt = 0; $attempt < $this->maxPolls; $attempt++) {
$request = $this->requestFactory
->createRequest('GET', $url)
->withHeader('Authorization', 'Bearer ' . $this->bearerToken);
$response = $this->httpClient->sendRequest($request);
$status = $response->getStatusCode();
if ($status !== 200) {
throw new BatchJobException(
sprintf('Poll for job "%s" returned HTTP %d.', $jobId, $status),
);
}
$data = $this->decodeData($response->getBody()->__toString());
$jobStatus = is_string($data['status'] ?? null) ? $data['status'] : 'unknown';
$progress = is_int($data['progress'] ?? null) ? $data['progress'] : null;
$this->logProgress($jobId, $jobStatus, $progress);
// Terminal states: completed, failed, cancelled.
if (in_array($jobStatus, ['completed', 'failed', 'cancelled'], strict: true)) {
return $data;
}
// Non-terminal: wait the interval the server asked for.
$this->waitRetryAfter($response->getHeaderLine('Retry-After'));
}
throw new BatchJobException(
sprintf('Job "%s" did not finish within %d polls.', $jobId, $this->maxPolls),
);
}
/**
* Act on a terminal job record: download a completed PDF, or report.
*
* @param non-empty-string $key Document key
* @param array<string, mixed> $record Terminal job record (data object)
* @param non-empty-string $outputDir Where to write the PDF
*
* @throws BatchJobException
* @throws ClientExceptionInterface
*
* @return string A human-readable outcome line
*/
private function finish(string $key, array $record, string $outputDir): string
{
$jobStatus = is_string($record['status'] ?? null) ? $record['status'] : 'unknown';
$jobId = is_string($record['job_id'] ?? null) ? $record['job_id'] : '';
if ($jobStatus !== 'completed') {
// A failed job carries an error message; surface it, do not swallow.
$error = is_string($record['error'] ?? null) ? $record['error'] : 'no detail';
return sprintf('%s -> %s (%s)', $key, $jobStatus, $error);
}
$path = rtrim($outputDir, '/\\') . DIRECTORY_SEPARATOR . $key . '.pdf';
$this->download($jobId, $path);
return sprintf('%s -> completed, written to %s', $key, $path);
}
/**
* Download a completed job result and write it to a server-derived path.
*
* @param non-empty-string $jobId
* @param non-empty-string $path Caller-controlled output path
*
* @throws BatchJobException
* @throws ClientExceptionInterface
*/
private function download(string $jobId, string $path): void
{
$request = $this->requestFactory
->createRequest('GET', $this->baseUrl . '/api/v1/jobs/' . rawurlencode($jobId) . '/result')
->withHeader('Authorization', 'Bearer ' . $this->bearerToken);
$response = $this->httpClient->sendRequest($request);
if ($response->getStatusCode() !== 200) {
throw new BatchJobException(
sprintf('Result download for job "%s" returned HTTP %d.', $jobId, $response->getStatusCode()),
);
}
$bytes = $response->getBody()->__toString();
if (!str_starts_with($bytes, '%PDF')) {
throw new BatchJobException(
sprintf('Result for job "%s" is not a PDF.', $jobId),
);
}
if (file_put_contents($path, $bytes) === false) {
throw new BatchJobException(sprintf('Could not write result to "%s".', $path));
}
}
/**
* Sleep for the server-requested interval, with a safe floor and ceiling.
*/
private function waitRetryAfter(string $retryAfter): void
{
$seconds = ctype_digit($retryAfter) ? (int) $retryAfter : 2;
// Clamp to a sane band so a hostile header cannot stall or busy-loop.
$seconds = max(1, min(30, $seconds));
sleep($seconds);
}
/**
* Emit a progress line. Replace with your logger.
*/
private function logProgress(string $jobId, string $jobStatus, ?int $progress): void
{
$pct = $progress === null ? 'n/a' : $progress . '%';
fwrite(STDERR, sprintf("[%s] status=%s progress=%s\n", $jobId, $jobStatus, $pct));
}
/**
* Decode a response envelope and return its data object.
*
* @throws BatchJobException When the body is not the expected envelope
*
* @return array<string, mixed>
*/
private function decodeData(string $json): array
{
try {
/** @var mixed $decoded */
$decoded = json_decode($json, true, 32, JSON_THROW_ON_ERROR);
} catch (JsonException $e) {
throw new BatchJobException('Response body is not valid JSON.', previous: $e);
}
if (!is_array($decoded) || !isset($decoded['data']) || !is_array($decoded['data'])) {
throw new BatchJobException('Response is missing the data envelope.');
}
/** @var array<string, mixed> $data */
$data = $decoded['data'];
return $data;
}
/**
* @param array<string, mixed> $body
*
* @throws BatchJobException
*/
private function encode(array $body): string
{
try {
return json_encode($body, JSON_THROW_ON_ERROR | JSON_UNESCAPED_SLASHES);
} catch (JsonException $e) {
throw new BatchJobException('Render request body is not encodable.', previous: $e);
}
}
/**
* @param non-empty-string $dir
*
* @throws BatchJobException
*/
private function assertWritableDir(string $dir): void
{
if (!is_dir($dir) || !is_writable($dir)) {
throw new BatchJobException(sprintf('Output directory "%s" is not writable.', $dir));
}
}
}
// ---------------------------------------------------------------------------
// Wiring. Provide your project's concrete PSR-18 client and PSR-17 factories.
// ---------------------------------------------------------------------------
/** @var ClientInterface $httpClient */
/** @var RequestFactoryInterface $requestFactory */
/** @var StreamFactoryInterface $streamFactory */
$baseUrl = getenv('NEXTPDF_CONNECT_URL');
$token = getenv('NEXTPDF_CONNECT_TOKEN');
if ($baseUrl === false || $baseUrl === '' || $token === false || $token === '') {
fwrite(STDERR, "Set NEXTPDF_CONNECT_URL and NEXTPDF_CONNECT_TOKEN.\n");
exit(2);
}
/** @var array<non-empty-string, array<string, mixed>> $documents */
$documents = [
'invoice-0001' => [
'page_size' => 'A4',
'orientation' => 'portrait',
'operations' => [
['type' => 'add_text', 'text' => 'Invoice 0001'],
],
],
'invoice-0002' => [
'page_size' => 'A4',
'orientation' => 'portrait',
'operations' => [
['type' => 'add_text', 'text' => 'Invoice 0002'],
],
],
];
$runner = new ConnectBatchRunner(
httpClient: $httpClient,
requestFactory: $requestFactory,
streamFactory: $streamFactory,
baseUrl: rtrim($baseUrl, '/'),
bearerToken: $token,
maxInFlight: 8,
);
try {
$outcomes = $runner->run($documents, getenv('NEXTPDF_COOKBOOK_OUTPUT') ?: sys_get_temp_dir());
} catch (BatchJobException $e) {
fwrite(STDERR, 'Batch stopped: ' . $e->getMessage() . "\n");
exit(1);
} catch (ClientExceptionInterface $e) {
fwrite(STDERR, 'Transport failure: ' . $e->getMessage() . "\n");
exit(1);
}
foreach ($outcomes as $line) {
echo $line, "\n";
}

预期 STDOUT 为每份文档一行;路径取决于你的输出目录:

invoice-0001 -> completed, written to /tmp/invoice-0001.pdf
invoice-0002 -> completed, written to /tmp/invoice-0002.pdf
  • 读取工作字段时,应从 data 下读取,而不是顶层。 每个成功响应都包在一个 { "data": ..., "meta": ... } 封装结构中。data.statusdata.progress 是你会据以行动的字段;meta 则带有用于关联排查的 request_id
  • progress 可能不存在。 只有在服务器为该工作跟踪进度时,才会包含 progress。请把缺失的字段视为「未知」而非零,并以一定会存在的 status 来驱动你的循环。
  • 提交可能已经是终止状态。 在当前版本中,服务器会在响应 POST 之前就以内联方式渲染,所以提交响应可能带有 status: completed,且结果可能在第一次轮询时就已就绪。你的轮询循环必须能在第零次尝试就接受终止状态,而不是强制先看到 pending
  • 遵循 Retry-After 非终止的状态响应会设置 Retry-After(2 秒间隔)。更频繁地轮询只会浪费请求并触发 429。请把这个值限制在合理区间,而不要盲目信任它。
  • 在完成前调用 /result 会得到 409 只在状态轮询显示 completed 之后才调用结果 endpoint。一个 409 Conflict 代表工作尚未完成;它并非传输错误。
  • Idempotency-Key 可防止重复作业。 用相同密钥重试提交会返回原本的工作(返回 200 而非 201)。请为每份文档使用稳定的密钥,这样网络重试永远不会启动第二次渲染。用相同密钥但搭配 不同 主体会构成 409 冲突。
  • 工作按拥有者限定范围。 用某把 API 密钥提交的工作对另一把密钥不可见;跨拥有者的 GET 会返回 404 而非 403。请用提交时所用的同一组凭证来轮询。
  • failed 工作会带有一条 error 消息。 读取 data.error——在终止的 failed 状态上——并记录下来。不要盲目重试。

一次批量的成本,是各项渲染加上轮询额外开销的总和。客户端侧有两个可调节项。第一,限制并发:maxInFlight 上限固定了同时跟踪多少个工作,这让客户端的打开请求数与内存用量不论批量大小都保持平稳。请把它设成与服务器的 worker 数量相符,而不要更高;进行中的工作数多于 worker 只会拉长每个工作的队列等待时间。第二,尊重轮询间隔:每次轮询都是一次成本低廉的状态读取,但紧密循环会放大请求量并触发速率限制器。服务器的 2 秒 Retry-After 是恰当的默认值,而 runner 会将其限制在 1 到 30 秒的区间,这样一个缓慢的工作既不会忙等空转,也不会卡住整个窗口。

对于超大批量,请分窗口处理(runner 使用 array_chunk),而不要一开始就全部提交出去。这会同时限制客户端跟踪的状态数量与服务器的队列深度,因此格式错误或规模过大的批量会在某个窗口内失败,而不是在数千次提交之后才失败。

  • 不要让 bearer token 出现在日志与 URL 中。 API 密钥只应在 Authorization 标头中传递。绝不可把它放进查询字符串、日志行,或任何写出的产物中。runner 只会记录 job_idstatus,绝不记录凭证。
  • 不要从服务器可控的值派生输出路径。 runner 会把你的代码所选的文档键拼接到固定输出目录下,用来构造每条输出路径,绝不使用服务器响应中的值来构造路径。不要把工作字段插入文件系统路径中,那会打开路径遍历漏洞。
  • 验证下载到的字节。 runner 会在写入文件之前,先检查来自 /result200 响应开头是否为 %PDF 标头。成功的下载状态本身并不能证明主体就是 PDF。
  • 在检查之前,把结果视为不可信。 工作已完成只代表服务器渲染出了字节,不代表那些字节可以安全地转发。在把结果交给客户端或下游系统之前,先让它通过一道结构检查步骤。
  • 使用最小权限的密钥。 异步工作接口属于 core 层的渲染。请为这个批量发放一把仅覆盖其所需操作范围的密钥,并按照你的密钥管理政策定期轮换它。
  • 限制轮询预算。 maxPolls 可阻止卡住的工作永远占住客户端。批量会把超时记录成一个结果,而不是阻塞,这让一个故障工作不至于对其余工作造成拒绝服务。

这份 recipe 不提出任何规范性标准主张。它会使用 NextPDF Connect 的异步工作 REST endpoints(POST /api/v1/jobsGET /api/v1/jobs/{id}GET /api/v1/jobs/{id}/result),并读取服务器所定义的工作记录字段(statusprogresserrorresult_urlpoll_url)。对已下载结果做的 %PDF 标头检查,只确认响应开头带有 PDF 标记;它并非有效性或符合性的判定。若要对一组文档做标准检查,请使用 Enterprise 批量符合性工具。请参阅 通过 Connect 进行批量标准检查,那与本页涵盖的渲染工作属于不同接口。