TCP_Socketプラグインと外部アプリケーションの通信方法

画像検査用ノーコードツール MERLICのプラグイン “TCP_Socket” の接続方法の詳細(困りそうなポイント)を説明します。

簡単な説明はMERLIC販売代理店のLINX様のサイトから資料をダウンロードできますが、実際に設定しようとすると困りそうなポイントがありますので、補足説明を致します。

MERLICとは

MERLICはMVTec社が開発した、画像検査ライブラリHALCONの基本機能を実装したノーコード画像処理ツールです。
プログラミングは一切不要、直感的な操作だけで画像処理システムを構築できます。

MERLIC設定画像

MERLIC販売代理店 LINX様サイト


今回はMERLICのサンプルプログラム “read_lot_number_and_ECC200” を使用して外部機器から検査アプリを制御する方法を説明します。

1.結果出力パラメーターの登録 ーーMERLIC Creator

サンプルプログラムで印字されたテキストを二次元コードを読み取り、読み取りの成否、読み込んだテキスト(文字列)、二次元コード(文字列)を外部機器に出力します。

read_lot_number_and_ECC200 サンプルProgram

TCP_Socketで出力したい項目は、出力したい項目があるツールのページで、出力変数の上にマウスを置き右クリックで表示されるウィンドウから[MVApp結果…]を追加を選択。

上記を設定後レシピの保存をします。

出力結果の登録

2-1. レシピの設定 ーーMERLIC RTE Setup

MERLIC RTEを立ち上げ、レシピのタブを選択します。
[インポート]ボタンをクリックし、レシピを読み込みます。

レシピが1つの場合は下部にあるトグルスイッチをデフォルトにしてください。

レシピのインポート

2-2. プラグイン ‘TCP_Socket’ の設定 ーーMERLIC RTE Setup

MERLIC RTEを立ち上げ、通信のタブを選択します。
[プラグインインスタンスの追加]をクリックし、表示されたリストから“TCP_Socket”を追加します

プラグインの追加

“TCP_Socket”を選択することで各パラメーターの設定を変更することができます。

  • 変更する項目
    • Terminator for incoming message … LineFeed 入力文字列の終端文字を’\n’とする
    • ResultReady — Separetor for result context … ”(空白)外部機器側での文字列解析の為

その他項目も必要に応じ変更してください。
変更後は ‘変更を保存’ ボタンを押してパラメーターを保存します。

TCP_Socket パラメーターの変更
TCP_Socket パラメーターの登録

保存後は’TCP_Socket’のプラグインを起動しておいてください。

プラグインの起動

3. 外部機器からの制御アプリケーション作成

今回はMERLICと同一PC(ローカル環境)から制御するPythonコード作成について紹介します。

下記は(単一)検査開始を行う為のメッセージを送信するコードを示します。
単一検査開始には文字列‘StartSingleJob’にMERLICで設定した終端文字 ‘\n’ を追加しUTF-8でエンコードしたデータを送信します。

import socket
import time

# ソケットの作成
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    # タイムアウト時間の設定
    s.settimeout(0.5)
    try:
        # MERLICとの接続
        s.connect((ip_adress, port))
    
        # 検査開始メッセージの作成(utf-8エンコード)
        buffer = ("StartSingleJob\n").encode("utf-8")
        # メッセージの送信
        s.sendall(buffer)  

        time.sleep(0.1)
        # ソケットの解除
        s.close()

    except (TimeoutError, ConnectionRefusedError):
        print("Connection Error")

データを送信すると各種イベントが発生するため、メッセージを受け取ります。

  # レスポンスメッセージの受信
  rcv_data: bytes = s.recv(512)[0:]
  # メッセージをUTF-8でデコード
  messages = rcv_data.decode("utf-8")
  print(messages)

受信メッセージは複数のメッセージを結合された文字列を受信するので、解析するには終端文字'\n'で分割する必要があります。
また分割されたメッセージを' '(空白)で分割すると各要素を取得することができます。

検査結果のデータを取得するには’ResultReady’イベントをキャッチしそのメッセージから各データを取得する必要があります。

 # メッセージリストから1メッセージを取得しループ処理
        for message in messages_list:
            # メッセージから各項目を取得
            items = message.split(" ")
            if len(items) > 0:
                # イベント名を取得
                event_name = items[0]
                # 検査結果出力イベントをキャッチし検査結果を取得する
                if event_name == "ResultReady:":
                    # インデックスはプラグインパラメーター設定画面を参照
                    judge = items[6]
                    text = items[7]
                    data = items[8]
                    print(f"Result: Judge = {judge}, Text = {text}, Data = {data}")

ResultReadyイベントを取得する前にソケットを閉じてしまうと二度と取得できなくなってしまう為、必ず取得してからソケットを閉じてください。

最後にコードの全文を記載します。

import socket
import time

ip_adress = "127.0.0.1"
port = 65433

# TCPクライアントの作成
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    # タイムアウト時間の設定
    s.settimeout(0.5)
    try:
        # MERLICとの接続
        s.connect((ip_adress, port))
    
        # 検査開始メッセージの作成(utf-8エンコード)
        buffer = ("StartSingleJob\n").encode("utf-8")
        # メッセージの送信
        s.sendall(buffer)  

        time.sleep(0.2)
        
        # レスポンスメッセージの受信
        rcv_data: bytes = s.recv(512)[0:]
        # メッセージをUTF-8でデコード
        messages = rcv_data.decode("utf-8")
        # メッセージを分割(リスト化)
        messages_list = messages.split("\n")
        
        # メッセージリストから1メッセージを取得しループ処理
        for message in messages_list:
            # メッセージから各項目を取得
            items = message.split(" ")
            if len(items) > 0:
                # イベント名を取得
                event_name = items[0]
                # 検査結果出力イベントをキャッチし検査結果を取得する
                if event_name == "ResultReady:":
                    # インデックスはプラグインパラメーター設定画面を参照
                    judge = items[6]
                    text = items[7]
                    data = items[8]
                    print(f"Result: Judge = {judge}, Text = {text}, Data = {data}")

    except (ConnectionRefusedError, TimeoutError):
        print("Connection Error")
    # ソケットを閉じる
    s.close()