nicegui
1 通用信号槽更新
from nicegui import ui, run, app
from loguru import logger
import janus
import asyncio
class SimpleSignal:
"""最简单的信号:子线程发,主线程收,更新UI"""
def __init__(self):
self.queue = janus.Queue(maxsize=100) # 队列最大长度100
self.handlers = [] # 执行函数列表
self.running = True # 添加:控制worker退出的标志
async def start(self):
"""在事件循环启动后调用"""
self.worker_task = asyncio.create_task(self.worker())
logger.success("队列信号启动完成")
async def worker(self):
logger.info("worker开始运行")
while self.running: # 修改:用running标志控制循环
try:
# 等信号(不会卡界面)
signal = await self.queue.async_q.get()
logger.debug(f"worker接收到信号: {signal}")
# 把信号分给每一个执行函数
for handler in self.handlers:
try:
if asyncio.iscoroutinefunction(handler):
# 如果是异步函数,等待执行
await handler(signal)
logger.debug("异步处理函数执行完成")
else:
# 如果是同步函数,直接执行
handler(signal)
logger.debug("同步处理函数执行完成")
except Exception as e:
logger.error(f"处理函数执行失败: {e}")
except Exception as e:
logger.error(f"worker异常: {e}")
def on(self, handler):
"""注册:注册执行函数"""
self.handlers.append(handler)
def emit(self, data):
"""发信:从子线程发信号到队列,使用保存的主线程事件循环引用"""
try:
logger.debug(f"emit信号: {data}")
self.queue.sync_q.put(data)
except Exception as e:
logger.error(f"emit信号失败: {e}")
async def cleanup(self):
"""清理函数:程序结束前调用"""
logger.info("开始清理信号总线")
# 1. 停止worker循环
self.running = False
# 清空队列(防止残留信号)
self.queue.close()
await self.queue.wait_closed()
# 4. 清理handler列表
self.handlers.clear()
logger.success("信号总线清理完成")
ui_queue = SimpleSignal()
class Class_ng_uis:
def __init__(self):
self.uiinit()
ui_queue.on(self.ui_updater)
async def ui_updater(self, data):
try:
logger.debug(f"获得子线程更新信号:{data}")
match data.get("name", ""):
case "log更新":
self.ui_消息log.push(f"{data['data']['文字']}", classes=data["data"]["颜色"])
except Exception as e:
ui.notify("队列信号异常")
logger.error(f"队列信号异常:{e}")
def ...():
ui_queue.emit({"name": "log更新", "data": {"文字": "开始设置标题txt", "颜色": "text-green"}})
async def clean():
"""异步清理函数"""
logger.info("开始清理...")
await ui_queue.cleanup()
logger.success("清理完成")
app.shutdown()
@ui.page("/")
async def main():
Class_ui_uis()
await ui_queue.start() # ← 启动队列
app.on_disconnect(clean)
ui.run(title="aria2", native=True, reload=False, window_size=(1000, 600), storage_secret="1")
2 通用ui
class Class_ng_uis:
def __init__(self):
self.uiinit()
def uiinit(self):
pass
async def clean():
"""异步清理函数"""
logger.info("开始清理...")
await ui_queue.cleanup() # 清理信号
app.shutdown() # 停止nicegui
logger.success("清理完成")
@ui.page("/")
async def main():
Class_ui_uis()
await ui_queue.start() # ← 启动队列
app.on_disconnect(clean)
ui.run(title="123", native=True, reload=False, window_size=(1000, 600), storage_secret="1")
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 程序员小航
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果