Skip to main content

基于 OpenTwins 的数字孪生系统设计

一、项目概述

本项目基于开源数字孪生平台 OpenTwins 构建企业级数字孪生应用。OpenTwins 底层使用 Eclipse Ditto 作为数字孪生核心引擎,提供完整的设备管理、数据采集、可视化和分析能力。

1.1 什么是 OpenTwins?

OpenTwins 是一个开源的数字孪生平台,集成了以下核心组件:

  • 🎯 Eclipse Ditto - 数字孪生核心引擎
  • 📊 Grafana - 数据可视化和监控
  • 💾 InfluxDB - 时序数据存储
  • 📡 Mosquitto - MQTT 消息代理
  • 📈 Telegraf - 数据采集和处理
  • 🗄️ MongoDB - Things 状态存储

1.2 应用场景

🏭 智能制造

  • 生产设备实时监控
  • 设备预测性维护
  • 生产线数字孪生
  • OEE(设备综合效率)分析

🏢 智慧园区

  • 园区 3D 可视化监控
  • 能耗监测和优化
  • 环境监测(温湿度、空气质量)
  • 设备集中管理

⚙️ 设备健康管理

  • 设备状态实时监控
  • 故障预警和诊断
  • 维护计划优化
  • 设备生命周期管理

1.3 技术架构

1.4 技术指标

指标目标值说明
设备连接数10,000+同时在线设备
数据采集频率1-10 Hz根据设备类型调整
端到端延迟< 1s从设备到可视化展示
API 响应时间< 200msDitto HTTP API
数据保留期30天/1年原始数据/聚合数据
系统可用性99.9%年度目标

二、OpenTwins 平台部署

2.1 环境准备

在开始开发之前,请先完成 OpenTwins 平台的部署。详细步骤请参考:

👉 OpenTwins Windows 本地部署实战指南

快速检查清单

bash
1# 1. 检查 Minikube 集群状态
2kubectl get nodes
3
4# 2. 检查 OpenTwins 所有 Pod 状态
5kubectl get pods
6
7# 3. 确认所有服务正常运行
8kubectl get services
9
10# 4. 启动端口转发
11powershell -ExecutionPolicy Bypass -File .\start-opentwins-portforward.ps1

访问地址

服务地址用途
Grafanahttp://localhost:3002数据可视化(admin/admin)
Ditto APIhttp://localhost:8080数字孪生核心 API
Extended APIhttp://localhost:8081OpenTwins 扩展 API
InfluxDBhttp://localhost:8086时序数据库管理
MQTTlocalhost:1883设备消息接入

2.2 OpenTwins 组件架构


三、Eclipse Ditto 核心概念

3.1 Things 数据模型

Eclipse Ditto 使用 Thing 作为核心概念,一个 Thing 代表一个数字孪生实体。

Thing 的完整结构

json
1{
2 "thingId": "org.example:temperature-sensor-001",
3 "policyId": "org.example:my-policy",
4 "definition": "org.example:TemperatureSensor:1.0.0",
5 "attributes": {
6 "manufacturer": "Acme Inc.",
7 "model": "TempSensor-2000",
8 "serialNumber": "SN-12345",
9 "location": {
10 "building": "Building A",
11 "floor": "2",
12 "room": "201",
13 "coordinates": {
14 "latitude": 31.2304,
15 "longitude": 121.4737
16 }
17 },
18 "installDate": "2024-01-15",
19 "warrantyDate": "2026-01-15"
20 },
21 "features": {
22 "temperature": {
23 "properties": {
24 "value": 23.5,
25 "unit": "°C",
26 "quality": "good",
27 "timestamp": "2024-01-15T10:30:00Z"
28 }
29 },
30 "humidity": {
31 "properties": {
32 "value": 65.2,
33 "unit": "%",
34 "timestamp": "2024-01-15T10:30:00Z"
35 }
36 },
37 "battery": {
38 "properties": {
39 "level": 85,
40 "voltage": 3.7,
41 "status": "charging"
42 }
43 }
44 }
45}

关键概念说明

  • thingId: 全局唯一标识符,格式:namespace:name
  • policyId: 关联的权限策略 ID
  • definition: Thing 的类型定义(可选)
  • attributes: 静态属性(设备元数据,不常变化)
  • features: 动态属性(传感器数据,经常变化)

3.2 Attributes vs Features

特性AttributesFeatures
用途静态元数据动态数据
更新频率低(很少变化)高(频繁变化)
示例制造商、型号、位置温度、湿度、状态
搜索支持全文搜索支持属性搜索
典型大小较小可能较大

3.3 Policy 权限模型

Ditto 使用 Policy 来控制对 Thing 的访问权限。

Policy 示例

json
1{
2 "policyId": "org.example:my-policy",
3 "entries": {
4 "owner": {
5 "subjects": {
6 "nginx:admin": {
7 "type": "nginx basic auth user"
8 }
9 },
10 "resources": {
11 "thing:/": {
12 "grant": ["READ", "WRITE"],
13 "revoke": []
14 },
15 "policy:/": {
16 "grant": ["READ", "WRITE"],
17 "revoke": []
18 },
19 "message:/": {
20 "grant": ["READ", "WRITE"],
21 "revoke": []
22 }
23 }
24 },
25 "observer": {
26 "subjects": {
27 "nginx:observer": {
28 "type": "nginx basic auth user"
29 }
30 },
31 "resources": {
32 "thing:/": {
33 "grant": ["READ"],
34 "revoke": []
35 }
36 }
37 }
38 }
39}

四、Ditto HTTP API 使用指南

4.1 创建数字孪生 (Thing)

API 端点: PUT /api/2/things/{thingId}

bash
1# 设置 Ditto API 基础地址
2DITTO_API="http://localhost:8080/api/2"
3
4# 创建一个温度传感器数字孪生
5curl -X PUT "$DITTO_API/things/org.example:temp-sensor-001" \
6 -H "Content-Type: application/json" \
7 -d '{
8 "attributes": {
9 "manufacturer": "Acme Inc.",
10 "model": "TempSensor-2000",
11 "location": {
12 "building": "A",
13 "floor": "2",
14 "room": "201"
15 }
16 },
17 "features": {
18 "temperature": {
19 "properties": {
20 "value": 0,
21 "unit": "°C"
22 }
23 },
24 "humidity": {
25 "properties": {
26 "value": 0,
27 "unit": "%"
28 }
29 }
30 }
31 }'

PowerShell 版本

powershell
1$dittoApi = "http://localhost:8080/api/2"
2$thingId = "org.example:temp-sensor-001"
3
4$body = @{
5 attributes = @{
6 manufacturer = "Acme Inc."
7 model = "TempSensor-2000"
8 location = @{
9 building = "A"
10 floor = "2"
11 room = "201"
12 }
13 }
14 features = @{
15 temperature = @{
16 properties = @{
17 value = 0
18 unit = "°C"
19 }
20 }
21 humidity = @{
22 properties = @{
23 value = 0
24 unit = "%"
25 }
26 }
27 }
28} | ConvertTo-Json -Depth 10
29
30Invoke-RestMethod -Uri "$dittoApi/things/$thingId" `
31 -Method Put `
32 -ContentType "application/json" `
33 -Body $body

4.2 查询数字孪生

获取单个 Thing

bash
1curl -X GET "$DITTO_API/things/org.example:temp-sensor-001"

搜索 Things(使用 RQL 查询语言):

bash
1# 查询所有 Acme 制造商的设备
2curl -X GET "$DITTO_API/search/things?filter=eq(attributes/manufacturer,\"Acme Inc.\")"
3
4# 查询特定楼层的所有设备
5curl -X GET "$DITTO_API/search/things?filter=eq(attributes/location/floor,\"2\")"
6
7# 查询温度大于 25°C 的设备
8curl -X GET "$DITTO_API/search/things?filter=gt(features/temperature/properties/value,25)"
9
10# 复杂查询:特定位置且温度异常
11curl -X GET "$DITTO_API/search/things?filter=and(eq(attributes/location/building,\"A\"),gt(features/temperature/properties/value,30))"

4.3 更新设备属性

更新单个 Feature 属性

bash
1# 更新温度值
2curl -X PUT "$DITTO_API/things/org.example:temp-sensor-001/features/temperature/properties/value" \
3 -H "Content-Type: application/json" \
4 -d '25.6'
5
6# 更新湿度值
7curl -X PUT "$DITTO_API/things/org.example:temp-sensor-001/features/humidity/properties/value" \
8 -H "Content-Type: application/json" \
9 -d '62.3'

批量更新整个 Feature

bash
1curl -X PUT "$DITTO_API/things/org.example:temp-sensor-001/features/temperature" \
2 -H "Content-Type: application/json" \
3 -d '{
4 "properties": {
5 "value": 25.6,
6 "unit": "°C",
7 "quality": "good",
8 "timestamp": "2024-01-15T10:30:00Z"
9 }
10 }'

4.4 删除数字孪生

bash
1curl -X DELETE "$DITTO_API/things/org.example:temp-sensor-001"

五、设备接入实现

5.1 MQTT 设备接入

OpenTwins 使用 Mosquitto 作为 MQTT Broker,设备通过 MQTT 协议上报数据。

MQTT 主题规范

1# Ditto 标准主题格式
2{namespace}/{thingId}/things/twin/commands/modify
3
4# 示例
5org.example/temp-sensor-001/things/twin/commands/modify

设备上报数据示例(Python):

python
1import paho.mqtt.client as mqtt
2import json
3import time
4from datetime import datetime
5
6# MQTT 配置
7MQTT_BROKER = "localhost"
8MQTT_PORT = 1883
9THING_ID = "org.example:temp-sensor-001"
10NAMESPACE = "org.example"
11THING_NAME = "temp-sensor-001"
12
13# MQTT 主题
14TOPIC = f"{NAMESPACE}/{THING_NAME}/things/twin/commands/modify"
15
16def on_connect(client, userdata, flags, rc):
17 print(f"Connected to MQTT Broker! Code: {rc}")
18
19def publish_sensor_data(client, temperature, humidity):
20 """发布传感器数据到 Ditto"""
21
22 # 构建 Ditto 消息
23 message = {
24 "topic": f"{NAMESPACE}/{THING_NAME}/things/twin/commands/modify",
25 "path": "/features",
26 "value": {
27 "temperature": {
28 "properties": {
29 "value": temperature,
30 "unit": "°C",
31 "timestamp": datetime.utcnow().isoformat() + "Z"
32 }
33 },
34 "humidity": {
35 "properties": {
36 "value": humidity,
37 "unit": "%",
38 "timestamp": datetime.utcnow().isoformat() + "Z"
39 }
40 }
41 }
42 }
43
44 # 发布消息
45 result = client.publish(TOPIC, json.dumps(message))
46 status = result[0]
47
48 if status == 0:
49 print(f"✓ Data sent: Temp={temperature}°C, Humidity={humidity}%")
50 else:
51 print(f"✗ Failed to send data to topic {TOPIC}")
52
53# 创建 MQTT 客户端
54client = mqtt.Client()
55client.on_connect = on_connect
56
57# 连接到 Broker
58client.connect(MQTT_BROKER, MQTT_PORT, 60)
59client.loop_start()
60
61# 模拟传感器数据上报
62try:
63 while True:
64 # 模拟读取传感器数据
65 temperature = 20 + (time.time() % 10) # 20-30°C
66 humidity = 50 + (time.time() % 20) # 50-70%
67
68 # 发布数据
69 publish_sensor_data(client, round(temperature, 2), round(humidity, 2))
70
71 # 每 5 秒上报一次
72 time.sleep(5)
73
74except KeyboardInterrupt:
75 print("\n停止数据上报")
76 client.loop_stop()
77 client.disconnect()

5.2 配置 Ditto MQTT 连接

需要在 Ditto 中配置 MQTT 连接,以便接收设备消息。

创建 MQTT 连接配置

bash
1curl -X POST "http://localhost:8080/api/2/connections" \
2 -H "Content-Type: application/json" \
3 -d '{
4 "id": "mqtt-connection-mosquitto",
5 "connectionType": "mqtt",
6 "connectionStatus": "open",
7 "uri": "tcp://opentwins-mosquitto:1883",
8 "sources": [{
9 "addresses": ["org.example/+/things/twin/commands/modify"],
10 "authorizationContext": ["nginx:ditto"],
11 "qos": 1,
12 "filters": []
13 }],
14 "targets": [{
15 "address": "org.example/{{ thing:id }}/things/twin/events",
16 "topics": ["_/_/things/twin/events"],
17 "authorizationContext": ["nginx:ditto"],
18 "qos": 1
19 }]
20 }'

5.3 HTTP API 设备接入

对于不支持 MQTT 的设备,可以直接使用 HTTP API 上报数据。

HTTP 上报示例(Python):

python
1import requests
2import time
3from datetime import datetime
4
5DITTO_API = "http://localhost:8080/api/2"
6THING_ID = "org.example:temp-sensor-002"
7
8def create_thing():
9 """创建 Thing"""
10 url = f"{DITTO_API}/things/{THING_ID}"
11 data = {
12 "attributes": {
13 "manufacturer": "Acme Inc.",
14 "model": "TempSensor-HTTP"
15 },
16 "features": {
17 "temperature": {
18 "properties": {
19 "value": 0,
20 "unit": "°C"
21 }
22 }
23 }
24 }
25
26 response = requests.put(url, json=data)
27 print(f"Thing created: {response.status_code}")
28
29def update_temperature(value):
30 """更新温度值"""
31 url = f"{DITTO_API}/things/{THING_ID}/features/temperature/properties"
32 data = {
33 "value": value,
34 "unit": "°C",
35 "timestamp": datetime.utcnow().isoformat() + "Z"
36 }
37
38 response = requests.put(url, json=data)
39 if response.status_code == 204:
40 print(f"✓ Temperature updated: {value}°C")
41 else:
42 print(f"✗ Failed: {response.status_code}")
43
44# 创建 Thing(首次运行)
45# create_thing()
46
47# 持续上报数据
48while True:
49 temp = 20 + (time.time() % 10)
50 update_temperature(round(temp, 2))
51 time.sleep(5)

六、数据可视化实现

6.1 Grafana 数据源配置

  1. 访问 Grafana: http://localhost:3002
  2. 登录(admin/admin)
  3. 配置 InfluxDB 数据源

添加 InfluxDB 数据源

  • SettingsData SourcesAdd data source
  • 选择 InfluxDB
  • 配置参数:
    • URL: http://opentwins-influxdb2:80
    • Query Language: Flux
    • Organization: iot
    • Token: (从 InfluxDB 获取)
    • Default Bucket: telegraf

6.2 创建监控仪表板

创建温度监控面板

  1. CreateDashboardAdd new panel
  2. 选择数据源:InfluxDB
  3. 编写 Flux 查询:
flux
1from(bucket: "telegraf")
2 |> range(start: -1h)
3 |> filter(fn: (r) => r["_measurement"] == "temperature")
4 |> filter(fn: (r) => r["device"] == "temp-sensor-001")
5 |> filter(fn: (r) => r["_field"] == "value")
6 |> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
7 |> yield(name: "mean")
  1. 配置可视化选项:
    • Panel Title: "设备温度实时监控"
    • Visualization: Time series(时间序列图)
    • Unit: Celsius (°C)
    • Thresholds: 设置告警阈值(如 >30°C)

6.3 完整监控大屏示例

仪表板布局

创建设备状态统计面板

flux
1from(bucket: "telegraf")
2 |> range(start: -5m)
3 |> filter(fn: (r) => r["_measurement"] == "device_status")
4 |> last()
5 |> group(columns: ["status"])
6 |> count()

七、业务应用开发

7.1 前端应用架构

技术栈选择

json
1{
2 "dependencies": {
3 "vue": "^3.4.0",
4 "vue-router": "^4.2.0",
5 "pinia": "^2.1.0",
6 "axios": "^1.6.0",
7 "element-plus": "^2.5.0",
8 "echarts": "^5.4.0",
9 "mqtt": "^5.3.0",
10 "@vueuse/core": "^10.7.0"
11 }
12}

项目结构

1digital-twin-web/
2├── src/
3│ ├── api/ # API 接口
4│ │ ├── ditto.js # Ditto API 封装
5│ │ └── influxdb.js # InfluxDB API 封装
6│ ├── components/ # 公共组件
7│ │ ├── DeviceCard.vue # 设备卡片
8│ │ ├── TemperatureChart.vue # 温度图表
9│ │ └── DeviceList.vue # 设备列表
10│ ├── views/ # 页面
11│ │ ├── Dashboard.vue # 监控大屏
12│ │ ├── DeviceManage.vue # 设备管理
13│ │ ├── DataAnalysis.vue # 数据分析
14│ │ └── AlarmCenter.vue # 告警中心
15│ ├── stores/ # Pinia 状态管理
16│ │ ├── device.js # 设备状态
17│ │ └── mqtt.js # MQTT 连接
18│ ├── utils/ # 工具函数
19│ │ ├── request.js # HTTP 请求封装
20│ │ └── mqtt.js # MQTT 工具
21│ ├── App.vue
22│ └── main.js
23└── package.json

7.2 Ditto API 封装

创建 src/api/ditto.js

javascript
1import axios from 'axios'
2
3const DITTO_BASE_URL = 'http://localhost:8080/api/2'
4
5// 创建 axios 实例
6const dittoApi = axios.create({
7 baseURL: DITTO_BASE_URL,
8 timeout: 10000,
9 headers: {
10 'Content-Type': 'application/json'
11 }
12})
13
14/**
15 * Ditto API 封装
16 */
17export default {
18 /**
19 * 获取所有 Things
20 */
21 async getAllThings() {
22 const response = await dittoApi.get('/things')
23 return response.data
24 },
25
26 /**
27 * 获取单个 Thing
28 */
29 async getThing(thingId) {
30 const response = await dittoApi.get(`/things/${thingId}`)
31 return response.data
32 },
33
34 /**
35 * 创建 Thing
36 */
37 async createThing(thingId, data) {
38 const response = await dittoApi.put(`/things/${thingId}`, data)
39 return response.data
40 },
41
42 /**
43 * 更新 Thing 属性
44 */
45 async updateThingAttribute(thingId, attributePath, value) {
46 const response = await dittoApi.put(
47 `/things/${thingId}/attributes/${attributePath}`,
48 value
49 )
50 return response.data
51 },
52
53 /**
54 * 更新 Feature 属性
55 */
56 async updateFeatureProperty(thingId, featureId, propertyPath, value) {
57 const response = await dittoApi.put(
58 `/things/${thingId}/features/${featureId}/properties/${propertyPath}`,
59 value
60 )
61 return response.data
62 },
63
64 /**
65 * 搜索 Things(RQL 查询)
66 */
67 async searchThings(filter, options = {}) {
68 const params = {
69 filter,
70 ...options // 支持 option、namespaces、fields 等参数
71 }
72 const response = await dittoApi.get('/search/things', { params })
73 return response.data
74 },
75
76 /**
77 * 删除 Thing
78 */
79 async deleteThing(thingId) {
80 const response = await dittoApi.delete(`/things/${thingId}`)
81 return response.data
82 },
83
84 /**
85 * 发送消息给 Thing
86 */
87 async sendMessage(thingId, messageSubject, payload) {
88 const response = await dittoApi.post(
89 `/things/${thingId}/inbox/messages/${messageSubject}`,
90 payload
91 )
92 return response.data
93 }
94}

7.3 设备列表页面实现

创建 src/views/DeviceManage.vue

vue
1<template>
2 <div class="device-manage">
3 <el-card class="header-card">
4 <div class="header-actions">
5 <el-input
6 v-model="searchText"
7 placeholder="搜索设备..."
8 style="width: 300px"
9 clearable
10 @input="handleSearch"
11 >
12 <template #prefix>
13 <el-icon><Search /></el-icon>
14 </template>
15 </el-input>
16
17 <el-button type="primary" @click="showCreateDialog = true">
18 <el-icon><Plus /></el-icon>
19 添加设备
20 </el-button>
21 </div>
22 </el-card>
23
24 <el-card class="device-list-card">
25 <el-table :data="filteredDevices" style="width: 100%" v-loading="loading">
26 <el-table-column prop="thingId" label="设备 ID" width="250" />
27
28 <el-table-column label="制造商">
29 <template #default="{ row }">
30 {{ row.attributes?.manufacturer || '-' }}
31 </template>
32 </el-table-column>
33
34 <el-table-column label="型号">
35 <template #default="{ row }">
36 {{ row.attributes?.model || '-' }}
37 </template>
38 </el-table-column>
39
40 <el-table-column label="位置">
41 <template #default="{ row }">
42 {{ formatLocation(row.attributes?.location) }}
43 </template>
44 </el-table-column>
45
46 <el-table-column label="温度">
47 <template #default="{ row }">
48 <span :class="getTemperatureClass(row.features?.temperature?.properties?.value)">
49 {{ row.features?.temperature?.properties?.value || '-' }}
50 {{ row.features?.temperature?.properties?.unit }}
51 </span>
52 </template>
53 </el-table-column>
54
55 <el-table-column label="湿度">
56 <template #default="{ row }">
57 {{ row.features?.humidity?.properties?.value || '-' }}
58 {{ row.features?.humidity?.properties?.unit }}
59 </template>
60 </el-table-column>
61
62 <el-table-column label="操作" width="200">
63 <template #default="{ row }">
64 <el-button size="small" @click="viewDevice(row)">详情</el-button>
65 <el-button size="small" type="danger" @click="deleteDevice(row)">
66 删除
67 </el-button>
68 </template>
69 </el-table-column>
70 </el-table>
71 </el-card>
72
73 <!-- 创建设备对话框 -->
74 <el-dialog v-model="showCreateDialog" title="添加设备" width="600px">
75 <el-form :model="newDevice" label-width="100px">
76 <el-form-item label="设备 ID">
77 <el-input v-model="newDevice.thingId" placeholder="org.example:device-001" />
78 </el-form-item>
79
80 <el-form-item label="制造商">
81 <el-input v-model="newDevice.manufacturer" />
82 </el-form-item>
83
84 <el-form-item label="型号">
85 <el-input v-model="newDevice.model" />
86 </el-form-item>
87
88 <el-form-item label="楼栋">
89 <el-input v-model="newDevice.building" />
90 </el-form-item>
91
92 <el-form-item label="楼层">
93 <el-input v-model="newDevice.floor" />
94 </el-form-item>
95
96 <el-form-item label="房间">
97 <el-input v-model="newDevice.room" />
98 </el-form-item>
99 </el-form>
100
101 <template #footer>
102 <el-button @click="showCreateDialog = false">取消</el-button>
103 <el-button type="primary" @click="createDevice" :loading="creating">
104 创建
105 </el-button>
106 </template>
107 </el-dialog>
108 </div>
109</template>
110
111<script setup>
112import { ref, computed, onMounted } from 'vue'
113import { ElMessage, ElMessageBox } from 'element-plus'
114import { Search, Plus } from '@element-plus/icons-vue'
115import dittoApi from '@/api/ditto'
116
117const devices = ref([])
118const loading = ref(false)
119const searchText = ref('')
120const showCreateDialog = ref(false)
121const creating = ref(false)
122
123const newDevice = ref({
124 thingId: '',
125 manufacturer: '',
126 model: '',
127 building: '',
128 floor: '',
129 room: ''
130})
131
132// 过滤后的设备列表
133const filteredDevices = computed(() => {
134 if (!searchText.value) return devices.value
135
136 const search = searchText.value.toLowerCase()
137 return devices.value.filter(device =>
138 device.thingId.toLowerCase().includes(search) ||
139 device.attributes?.manufacturer?.toLowerCase().includes(search) ||
140 device.attributes?.model?.toLowerCase().includes(search)
141 )
142})
143
144// 加载设备列表
145const loadDevices = async () => {
146 loading.value = true
147 try {
148 const response = await dittoApi.getAllThings()
149 devices.value = response.items || []
150 } catch (error) {
151 ElMessage.error('加载设备列表失败')
152 console.error(error)
153 } finally {
154 loading.value = false
155 }
156}
157
158// 创建设备
159const createDevice = async () => {
160 if (!newDevice.value.thingId) {
161 ElMessage.warning('请输入设备 ID')
162 return
163 }
164
165 creating.value = true
166 try {
167 const data = {
168 attributes: {
169 manufacturer: newDevice.value.manufacturer,
170 model: newDevice.value.model,
171 location: {
172 building: newDevice.value.building,
173 floor: newDevice.value.floor,
174 room: newDevice.value.room
175 }
176 },
177 features: {
178 temperature: {
179 properties: {
180 value: 0,
181 unit: '°C'
182 }
183 },
184 humidity: {
185 properties: {
186 value: 0,
187 unit: '%'
188 }
189 }
190 }
191 }
192
193 await dittoApi.createThing(newDevice.value.thingId, data)
194 ElMessage.success('设备创建成功')
195 showCreateDialog.value = false
196 loadDevices()
197
198 // 重置表单
199 newDevice.value = {
200 thingId: '',
201 manufacturer: '',
202 model: '',
203 building: '',
204 floor: '',
205 room: ''
206 }
207 } catch (error) {
208 ElMessage.error('创建设备失败')
209 console.error(error)
210 } finally {
211 creating.value = false
212 }
213}
214
215// 删除设备
216const deleteDevice = async (device) => {
217 try {
218 await ElMessageBox.confirm(
219 `确定要删除设备 ${device.thingId} 吗?`,
220 '确认删除',
221 {
222 confirmButtonText: '删除',
223 cancelButtonText: '取消',
224 type: 'warning'
225 }
226 )
227
228 await dittoApi.deleteThing(device.thingId)
229 ElMessage.success('设备删除成功')
230 loadDevices()
231 } catch (error) {
232 if (error !== 'cancel') {
233 ElMessage.error('删除设备失败')
234 console.error(error)
235 }
236 }
237}
238
239// 查看设备详情
240const viewDevice = (device) => {
241 // 跳转到设备详情页
242 console.log('View device:', device)
243}
244
245// 格式化位置
246const formatLocation = (location) => {
247 if (!location) return '-'
248 return `${location.building || ''}-${location.floor || ''}F-${location.room || ''}`
249}
250
251// 获取温度样式类
252const getTemperatureClass = (temp) => {
253 if (!temp) return ''
254 if (temp > 30) return 'temp-high'
255 if (temp < 15) return 'temp-low'
256 return 'temp-normal'
257}
258
259const handleSearch = () => {
260 // 搜索逻辑已在 computed 中实现
261}
262
263onMounted(() => {
264 loadDevices()
265})
266</script>
267
268<style scoped>
269.device-manage {
270 padding: 20px;
271}
272
273.header-card {
274 margin-bottom: 20px;
275}
276
277.header-actions {
278 display: flex;
279 justify-content: space-between;
280 align-items: center;
281}
282
283.temp-high {
284 color: #f56c6c;
285 font-weight: bold;
286}
287
288.temp-low {
289 color: #409eff;
290 font-weight: bold;
291}
292
293.temp-normal {
294 color: #67c23a;
295}
296</style>

八、实时通信 WebSocket

8.1 WebSocket 连接

Ditto 支持通过 WebSocket 订阅 Thing 的实时变化。

创建 WebSocket 连接(JavaScript):

javascript
1// src/utils/dittoWebSocket.js
2export class DittoWebSocket {
3 constructor(wsUrl = 'ws://localhost:8080/ws/2') {
4 this.wsUrl = wsUrl
5 this.ws = null
6 this.handlers = new Map()
7 this.reconnectInterval = 5000
8 }
9
10 connect() {
11 this.ws = new WebSocket(this.wsUrl)
12
13 this.ws.onopen = () => {
14 console.log('✓ WebSocket connected')
15
16 // 订阅所有 Things 的变化
17 this.subscribe()
18 }
19
20 this.ws.onmessage = (event) => {
21 try {
22 const message = JSON.parse(event.data)
23 this.handleMessage(message)
24 } catch (error) {
25 console.error('Failed to parse WebSocket message:', error)
26 }
27 }
28
29 this.ws.onerror = (error) => {
30 console.error('WebSocket error:', error)
31 }
32
33 this.ws.onclose = () => {
34 console.log('WebSocket closed. Reconnecting...')
35 setTimeout(() => this.connect(), this.reconnectInterval)
36 }
37 }
38
39 subscribe(filter = null) {
40 const subscribeMessage = {
41 topic: 'org.example/org.example:temp-sensor-001/things/twin/commands/subscribe',
42 path: '/',
43 value: {
44 filter: filter || 'true'
45 }
46 }
47
48 this.send(subscribeMessage)
49 }
50
51 send(message) {
52 if (this.ws && this.ws.readyState === WebSocket.OPEN) {
53 this.ws.send(JSON.stringify(message))
54 }
55 }
56
57 handleMessage(message) {
58 console.log('WebSocket message:', message)
59
60 // 触发注册的处理函数
61 this.handlers.forEach(handler => handler(message))
62 }
63
64 onMessage(handler) {
65 const id = Symbol()
66 this.handlers.set(id, handler)
67
68 return () => {
69 this.handlers.delete(id)
70 }
71 }
72
73 disconnect() {
74 if (this.ws) {
75 this.ws.close()
76 this.ws = null
77 }
78 }
79}

在 Vue 组件中使用

vue
1<script setup>
2import { onMounted, onUnmounted } from 'vue'
3import { DittoWebSocket } from '@/utils/dittoWebSocket'
4
5let dittoWs = null
6
7onMounted(() => {
8 dittoWs = new DittoWebSocket()
9 dittoWs.connect()
10
11 // 订阅消息
12 const unsubscribe = dittoWs.onMessage((message) => {
13 console.log('Thing updated:', message)
14 // 更新 UI
15 })
16
17 onUnmounted(() => {
18 unsubscribe()
19 dittoWs.disconnect()
20 })
21})
22</script>

九、项目实战案例

9.1 智能工厂温度监控系统

场景描述

  • 工厂有 100+ 个温度传感器
  • 需要实时监控所有设备温度
  • 温度超过 30°C 时触发告警
  • 提供历史数据分析和趋势预测

实现步骤

1. 创建设备批量导入脚本

python
1# scripts/import_devices.py
2import requests
3
4DITTO_API = "http://localhost:8080/api/2"
5
6# 设备配置
7devices = [
8 {"id": f"org.factory:temp-sensor-{i:03d}",
9 "building": "A",
10 "floor": str(i // 10 + 1),
11 "room": str(i % 10 + 101)}
12 for i in range(1, 101)
13]
14
15def create_device(device_id, building, floor, room):
16 """创建设备"""
17 url = f"{DITTO_API}/things/{device_id}"
18 data = {
19 "attributes": {
20 "manufacturer": "Acme Inc.",
21 "model": "TempSensor-Industrial",
22 "location": {
23 "building": building,
24 "floor": floor,
25 "room": room
26 }
27 },
28 "features": {
29 "temperature": {
30 "properties": {
31 "value": 25.0,
32 "unit": "°C",
33 "status": "normal"
34 }
35 }
36 }
37 }
38
39 response = requests.put(url, json=data)
40 return response.status_code
41
42# 批量创建设备
43for device in devices:
44 status = create_device(
45 device["id"],
46 device["building"],
47 device["floor"],
48 device["room"]
49 )
50 print(f"✓ Created {device['id']}: {status}")

2. 设备数据模拟器

python
1# scripts/device_simulator.py
2import paho.mqtt.client as mqtt
3import json
4import time
5import random
6from datetime import datetime
7
8MQTT_BROKER = "localhost"
9MQTT_PORT = 1883
10
11def simulate_temperature():
12 """模拟温度值(正态分布,偶尔出现异常)"""
13 if random.random() < 0.05: # 5% 概率出现高温
14 return round(random.uniform(30, 35), 2)
15 else:
16 return round(random.gauss(25, 3), 2) # 正常范围 20-30°C
17
18def publish_data(client, device_id):
19 """发布设备数据"""
20 namespace, name = device_id.split(":")
21 topic = f"{namespace}/{name}/things/twin/commands/modify"
22
23 temperature = simulate_temperature()
24
25 message = {
26 "topic": f"{namespace}/{name}/things/twin/commands/modify",
27 "path": "/features/temperature/properties",
28 "value": {
29 "value": temperature,
30 "unit": "°C",
31 "status": "high" if temperature > 30 else "normal",
32 "timestamp": datetime.utcnow().isoformat() + "Z"
33 }
34 }
35
36 client.publish(topic, json.dumps(message))
37 return temperature
38
39# MQTT 客户端
40client = mqtt.Client()
41client.connect(MQTT_BROKER, MQTT_PORT, 60)
42client.loop_start()
43
44# 模拟 100 个设备
45devices = [f"org.factory:temp-sensor-{i:03d}" for i in range(1, 101)]
46
47print("开始模拟设备数据上报...")
48try:
49 while True:
50 for device_id in devices:
51 temp = publish_data(client, device_id)
52 print(f"📡 {device_id}: {temp}°C")
53
54 time.sleep(60) # 每分钟更新一次
55
56except KeyboardInterrupt:
57 print("\n停止模拟")
58 client.loop_stop()
59 client.disconnect()

9.2 数据分析仪表板

Grafana 仪表板配置

  1. 温度分布热力图
flux
1from(bucket: "telegraf")
2 |> range(start: -1h)
3 |> filter(fn: (r) => r["_measurement"] == "temperature")
4 |> filter(fn: (r) => r["_field"] == "value")
5 |> aggregateWindow(every: 5m, fn: mean)
  1. 异常设备列表
flux
1from(bucket: "telegraf")
2 |> range(start: -5m)
3 |> filter(fn: (r) => r["_measurement"] == "temperature")
4 |> filter(fn: (r) => r["_field"] == "value")
5 |> filter(fn: (r) => r._value > 30.0)
6 |> last()
  1. 温度趋势预测(使用 Holt-Winters 算法)
flux
1from(bucket: "telegraf")
2 |> range(start: -7d)
3 |> filter(fn: (r) => r["_measurement"] == "temperature")
4 |> aggregateWindow(every: 1h, fn: mean)
5 |> holtWinters(n: 24, interval: 1h)

十、最佳实践

10.1 Thing ID 命名规范

推荐格式{namespace}:{resource-type}-{identifier}

1org.factory:temp-sensor-001
2org.factory:pump-motor-A12
3com.warehouse:conveyor-belt-B03

避免

  • ❌ 过长的 ID:org.company.department.factory.building-a.floor-2.room-201.temp-sensor-001
  • ❌ 特殊字符:sensor#001device@abc
  • ❌ 中文字符:温度传感器-001

10.2 数据更新策略

高频数据(> 1 Hz)

javascript
1// 使用批量更新减少 API 调用
2const batchUpdate = {
3 "temperature": { "properties": { "value": 25.5 } },
4 "humidity": { "properties": { "value": 65.2 } },
5 "pressure": { "properties": { "value": 101.3 } }
6}
7
8await dittoApi.put(`/things/${thingId}/features`, batchUpdate)

低频数据(< 1 次/分钟)

javascript
1// 单独更新每个属性
2await dittoApi.put(
3 `/things/${thingId}/features/battery/properties/level`,
4 85
5)

10.3 权限管理

为不同角色创建 Policy

json
1{
2 "policyId": "org.factory:device-policy",
3 "entries": {
4 "admin": {
5 "subjects": {
6 "nginx:admin": { "type": "admin" }
7 },
8 "resources": {
9 "thing:/": { "grant": ["READ", "WRITE"], "revoke": [] },
10 "policy:/": { "grant": ["READ", "WRITE"], "revoke": [] },
11 "message:/": { "grant": ["READ", "WRITE"], "revoke": [] }
12 }
13 },
14 "operator": {
15 "subjects": {
16 "nginx:operator": { "type": "operator" }
17 },
18 "resources": {
19 "thing:/": { "grant": ["READ", "WRITE"], "revoke": [] },
20 "policy:/": { "grant": ["READ"], "revoke": ["WRITE"] },
21 "message:/": { "grant": ["READ", "WRITE"], "revoke": [] }
22 }
23 },
24 "viewer": {
25 "subjects": {
26 "nginx:viewer": { "type": "viewer" }
27 },
28 "resources": {
29 "thing:/": { "grant": ["READ"], "revoke": ["WRITE"] },
30 "policy:/": { "grant": [], "revoke": ["READ", "WRITE"] },
31 "message:/": { "grant": ["READ"], "revoke": ["WRITE"] }
32 }
33 }
34 }
35}

10.4 性能优化

1. 使用搜索 API 代替轮询

不推荐(轮询所有设备):

javascript
1setInterval(async () => {
2 const things = await dittoApi.getAllThings()
3 // 处理数据...
4}, 5000)

推荐(使用 WebSocket 订阅):

javascript
1const ws = new DittoWebSocket()
2ws.onMessage((message) => {
3 // 实时处理变化
4})

2. 批量操作

javascript
1// 批量删除
2const thingIds = ['org.example:device-001', 'org.example:device-002']
3await Promise.all(thingIds.map(id => dittoApi.deleteThing(id)))

3. 分页查询

javascript
1// 使用 option 参数进行分页
2const page1 = await dittoApi.searchThings('eq(attributes/manufacturer,"Acme")', {
3 option: 'size(50),cursor(0)'
4})

十一、故障排查

11.1 常见问题

问题 1:无法创建 Thing

1HTTP 400 Bad Request: Thing ID must match pattern namespace:name

解决方案:检查 Thing ID 格式,必须包含命名空间和名称,用冒号分隔。

javascript
1// ❌ 错误
2"my-device"
3
4// ✅ 正确
5"org.example:my-device"

问题 2:MQTT 消息未生效

排查步骤

  1. 检查 MQTT 连接是否配置
  2. 验证主题格式
  3. 查看 Ditto connectivity 日志
bash
1# 查看 connectivity 服务日志
2kubectl logs -f opentwins-ditto-connectivity-xxx

问题 3:WebSocket 连接失败

解决方案:检查端口转发是否正常

bash
1# 重新启动端口转发
2kubectl port-forward svc/opentwins-ditto-nginx 8080:8080

11.2 监控和日志

查看 OpenTwins 组件日志

bash
1# 查看所有 Pod
2kubectl get pods
3
4# 查看特定服务日志
5kubectl logs -f opentwins-ditto-gateway-xxx
6kubectl logs -f opentwins-ditto-things-xxx
7kubectl logs -f opentwins-influxdb2-0
8
9# 查看最近的事件
10kubectl get events --sort-by='.lastTimestamp'

十二、总结与展望

12.1 项目总结

本文档详细介绍了基于 OpenTwins 平台构建数字孪生应用的完整流程:

平台部署:OpenTwins 在 Windows 本地环境的安装和配置 ✅ 核心概念:Eclipse Ditto 的 Things、Features、Policies 数据模型 ✅ API 使用:完整的 Ditto HTTP API 操作示例 ✅ 设备接入:MQTT 和 HTTP 两种设备接入方式 ✅ 数据可视化:Grafana 监控仪表板配置 ✅ 应用开发:Vue3 前端应用开发实践 ✅ 实时通信:WebSocket 实时数据订阅 ✅ 最佳实践:命名规范、性能优化、权限管理

12.2 技术优势

使用 OpenTwins 的优势:

优势说明
🚀 快速部署一键部署完整数字孪生平台
🎯 标准化基于 Eclipse Ditto,符合工业标准
💰 成本低开源免费,无授权费用
🔧 易扩展微服务架构,支持水平扩展
📊 可视化内置 Grafana,开箱即用
🔒 安全性完善的权限控制机制

12.3 下一步计划

功能扩展

  1. 3D 可视化

    • 集成 Three.js 实现 3D 场景
    • 设备在 3D 空间中的实时定位
    • 点击设备查看详细信息
  2. AI 预测

    • 设备故障预测模型
    • 温度趋势预测
    • 异常检测算法
  3. 告警系统

    • 多级告警规则
    • 短信/邮件通知
    • 告警升级机制
  4. 移动端应用

    • UniApp 跨平台开发
    • 实时推送通知
    • 离线数据缓存

生产环境部署

  1. Kubernetes 集群

    • 使用生产级 Kubernetes 集群
    • 配置高可用性
    • 设置资源限制
  2. 监控和运维

    • Prometheus + Grafana 监控
    • ELK 日志收集
    • 自动备份和恢复
  3. 安全加固

    • HTTPS/TLS 加密
    • OAuth2 认证
    • API 访问限流

12.4 参考资源

官方文档

社区资源

学习路径

  1. 完成 OpenTwins 本地部署
  2. 学习 Ditto Things 数据模型
  3. 开发简单的设备接入程序
  4. 创建 Grafana 监控仪表板
  5. 开发 Vue 前端应用
  6. 集成实时 WebSocket 通信
  7. 部署到生产环境

文档版本:1.0.0 最后更新:2025-01-15 作者:Laby 许可证:MIT

相关文档


🎉 恭喜! 您已经掌握了基于 OpenTwins 构建数字孪生应用的全部知识。现在开始动手实践吧!

参与讨论