#author("2025-02-27T22:01:41+09:00","default:TeleportDresser","TeleportDresser")
#author("2025-02-27T22:03:55+09:00","default:TeleportDresser","TeleportDresser")
[[TeleportDresser]]

#code(Python){{
# coding: utf-8
#
#   
#          +---------+      +--------------+
#  Wiki <->| fwb4pi  |<---->|tcp_server_ex1|
#          +---------+      +--------------+
#                                 ^
#                                 |
#                                 v
#                           +------------------+
#                           |RemoteCommandReder|
#                           |    |             |
#                           |    v             |
#                           |   handler        |
#                           |    |             |
#                           |    v             |
#                           |   parser         |
#                           |    |             |
#                           +----|-------------+
#                                |
#                                | put
#                                v
#                           +------------------+
#                           |TeleportDresser   |
#                           |    |             |
#                           |    v             |
#                           |  handler         |
#                           +------------------+
#                          
 
import cv2
import numpy as np
from moviepy.editor import ImageSequenceClip
# coding: utf-8
from PIL import Image, ImageFont, ImageDraw
#import smbus
from smbus2 import SMBus
import time
import sys
import requests
import os
import socket
import threading
from collections import deque
import subprocess
import copy
from subprocess import check_output
 
 
#from PIL import Image
#import smbus
#import time
#import sys
#import requests
 
 
class RemoteCommandReader:
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    HOST = 'localhost'
    PORT = 9999
    def __init__(self):
        print("start RemoteCommandReader.__init__")
        self.client_start()
        self.teleport_dress=Teleport_Dress()
        print(self.teleport_dress)
        self.teleport_dress.setRemoteCommandReader(self)
        while True:
          try:
            time.sleep(0.01)
          except:
            print("error")
 
    def python2fwbln(self,py2x_message):
      msg=py2x_message+'\n'
      self.sock.sendall(msg.encode('utf-8'))
      
    def python2fwb(self,py2x_message):
      msg=py2x_message
      self.sock.sendall(msg.encode('utf-8'))
 
    def parse(self,line):
      self.python2fwb(">"+line)
      words=line.split()
      print(words)
      if words[0]=='show':
          fx=words[1]
          print(fx)
          print(self.teleport_dress)
          self.teleport_dress.putImage(fx)
      elif words[0]=='text':
          if len(words)>=2:
             words.pop(0)
             fx  = ' '.join(words)
          else:
             fx=""
          print("text "+fx)
          self.teleport_dress.putText(fx)
      elif words[0]=='txtColor':
          fx=words[1]
          print("txtColor "+fx)
          self.teleport_dress.setTextColor(fx)
      elif words[0]=='clear':
          self.teleport_dress.clearImage()
      elif words[0]=='clearAll':
          self.teleport_dress.putText('')
          self.teleport_dress.clearImage()
      elif words[0]=='ls':
          self.returnFileList()
      #elif words[0]=='ip_addr':
          #self.returnIpAddr()
      elif words[0]=='shutdown':
          self.shutdown()
 
    def returnIpAddr(self):
        out = check_output(["hostname","-I"])
        self.python2fwbln(out)
 
    def returnFileList(self):
        cpath=os.getcwd()
        os.chdir("/home/pi/Pictures")
        filenames = [f.name for f in os.scandir()]
        filenames.sort()
        for fn in filenames:
            self.python2fwbln(fn)
        os.chdir(cpath)
 
    def shutdown(self):
        os.system("sudo shutdown -h now")
      
    def client_start(self):
      """繧ッ繝ゥ繧、繧「繝ウ繝医・繧ケ繧ソ繝シ繝・""
      """クライアントのスタート"""
      self.sock.connect((self.HOST, self.PORT))
      handle_thread = threading.Thread(target=self.handler, args=(self.sock,), daemon=True)
      handle_thread.start()
 
    def handler(self,sock):
      """繧オ繝シ繝舌°繧峨Γ繝・そ繝シ繧ク繧貞女菫。縺励€∬。ィ遉コ縺吶k"""
      """サーバからメッセージを受信し、表示する"""
 
      while True:
        data = sock.recv(1024)
        print("[蜿嶺ソ。]{}".format(data.decode("utf-8")))
        print("[受信]{}".format(data.decode("utf-8")))
        received=data.decode("utf-8")
        lines=received.splitlines()
        for line in lines:
            self.parse(line)
 
class Teleport_Dress:
 
  off_x=0
  off_y=0
 
  imax=75   #display height pixel size
  jmax=16   #display width pixel size
 
#  dx=img_width/jmax
#  dy=img_height/imax
 
  impose_text=""
  text_color=(0,0,0)
 
 
  def __init__(self):
    print("start Teleport_Dress.__init__")
    #self.font_path = './font/font_jb004_running_brush_wi.ttf'
    self.font_path = './font/GenShinGothic-Medium.ttf'
    self.pixels=[[(0,0,0) for j in range(self.jmax)] for i in range(self.imax)]
    self.addrs = [0x30,0x31,0x32,0x33]
    self.fourpix=[0x00, 0x00, 0x04, 0,0,0, 0,0,0, 0,0,0, 0,0,0]
    #self.i2c = smbus.SMBus(1) # 豕ィ:繝ゥ繧コ繝代う縺ョI2C繝昴・繝・    self.i2c=SMBus(1)
    #self.i2c = smbus.SMBus(1) # 注:ラズパイのI2Cポート
    self.i2c=SMBus(1)
    self.queue=deque([])
    self.show_loop_start()
    self.text_x=0
 
  def setTextColor(self,x):
      if x=="white":
          self.text_color=(255,255,255)
      elif x=="black":
          self.text_color=(0,0,0)
      elif x=="red":
          self.text_color=(255,0,0)
      elif x=="green":
          self.text_color=(0,255,0)
      elif x=="blue":
          self.text_color=(0,0,255)
      elif x=="yellow":
          self.text_color=(255,255,0)
      elif x=="cyan":
          self.text_color=(0,255,255)
      elif x=="magenta":
          self.text_color=(255,0,255)
      elif x=="orange":
          self.text_color=(255,128,0)
      elif x=="pink":
          self.text_color=(200,80,80)
 
  def setRemoteCommandReader(self, x):
    self.rcReader=x
 
  def cv2pil(self,image):
    ''' OpenCV蝙・-> PIL蝙・'''
    ''' OpenCV型 -> PIL型 '''
    new_image = image.copy()
    if new_image.ndim == 2:  # 繝「繝弱け繝ュ
    if new_image.ndim == 2:  # モノクロ
        pass
    elif new_image.shape[2] == 3:  # 繧ォ繝ゥ繝シ
    elif new_image.shape[2] == 3:  # カラー
        new_image = cv2.cvtColor(new_image, cv2.COLOR_BGR2RGB)
    elif new_image.shape[2] == 4:  # 騾城℃
    elif new_image.shape[2] == 4:  # 透過
        new_image = cv2.cvtColor(new_image, cv2.COLOR_BGRA2RGBA)
    new_image = Image.fromarray(new_image)
    return new_image
 
  def gif2img(self,file_name):
    #print("gif2img.."+file_name)
    try:
      path="/home/pi/Pictures/"+file_name
      gif = cv2.VideoCapture(path)
      print("gif=",gif)
    except:
      import traceback
      traceback.print_exc()
      rtn="error: show ...no "+file_name
      self.rcReader.pythohn2fwb(rtn)
      gif = cv2.VideoCapture('/home/pi/Pictures/giphy-girl-ex1.gif')
    fps = gif.get(cv2.CAP_PROP_FPS)  # fps縺ッ・醍ァ偵≠縺溘j縺ョ繧ウ繝樊焚
    fps = gif.get(cv2.CAP_PROP_FPS)  # fpsは1秒あたりのコマ数
    images = []
    i = 0
    while True:
      is_success, img = gif.read()
      if not is_success:
          #print("gif.read() is not success")
          break
      #print("gif read i=",i)
      images.append(img)
      i+=1
    ex=('anime',images)
    return ex
 
  def pic2imgs(self,file_name):
    print("pic2imgs("+file_name+")")
    img=[]
    #subprocess.call(["cd","~/Pictures"])
    try:
      img = Image.open("/home/pi/Pictures/"+file_name)
      print("img open success.");
      #self.show_one_pic(img)
    except:
      import traceback
      traceback.print_exc()
      rtn="error: show "+file_name
      self.rcReader.python2fwbln(rtn)
    #fps = gif.get(cv2.CAP_PROP_FPS)  # fps縺ッ・醍ァ偵≠縺溘j縺ョ繧ウ繝樊焚
    #fps = gif.get(cv2.CAP_PROP_FPS)  # fpsは1秒あたりのコマ数
    #
    images = []
    #
    # when the width of the img was
    #     larger than the width of the teleport dresser,
    #     make an animation
    #
    imgw,imgh=img.size
    print("imgw="+str(imgw)+",imgh="+str(imgh))
    if imgw >int(imgh/1.2) :
        print("imgw>imgh/1.2")
        wd=int(imgh/2)
        dx=int(wd/self.jmax)
        nx=int(imgw/dx-wd/dx-1)
        #print("imgw="+str(imgw)+",wd="+str(wd)+",dx="+str(dx)+",nx="+str(nx))
        #transform right
        try:
          np_img=np.array(img)
          #imgc=cv2.hconcat([np_img,np_img])
          #print("cv2.hconcat")
          #print(type(imgc))
        except:
          import traceback
          traceback.print_exc()
          rtn="error: np.array(img)"
          self.rcReader.python2fwbln(rtn)
        off_x=0
        off_y=0
        for i in range(nx):
            #print("i="+str(i))
            #print("off_x+i*dx="+str(off_x+i*dx))
            try:
              #subimg=imgc[off_y:imgh,off_x+i*dx:off_x+i*dx+wd]
              subimg=np_img[off_y:imgh,off_x+i*dx:off_x+i*dx+wd]
              img2=Image.fromarray(subimg)
              images.append(copy.copy(img2))
            except:
              import traceback
              traceback.print_exc()
              rtn="error: subimg"
              self.rcReader.python2fwbln(rtn)
    else:
        print("imgw<=imgh/1.2")
        #
        #
        #
        i = 0
        for i in range(5):
            img2=copy.copy(img)
            #self.show_one_pic(img2)
            images.append(img2)
            print(i)
        print("images-len="+str(len(images)))
    rtn=('img',images)
    return rtn
 
  def clearImage(self):
    ex=('clear','x')
    print("clear")
    self.queue.append(ex)
 
  def putImage(self,file_name):
    print("putImage..."+file_name)
    if file_name.startswith('http://') or file_name.startswith('https://'):
      #subprocess.call(["cd ~/Pictures"],shell=True)
      cpath=os.getcwd()
      os.chdir("/home/pi/Pictures")
      wgetx = "wget -N "+file_name
      subprocess.call([wgetx],shell=True)
      #subprocess.call(["cd ~/python"],shell=True)
      os.chdir(cpath)
      path=file_name.split("/")
      plen=len(path)
      xname=path[plen-1]
      print("xname="+xname)
      if xname.endswith('.gif') or xname.endswith('.GIF'):
        ex=self.gif2img(xname)
        self.queue.append(ex)
      elif xname.endswith('.jpg') or xname.endswith('.JPG') or \
           xname.endswith('.JPEG') or xname.endswith('.jpeg'):
        ex=('img',xname)
        #ex=self.pic2imgs(xname)
        self.queue.append(ex)
      elif xname.endswith('.png') or xname.endswith('.PNG'):
        ex=('img',xname)
        self.queue.append(ex)
    else:
      path=file_name.split("/")
      plen=len(path)
      xname=path[plen-1]
      print("xname="+xname);
      if xname.endswith('.gif') or xname.endswith('.GIF'):
        ex=self.gif2img(xname)
        self.queue.append(ex)
      elif xname.endswith('.jpg') or xname.endswith('.JPG') or \
           xname.endswith('.JPEG') or xname.endswith('.jpeg'):
        #ex=self.pic2imgs(xname)
        ex=('img',xname)
        self.queue.append(ex)
      elif xname.endswith('.png') or xname.endswith('.PNG'):
        ex=('img',xname)
        #ex=self.pic2imgs(xname)
        self.queue.append(ex)
      print("queue="+str(len(self.queue)))
      
  def putText(self,txt):
      self.impose_text=txt
 
  def show_loop_start(self):
      """show_loop_start"""
      self.handle_thread_01 = threading.Thread(target=self.handler, daemon=True)
      self.handle_thread_01.start()
 
  def handler(self):
      """queue縺九i繝。繝・そ繝シ繧ク繧貞女菫。縺励€∬。ィ遉コ縺吶k"""
      """queueからメッセージを受信し、表示する"""
      last_anime=[]
      while True:
        try:
          qlen = len(self.queue)
          if qlen>0:
              print("handler, qlen="+str(qlen))
              kimg=self.queue.popleft()
              print("handler, kimg[0]="+kimg[0])
              if kimg[0]=='anime':
                last_anime=kimg
                self.show_one_anime_x(kimg)
              elif kimg[0]=='img':
                print("handler, kimg[0]="+kimg[0]+" kimg[1]="+kimg[1])
                x=self.pic2imgs(kimg[1])
                last_anime=x
                self.show_one_anime_x(x)
              elif kimg[0]=='clear':
                last_anime=[]
                self.clear_pic()
          else:
             if last_anime!=[]:
                 self.show_one_anime_x(last_anime)
             else:
                time.sleep(0.1)
        except:
          if last_anime!=[]:
            self.show_one_anime_x(last_anime)
  def show_jpg_png(self,file_name):
    print("show_jpg_png("+file_name+")")
    #subprocess.call(["cd","~/Pictures"])
    try:
      img = Image.open("/home/pi/Pictures/"+file_name)
      self.show_one_pic(img)
    except:
      import traceback
      traceback.print_exc()
      rtn="error: show "+file_name
      self.rcReader.python2fwbln(rtn)
  def show_one_anime_x(self,images):
      if images[0]=='anime':
          self.show_one_anime(images[1])
      elif images[0]=='img':
          self.show_one_anime2(images[1])
 
  def show_one_anime(self,images):
      #print("show_one_anime() images-len="+str(len(images)))
      try:
        for t in range(len(images)):
          #cv2.imshow('test', images[t])
          #cv2.waitKey(int(1000/fps))
          self.show_one_pic(self.cv2pil(images[t]))
      except KeyboardInterrupt:
        print('stop by ctrl-c')
      except:
        import traceback
        traceback.print_exc()
        rtn="error: show gif"
        self.rcReader.python2fwbln(rtn)
        #self.handle_thread_01.stop()
        self.handle_thread_01.clear()
                
  def show_one_anime2(self,images):
      #print("show_one_anime2() images-len="+str(len(images)))
      try:
        for t in range(len(images)):
          #cv2.imshow('test', images[t])
          #cv2.waitKey(int(1000/fps))
          self.show_one_pic(images[t])
      except KeyboardInterrupt:
        print('stop by ctrl-c')
      except:
        import traceback
        traceback.print_exc()
        rtn="error: show anime2(jpg/png)"
        self.rcReader.python2fwbln(rtn)
        #self.handle_thread_01.stop()
        self.handle_thread_01.clear()
 
#
# command:
#   clear... 0x00
#   show ... 0x01
#   set1 ix,iy,r,g,b
#         ... 0x02  *,*,  *,*,*
#   setn ix,iy,n, r0,g0,b0, ..r(n-1),g(n-1),b(n-1)   n<=8
#         ... 0x03 *,*, *, *,*,*, ..., *,*,*
#
          
  def show_one_pic(self,img_in):
    #print('show_one_pic width:',img_width)
    #print('show_one_pic height:',img_height)
    #print("show_one_pic")
    wwx,wwy=img_in.size
    fs=int(wwy/2)
    px=self.text_x
    py=int(wwx/4)
    if self.impose_text!="":
        img = img_in.copy()
        font = ImageFont.truetype(self.font_path, fs)
        draw = ImageDraw.Draw(img)
        draw.text((px,py), self.impose_text, fill=self.text_color, font=font)
        #img=self.puttext(img_in, self.impose_text, (px,py), self.font_path, fs, (0,0,0))
        #size = draw.textsize(self.impose_text, font=font)
        text_w=draw.textlength(self.impose_text, font=font)
        #text_h=font.getsize(self.impose_text[0])
        if self.text_x+text_w<(wwx-wwx/3):
            self.text_x=wwx-wwx/3
        else:
            #self.text_x=self.text_x-int(wwx/27)
            self.text_x=self.text_x-int(wwx/13)
        #print("text_size=("+str(size[0])+",",str(size[1])+")")
    else:
       img=img_in
    img_width, img_height = img.size
    dx=img_width/self.jmax
    dy=img_height/self.imax
    #print("dx=",dx)
    #print("dy=",dy)
 
    for i in range(self.imax):
      for j in range(self.jmax):
        x=self.off_x+j*dx
        y=self.off_y+i*dy
        rgb=img.getpixel((x,y))
        #print("rgb[",j,",",i,"]=",rgb)
        self.pixels[i][j]=rgb
 
    self.i2c.write_byte(self.addrs[0],0x00)
    self.i2c.write_byte(self.addrs[1],0x00)
    self.i2c.write_byte(self.addrs[2],0x00)
    self.i2c.write_byte(self.addrs[3],0x00)
    for i in range(self.imax):
      for j in range(4):
        for k in range(4):
          p=self.pixels[i][4*j+k]
          self.fourpix[3+k*3+0]=p[0]
          self.fourpix[3+k*3+1]=p[1]
          self.fourpix[3+k*3+2]=p[2]
        self.fourpix[0]=i;
        self.fourpix[1]=0;
        i2c_addr=self.addrs[j]
        try:
          self.i2c.write_i2c_block_data(i2c_addr,0x03,self.fourpix)
        except:
          print('i2c write error at:(',i,',',j,')')
    #time.sleep(0.00001)
    self.i2c.write_byte(self.addrs[0],0x01)
    self.i2c.write_byte(self.addrs[1],0x01)
    self.i2c.write_byte(self.addrs[2],0x01)
    self.i2c.write_byte(self.addrs[3],0x01)
 
  def clear_pic(self):
    print("clear_pic")
    self.i2c.write_byte(self.addrs[0],0x00)
    self.i2c.write_byte(self.addrs[1],0x00)
    self.i2c.write_byte(self.addrs[2],0x00)
    self.i2c.write_byte(self.addrs[3],0x00)
    time.sleep(0.001)
    self.i2c.write_byte(self.addrs[0],0x01)
    self.i2c.write_byte(self.addrs[1],0x01)
    self.i2c.write_byte(self.addrs[2],0x01)
    self.i2c.write_byte(self.addrs[3],0x01)
 
def main():
    print("start teleport_dress_ex1.py")
    RemoteCommandReader()
 
if __name__ == "__main__":
    main()


}}

トップ   編集 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS