快速入门

更新时间:2024-03-22 07:59:14下载pdf

本文描述如何启用设备的局域网通信功能,并为第三方应用提供授权,帮助您快速入门。

第一步:启用局域网通信功能

设备对外的局域网通信功能默认是关闭的,您需要在 App 面板上开启。

  1. 单击 App 面板的 高级功能
  2. 选择 局域网控制,单击开关图标,可以开启或关闭该功能。

启用局域网通信功能后,App 面板上展示设备的密钥,该密钥用于身份认证的数据加密。请保存该密钥,防止泄露。

第二步:接入鉴权

网关局域网通信采用 WebSocket 协议作为传输协议,数据使用 AES-128-ECB 加密算法加密,保证数据的安全性。建立 WebSocket 连接后,先进行身份认证,身份认证成功后才能进行双向通信,并且传输的数据用身份认证过程协商得到的会话密钥进行加密。

身份认证流程

  1. 网关生成 16 字节随机数 nonce S,将其 Base64 编码。

  2. 网关向第三方应用发送类型为 auth_required 的消息,数据是 Base64(nonce S)。

  3. 第三方应用接收数据,通过 Base64 解码出 nonce S,并使用 SHA-256 计算 HMAC 值,将其 Base64 编码。

  4. 第三方应用生成 16 字节随机数 nonce C,将其 Base64 编码。

  5. 第三方应用向网关发送类型为 auth 的消息,数据是 Base64(nonce C)和 Base64(HMAC)。

  6. 网关接收数据,通过 Base64 解密出 nonce C,并使用 SHA-256 计算 HMAC 值,将其 Base64 编码。

  7. 网关校验第三方应用发过来的 HMAC 值,校验成功则向第三方应用发送类型为 auth_ok 的消息,数据是 Base64(HMAC)。

  8. 同样地,第三方应用也校验网关发过来的 HMAC 值,验证失败则断开连接。

    以上的传输数据使用 AES-128-ECB 加密算法加密,密钥是 App 面板上展示的密钥。

网关第三方应用生成随机数 nonce Stype: auth_required, data: base64(nonce S)生成随机数 nonce Ctype: auth, data: base64(nonce C), base64(SHA-256(nonce S))合法性校验type: auth_ok, data: base64(SHA-256(nonce C))断开 WebSocket 连接alt[认证成功][认证失败]合法性校验断开 WebSocket 连接alt[认证失败]网关第三方应用

会话密钥协商流程

  1. 网关将 nonce S 和 nonce C 进行异或运算,得到会话密钥。
  2. 第三方应用将 nonce S 和 nonce C 进行异或运算,得到会话密钥。
  3. 后续的通信数据使用会话密钥加密和解密。
网关第三方应用Session Key = nonce C ^ nonce SSession Key = nonce C ^ nonce SSession Key 加密的数据Session Key 加密的数据网关第三方应用

第三步:示例

以下的示例 Python 代码演示了 WebSocket 客户端通过局域网连接到网关,完成身份认证过程,并且获取当前网关的设备列表。

在示例代码中,将 YOUR_IP_ADDRESS 替换成您的网关 IP 地址,YOUR_SECRET_KEY 替换成您的网关密钥(App 面板上展示的密钥)。

示例代码

import websocket
import json
import os

from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
import hashlib
import hmac
import base64

class LocalApi(object):
    def __init__(self, url: str, key: bytes):
        self.nonce_c = os.urandom(16)
        self.nonce_s = None
        self.key = key
        self.url = url
        self.ws = websocket.WebSocket()
        self.cipher = AES.new(self.key, AES.MODE_ECB)

        self.ws.connect(self.url)

    def enc_msg(self, msg: bytes) -> bytes:
        padded_msg = pad(msg, AES.block_size)
        return self.cipher.encrypt(padded_msg)

    def dec_msg(self, msg: bytes) -> bytes:
        return unpad(self.cipher.decrypt(msg), AES.block_size)

    def _auth_required(self, data):
        self.nonce_s = base64.b64decode(data['data']['nonce'])

        hmac_sha256 = hmac.new(self.key, self.nonce_s, hashlib.sha256)
        hmac_hex = hmac_sha256.digest()
        hmac_str = base64.b64encode(hmac_hex).decode('utf-8')

        msg = json.dumps({
            "type": "auth",
            "data": {
                "nonce": base64.b64encode(self.nonce_c).decode('utf-8'),
                "hmac": hmac_str
            }
        })

        self.ws.send(self.enc_msg(msg.encode()))

    def _auth_ok(self, data):
        self.key = bytes([a ^ b for a, b in zip(self.nonce_c, self.nonce_s)])
        self.cipher = AES.new(self.key, AES.MODE_ECB)
        self._get_devices()

    def _get_devices(self):
        msg = json.dumps({
            "id": 1,
            "type": "get_devices"
        })

        self.ws.send(self.enc_msg(msg.encode()))

    def run(self):
        while True:
            ws_data = self.ws.recv()
            data = json.loads(self.dec_msg(ws_data))
            print(f"recv data: {data}")
            if data['type'] == 'auth_required':
                self._auth_required(data)
            elif data['type'] == 'auth_ok':
                self._auth_ok(data)
            elif data['type'] == 'event':
                pass

        self.ws.close()

url = "ws://<YOUR_IP_ADDRESS>:8080"
key = b'<YOUR_SECRET_KEY>'

client = LocalApi(url, key)
client.run()

输出日志

recv data: {'type': 'auth_required', 'data': {'nonce': 'nQ+UcvlKWSbPSoXic1o1Tg=='}}
recv data: {'type': 'auth_ok', 'data': {'hmac': '5uulS0qaM1aSE5ISvE4puEJ2rZ3kK1YIW/1yw+szTVw='}}
recv data: {'id': 1, 'type': 'result', 'success': True, 'result': ['000d6ffffe67e2ca-1', '000d6ffffe67e2ca']}