Teleport_
をテンプレートにして作成
[
トップ
] [
新規
|
一覧
|
単語検索
|
最終更新
|
ヘルプ
|
ログイン
]
開始行:
#code(Python){{
# coding: utf-8
#
#
# +---------+ +--------------+
# Wiki <->| fwb4pi |<---->|tcp_server_ex1|
# +---------+ +--------------+
# ^
# |
# v
# +------------------+
# |RemoteCommandReder|
# | | |
# | v |
# | handler |
# | | |
# | v |
# | parser |
# | | |
# +----|-------------+
# |
# | put
# v
# +------------------+
# |TeleportDresser |
# | | |
# | v |
# | handler |
# +------------------+
#
import cv2
import numpy as np
from moviepy.editor import ImageSequenceClip
# coding: utf-8
from PIL import Image
import smbus
import time
import sys
import requests
import os
import socket
import threading
from collections import deque
import subprocess
#from PIL import Image
#import smbus
#import time
#import sys
#import requests
class RemoteCommandReader:
sock = socket.socket(socket.AF_INET, socket.SOCK_STRE...
HOST = 'localhost'
PORT = 9998
def __init__(self):
print("start RemoteCommandReader.__init__")
self.client_start()
self.teleport_dress=Teleport_Dress()
print(self.teleport_dress)
while True:
try:
time.sleep(0.01)
except:
print("error")
def python2fwb(self,py2x_message):
msg=py2x_message+'\n'
self.sock.send(msg.encode('utf-8'))
def parse(self,line):
print("parsing "+line)
words=line.split()
print(words)
if words[0]=='show':
fx=words[1]
print(fx)
print(self.teleport_dress)
self.teleport_dress.putImage(fx)
elif words[0]=='clear':
self.teleport_dress.clearImage()
def client_start(self):
"""クライアントのスタート"""
self.sock.connect((self.HOST, self.PORT))
handle_thread = threading.Thread(target=self.handle...
handle_thread.start()
def handler(self,sock):
"""サーバからメッセージを受信し、表示する"""
while True:
data = sock.recv(1024)
print("[受信]{}".format(data.decode("utf-8")))
line=data.decode("utf-8")
self.parse(line)
class Teleport_Dress:
off_x=0
off_y=0
imax=75
jmax=16
# dx=img_width/jmax
# dy=img_height/imax
def __init__(self):
print("start Teleport_Dress.__init__")
self.pixels=[[(0,0,0) for j in range(self.jmax)] for ...
self.addrs = [0x30,0x31,0x32,0x33]
self.fourpix=[0x00, 0x00, 0x04, 0,0,0, 0,0,0, 0,0,0, ...
self.i2c = smbus.SMBus(1) # 注:ラズパイのI2Cポート
self.queue=deque([])
self.show_loop_start()
def cv2pil(self,image):
''' OpenCV型 -> PIL型 '''
new_image = image.copy()
if new_image.ndim == 2: # モノクロ
pass
elif new_image.shape[2] == 3: # カラー
new_image = cv2.cvtColor(new_image, cv2.COLOR_BGR...
elif new_image.shape[2] == 4: # 透過
new_image = cv2.cvtColor(new_image, cv2.COLOR_BGR...
new_image = Image.fromarray(new_image)
return new_image
def gif2img(self,file_name):
try:
gif = cv2.VideoCapture(gif_name)
except:
gif = cv2.VideoCapture('../Pictures/giphy-girl-ex1....
fps = gif.get(cv2.CAP_PROP_FPS) # fpsは1秒あたり...
images = []
i = 0
while True:
is_success, img = gif.read()
if not is_success:
break
images.append(img)
i+=1
ex=('anime',images)
return ex
def clearImage(self):
ex=('clear','x')
print("clear")
self.queue.append(ex)
def putImage(self,file_name):
print("putImage..."+file_name)
if file_name.startswith('http://') or file_name.start...
subprocess.call(["cd","~/Pictures"])
subprocess.call(["wget", file_name])
path=file_name.split("/")
plen=len(path)
xname=path[plen-1]
print("xname="+xname)
if xname.endswith('.gif') or xname.endswith('.GIF'):
ex=gif2img(xname)
self.queue.append(ex)
elif xname.endswith('.jpg') or xname.endswith('.JPG...
ex=('img',xname)
self.queue.append(ex)
elif xname.endswith('.png') or xname.endswith('.PNG...
ex=('img',xname)
self.queue.append(ex)
else:
path=file_name.split("/")
plen=len(path)
xname=path[plen-1]
print("xname="+xname);
if xname.endswith('.gif') or xname.endswith('.GIF'):
ex=gif2img(xname)
self.queue.append(ex)
elif xname.endswith('.jpg') or xname.endswith('.JPG...
ex=('img',xname)
self.queue.append(ex)
elif xname.endswith('.png') or xname.endswith('.PNG...
ex=('img',xname)
self.queue.append(ex)
print("queue="+str(len(self.queue)))
def show_loop_start(self):
"""show_loop_start"""
handle_thread = threading.Thread(target=self.handle...
handle_thread.start()
def handler(self):
"""queueからメッセージを受信し、表示する"""
last_anime=[]
while True:
try:
qlen = len(self.queue)
if qlen>0:
print("handler, qlen="+str(qlen))
kimg=self.queue.popleft()
print("handler, kimg[0]="+kimg[0]+",kimg[1]...
if kimg[0]=='anime':
last_anime=kimg[1]
self.show_animegif(last_anime)
elif kimg[0]=='img':
last_anime=[]
self.show_jpg_png(kimg[1])
elif kimg[0]=='clear':
last_anime=[]
self.clear_pic()
else:
if last_anime!=[]:
self.show_animegif(last_anime)
else:
time.sleep(0.1)
except:
if last_anime!=[]:
self.show_animegif(last_anime)
def show_jpg_png(self,file_name):
print("show_jpg_png("+file_name+")")
#subprocess.call(["cd","~/Pictures"])
img = Image.open("../Pictures/"+file_name)
self.show_one_pic(img)
def show_animegif(self,gif_name):
try:
gif = cv2.VideoCapture(gif_name)
except:
gif = cv2.VideoCapture('../Pictures/giphy-girl-ex...
fps = gif.get(cv2.CAP_PROP_FPS) # fpsは1秒あたりの...
images = []
i = 0
while True:
is_success, img = gif.read()
if not is_success:
break
images.append(img)
i+=1
#cv2.namedWindow('test', cv2.WINDOW_AUTOSIZE)
#for it in range(round(times,0)):
show_one_anime(images)
def show_one_anime(self,images):
try:
for t in range(len(images)):
#cv2.imshow('test', images[t])
#cv2.waitKey(int(1000/fps))
show_one_pic(cv2pil(images[t]))
except KeyboardInterrupt:
print('stop by ctrl-c')
#
# command:
# clear... 0x00
# show ... 0x01
# set1 ix,iy,r,g,b
# ... 0x02 *,*, *,*,*
# setn ix,iy,n, r0,g0,b0, ..r(n-1),g(n-1),b(n-1) n<=8
# ... 0x03 *,*, *, *,*,*, ..., *,*,*
#
def show_one_pic(self,img):
img_width, img_height = img.size
#print('width:',img_width)
#print('height:',img_height)
#print("show_one_pic")
dx=img_width/self.jmax
dy=img_height/self.imax
#print("dx=",dx)
#print("dy=",dy)
for i in range(self.imax):
for j in range(self.jmax):
x=self.off_x+j*dx
y=self.off_y+i*dy
rgb=img.getpixel((x,y))
#print("rgb[",j,",",i,"]=",rgb)
self.pixels[i][j]=rgb
self.i2c.write_byte(self.addrs[0],0x00)
self.i2c.write_byte(self.addrs[1],0x00)
self.i2c.write_byte(self.addrs[2],0x00)
self.i2c.write_byte(self.addrs[3],0x00)
for i in range(self.imax):
for j in range(4):
for k in range(4):
p=self.pixels[i][4*j+k]
self.fourpix[3+k*3+0]=p[0]
self.fourpix[3+k*3+1]=p[1]
self.fourpix[3+k*3+2]=p[2]
self.fourpix[0]=i;
self.fourpix[1]=0;
i2c_addr=self.addrs[j]
try:
self.i2c.write_i2c_block_data(i2c_addr,0x03,sel...
except:
print('i2c write error at:(',i,',',j,')')
#time.sleep(0.00001)
self.i2c.write_byte(self.addrs[0],0x01)
self.i2c.write_byte(self.addrs[1],0x01)
self.i2c.write_byte(self.addrs[2],0x01)
self.i2c.write_byte(self.addrs[3],0x01)
def clear_pic(self):
print("clear_pic")
self.i2c.write_byte(self.addrs[0],0x00)
self.i2c.write_byte(self.addrs[1],0x00)
self.i2c.write_byte(self.addrs[2],0x00)
self.i2c.write_byte(self.addrs[3],0x00)
time.sleep(0.001)
self.i2c.write_byte(self.addrs[0],0x01)
self.i2c.write_byte(self.addrs[1],0x01)
self.i2c.write_byte(self.addrs[2],0x01)
self.i2c.write_byte(self.addrs[3],0x01)
def main():
RemoteCommandReader()
if __name__ == "__main__":
main()
}}
終了行:
#code(Python){{
# coding: utf-8
#
#
# +---------+ +--------------+
# Wiki <->| fwb4pi |<---->|tcp_server_ex1|
# +---------+ +--------------+
# ^
# |
# v
# +------------------+
# |RemoteCommandReder|
# | | |
# | v |
# | handler |
# | | |
# | v |
# | parser |
# | | |
# +----|-------------+
# |
# | put
# v
# +------------------+
# |TeleportDresser |
# | | |
# | v |
# | handler |
# +------------------+
#
import cv2
import numpy as np
from moviepy.editor import ImageSequenceClip
# coding: utf-8
from PIL import Image
import smbus
import time
import sys
import requests
import os
import socket
import threading
from collections import deque
import subprocess
#from PIL import Image
#import smbus
#import time
#import sys
#import requests
class RemoteCommandReader:
sock = socket.socket(socket.AF_INET, socket.SOCK_STRE...
HOST = 'localhost'
PORT = 9998
def __init__(self):
print("start RemoteCommandReader.__init__")
self.client_start()
self.teleport_dress=Teleport_Dress()
print(self.teleport_dress)
while True:
try:
time.sleep(0.01)
except:
print("error")
def python2fwb(self,py2x_message):
msg=py2x_message+'\n'
self.sock.send(msg.encode('utf-8'))
def parse(self,line):
print("parsing "+line)
words=line.split()
print(words)
if words[0]=='show':
fx=words[1]
print(fx)
print(self.teleport_dress)
self.teleport_dress.putImage(fx)
elif words[0]=='clear':
self.teleport_dress.clearImage()
def client_start(self):
"""クライアントのスタート"""
self.sock.connect((self.HOST, self.PORT))
handle_thread = threading.Thread(target=self.handle...
handle_thread.start()
def handler(self,sock):
"""サーバからメッセージを受信し、表示する"""
while True:
data = sock.recv(1024)
print("[受信]{}".format(data.decode("utf-8")))
line=data.decode("utf-8")
self.parse(line)
class Teleport_Dress:
off_x=0
off_y=0
imax=75
jmax=16
# dx=img_width/jmax
# dy=img_height/imax
def __init__(self):
print("start Teleport_Dress.__init__")
self.pixels=[[(0,0,0) for j in range(self.jmax)] for ...
self.addrs = [0x30,0x31,0x32,0x33]
self.fourpix=[0x00, 0x00, 0x04, 0,0,0, 0,0,0, 0,0,0, ...
self.i2c = smbus.SMBus(1) # 注:ラズパイのI2Cポート
self.queue=deque([])
self.show_loop_start()
def cv2pil(self,image):
''' OpenCV型 -> PIL型 '''
new_image = image.copy()
if new_image.ndim == 2: # モノクロ
pass
elif new_image.shape[2] == 3: # カラー
new_image = cv2.cvtColor(new_image, cv2.COLOR_BGR...
elif new_image.shape[2] == 4: # 透過
new_image = cv2.cvtColor(new_image, cv2.COLOR_BGR...
new_image = Image.fromarray(new_image)
return new_image
def gif2img(self,file_name):
try:
gif = cv2.VideoCapture(gif_name)
except:
gif = cv2.VideoCapture('../Pictures/giphy-girl-ex1....
fps = gif.get(cv2.CAP_PROP_FPS) # fpsは1秒あたり...
images = []
i = 0
while True:
is_success, img = gif.read()
if not is_success:
break
images.append(img)
i+=1
ex=('anime',images)
return ex
def clearImage(self):
ex=('clear','x')
print("clear")
self.queue.append(ex)
def putImage(self,file_name):
print("putImage..."+file_name)
if file_name.startswith('http://') or file_name.start...
subprocess.call(["cd","~/Pictures"])
subprocess.call(["wget", file_name])
path=file_name.split("/")
plen=len(path)
xname=path[plen-1]
print("xname="+xname)
if xname.endswith('.gif') or xname.endswith('.GIF'):
ex=gif2img(xname)
self.queue.append(ex)
elif xname.endswith('.jpg') or xname.endswith('.JPG...
ex=('img',xname)
self.queue.append(ex)
elif xname.endswith('.png') or xname.endswith('.PNG...
ex=('img',xname)
self.queue.append(ex)
else:
path=file_name.split("/")
plen=len(path)
xname=path[plen-1]
print("xname="+xname);
if xname.endswith('.gif') or xname.endswith('.GIF'):
ex=gif2img(xname)
self.queue.append(ex)
elif xname.endswith('.jpg') or xname.endswith('.JPG...
ex=('img',xname)
self.queue.append(ex)
elif xname.endswith('.png') or xname.endswith('.PNG...
ex=('img',xname)
self.queue.append(ex)
print("queue="+str(len(self.queue)))
def show_loop_start(self):
"""show_loop_start"""
handle_thread = threading.Thread(target=self.handle...
handle_thread.start()
def handler(self):
"""queueからメッセージを受信し、表示する"""
last_anime=[]
while True:
try:
qlen = len(self.queue)
if qlen>0:
print("handler, qlen="+str(qlen))
kimg=self.queue.popleft()
print("handler, kimg[0]="+kimg[0]+",kimg[1]...
if kimg[0]=='anime':
last_anime=kimg[1]
self.show_animegif(last_anime)
elif kimg[0]=='img':
last_anime=[]
self.show_jpg_png(kimg[1])
elif kimg[0]=='clear':
last_anime=[]
self.clear_pic()
else:
if last_anime!=[]:
self.show_animegif(last_anime)
else:
time.sleep(0.1)
except:
if last_anime!=[]:
self.show_animegif(last_anime)
def show_jpg_png(self,file_name):
print("show_jpg_png("+file_name+")")
#subprocess.call(["cd","~/Pictures"])
img = Image.open("../Pictures/"+file_name)
self.show_one_pic(img)
def show_animegif(self,gif_name):
try:
gif = cv2.VideoCapture(gif_name)
except:
gif = cv2.VideoCapture('../Pictures/giphy-girl-ex...
fps = gif.get(cv2.CAP_PROP_FPS) # fpsは1秒あたりの...
images = []
i = 0
while True:
is_success, img = gif.read()
if not is_success:
break
images.append(img)
i+=1
#cv2.namedWindow('test', cv2.WINDOW_AUTOSIZE)
#for it in range(round(times,0)):
show_one_anime(images)
def show_one_anime(self,images):
try:
for t in range(len(images)):
#cv2.imshow('test', images[t])
#cv2.waitKey(int(1000/fps))
show_one_pic(cv2pil(images[t]))
except KeyboardInterrupt:
print('stop by ctrl-c')
#
# command:
# clear... 0x00
# show ... 0x01
# set1 ix,iy,r,g,b
# ... 0x02 *,*, *,*,*
# setn ix,iy,n, r0,g0,b0, ..r(n-1),g(n-1),b(n-1) n<=8
# ... 0x03 *,*, *, *,*,*, ..., *,*,*
#
def show_one_pic(self,img):
img_width, img_height = img.size
#print('width:',img_width)
#print('height:',img_height)
#print("show_one_pic")
dx=img_width/self.jmax
dy=img_height/self.imax
#print("dx=",dx)
#print("dy=",dy)
for i in range(self.imax):
for j in range(self.jmax):
x=self.off_x+j*dx
y=self.off_y+i*dy
rgb=img.getpixel((x,y))
#print("rgb[",j,",",i,"]=",rgb)
self.pixels[i][j]=rgb
self.i2c.write_byte(self.addrs[0],0x00)
self.i2c.write_byte(self.addrs[1],0x00)
self.i2c.write_byte(self.addrs[2],0x00)
self.i2c.write_byte(self.addrs[3],0x00)
for i in range(self.imax):
for j in range(4):
for k in range(4):
p=self.pixels[i][4*j+k]
self.fourpix[3+k*3+0]=p[0]
self.fourpix[3+k*3+1]=p[1]
self.fourpix[3+k*3+2]=p[2]
self.fourpix[0]=i;
self.fourpix[1]=0;
i2c_addr=self.addrs[j]
try:
self.i2c.write_i2c_block_data(i2c_addr,0x03,sel...
except:
print('i2c write error at:(',i,',',j,')')
#time.sleep(0.00001)
self.i2c.write_byte(self.addrs[0],0x01)
self.i2c.write_byte(self.addrs[1],0x01)
self.i2c.write_byte(self.addrs[2],0x01)
self.i2c.write_byte(self.addrs[3],0x01)
def clear_pic(self):
print("clear_pic")
self.i2c.write_byte(self.addrs[0],0x00)
self.i2c.write_byte(self.addrs[1],0x00)
self.i2c.write_byte(self.addrs[2],0x00)
self.i2c.write_byte(self.addrs[3],0x00)
time.sleep(0.001)
self.i2c.write_byte(self.addrs[0],0x01)
self.i2c.write_byte(self.addrs[1],0x01)
self.i2c.write_byte(self.addrs[2],0x01)
self.i2c.write_byte(self.addrs[3],0x01)
def main():
RemoteCommandReader()
if __name__ == "__main__":
main()
}}
ページ名: