1 通用信号槽更新

from nicegui import ui, run, app
from loguru import logger
import janus
import asyncio


class SimpleSignal:
	"""最简单的信号:子线程发,主线程收,更新UI"""

	def __init__(self):
		self.queue = janus.Queue(maxsize=100)  # 队列最大长度100
		self.handlers = []  # 执行函数列表
		self.running = True  # 添加:控制worker退出的标志

	async def start(self):
		"""在事件循环启动后调用"""
		self.worker_task = asyncio.create_task(self.worker())
		logger.success("队列信号启动完成")

	async def worker(self):
		logger.info("worker开始运行")
		while self.running:  # 修改:用running标志控制循环
			try:
				# 等信号(不会卡界面)
				signal = await self.queue.async_q.get()
				logger.debug(f"worker接收到信号: {signal}")
				# 把信号分给每一个执行函数
				for handler in self.handlers:
					try:
						if asyncio.iscoroutinefunction(handler):
							# 如果是异步函数,等待执行
							await handler(signal)
							logger.debug("异步处理函数执行完成")
						else:
							# 如果是同步函数,直接执行
							handler(signal)
							logger.debug("同步处理函数执行完成")
					except Exception as e:
						logger.error(f"处理函数执行失败: {e}")
			except Exception as e:
				logger.error(f"worker异常: {e}")

	def on(self, handler):
		"""注册:注册执行函数"""
		self.handlers.append(handler)

	def emit(self, data):
		"""发信:从子线程发信号到队列,使用保存的主线程事件循环引用"""
		try:
			logger.debug(f"emit信号: {data}")
			self.queue.sync_q.put(data)
		except Exception as e:
			logger.error(f"emit信号失败: {e}")

	async def cleanup(self):
		"""清理函数:程序结束前调用"""
		logger.info("开始清理信号总线")

		# 1. 停止worker循环
		self.running = False

		# 清空队列(防止残留信号)
		self.queue.close()
		await self.queue.wait_closed()
		# 4. 清理handler列表
		self.handlers.clear()

		logger.success("信号总线清理完成")


ui_queue = SimpleSignal()
class Class_ng_uis:
	def __init__(self):
		self.uiinit()
			ui_queue.on(self.ui_updater)
			
	async def ui_updater(self, data):
		try:
			logger.debug(f"获得子线程更新信号:{data}")
			match data.get("name", ""):
				case "log更新":
					self.ui_消息log.push(f"{data['data']['文字']}", classes=data["data"]["颜色"])
		except Exception as e:
			ui.notify("队列信号异常")
			logger.error(f"队列信号异常:{e}")
			
	def ...():
		ui_queue.emit({"name": "log更新", "data": {"文字": "开始设置标题txt", "颜色": "text-green"}})
	
	

async def clean():
	"""异步清理函数"""
	logger.info("开始清理...")
	await ui_queue.cleanup()
	logger.success("清理完成")
	app.shutdown()


@ui.page("/")
async def main():
	Class_ui_uis()
	await ui_queue.start()  # ← 启动队列


app.on_disconnect(clean)
ui.run(title="aria2", native=True, reload=False, window_size=(1000, 600), storage_secret="1")

2 通用ui

class Class_ng_uis:
	def __init__(self):
		self.uiinit()
	def uiinit(self):
		pass

async def clean():
	"""异步清理函数"""
	logger.info("开始清理...")
	await ui_queue.cleanup() # 清理信号
	app.shutdown() # 停止nicegui
	logger.success("清理完成")

@ui.page("/")
async def main():
	Class_ui_uis()
	await ui_queue.start()  # ← 启动队列


app.on_disconnect(clean)
ui.run(title="123", native=True, reload=False, window_size=(1000, 600), storage_secret="1")