1
ZLMediaKit Python混合编程模式
夏楚 edited this page 2026-04-10 17:00:44 +08:00
This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

本文面向对流媒体开发感兴趣的工程师,介绍 ZLMediaKit 的 Python 混合编程模式——用Python监听ZLM内核事件、用 Python 编写HTTP API两件事只需一个 .py 文件,开箱即用。


一、环境准备:开启 Python 插件功能

在写代码之前,有三件事需要先确认。

1.1 使用支持 Python 的 ZLM 二进制包

ZLM 的 Python 插件功能需要在编译时开启 -DENABLE_PYTHON=ON,普通官方发布包默认不含此功能。

有两种方式获取支持 Python 的二进制包:

方式一(推荐):下载预编译包

前往 ZLMediaKit 的 Issue #483 页面,下载文件名中带 Python 后缀的二进制包,例如:

Linux_Python_master_2026-04-01.zip
Windows_Python_master_2026-04-01.zip

方式二:自行编译

git clone https://github.com/ZLMediaKit/ZLMediaKit.git
git submodule update --init --recursive
cd ZLMediaKit
mkdir build && cd build
cmake .. -DENABLE_PYTHON=ON
make -j$(nproc)

1.2 配置 config.ini

在 ZLM 的配置文件 config.ini 中添加 [python] 节,指定插件入口模块名(不含 .py 后缀):

[python]
plugin=mk_plugin

plugin 的值是 Python 模块名ZLM 启动时会执行 import mk_plugin,因此该模块必须在 Python 的导入路径中可见。


1.3 设置 PYTHONPATH

ZLM 需要知道去哪里找 mk_plugin.py,启动前设置环境变量 PYTHONPATH 指向插件文件所在目录:

# 假设 mk_plugin.py 位于 /opt/pymkui/backend/
export PYTHONPATH=/opt/pymkui/backend

# 然后启动 ZLM
./MediaServer -c config.ini

也可以写成一行:

PYTHONPATH=/opt/pymkui/backend ./MediaServer -c config.ini

如果使用 systemd 管理服务,在 [Service] 段加入:

[Service]
Environment="PYTHONPATH=/opt/pymkui/backend"
ExecStart=/opt/zlm/MediaServer -c /opt/zlm/config.ini

1.4 验证是否生效

ZLM 启动日志中出现以下内容,说明 Python 插件加载成功:

2026-04-10 16:45:38.811 I [MediaServer] [39041-8660253376] pyinvoker.cpp:560 set_python_path | PYTHONPATH is already set to: /opt/pymkui/backend
2026-04-10 16:45:39.161 I [MediaServer] [39041-8660253376] mk_plugin.py:100 on_start | on_start, secret: xxxxxxxxxxxxxxx

若出现 ModuleNotFoundError: No module named 'mk_plugin',请检查 PYTHONPATH 是否设置正确。


二、什么是"混合编程"

ZLMediaKit简称 ZLM的核心是高性能 C++ 引擎,但它内置了一套 Python 插件机制(mk_loader

┌───────────────────────────────────┐
│        ZLMediaKit C++ 内核         │
│                                   │
│  推流 / 拉流 / 转码 / RTP / ...     │
│                                   │
│  ┌──────────────────────────────┐ │
│  │   Python 插件层mk_loader    │ │
│  │                               │ │
│  │  事件回调  ←→  mk_plugin.py   │ │
│  │  HTTP 接口 ←→  FastAPI app    │ │
│  └──────────────────────────────┘ │
└───────────────────────────────────┘
  • 事件回调推流、播放、录制、流无人观看……C++ 内核触发事件后自动调用 Python 函数,你可以在这里写鉴权、写数据库、发通知。
  • HTTP 接口扩展:把一个标准的 FastAPI 应用传给 ZLMZLM的HTTP服务器会把请求"转发"给你的Python路由完全复用ZLM的HTTP端口无需另起进程。

三、插件入口文件 mk_plugin.py

ZLM 启动时会加载 mk_plugin.py,约定以下几个生命周期函数:

函数名 触发时机
on_start() ZLM 启动完成后调用一次
on_exit() ZLM 退出前调用一次
on_reload_config() 配置热重载时触发

所有事件回调也定义在同一个文件里,返回 True 代表 Python 已接管该事件ZLM 不再做默认处理;返回 False 则继续走 ZLM默认WebHook逻辑。


四、常用事件回调详解

4.1 on_start — 启动初始化

import mk_loader
import mk_logger

def on_start():
    # 读取/修改 ZLM 配置项
    mk_loader.set_config('http.rootPath', '/var/www/frontend')
    mk_loader.update_config()   # 使配置生效

    # 把 FastAPI 路由注册到 ZLM HTTP 服务器
    mk_loader.set_fastapi(check_route, submit_coro)

    mk_logger.log_info("插件启动完成")

set_fastapi 传入两个函数:check_route(判断路径是否由 Python 处理)和 submit_coro(把 ASGI 请求投递到异步事件循环)。这两个函数是固定写法,下文会给出完整示例。


4.2 on_publish — 推流鉴权

每当有客户端推流时触发,可在此做 Token 校验、写入流记录等。

def on_publish(type: str, args: dict, invoker, sender: dict) -> bool:
    stream = args.get("stream", "")
    app    = args.get("app", "")
    token  = args.get("params", "")   # URL 参数,如 ?token=xxx

    # 简单示例:校验 token
    if not is_valid_token(token, stream):
        # 第二个参数非空字符串 = 拒绝推流,内容作为错误信息返回给推流端
        mk_loader.publish_auth_invoker_do(invoker, "Token 无效,推流拒绝")
        return True

    # 允许推流,同时指定转协议选项
    opt = {
        "enable_hls": "1",       # 开启 HLS
        "enable_mp4": "0",       # 不自动录制
    }
    mk_loader.publish_auth_invoker_do(invoker, "", opt)
    mk_logger.log_info(f"推流已允许: {app}/{stream}")
    return True

4.3 on_play — 播放鉴权

def on_play(args: dict, invoker, sender: dict) -> bool:
    stream = args.get("stream", "")
    ip     = sender.get("peer_ip", "")

    # 白名单校验示例
    if ip not in ALLOWED_IPS:
        mk_loader.play_auth_invoker_do(invoker, "IP 不在白名单")
        return True

    mk_loader.play_auth_invoker_do(invoker, "")   # 空字符串 = 允许
    return True

4.4 on_stream_not_found — 按需拉流

这是混合编程最典型的使用场景播放器请求一条不存在的流时ZLM 触发此事件Python 可以在这里动态启动拉流代理。

def on_stream_not_found(args: dict, sender: dict, invoker) -> bool:
    vhost  = args.get("vhost", "__defaultVhost__")
    app    = args.get("app", "")
    stream = args.get("stream", "")

    # 查数据库:是否存在该流的"按需拉流"配置
    proxy = db.get_proxy(vhost, app, stream, on_demand=True)
    if not proxy:
        return False   # 不处理ZLM 返回 404

    url = proxy["url"]

    def cb(err, key):
        if err:
            mk_logger.log_warn(f"按需拉流失败: {err}")
        else:
            mk_logger.log_info(f"按需拉流成功: {key}")

    mk_loader.add_stream_proxy(
        vhost, app, stream, url,
        cb,
        retry_count=3,
        force=True,
        timeout_sec=10.0,
        opt={"auto_close": True},   # 无人观看后自动停止
    )
    return True   # 告诉 ZLMPython 已接管,等待流上线再分发给播放器

4.5 on_stream_none_reader — 无人观看

def on_stream_none_reader(sender: mk_loader.MediaSource) -> bool:
    url = sender.getUrl()
    mk_logger.log_info(f"无人观看: {url}")

    # 可以在这里选择关闭流,释放带宽
    # sender.close(False)

    return True   # 返回 True 阻止 ZLM 默认行为(默认也是关流)

4.6 on_record_mp4 / on_record_ts — 录制完成

def on_record_mp4(info: dict) -> bool:
    """
    info 包含:
      file_path  — 录制文件本地路径
      start_time — 录制开始时间戳
      time_len   — 时长(秒)
      file_size  — 文件大小(字节)
      vhost / app / stream
    """
    mk_logger.log_info(f"录制完成: {info['file_path']} ({info['time_len']}s)")

    # 异步上传到 OSS / 发送 Webhook 通知等
    asyncio.run_coroutine_threadsafe(
        upload_to_oss(info["file_path"]),
        SharedLoop.get_loop()
    )
    return True

4.7 on_player_proxy_failed — 拉流失败自动切换备用地址

def on_player_proxy_failed(url, media_tuple, ex) -> bool:
    vhost  = media_tuple.vhost
    app    = media_tuple.app
    stream = media_tuple.stream

    # 从数据库取多条备用地址,切换到下一个
    next_url = db.get_next_proxy_url(vhost, app, stream, failed_url=url)
    if not next_url:
        return False

    mk_logger.log_info(f"切换备用地址: {url}{next_url}")
    mk_loader.update_stream_proxy(vhost, app, stream, next_url, {})
    return True

4.8 on_http_access — HTTP 文件访问鉴权

def on_http_access(parser, path, file_path, is_dir, invoker, sender) -> bool:
    frontend_root = "/var/www/frontend"

    if not file_path.startswith(frontend_root):
        # 禁止访问前端目录之外的文件,防止路径穿越
        mk_loader.http_access_invoker_do(invoker, "Access denied", path, 3600)
        return True

    mk_loader.http_access_invoker_do(invoker, "", path, 3600)
    return True

五、用 Python 扩展 ZLM 的 HTTP 接口

5.1 核心原理

ZLM的HTTP服务器收到请求后先查询Python是否注册了对应路由如果有则把请求以 ASGI 格式投递到Python的 FastAPI 应用。整个流程在同一进程内完成,没有网络开销。

5.2 固定胶水代码(mk_plugin.py

import asyncio
from starlette.routing import Match
from shared_loop import SharedLoop
from py_http_api import app   # 你的 FastAPI 实例

def submit_coro(scope, body, send):
    """把 ZLM 传来的 ASGI 请求投递到 Python 异步事件循环"""
    async def run():
        async def async_send(message):
            result = send(message)
            if result is not None:
                await result

        async def receive():
            return {"type": "http.request", "body": body, "more_body": False}

        try:
            await app(scope, receive, async_send)
        except Exception as e:
            mk_logger.log_warn(f"FastAPI 处理失败: {e}")
            await async_send({"type": "http.response.start", "status": 500,
                               "headers": [(b"content-type", b"text/plain")]})
            await async_send({"type": "http.response.body", "body": b"Internal Server Error"})

    return asyncio.run_coroutine_threadsafe(run(), SharedLoop.get_loop())


def check_route(scope) -> bool:
    """告诉 ZLM 这个路径是否由 Python 处理"""
    for route in app.routes:
        if hasattr(route, "matches"):
            match, _ = route.matches(scope)
            if match == Match.FULL:
                return True
    return False


def on_start():
    mk_loader.set_fastapi(check_route, submit_coro)

5.3 编写 FastAPI 接口(py_http_api.py

接口与普通 FastAPI 项目完全一致,支持 GET/POST、路径参数、Body 解析等:

import json
import mk_logger
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

app = FastAPI(title="我的流媒体接口")

# ── 示例 1查询在线流列表 ──────────────────────────────────────────
@app.get("/index/pyapi/streams")
async def list_streams():
    streams = db.get_all_streams()
    return {"code": 0, "data": streams}


# ── 示例 2添加拉流代理POST支持 JSON / Form 两种请求体) ────────
@app.post("/index/pyapi/addPullProxy")
async def add_pull_proxy(request: Request):
    # 解析请求体
    body = await request.body()
    content_type = request.headers.get("content-type", "")
    if "application/json" in content_type:
        data = json.loads(body)
    else:
        import urllib.parse
        parsed = urllib.parse.parse_qs(body.decode(), keep_blank_values=True)
        data = {k: v[0] for k, v in parsed.items()}

    app_name = data.get("app", "")
    stream   = data.get("stream", "")
    url      = data.get("url", "")

    if not all([app_name, stream, url]):
        return {"code": -1, "msg": "app/stream/url 不能为空"}

    proxy_id = db.add_pull_proxy({
        "vhost": "__defaultVhost__",
        "app": app_name,
        "stream": stream,
    })
    db.set_proxy_urls(proxy_id, [{"url": url, "params": {}}])

    mk_logger.log_info(f"添加拉流代理: {app_name}/{stream}{url}")
    return {"code": 0, "msg": "添加成功", "data": {"id": proxy_id}}


# ── 示例 3调用 ZLM 原生 API反向代理模式 ────────────────────────
import httpx

_client = httpx.AsyncClient(timeout=10.0)

@app.get("/index/pyapi/zlm/mediaList")
async def get_media_list(request: Request):
    """把请求透传给 ZLM 原生接口,做一层 Python 加工后返回"""
    secret = mk_loader.get_config("api.secret")
    zlm_url = f"http://127.0.0.1:{mk_loader.get_config('http.port')}/index/api/getMediaList"
    resp = await _client.get(zlm_url, params={"secret": secret})
    data = resp.json()

    # 在这里可以对数据做过滤、脱敏等处理
    return {"code": 0, "total": len(data.get("data", [])), "data": data.get("data", [])}

5.4 接口路径约定

前缀 说明
/index/api/... ZLM 原生接口,直接调用
/index/pyapi/... Python 扩展接口(推荐命名规范)

六、异步与线程安全

ZLM 的 C++ 回调运行在 C++ 线程里,而 FastAPI 运行在 Python asyncio 事件循环里。两者之间的桥梁是 SharedLoop

# shared_loop.py — 全局共享的 asyncio 事件循环
import asyncio
import threading

class SharedLoop:
    _loop: asyncio.AbstractEventLoop = None
    _thread: threading.Thread = None

    @classmethod
    def get_loop(cls) -> asyncio.AbstractEventLoop:
        if cls._loop is None:
            cls._loop = asyncio.new_event_loop()
            cls._thread = threading.Thread(
                target=cls._loop.run_forever, daemon=True
            )
            cls._thread.start()
        return cls._loop

在 C++ 回调函数里投递异步任务:

import asyncio
from shared_loop import SharedLoop

def on_record_mp4(info: dict) -> bool:
    # ✅ 正确:通过 run_coroutine_threadsafe 跨线程投递协程
    asyncio.run_coroutine_threadsafe(
        notify_webhook(info),
        SharedLoop.get_loop()
    )
    return True

async def notify_webhook(info: dict):
    async with httpx.AsyncClient() as client:
        await client.post("https://your-server.com/hook", json=info)

⚠️ 不要在 C++ 回调里直接 await,也不要直接调用 asyncio.run(),否则会发生死锁或运行时错误。


七、完整插件文件骨架

# mk_plugin.py — ZLMediaKit Python 插件完整骨架

import asyncio
import mk_loader
import mk_logger
from starlette.routing import Match
from shared_loop import SharedLoop
from py_http_api import app   # FastAPI 实例

# ── ASGI 桥接(固定写法) ─────────────────────────────────────────
def submit_coro(scope, body, send):
    async def run():
        async def async_send(msg):
            r = send(msg)
            if r is not None:
                await r
        async def receive():
            return {"type": "http.request", "body": body, "more_body": False}
        try:
            await app(scope, receive, async_send)
        except Exception as e:
            mk_logger.log_warn(f"FastAPI error: {e}")
    return asyncio.run_coroutine_threadsafe(run(), SharedLoop.get_loop())

def check_route(scope) -> bool:
    for route in app.routes:
        if hasattr(route, "matches"):
            match, _ = route.matches(scope)
            if match == Match.FULL:
                return True
    return False

# ── 生命周期 ──────────────────────────────────────────────────────
def on_start():
    mk_loader.set_config("http.rootPath", "/var/www/frontend")
    mk_loader.update_config()
    mk_loader.set_fastapi(check_route, submit_coro)
    mk_logger.log_info("Python 插件已启动")

def on_exit():
    mk_logger.log_info("Python 插件退出")

def on_reload_config():
    mk_logger.log_info("配置已热重载")

# ── 推流鉴权 ──────────────────────────────────────────────────────
def on_publish(type: str, args: dict, invoker, sender: dict) -> bool:
    mk_loader.publish_auth_invoker_do(invoker, "", {})
    return True

# ── 播放鉴权 ──────────────────────────────────────────────────────
def on_play(args: dict, invoker, sender: dict) -> bool:
    mk_loader.play_auth_invoker_do(invoker, "")
    return True

# ── 按需拉流 ──────────────────────────────────────────────────────
def on_stream_not_found(args: dict, sender: dict, invoker) -> bool:
    # 查询数据库 → 动态启动拉流代理
    return False  # 没有匹配时返回 False

# ── 录制完成 ──────────────────────────────────────────────────────
def on_record_mp4(info: dict) -> bool:
    mk_logger.log_info(f"录制: {info.get('file_path')} {info.get('time_len')}s")
    return True

# ── 无人观看 ──────────────────────────────────────────────────────
def on_stream_none_reader(sender: mk_loader.MediaSource) -> bool:
    return True

# ── HTTP 文件访问鉴权 ─────────────────────────────────────────────
def on_http_access(parser, path, file_path, is_dir, invoker, sender) -> bool:
    mk_loader.http_access_invoker_do(invoker, "", path, 3600)
    return True

八、总结

能力 实现方式
推流 / 播放鉴权 on_publish / on_play + invoker_do
按需拉流 on_stream_not_found + mk_loader.add_stream_proxy
拉流多地址切换 on_player_proxy_failed + mk_loader.update_stream_proxy
录制后处理 on_record_mp4 + 异步投递到 SharedLoop
扩展 HTTP API FastAPI 路由 +set_fastapi 注册
调用 ZLM 原生 API httpx.AsyncClienthttp://127.0.0.1:{port}/index/api/...
跨线程异步 asyncio.run_coroutine_threadsafe(..., SharedLoop.get_loop())

ZLMediaKit的Python插件机制把 C++ 的高性能Python 的开发效率结合在一起:核心推拉流由 C++ 保证吞吐量业务逻辑、数据库操作、HTTP 接口全用 Python 快速迭代,是流媒体服务开发的一种高效范式。


本文代码均来自开源项目 PyMKUI,欢迎 Star