peripheral5.py

٤Ƴ٤Ĥ
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
-
!
 
 
 
 
 
 
 
 
 
 
 
 
-
!
 
 
-
!
 
 
 
 
-
!
 
-
!
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
-
!
 
 
 
 
-
|
!
 
 
 
-
!
 
 
 
-
!
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
# -*- coding: utf-8 -*-
import asyncio
from bleak import BleakClient, BleakScanner
import aioconsole
 
ADDRESS = "B8:27:EB:ED:CD:2A"
CHAR_UUID = "87654321-4321-8765-4321-876543210000"
 
def notification_handler(sender, data):
    print(f"\n[Zeroから受信]: {data.decode('utf-8')}")
    print("自分(Pi4)の入力: ", end="", flush=True)
 
async def send_task(client):
    try:
        # 最初のプロンプトを一度だけ出す
        print("自分(Pi4)の入力: ", end="", flush=True)
        while client.is_connected:
            try:
                # 引数を空文字 "" にして、タイムアウト後の再表示を防ぐ
                msg = await asyncio.wait_for(aioconsole.ainput(""), timeout=1.0)
                
                if msg and client.is_connected:
                    await client.write_gatt_char(CHAR_UUID, msg.encode('utf-8'), response=False)
                
                # 入力・送信が終わったら次のプロンプトを出す
                print("自分(Pi4)の入力: ", end="", flush=True)
            except asyncio.TimeoutError:
                # タイムアウト時は何もしない(プロンプトを再表示しない)
                continue
    except Exception:
        pass
 
async def run_ble_client():
    print(f"\n--- Scanning for {ADDRESS} ---")
    device = await BleakScanner.find_device_by_address(ADDRESS, timeout=10.0)
    if not device: 
        print("デバイスが見つかりません。")
        return
 
    disconnect_event = asyncio.Event()
 
    def on_disconnect(_client): 
        print("\n[通知] 接続が切れました。")
        disconnect_event.set()
 
# --- central10.py の修正箇所 ---
    try:
        async with BleakClient(device, timeout=30.0, disconnected_callback=on_disconnect) as client:
            print(f"Connected: {client.is_connected}")
            
            print("Discovering services...")
            # get_services() ではなく、プロパティにアクセスするだけでOK
            # これでBleakが内部でサービス検索を実行します
            _ = client.services 
            await asyncio.sleep(2.0)
            
            await client.start_notify(CHAR_UUID, notification_handler)
            # ... (以下同じ)
            print("通信準備OK。文字を入力できます(1秒ごとに入力待機を更新します)")
            
            send_worker = asyncio.create_task(send_task(client))
            
            # 切断されるまで待機
            await disconnect_event.wait()
            
            send_worker.cancel()
    except Exception as e:
        print(f"通信確立エラー: {e}")
    finally:
        print("Cleanup finished.")
 
async def main():
    while True:
        try:
            print("\n=== 新しい接続試行を開始します ===")
            await run_ble_client()
        except Exception as e:
            print(f"メインループ内エラー: {e}")
        
        print("5秒後に再試行します... (Ctrl+Cで終了)")
        await asyncio.sleep(5.0)
 
if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\n終了します")
        

Counter: 6, today: 2, yesterday: 0

トップ   新規 一覧 検索 最終更新   ヘルプ   最終更新のRSS