#author("2026-05-05T19:55:13+09:00","default:TESLA2","TESLA2") #author("2026-05-06T06:14:08+09:00","default:TESLA2","TESLA2") [[peripheral5.py]] #code(Python){{ # -*- coding: utf-8 -*- import asyncio from bleak import BleakClient, BleakScanner import aioconsole ADDRESS = "B8:27:EB:ED:CD:2A" CHAR_UUID = "87654321-4321-8765-4321-876543210000" def notification_handler(sender, data): print(f"\n[Zeroから受信]: {data.decode('utf-8')}") print("自分(Pi4)の入力: ", end="", flush=True) async def send_task(client): try: # 最初のプロンプトを一度だけ出す print("自分(Pi4)の入力: ", end="", flush=True) while client.is_connected: try: # 引数を空文字 "" にして、タイムアウト後の再表示を防ぐ msg = await asyncio.wait_for(aioconsole.ainput(""), timeout=1.0) if msg and client.is_connected: await client.write_gatt_char(CHAR_UUID, msg.encode('utf-8'), response=False) # 入力・送信が終わったら次のプロンプトを出す print("自分(Pi4)の入力: ", end="", flush=True) except asyncio.TimeoutError: # タイムアウト時は何もしない(プロンプトを再表示しない) continue except Exception: pass async def run_ble_client(): print(f"\n--- Scanning for {ADDRESS} ---") device = await BleakScanner.find_device_by_address(ADDRESS, timeout=10.0) if not device: print("デバイスが見つかりません。") return disconnect_event = asyncio.Event() def on_disconnect(_client): print("\n[通知] 接続が切れました。") disconnect_event.set() # --- central10.py の修正箇所 --- try: async with BleakClient(device, timeout=30.0, disconnected_callback=on_disconnect) as client: print(f"Connected: {client.is_connected}") print("Discovering services...") # get_services() ではなく、プロパティにアクセスするだけでOK # これでBleakが内部でサービス検索を実行します _ = client.services await asyncio.sleep(2.0) await client.start_notify(CHAR_UUID, notification_handler) # ... (以下同じ) print("通信準備OK。文字を入力できます(1秒ごとに入力待機を更新します)") send_worker = asyncio.create_task(send_task(client)) # 切断されるまで待機 await disconnect_event.wait() send_worker.cancel() except Exception as e: print(f"通信確立エラー: {e}") finally: print("Cleanup finished.") async def main(): while True: try: print("\n=== 新しい接続試行を開始します ===") await run_ble_client() except Exception as e: print(f"メインループ内エラー: {e}") print("5秒後に再試行します... (Ctrl+Cで終了)") await asyncio.sleep(5.0) if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\n終了します") }} ---- - https://note.com/nobu_hand_maker/n/n17f284cc8c26 ---- #counter