central10.py
をテンプレートにして作成
[
トップ
] [
新規
|
一覧
|
検索
|
最終更新
|
ヘルプ
|
ログイン
]
開始行:
[[peripheral5.py]]
#code(Python){{
# -*- coding: utf-8 -*-
import asyncio
from bleak import BleakClient, BleakScanner
import aioconsole
ADDRESS = "B8:27:EB:ED:CD:2A"
CHAR_UUID = "87654321-4321-8765-4321-876543210000"
def notification_handler(sender, data):
print(f"\n[Zeroから受信]: {data.decode('utf-8')}")
print("自分(Pi4)の入力: ", end="", flush=True)
async def send_task(client):
try:
# 最初のプロンプトを一度だけ出す
print("自分(Pi4)の入力: ", end="", flush=True)
while client.is_connected:
try:
# 引数を空文字 "" にして、タイムアウト後...
msg = await asyncio.wait_for(aioconsole.a...
if msg and client.is_connected:
await client.write_gatt_char(CHAR_UUI...
# 入力・送信が終わったら次のプロンプトを...
print("自分(Pi4)の入力: ", end="", flush=...
except asyncio.TimeoutError:
# タイムアウト時は何もしない(プロンプト...
continue
except Exception:
pass
async def run_ble_client():
print(f"\n--- Scanning for {ADDRESS} ---")
device = await BleakScanner.find_device_by_address(AD...
if not device:
print("デバイスが見つかりません。")
return
disconnect_event = asyncio.Event()
def on_disconnect(_client):
print("\n[通知] 接続が切れました。")
disconnect_event.set()
# --- central10.py の修正箇所 ---
try:
async with BleakClient(device, timeout=30.0, disc...
print(f"Connected: {client.is_connected}")
print("Discovering services...")
# get_services() ではなく、プロパティにアクセ...
# これでBleakが内部でサービス検索を実行します
_ = client.services
await asyncio.sleep(2.0)
await client.start_notify(CHAR_UUID, notifica...
# ... (以下同じ)
print("通信準備OK。文字を入力できます(1秒ご...
send_worker = asyncio.create_task(send_task(c...
# 切断されるまで待機
await disconnect_event.wait()
send_worker.cancel()
except Exception as e:
print(f"通信確立エラー: {e}")
finally:
print("Cleanup finished.")
async def main():
while True:
try:
print("\n=== 新しい接続試行を開始します ===")
await run_ble_client()
except Exception as e:
print(f"メインループ内エラー: {e}")
print("5秒後に再試行します... (Ctrl+Cで終了)")
await asyncio.sleep(5.0)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n終了します")
}}
----
- https://note.com/nobu_hand_maker/n/n17f284cc8c26
----
#counter
終了行:
[[peripheral5.py]]
#code(Python){{
# -*- coding: utf-8 -*-
import asyncio
from bleak import BleakClient, BleakScanner
import aioconsole
ADDRESS = "B8:27:EB:ED:CD:2A"
CHAR_UUID = "87654321-4321-8765-4321-876543210000"
def notification_handler(sender, data):
print(f"\n[Zeroから受信]: {data.decode('utf-8')}")
print("自分(Pi4)の入力: ", end="", flush=True)
async def send_task(client):
try:
# 最初のプロンプトを一度だけ出す
print("自分(Pi4)の入力: ", end="", flush=True)
while client.is_connected:
try:
# 引数を空文字 "" にして、タイムアウト後...
msg = await asyncio.wait_for(aioconsole.a...
if msg and client.is_connected:
await client.write_gatt_char(CHAR_UUI...
# 入力・送信が終わったら次のプロンプトを...
print("自分(Pi4)の入力: ", end="", flush=...
except asyncio.TimeoutError:
# タイムアウト時は何もしない(プロンプト...
continue
except Exception:
pass
async def run_ble_client():
print(f"\n--- Scanning for {ADDRESS} ---")
device = await BleakScanner.find_device_by_address(AD...
if not device:
print("デバイスが見つかりません。")
return
disconnect_event = asyncio.Event()
def on_disconnect(_client):
print("\n[通知] 接続が切れました。")
disconnect_event.set()
# --- central10.py の修正箇所 ---
try:
async with BleakClient(device, timeout=30.0, disc...
print(f"Connected: {client.is_connected}")
print("Discovering services...")
# get_services() ではなく、プロパティにアクセ...
# これでBleakが内部でサービス検索を実行します
_ = client.services
await asyncio.sleep(2.0)
await client.start_notify(CHAR_UUID, notifica...
# ... (以下同じ)
print("通信準備OK。文字を入力できます(1秒ご...
send_worker = asyncio.create_task(send_task(c...
# 切断されるまで待機
await disconnect_event.wait()
send_worker.cancel()
except Exception as e:
print(f"通信確立エラー: {e}")
finally:
print("Cleanup finished.")
async def main():
while True:
try:
print("\n=== 新しい接続試行を開始します ===")
await run_ble_client()
except Exception as e:
print(f"メインループ内エラー: {e}")
print("5秒後に再試行します... (Ctrl+Cで終了)")
await asyncio.sleep(5.0)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n終了します")
}}
----
- https://note.com/nobu_hand_maker/n/n17f284cc8c26
----
#counter
ページ名: