1 通用信号槽更新

from nicegui import ui, run, app
from loguru import logger
import janus
import asyncio



class SimpleSignal:
	"""
	统一信号机制:
	- 子线程或协程里都可以 emit(data)
	- 在主线程调用 handlers 更新 UI
	"""

	def __init__(self):
		self.queue = janus.Queue(maxsize=100)
		self.handlers = []
		self.running = False
		self.loop = None
		self.worker_task = None

	async def start(self):
		"""启动"""
		self.loop = asyncio.get_running_loop()
		self.running = True
		self.worker_task = asyncio.create_task(self._worker())
		logger.info("信号启动")

	async def _worker(self):
		"""处理器:在主线程调用 handlers"""
		while self.running:
			try:
				data = await self.queue.async_q.get()
				for handler in self.handlers:
					if asyncio.iscoroutinefunction(handler):
						asyncio.run_coroutine_threadsafe(handler(data), self.loop)
					else:
						self.loop.call_soon_threadsafe(handler, data)
			except asyncio.CancelledError:
				break
			except Exception as e:
				logger.error(e)

	def on(self, handler):
		"""注册处理函数"""

		if handler not in self.handlers:
			self.handlers.append(handler)
		return handler

	def emit(self, data):
		"""
		统一发信号:子线程或协程里都可以用
		"""
		self.queue.sync_q.put(data)  # 队列满会阻塞,不报错

	async def cleanup(self):
		"""清理"""
		self.running = False
		if not self.queue.closed:
			self.queue.close()
			await self.queue.wait_closed()
		self.handlers.clear()
		logger.info("信号清理完成")

# ========== 全局实例 ==========
ui_queue = SimpleSignal()

# ========== UI 结构 ==========
class Class_ng_uis:
	def __init__(self):
		self.uiinit()
		ui_queue.on(self.ui_updater)
			
	async def ui_updater(self, data):
		try:
			logger.debug(f"获得子线程更新信号:{data}")
			match data.get("name", ""):
				case "log更新":
					self.ui_消息log.push(f"{data['data']['文字']}", classes=data["data"]["颜色"])
		except Exception as e:
			ui.notify("队列信号异常")
			logger.error(f"队列信号异常:{e}")
			
	def ...():
		ui_queue.emit({"name": "log更新", "data": {"文字": "开始设置标题txt", "颜色": "text-green"}})
	
	

async def clean():
	"""异步清理函数"""
	logger.info("开始清理...")
	await ui_queue.cleanup()
	logger.success("清理完成")
	app.shutdown()


@ui.page("/")
async def main():
	Class_ui_uis()
	await ui_queue.start()  # ← 启动队列


app.on_disconnect(clean)
ui.run(title="1123", native=True, reload=False, window_size=(1000, 600), storage_secret="1")

2 通用ui

class Class_ng_uis:
	def __init__(self):
		self.uiinit()
	def uiinit(self):
		pass

async def clean():
	"""异步清理函数"""
	logger.info("开始清理...")
	await ui_queue.cleanup() # 清理信号
	app.shutdown() # 停止nicegui
	logger.success("清理完成")

@ui.page("/")
async def main():
	Class_ui_uis()
	await ui_queue.start()  # ← 启动队列


app.on_disconnect(clean)
ui.run(title="123", native=True, reload=False, window_size=(1000, 600), storage_secret="1")

3.bf旧的

from nicegui import ui, run, app
from loguru import logger
import janus
import asyncio


class SimpleSignal:
	"""最简单的信号:子线程发,主线程收,更新UI"""

	def __init__(self):
		self.queue = janus.Queue(maxsize=100)  # 队列最大长度100
		self.handlers = []  # 执行函数列表
		self.running = True  # 添加:控制worker退出的标志

	async def start(self):
		"""在事件循环启动后调用"""
		self.worker_task = asyncio.create_task(self.worker())
		logger.success("队列信号启动完成")

	async def worker(self):
		logger.info("worker开始运行")
		while self.running:  # 修改:用running标志控制循环
			try:
				# 等信号(不会卡界面)
				signal = await self.queue.async_q.get()
				logger.debug(f"worker接收到信号: {signal}")
				# 把信号分给每一个执行函数
				for handler in self.handlers:
					try:
						if asyncio.iscoroutinefunction(handler):
							# 如果是异步函数,等待执行
							await handler(signal)
							logger.debug("异步处理函数执行完成")
						else:
							# 如果是同步函数,直接执行
							handler(signal)
							logger.debug("同步处理函数执行完成")
					except Exception as e:
						logger.error(f"处理函数执行失败: {e}")
			except Exception as e:
				logger.error(f"worker异常: {e}")

	def on(self, handler):
		"""注册:注册执行函数"""
		self.handlers.append(handler)

	def emit(self, data):
		"""发信:从子线程发信号到队列,使用保存的主线程事件循环引用"""
		try:
			logger.debug(f"emit信号: {data}")
			self.queue.sync_q.put(data)
		except Exception as e:
			logger.error(f"emit信号失败: {e}")

	async def cleanup(self):
		"""清理函数:程序结束前调用"""
		logger.info("开始清理信号总线")

		# 1. 停止worker循环
		self.running = False

		# 清空队列(防止残留信号)
		self.queue.close()
		await self.queue.wait_closed()
		# 4. 清理handler列表
		self.handlers.clear()

		logger.success("信号总线清理完成")


ui_queue = SimpleSignal()
class Class_ng_uis:
	def __init__(self):
		self.uiinit()
		ui_queue.on(self.ui_updater)
			
	async def ui_updater(self, data):
		try:
			logger.debug(f"获得子线程更新信号:{data}")
			match data.get("name", ""):
				case "log更新":
					self.ui_消息log.push(f"{data['data']['文字']}", classes=data["data"]["颜色"])
		except Exception as e:
			ui.notify("队列信号异常")
			logger.error(f"队列信号异常:{e}")
			
	def ...():
		ui_queue.emit({"name": "log更新", "data": {"文字": "开始设置标题txt", "颜色": "text-green"}})
	
	

async def clean():
	"""异步清理函数"""
	logger.info("开始清理...")
	await ui_queue.cleanup()
	logger.success("清理完成")
	app.shutdown()


@ui.page("/")
async def main():
	Class_ui_uis()
	await ui_queue.start()  # ← 启动队列


app.on_disconnect(clean)
ui.run(title="aria2", native=True, reload=False, window_size=(1000, 600), storage_secret="1")