Table of Contents
- 一、环境准备:开启 Python 插件功能
- 二、什么是"混合编程"?
- 三、插件入口文件 mk_plugin.py
- 四、常用事件回调详解
- 4.1 on_start — 启动初始化
- 4.2 on_publish — 推流鉴权
- 4.3 on_play — 播放鉴权
- 4.4 on_stream_not_found — 按需拉流
- 4.5 on_stream_none_reader — 无人观看
- 4.6 on_record_mp4 / on_record_ts — 录制完成
- 4.7 on_player_proxy_failed — 拉流失败自动切换备用地址
- 4.8 on_http_access — HTTP 文件访问鉴权
- 五、用 Python 扩展 ZLM 的 HTTP 接口
- 六、异步与线程安全
- 七、完整插件文件骨架
- 八、总结
本文面向对流媒体开发感兴趣的工程师,介绍 ZLMediaKit 的 Python 混合编程模式——用Python监听ZLM内核事件、用 Python 编写HTTP API,两件事只需一个
.py文件,开箱即用。
一、环境准备:开启 Python 插件功能
在写代码之前,有三件事需要先确认。
1.1 使用支持 Python 的 ZLM 二进制包
ZLM 的 Python 插件功能需要在编译时开启 -DENABLE_PYTHON=ON,普通官方发布包默认不含此功能。
有两种方式获取支持 Python 的二进制包:
方式一(推荐):下载预编译包
前往 ZLMediaKit 的 Issue #483 页面,下载文件名中带 Python 后缀的二进制包,例如:
Linux_Python_master_2026-04-01.zip
Windows_Python_master_2026-04-01.zip
方式二:自行编译
git clone https://github.com/ZLMediaKit/ZLMediaKit.git
git submodule update --init --recursive
cd ZLMediaKit
mkdir build && cd build
cmake .. -DENABLE_PYTHON=ON
make -j$(nproc)
1.2 配置 config.ini
在 ZLM 的配置文件 config.ini 中添加 [python] 节,指定插件入口模块名(不含 .py 后缀):
[python]
plugin=mk_plugin
plugin的值是 Python 模块名,ZLM 启动时会执行import mk_plugin,因此该模块必须在 Python 的导入路径中可见。
1.3 设置 PYTHONPATH
ZLM 需要知道去哪里找 mk_plugin.py,启动前设置环境变量 PYTHONPATH 指向插件文件所在目录:
# 假设 mk_plugin.py 位于 /opt/pymkui/backend/
export PYTHONPATH=/opt/pymkui/backend
# 然后启动 ZLM
./MediaServer -c config.ini
也可以写成一行:
PYTHONPATH=/opt/pymkui/backend ./MediaServer -c config.ini
如果使用 systemd 管理服务,在 [Service] 段加入:
[Service]
Environment="PYTHONPATH=/opt/pymkui/backend"
ExecStart=/opt/zlm/MediaServer -c /opt/zlm/config.ini
1.4 验证是否生效
ZLM 启动日志中出现以下内容,说明 Python 插件加载成功:
2026-04-10 16:45:38.811 I [MediaServer] [39041-8660253376] pyinvoker.cpp:560 set_python_path | PYTHONPATH is already set to: /opt/pymkui/backend
2026-04-10 16:45:39.161 I [MediaServer] [39041-8660253376] mk_plugin.py:100 on_start | on_start, secret: xxxxxxxxxxxxxxx
若出现 ModuleNotFoundError: No module named 'mk_plugin',请检查 PYTHONPATH 是否设置正确。
二、什么是"混合编程"?
ZLMediaKit(简称 ZLM)的核心是高性能 C++ 引擎,但它内置了一套 Python 插件机制(mk_loader):
┌───────────────────────────────────┐
│ ZLMediaKit C++ 内核 │
│ │
│ 推流 / 拉流 / 转码 / RTP / ... │
│ │
│ ┌──────────────────────────────┐ │
│ │ Python 插件层(mk_loader) │ │
│ │ │ │
│ │ 事件回调 ←→ mk_plugin.py │ │
│ │ HTTP 接口 ←→ FastAPI app │ │
│ └──────────────────────────────┘ │
└───────────────────────────────────┘
- 事件回调:推流、播放、录制、流无人观看……C++ 内核触发事件后自动调用 Python 函数,你可以在这里写鉴权、写数据库、发通知。
- HTTP 接口扩展:把一个标准的
FastAPI应用传给 ZLM,ZLM的HTTP服务器会把请求"转发"给你的Python路由,完全复用ZLM的HTTP端口,无需另起进程。
三、插件入口文件 mk_plugin.py
ZLM 启动时会加载 mk_plugin.py,约定以下几个生命周期函数:
| 函数名 | 触发时机 |
|---|---|
on_start() |
ZLM 启动完成后调用一次 |
on_exit() |
ZLM 退出前调用一次 |
on_reload_config() |
配置热重载时触发 |
所有事件回调也定义在同一个文件里,返回 True 代表 Python 已接管该事件,ZLM 不再做默认处理;返回 False 则继续走 ZLM默认WebHook逻辑。
四、常用事件回调详解
4.1 on_start — 启动初始化
import mk_loader
import mk_logger
def on_start():
# 读取/修改 ZLM 配置项
mk_loader.set_config('http.rootPath', '/var/www/frontend')
mk_loader.update_config() # 使配置生效
# 把 FastAPI 路由注册到 ZLM HTTP 服务器
mk_loader.set_fastapi(check_route, submit_coro)
mk_logger.log_info("插件启动完成")
set_fastapi传入两个函数:check_route(判断路径是否由 Python 处理)和submit_coro(把 ASGI 请求投递到异步事件循环)。这两个函数是固定写法,下文会给出完整示例。
4.2 on_publish — 推流鉴权
每当有客户端推流时触发,可在此做 Token 校验、写入流记录等。
def on_publish(type: str, args: dict, invoker, sender: dict) -> bool:
stream = args.get("stream", "")
app = args.get("app", "")
token = args.get("params", "") # URL 参数,如 ?token=xxx
# 简单示例:校验 token
if not is_valid_token(token, stream):
# 第二个参数非空字符串 = 拒绝推流,内容作为错误信息返回给推流端
mk_loader.publish_auth_invoker_do(invoker, "Token 无效,推流拒绝")
return True
# 允许推流,同时指定转协议选项
opt = {
"enable_hls": "1", # 开启 HLS
"enable_mp4": "0", # 不自动录制
}
mk_loader.publish_auth_invoker_do(invoker, "", opt)
mk_logger.log_info(f"推流已允许: {app}/{stream}")
return True
4.3 on_play — 播放鉴权
def on_play(args: dict, invoker, sender: dict) -> bool:
stream = args.get("stream", "")
ip = sender.get("peer_ip", "")
# 白名单校验示例
if ip not in ALLOWED_IPS:
mk_loader.play_auth_invoker_do(invoker, "IP 不在白名单")
return True
mk_loader.play_auth_invoker_do(invoker, "") # 空字符串 = 允许
return True
4.4 on_stream_not_found — 按需拉流
这是混合编程最典型的使用场景:播放器请求一条不存在的流时,ZLM 触发此事件,Python 可以在这里动态启动拉流代理。
def on_stream_not_found(args: dict, sender: dict, invoker) -> bool:
vhost = args.get("vhost", "__defaultVhost__")
app = args.get("app", "")
stream = args.get("stream", "")
# 查数据库:是否存在该流的"按需拉流"配置
proxy = db.get_proxy(vhost, app, stream, on_demand=True)
if not proxy:
return False # 不处理,ZLM 返回 404
url = proxy["url"]
def cb(err, key):
if err:
mk_logger.log_warn(f"按需拉流失败: {err}")
else:
mk_logger.log_info(f"按需拉流成功: {key}")
mk_loader.add_stream_proxy(
vhost, app, stream, url,
cb,
retry_count=3,
force=True,
timeout_sec=10.0,
opt={"auto_close": True}, # 无人观看后自动停止
)
return True # 告诉 ZLM:Python 已接管,等待流上线再分发给播放器
4.5 on_stream_none_reader — 无人观看
def on_stream_none_reader(sender: mk_loader.MediaSource) -> bool:
url = sender.getUrl()
mk_logger.log_info(f"无人观看: {url}")
# 可以在这里选择关闭流,释放带宽
# sender.close(False)
return True # 返回 True 阻止 ZLM 默认行为(默认也是关流)
4.6 on_record_mp4 / on_record_ts — 录制完成
def on_record_mp4(info: dict) -> bool:
"""
info 包含:
file_path — 录制文件本地路径
start_time — 录制开始时间戳
time_len — 时长(秒)
file_size — 文件大小(字节)
vhost / app / stream
"""
mk_logger.log_info(f"录制完成: {info['file_path']} ({info['time_len']}s)")
# 异步上传到 OSS / 发送 Webhook 通知等
asyncio.run_coroutine_threadsafe(
upload_to_oss(info["file_path"]),
SharedLoop.get_loop()
)
return True
4.7 on_player_proxy_failed — 拉流失败自动切换备用地址
def on_player_proxy_failed(url, media_tuple, ex) -> bool:
vhost = media_tuple.vhost
app = media_tuple.app
stream = media_tuple.stream
# 从数据库取多条备用地址,切换到下一个
next_url = db.get_next_proxy_url(vhost, app, stream, failed_url=url)
if not next_url:
return False
mk_logger.log_info(f"切换备用地址: {url} → {next_url}")
mk_loader.update_stream_proxy(vhost, app, stream, next_url, {})
return True
4.8 on_http_access — HTTP 文件访问鉴权
def on_http_access(parser, path, file_path, is_dir, invoker, sender) -> bool:
frontend_root = "/var/www/frontend"
if not file_path.startswith(frontend_root):
# 禁止访问前端目录之外的文件,防止路径穿越
mk_loader.http_access_invoker_do(invoker, "Access denied", path, 3600)
return True
mk_loader.http_access_invoker_do(invoker, "", path, 3600)
return True
五、用 Python 扩展 ZLM 的 HTTP 接口
5.1 核心原理
ZLM的HTTP服务器收到请求后,先查询Python是否注册了对应路由;如果有,则把请求以 ASGI 格式投递到Python的 FastAPI 应用。整个流程在同一进程内完成,没有网络开销。
5.2 固定胶水代码(mk_plugin.py)
import asyncio
from starlette.routing import Match
from shared_loop import SharedLoop
from py_http_api import app # 你的 FastAPI 实例
def submit_coro(scope, body, send):
"""把 ZLM 传来的 ASGI 请求投递到 Python 异步事件循环"""
async def run():
async def async_send(message):
result = send(message)
if result is not None:
await result
async def receive():
return {"type": "http.request", "body": body, "more_body": False}
try:
await app(scope, receive, async_send)
except Exception as e:
mk_logger.log_warn(f"FastAPI 处理失败: {e}")
await async_send({"type": "http.response.start", "status": 500,
"headers": [(b"content-type", b"text/plain")]})
await async_send({"type": "http.response.body", "body": b"Internal Server Error"})
return asyncio.run_coroutine_threadsafe(run(), SharedLoop.get_loop())
def check_route(scope) -> bool:
"""告诉 ZLM 这个路径是否由 Python 处理"""
for route in app.routes:
if hasattr(route, "matches"):
match, _ = route.matches(scope)
if match == Match.FULL:
return True
return False
def on_start():
mk_loader.set_fastapi(check_route, submit_coro)
5.3 编写 FastAPI 接口(py_http_api.py)
接口与普通 FastAPI 项目完全一致,支持 GET/POST、路径参数、Body 解析等:
import json
import mk_logger
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
app = FastAPI(title="我的流媒体接口")
# ── 示例 1:查询在线流列表 ──────────────────────────────────────────
@app.get("/index/pyapi/streams")
async def list_streams():
streams = db.get_all_streams()
return {"code": 0, "data": streams}
# ── 示例 2:添加拉流代理(POST,支持 JSON / Form 两种请求体) ────────
@app.post("/index/pyapi/addPullProxy")
async def add_pull_proxy(request: Request):
# 解析请求体
body = await request.body()
content_type = request.headers.get("content-type", "")
if "application/json" in content_type:
data = json.loads(body)
else:
import urllib.parse
parsed = urllib.parse.parse_qs(body.decode(), keep_blank_values=True)
data = {k: v[0] for k, v in parsed.items()}
app_name = data.get("app", "")
stream = data.get("stream", "")
url = data.get("url", "")
if not all([app_name, stream, url]):
return {"code": -1, "msg": "app/stream/url 不能为空"}
proxy_id = db.add_pull_proxy({
"vhost": "__defaultVhost__",
"app": app_name,
"stream": stream,
})
db.set_proxy_urls(proxy_id, [{"url": url, "params": {}}])
mk_logger.log_info(f"添加拉流代理: {app_name}/{stream} → {url}")
return {"code": 0, "msg": "添加成功", "data": {"id": proxy_id}}
# ── 示例 3:调用 ZLM 原生 API(反向代理模式) ────────────────────────
import httpx
_client = httpx.AsyncClient(timeout=10.0)
@app.get("/index/pyapi/zlm/mediaList")
async def get_media_list(request: Request):
"""把请求透传给 ZLM 原生接口,做一层 Python 加工后返回"""
secret = mk_loader.get_config("api.secret")
zlm_url = f"http://127.0.0.1:{mk_loader.get_config('http.port')}/index/api/getMediaList"
resp = await _client.get(zlm_url, params={"secret": secret})
data = resp.json()
# 在这里可以对数据做过滤、脱敏等处理
return {"code": 0, "total": len(data.get("data", [])), "data": data.get("data", [])}
5.4 接口路径约定
| 前缀 | 说明 |
|---|---|
/index/api/... |
ZLM 原生接口,直接调用 |
/index/pyapi/... |
Python 扩展接口(推荐命名规范) |
六、异步与线程安全
ZLM 的 C++ 回调运行在 C++ 线程里,而 FastAPI 运行在 Python asyncio 事件循环里。两者之间的桥梁是 SharedLoop:
# shared_loop.py — 全局共享的 asyncio 事件循环
import asyncio
import threading
class SharedLoop:
_loop: asyncio.AbstractEventLoop = None
_thread: threading.Thread = None
@classmethod
def get_loop(cls) -> asyncio.AbstractEventLoop:
if cls._loop is None:
cls._loop = asyncio.new_event_loop()
cls._thread = threading.Thread(
target=cls._loop.run_forever, daemon=True
)
cls._thread.start()
return cls._loop
在 C++ 回调函数里投递异步任务:
import asyncio
from shared_loop import SharedLoop
def on_record_mp4(info: dict) -> bool:
# ✅ 正确:通过 run_coroutine_threadsafe 跨线程投递协程
asyncio.run_coroutine_threadsafe(
notify_webhook(info),
SharedLoop.get_loop()
)
return True
async def notify_webhook(info: dict):
async with httpx.AsyncClient() as client:
await client.post("https://your-server.com/hook", json=info)
⚠️ 不要在 C++ 回调里直接
await,也不要直接调用asyncio.run(),否则会发生死锁或运行时错误。
七、完整插件文件骨架
# mk_plugin.py — ZLMediaKit Python 插件完整骨架
import asyncio
import mk_loader
import mk_logger
from starlette.routing import Match
from shared_loop import SharedLoop
from py_http_api import app # FastAPI 实例
# ── ASGI 桥接(固定写法) ─────────────────────────────────────────
def submit_coro(scope, body, send):
async def run():
async def async_send(msg):
r = send(msg)
if r is not None:
await r
async def receive():
return {"type": "http.request", "body": body, "more_body": False}
try:
await app(scope, receive, async_send)
except Exception as e:
mk_logger.log_warn(f"FastAPI error: {e}")
return asyncio.run_coroutine_threadsafe(run(), SharedLoop.get_loop())
def check_route(scope) -> bool:
for route in app.routes:
if hasattr(route, "matches"):
match, _ = route.matches(scope)
if match == Match.FULL:
return True
return False
# ── 生命周期 ──────────────────────────────────────────────────────
def on_start():
mk_loader.set_config("http.rootPath", "/var/www/frontend")
mk_loader.update_config()
mk_loader.set_fastapi(check_route, submit_coro)
mk_logger.log_info("Python 插件已启动")
def on_exit():
mk_logger.log_info("Python 插件退出")
def on_reload_config():
mk_logger.log_info("配置已热重载")
# ── 推流鉴权 ──────────────────────────────────────────────────────
def on_publish(type: str, args: dict, invoker, sender: dict) -> bool:
mk_loader.publish_auth_invoker_do(invoker, "", {})
return True
# ── 播放鉴权 ──────────────────────────────────────────────────────
def on_play(args: dict, invoker, sender: dict) -> bool:
mk_loader.play_auth_invoker_do(invoker, "")
return True
# ── 按需拉流 ──────────────────────────────────────────────────────
def on_stream_not_found(args: dict, sender: dict, invoker) -> bool:
# 查询数据库 → 动态启动拉流代理
return False # 没有匹配时返回 False
# ── 录制完成 ──────────────────────────────────────────────────────
def on_record_mp4(info: dict) -> bool:
mk_logger.log_info(f"录制: {info.get('file_path')} {info.get('time_len')}s")
return True
# ── 无人观看 ──────────────────────────────────────────────────────
def on_stream_none_reader(sender: mk_loader.MediaSource) -> bool:
return True
# ── HTTP 文件访问鉴权 ─────────────────────────────────────────────
def on_http_access(parser, path, file_path, is_dir, invoker, sender) -> bool:
mk_loader.http_access_invoker_do(invoker, "", path, 3600)
return True
八、总结
| 能力 | 实现方式 |
|---|---|
| 推流 / 播放鉴权 | on_publish / on_play + invoker_do |
| 按需拉流 | on_stream_not_found + mk_loader.add_stream_proxy |
| 拉流多地址切换 | on_player_proxy_failed + mk_loader.update_stream_proxy |
| 录制后处理 | on_record_mp4 + 异步投递到 SharedLoop |
| 扩展 HTTP API | FastAPI 路由 +set_fastapi 注册 |
| 调用 ZLM 原生 API | httpx.AsyncClient → http://127.0.0.1:{port}/index/api/... |
| 跨线程异步 | asyncio.run_coroutine_threadsafe(..., SharedLoop.get_loop()) |
ZLMediaKit的Python插件机制把 C++ 的高性能和 Python 的开发效率结合在一起:核心推拉流由 C++ 保证吞吐量,业务逻辑、数据库操作、HTTP 接口全用 Python 快速迭代,是流媒体服务开发的一种高效范式。
本文代码均来自开源项目 PyMKUI,欢迎 Star ⭐
测试文档
使用教程
- 代码依赖与版权声明
- 快速开始
- vcpkg安装zlmediakit
- 服务器的启动与关闭
- GB28181教程
- 推流播放测试
- RESTful 接口
- RESTful 接口 postman自动生成
- Web Hook 接口
- Python混合编程
- 配置文件详解
- 播放URL规则
- 按需拉流
- 按需推流
- 播放鉴权
- 推流鉴权
- 怎样创建直播流
- webrtc编译与使用
- webrtc信令交互格式
- webrtc重磅更新
- 怎么开启https相关功能
相关文档和资源
- zlmediakit独家特性
- zlmediakit的hls高性能之旅
- 高并发实现原理
- RTSP推流流程
- 流媒体相关技术介绍
- 直播延时的本质
- rtmp对H265/opus的支持
- ssl自签名证书测试
- 视频会议相关资源