基于 OpenTwins 的数字孪生系统设计
一、项目概述
本项目基于开源数字孪生平台 OpenTwins 构建企业级数字孪生应用。OpenTwins 底层使用 Eclipse Ditto 作为数字孪生核心引擎,提供完整的设备管理、数据采集、可视化和分析能力。
1.1 什么是 OpenTwins?
OpenTwins 是一个开源的数字孪生平台,集成了以下核心组件:
- 🎯 Eclipse Ditto - 数字孪生核心引擎
- 📊 Grafana - 数据可视化和监控
- 💾 InfluxDB - 时序数据存储
- 📡 Mosquitto - MQTT 消息代理
- 📈 Telegraf - 数据采集和处理
- 🗄️ MongoDB - Things 状态存储
1.2 应用场景
🏭 智能制造
- 生产设备实时监控
- 设备预测性维护
- 生产线数字孪生
- OEE(设备综合效率)分析
🏢 智慧园区
- 园区 3D 可视化监控
- 能耗监测和优化
- 环境监测(温湿度、空气质量)
- 设备集中管理
⚙️ 设备健康管理
- 设备状态实时监控
- 故障预警和诊断
- 维护计划优化
- 设备生命周期管理
1.3 技术架构
1.4 技术指标
| 指标 | 目标值 | 说明 |
|---|---|---|
| 设备连接数 | 10,000+ | 同时在线设备 |
| 数据采集频率 | 1-10 Hz | 根据设备类型调整 |
| 端到端延迟 | < 1s | 从设备到可视化展示 |
| API 响应时间 | < 200ms | Ditto HTTP API |
| 数据保留期 | 30天/1年 | 原始数据/聚合数据 |
| 系统可用性 | 99.9% | 年度目标 |
二、OpenTwins 平台部署
2.1 环境准备
在开始开发之前,请先完成 OpenTwins 平台的部署。详细步骤请参考:
快速检查清单:
1# 1. 检查 Minikube 集群状态2kubectl get nodes34# 2. 检查 OpenTwins 所有 Pod 状态5kubectl get pods67# 3. 确认所有服务正常运行8kubectl get services910# 4. 启动端口转发11powershell -ExecutionPolicy Bypass -File .\start-opentwins-portforward.ps1访问地址:
| 服务 | 地址 | 用途 |
|---|---|---|
| Grafana | http://localhost:3002 | 数据可视化(admin/admin) |
| Ditto API | http://localhost:8080 | 数字孪生核心 API |
| Extended API | http://localhost:8081 | OpenTwins 扩展 API |
| InfluxDB | http://localhost:8086 | 时序数据库管理 |
| MQTT | localhost:1883 | 设备消息接入 |
2.2 OpenTwins 组件架构
三、Eclipse Ditto 核心概念
3.1 Things 数据模型
Eclipse Ditto 使用 Thing 作为核心概念,一个 Thing 代表一个数字孪生实体。
Thing 的完整结构:
1{2 "thingId": "org.example:temperature-sensor-001",3 "policyId": "org.example:my-policy",4 "definition": "org.example:TemperatureSensor:1.0.0",5 "attributes": {6 "manufacturer": "Acme Inc.",7 "model": "TempSensor-2000",8 "serialNumber": "SN-12345",9 "location": {10 "building": "Building A",11 "floor": "2",12 "room": "201",13 "coordinates": {14 "latitude": 31.2304,15 "longitude": 121.473716 }17 },18 "installDate": "2024-01-15",19 "warrantyDate": "2026-01-15"20 },21 "features": {22 "temperature": {23 "properties": {24 "value": 23.5,25 "unit": "°C",26 "quality": "good",27 "timestamp": "2024-01-15T10:30:00Z"28 }29 },30 "humidity": {31 "properties": {32 "value": 65.2,33 "unit": "%",34 "timestamp": "2024-01-15T10:30:00Z"35 }36 },37 "battery": {38 "properties": {39 "level": 85,40 "voltage": 3.7,41 "status": "charging"42 }43 }44 }45}关键概念说明:
- thingId: 全局唯一标识符,格式:
namespace:name - policyId: 关联的权限策略 ID
- definition: Thing 的类型定义(可选)
- attributes: 静态属性(设备元数据,不常变化)
- features: 动态属性(传感器数据,经常变化)
3.2 Attributes vs Features
| 特性 | Attributes | Features |
|---|---|---|
| 用途 | 静态元数据 | 动态数据 |
| 更新频率 | 低(很少变化) | 高(频繁变化) |
| 示例 | 制造商、型号、位置 | 温度、湿度、状态 |
| 搜索 | 支持全文搜索 | 支持属性搜索 |
| 典型大小 | 较小 | 可能较大 |
3.3 Policy 权限模型
Ditto 使用 Policy 来控制对 Thing 的访问权限。
Policy 示例:
1{2 "policyId": "org.example:my-policy",3 "entries": {4 "owner": {5 "subjects": {6 "nginx:admin": {7 "type": "nginx basic auth user"8 }9 },10 "resources": {11 "thing:/": {12 "grant": ["READ", "WRITE"],13 "revoke": []14 },15 "policy:/": {16 "grant": ["READ", "WRITE"],17 "revoke": []18 },19 "message:/": {20 "grant": ["READ", "WRITE"],21 "revoke": []22 }23 }24 },25 "observer": {26 "subjects": {27 "nginx:observer": {28 "type": "nginx basic auth user"29 }30 },31 "resources": {32 "thing:/": {33 "grant": ["READ"],34 "revoke": []35 }36 }37 }38 }39}四、Ditto HTTP API 使用指南
4.1 创建数字孪生 (Thing)
API 端点: PUT /api/2/things/{thingId}
1# 设置 Ditto API 基础地址2DITTO_API="http://localhost:8080/api/2"34# 创建一个温度传感器数字孪生5curl -X PUT "$DITTO_API/things/org.example:temp-sensor-001" \6 -H "Content-Type: application/json" \7 -d '{8 "attributes": {9 "manufacturer": "Acme Inc.",10 "model": "TempSensor-2000",11 "location": {12 "building": "A",13 "floor": "2",14 "room": "201"15 }16 },17 "features": {18 "temperature": {19 "properties": {20 "value": 0,21 "unit": "°C"22 }23 },24 "humidity": {25 "properties": {26 "value": 0,27 "unit": "%"28 }29 }30 }31 }'PowerShell 版本:
1$dittoApi = "http://localhost:8080/api/2"2$thingId = "org.example:temp-sensor-001"34$body = @{5 attributes = @{6 manufacturer = "Acme Inc."7 model = "TempSensor-2000"8 location = @{9 building = "A"10 floor = "2"11 room = "201"12 }13 }14 features = @{15 temperature = @{16 properties = @{17 value = 018 unit = "°C"19 }20 }21 humidity = @{22 properties = @{23 value = 024 unit = "%"25 }26 }27 }28} | ConvertTo-Json -Depth 102930Invoke-RestMethod -Uri "$dittoApi/things/$thingId" `31 -Method Put `32 -ContentType "application/json" `33 -Body $body4.2 查询数字孪生
获取单个 Thing:
1curl -X GET "$DITTO_API/things/org.example:temp-sensor-001"搜索 Things(使用 RQL 查询语言):
1# 查询所有 Acme 制造商的设备2curl -X GET "$DITTO_API/search/things?filter=eq(attributes/manufacturer,\"Acme Inc.\")"34# 查询特定楼层的所有设备5curl -X GET "$DITTO_API/search/things?filter=eq(attributes/location/floor,\"2\")"67# 查询温度大于 25°C 的设备8curl -X GET "$DITTO_API/search/things?filter=gt(features/temperature/properties/value,25)"910# 复杂查询:特定位置且温度异常11curl -X GET "$DITTO_API/search/things?filter=and(eq(attributes/location/building,\"A\"),gt(features/temperature/properties/value,30))"4.3 更新设备属性
更新单个 Feature 属性:
1# 更新温度值2curl -X PUT "$DITTO_API/things/org.example:temp-sensor-001/features/temperature/properties/value" \3 -H "Content-Type: application/json" \4 -d '25.6'56# 更新湿度值7curl -X PUT "$DITTO_API/things/org.example:temp-sensor-001/features/humidity/properties/value" \8 -H "Content-Type: application/json" \9 -d '62.3'批量更新整个 Feature:
1curl -X PUT "$DITTO_API/things/org.example:temp-sensor-001/features/temperature" \2 -H "Content-Type: application/json" \3 -d '{4 "properties": {5 "value": 25.6,6 "unit": "°C",7 "quality": "good",8 "timestamp": "2024-01-15T10:30:00Z"9 }10 }'4.4 删除数字孪生
1curl -X DELETE "$DITTO_API/things/org.example:temp-sensor-001"五、设备接入实现
5.1 MQTT 设备接入
OpenTwins 使用 Mosquitto 作为 MQTT Broker,设备通过 MQTT 协议上报数据。
MQTT 主题规范:
1# Ditto 标准主题格式2{namespace}/{thingId}/things/twin/commands/modify34# 示例5org.example/temp-sensor-001/things/twin/commands/modify设备上报数据示例(Python):
1import paho.mqtt.client as mqtt2import json3import time4from datetime import datetime56# MQTT 配置7MQTT_BROKER = "localhost"8MQTT_PORT = 18839THING_ID = "org.example:temp-sensor-001"10NAMESPACE = "org.example"11THING_NAME = "temp-sensor-001"1213# MQTT 主题14TOPIC = f"{NAMESPACE}/{THING_NAME}/things/twin/commands/modify"1516def on_connect(client, userdata, flags, rc):17 print(f"Connected to MQTT Broker! Code: {rc}")1819def publish_sensor_data(client, temperature, humidity):20 """发布传感器数据到 Ditto"""21 22 # 构建 Ditto 消息23 message = {24 "topic": f"{NAMESPACE}/{THING_NAME}/things/twin/commands/modify",25 "path": "/features",26 "value": {27 "temperature": {28 "properties": {29 "value": temperature,30 "unit": "°C",31 "timestamp": datetime.utcnow().isoformat() + "Z"32 }33 },34 "humidity": {35 "properties": {36 "value": humidity,37 "unit": "%",38 "timestamp": datetime.utcnow().isoformat() + "Z"39 }40 }41 }42 }43 44 # 发布消息45 result = client.publish(TOPIC, json.dumps(message))46 status = result[0]47 48 if status == 0:49 print(f"✓ Data sent: Temp={temperature}°C, Humidity={humidity}%")50 else:51 print(f"✗ Failed to send data to topic {TOPIC}")5253# 创建 MQTT 客户端54client = mqtt.Client()55client.on_connect = on_connect5657# 连接到 Broker58client.connect(MQTT_BROKER, MQTT_PORT, 60)59client.loop_start()6061# 模拟传感器数据上报62try:63 while True:64 # 模拟读取传感器数据65 temperature = 20 + (time.time() % 10) # 20-30°C66 humidity = 50 + (time.time() % 20) # 50-70%67 68 # 发布数据69 publish_sensor_data(client, round(temperature, 2), round(humidity, 2))70 71 # 每 5 秒上报一次72 time.sleep(5)73 74except KeyboardInterrupt:75 print("\n停止数据上报")76 client.loop_stop()77 client.disconnect()5.2 配置 Ditto MQTT 连接
需要在 Ditto 中配置 MQTT 连接,以便接收设备消息。
创建 MQTT 连接配置:
1curl -X POST "http://localhost:8080/api/2/connections" \2 -H "Content-Type: application/json" \3 -d '{4 "id": "mqtt-connection-mosquitto",5 "connectionType": "mqtt",6 "connectionStatus": "open",7 "uri": "tcp://opentwins-mosquitto:1883",8 "sources": [{9 "addresses": ["org.example/+/things/twin/commands/modify"],10 "authorizationContext": ["nginx:ditto"],11 "qos": 1,12 "filters": []13 }],14 "targets": [{15 "address": "org.example/{{ thing:id }}/things/twin/events",16 "topics": ["_/_/things/twin/events"],17 "authorizationContext": ["nginx:ditto"],18 "qos": 119 }]20 }'5.3 HTTP API 设备接入
对于不支持 MQTT 的设备,可以直接使用 HTTP API 上报数据。
HTTP 上报示例(Python):
1import requests2import time3from datetime import datetime45DITTO_API = "http://localhost:8080/api/2"6THING_ID = "org.example:temp-sensor-002"78def create_thing():9 """创建 Thing"""10 url = f"{DITTO_API}/things/{THING_ID}"11 data = {12 "attributes": {13 "manufacturer": "Acme Inc.",14 "model": "TempSensor-HTTP"15 },16 "features": {17 "temperature": {18 "properties": {19 "value": 0,20 "unit": "°C"21 }22 }23 }24 }25 26 response = requests.put(url, json=data)27 print(f"Thing created: {response.status_code}")2829def update_temperature(value):30 """更新温度值"""31 url = f"{DITTO_API}/things/{THING_ID}/features/temperature/properties"32 data = {33 "value": value,34 "unit": "°C",35 "timestamp": datetime.utcnow().isoformat() + "Z"36 }37 38 response = requests.put(url, json=data)39 if response.status_code == 204:40 print(f"✓ Temperature updated: {value}°C")41 else:42 print(f"✗ Failed: {response.status_code}")4344# 创建 Thing(首次运行)45# create_thing()4647# 持续上报数据48while True:49 temp = 20 + (time.time() % 10)50 update_temperature(round(temp, 2))51 time.sleep(5)六、数据可视化实现
6.1 Grafana 数据源配置
- 访问 Grafana: http://localhost:3002
- 登录(admin/admin)
- 配置 InfluxDB 数据源
添加 InfluxDB 数据源:
- Settings → Data Sources → Add data source
- 选择 InfluxDB
- 配置参数:
- URL:
http://opentwins-influxdb2:80 - Query Language:
Flux - Organization:
iot - Token: (从 InfluxDB 获取)
- Default Bucket:
telegraf
- URL:
6.2 创建监控仪表板
创建温度监控面板:
- Create → Dashboard → Add new panel
- 选择数据源:InfluxDB
- 编写 Flux 查询:
1from(bucket: "telegraf")2 |> range(start: -1h)3 |> filter(fn: (r) => r["_measurement"] == "temperature")4 |> filter(fn: (r) => r["device"] == "temp-sensor-001")5 |> filter(fn: (r) => r["_field"] == "value")6 |> aggregateWindow(every: 1m, fn: mean, createEmpty: false)7 |> yield(name: "mean")- 配置可视化选项:
- Panel Title: "设备温度实时监控"
- Visualization: Time series(时间序列图)
- Unit: Celsius (°C)
- Thresholds: 设置告警阈值(如 >30°C)
6.3 完整监控大屏示例
仪表板布局:
创建设备状态统计面板:
1from(bucket: "telegraf")2 |> range(start: -5m)3 |> filter(fn: (r) => r["_measurement"] == "device_status")4 |> last()5 |> group(columns: ["status"])6 |> count()七、业务应用开发
7.1 前端应用架构
技术栈选择:
1{2 "dependencies": {3 "vue": "^3.4.0",4 "vue-router": "^4.2.0",5 "pinia": "^2.1.0",6 "axios": "^1.6.0",7 "element-plus": "^2.5.0",8 "echarts": "^5.4.0",9 "mqtt": "^5.3.0",10 "@vueuse/core": "^10.7.0"11 }12}项目结构:
1digital-twin-web/2├── src/3│ ├── api/ # API 接口4│ │ ├── ditto.js # Ditto API 封装5│ │ └── influxdb.js # InfluxDB API 封装6│ ├── components/ # 公共组件7│ │ ├── DeviceCard.vue # 设备卡片8│ │ ├── TemperatureChart.vue # 温度图表9│ │ └── DeviceList.vue # 设备列表10│ ├── views/ # 页面11│ │ ├── Dashboard.vue # 监控大屏12│ │ ├── DeviceManage.vue # 设备管理13│ │ ├── DataAnalysis.vue # 数据分析14│ │ └── AlarmCenter.vue # 告警中心15│ ├── stores/ # Pinia 状态管理16│ │ ├── device.js # 设备状态17│ │ └── mqtt.js # MQTT 连接18│ ├── utils/ # 工具函数19│ │ ├── request.js # HTTP 请求封装20│ │ └── mqtt.js # MQTT 工具21│ ├── App.vue22│ └── main.js23└── package.json7.2 Ditto API 封装
创建 src/api/ditto.js:
1import axios from 'axios'23const DITTO_BASE_URL = 'http://localhost:8080/api/2'45// 创建 axios 实例6const dittoApi = axios.create({7 baseURL: DITTO_BASE_URL,8 timeout: 10000,9 headers: {10 'Content-Type': 'application/json'11 }12})1314/**15 * Ditto API 封装16 */17export default {18 /**19 * 获取所有 Things20 */21 async getAllThings() {22 const response = await dittoApi.get('/things')23 return response.data24 },2526 /**27 * 获取单个 Thing28 */29 async getThing(thingId) {30 const response = await dittoApi.get(`/things/${thingId}`)31 return response.data32 },3334 /**35 * 创建 Thing36 */37 async createThing(thingId, data) {38 const response = await dittoApi.put(`/things/${thingId}`, data)39 return response.data40 },4142 /**43 * 更新 Thing 属性44 */45 async updateThingAttribute(thingId, attributePath, value) {46 const response = await dittoApi.put(47 `/things/${thingId}/attributes/${attributePath}`,48 value49 )50 return response.data51 },5253 /**54 * 更新 Feature 属性55 */56 async updateFeatureProperty(thingId, featureId, propertyPath, value) {57 const response = await dittoApi.put(58 `/things/${thingId}/features/${featureId}/properties/${propertyPath}`,59 value60 )61 return response.data62 },6364 /**65 * 搜索 Things(RQL 查询)66 */67 async searchThings(filter, options = {}) {68 const params = {69 filter,70 ...options // 支持 option、namespaces、fields 等参数71 }72 const response = await dittoApi.get('/search/things', { params })73 return response.data74 },7576 /**77 * 删除 Thing78 */79 async deleteThing(thingId) {80 const response = await dittoApi.delete(`/things/${thingId}`)81 return response.data82 },8384 /**85 * 发送消息给 Thing86 */87 async sendMessage(thingId, messageSubject, payload) {88 const response = await dittoApi.post(89 `/things/${thingId}/inbox/messages/${messageSubject}`,90 payload91 )92 return response.data93 }94}7.3 设备列表页面实现
创建 src/views/DeviceManage.vue:
1<template>2 <div class="device-manage">3 <el-card class="header-card">4 <div class="header-actions">5 <el-input6 v-model="searchText"7 placeholder="搜索设备..."8 style="width: 300px"9 clearable10 @input="handleSearch"11 >12 <template #prefix>13 <el-icon><Search /></el-icon>14 </template>15 </el-input>16 17 <el-button type="primary" @click="showCreateDialog = true">18 <el-icon><Plus /></el-icon>19 添加设备20 </el-button>21 </div>22 </el-card>2324 <el-card class="device-list-card">25 <el-table :data="filteredDevices" style="width: 100%" v-loading="loading">26 <el-table-column prop="thingId" label="设备 ID" width="250" />27 28 <el-table-column label="制造商">29 <template #default="{ row }">30 {{ row.attributes?.manufacturer || '-' }}31 </template>32 </el-table-column>33 34 <el-table-column label="型号">35 <template #default="{ row }">36 {{ row.attributes?.model || '-' }}37 </template>38 </el-table-column>39 40 <el-table-column label="位置">41 <template #default="{ row }">42 {{ formatLocation(row.attributes?.location) }}43 </template>44 </el-table-column>45 46 <el-table-column label="温度">47 <template #default="{ row }">48 <span :class="getTemperatureClass(row.features?.temperature?.properties?.value)">49 {{ row.features?.temperature?.properties?.value || '-' }}50 {{ row.features?.temperature?.properties?.unit }}51 </span>52 </template>53 </el-table-column>54 55 <el-table-column label="湿度">56 <template #default="{ row }">57 {{ row.features?.humidity?.properties?.value || '-' }}58 {{ row.features?.humidity?.properties?.unit }}59 </template>60 </el-table-column>61 62 <el-table-column label="操作" width="200">63 <template #default="{ row }">64 <el-button size="small" @click="viewDevice(row)">详情</el-button>65 <el-button size="small" type="danger" @click="deleteDevice(row)">66 删除67 </el-button>68 </template>69 </el-table-column>70 </el-table>71 </el-card>7273 <!-- 创建设备对话框 -->74 <el-dialog v-model="showCreateDialog" title="添加设备" width="600px">75 <el-form :model="newDevice" label-width="100px">76 <el-form-item label="设备 ID">77 <el-input v-model="newDevice.thingId" placeholder="org.example:device-001" />78 </el-form-item>79 80 <el-form-item label="制造商">81 <el-input v-model="newDevice.manufacturer" />82 </el-form-item>83 84 <el-form-item label="型号">85 <el-input v-model="newDevice.model" />86 </el-form-item>87 88 <el-form-item label="楼栋">89 <el-input v-model="newDevice.building" />90 </el-form-item>91 92 <el-form-item label="楼层">93 <el-input v-model="newDevice.floor" />94 </el-form-item>95 96 <el-form-item label="房间">97 <el-input v-model="newDevice.room" />98 </el-form-item>99 </el-form>100 101 <template #footer>102 <el-button @click="showCreateDialog = false">取消</el-button>103 <el-button type="primary" @click="createDevice" :loading="creating">104 创建105 </el-button>106 </template>107 </el-dialog>108 </div>109</template>110111<script setup>112import { ref, computed, onMounted } from 'vue'113import { ElMessage, ElMessageBox } from 'element-plus'114import { Search, Plus } from '@element-plus/icons-vue'115import dittoApi from '@/api/ditto'116117const devices = ref([])118const loading = ref(false)119const searchText = ref('')120const showCreateDialog = ref(false)121const creating = ref(false)122123const newDevice = ref({124 thingId: '',125 manufacturer: '',126 model: '',127 building: '',128 floor: '',129 room: ''130})131132// 过滤后的设备列表133const filteredDevices = computed(() => {134 if (!searchText.value) return devices.value135 136 const search = searchText.value.toLowerCase()137 return devices.value.filter(device => 138 device.thingId.toLowerCase().includes(search) ||139 device.attributes?.manufacturer?.toLowerCase().includes(search) ||140 device.attributes?.model?.toLowerCase().includes(search)141 )142})143144// 加载设备列表145const loadDevices = async () => {146 loading.value = true147 try {148 const response = await dittoApi.getAllThings()149 devices.value = response.items || []150 } catch (error) {151 ElMessage.error('加载设备列表失败')152 console.error(error)153 } finally {154 loading.value = false155 }156}157158// 创建设备159const createDevice = async () => {160 if (!newDevice.value.thingId) {161 ElMessage.warning('请输入设备 ID')162 return163 }164 165 creating.value = true166 try {167 const data = {168 attributes: {169 manufacturer: newDevice.value.manufacturer,170 model: newDevice.value.model,171 location: {172 building: newDevice.value.building,173 floor: newDevice.value.floor,174 room: newDevice.value.room175 }176 },177 features: {178 temperature: {179 properties: {180 value: 0,181 unit: '°C'182 }183 },184 humidity: {185 properties: {186 value: 0,187 unit: '%'188 }189 }190 }191 }192 193 await dittoApi.createThing(newDevice.value.thingId, data)194 ElMessage.success('设备创建成功')195 showCreateDialog.value = false196 loadDevices()197 198 // 重置表单199 newDevice.value = {200 thingId: '',201 manufacturer: '',202 model: '',203 building: '',204 floor: '',205 room: ''206 }207 } catch (error) {208 ElMessage.error('创建设备失败')209 console.error(error)210 } finally {211 creating.value = false212 }213}214215// 删除设备216const deleteDevice = async (device) => {217 try {218 await ElMessageBox.confirm(219 `确定要删除设备 ${device.thingId} 吗?`,220 '确认删除',221 {222 confirmButtonText: '删除',223 cancelButtonText: '取消',224 type: 'warning'225 }226 )227 228 await dittoApi.deleteThing(device.thingId)229 ElMessage.success('设备删除成功')230 loadDevices()231 } catch (error) {232 if (error !== 'cancel') {233 ElMessage.error('删除设备失败')234 console.error(error)235 }236 }237}238239// 查看设备详情240const viewDevice = (device) => {241 // 跳转到设备详情页242 console.log('View device:', device)243}244245// 格式化位置246const formatLocation = (location) => {247 if (!location) return '-'248 return `${location.building || ''}-${location.floor || ''}F-${location.room || ''}`249}250251// 获取温度样式类252const getTemperatureClass = (temp) => {253 if (!temp) return ''254 if (temp > 30) return 'temp-high'255 if (temp < 15) return 'temp-low'256 return 'temp-normal'257}258259const handleSearch = () => {260 // 搜索逻辑已在 computed 中实现261}262263onMounted(() => {264 loadDevices()265})266</script>267268<style scoped>269.device-manage {270 padding: 20px;271}272273.header-card {274 margin-bottom: 20px;275}276277.header-actions {278 display: flex;279 justify-content: space-between;280 align-items: center;281}282283.temp-high {284 color: #f56c6c;285 font-weight: bold;286}287288.temp-low {289 color: #409eff;290 font-weight: bold;291}292293.temp-normal {294 color: #67c23a;295}296</style>八、实时通信 WebSocket
8.1 WebSocket 连接
Ditto 支持通过 WebSocket 订阅 Thing 的实时变化。
创建 WebSocket 连接(JavaScript):
1// src/utils/dittoWebSocket.js2export class DittoWebSocket {3 constructor(wsUrl = 'ws://localhost:8080/ws/2') {4 this.wsUrl = wsUrl5 this.ws = null6 this.handlers = new Map()7 this.reconnectInterval = 50008 }910 connect() {11 this.ws = new WebSocket(this.wsUrl)1213 this.ws.onopen = () => {14 console.log('✓ WebSocket connected')15 16 // 订阅所有 Things 的变化17 this.subscribe()18 }1920 this.ws.onmessage = (event) => {21 try {22 const message = JSON.parse(event.data)23 this.handleMessage(message)24 } catch (error) {25 console.error('Failed to parse WebSocket message:', error)26 }27 }2829 this.ws.onerror = (error) => {30 console.error('WebSocket error:', error)31 }3233 this.ws.onclose = () => {34 console.log('WebSocket closed. Reconnecting...')35 setTimeout(() => this.connect(), this.reconnectInterval)36 }37 }3839 subscribe(filter = null) {40 const subscribeMessage = {41 topic: 'org.example/org.example:temp-sensor-001/things/twin/commands/subscribe',42 path: '/',43 value: {44 filter: filter || 'true'45 }46 }47 48 this.send(subscribeMessage)49 }5051 send(message) {52 if (this.ws && this.ws.readyState === WebSocket.OPEN) {53 this.ws.send(JSON.stringify(message))54 }55 }5657 handleMessage(message) {58 console.log('WebSocket message:', message)59 60 // 触发注册的处理函数61 this.handlers.forEach(handler => handler(message))62 }6364 onMessage(handler) {65 const id = Symbol()66 this.handlers.set(id, handler)67 68 return () => {69 this.handlers.delete(id)70 }71 }7273 disconnect() {74 if (this.ws) {75 this.ws.close()76 this.ws = null77 }78 }79}在 Vue 组件中使用:
1<script setup>2import { onMounted, onUnmounted } from 'vue'3import { DittoWebSocket } from '@/utils/dittoWebSocket'45let dittoWs = null67onMounted(() => {8 dittoWs = new DittoWebSocket()9 dittoWs.connect()10 11 // 订阅消息12 const unsubscribe = dittoWs.onMessage((message) => {13 console.log('Thing updated:', message)14 // 更新 UI15 })16 17 onUnmounted(() => {18 unsubscribe()19 dittoWs.disconnect()20 })21})22</script>九、项目实战案例
9.1 智能工厂温度监控系统
场景描述:
- 工厂有 100+ 个温度传感器
- 需要实时监控所有设备温度
- 温度超过 30°C 时触发告警
- 提供历史数据分析和趋势预测
实现步骤:
1. 创建设备批量导入脚本
1# scripts/import_devices.py2import requests34DITTO_API = "http://localhost:8080/api/2"56# 设备配置7devices = [8 {"id": f"org.factory:temp-sensor-{i:03d}", 9 "building": "A", 10 "floor": str(i // 10 + 1), 11 "room": str(i % 10 + 101)} 12 for i in range(1, 101)13]1415def create_device(device_id, building, floor, room):16 """创建设备"""17 url = f"{DITTO_API}/things/{device_id}"18 data = {19 "attributes": {20 "manufacturer": "Acme Inc.",21 "model": "TempSensor-Industrial",22 "location": {23 "building": building,24 "floor": floor,25 "room": room26 }27 },28 "features": {29 "temperature": {30 "properties": {31 "value": 25.0,32 "unit": "°C",33 "status": "normal"34 }35 }36 }37 }38 39 response = requests.put(url, json=data)40 return response.status_code4142# 批量创建设备43for device in devices:44 status = create_device(45 device["id"], 46 device["building"], 47 device["floor"], 48 device["room"]49 )50 print(f"✓ Created {device['id']}: {status}")2. 设备数据模拟器
1# scripts/device_simulator.py2import paho.mqtt.client as mqtt3import json4import time5import random6from datetime import datetime78MQTT_BROKER = "localhost"9MQTT_PORT = 18831011def simulate_temperature():12 """模拟温度值(正态分布,偶尔出现异常)"""13 if random.random() < 0.05: # 5% 概率出现高温14 return round(random.uniform(30, 35), 2)15 else:16 return round(random.gauss(25, 3), 2) # 正常范围 20-30°C1718def publish_data(client, device_id):19 """发布设备数据"""20 namespace, name = device_id.split(":")21 topic = f"{namespace}/{name}/things/twin/commands/modify"22 23 temperature = simulate_temperature()24 25 message = {26 "topic": f"{namespace}/{name}/things/twin/commands/modify",27 "path": "/features/temperature/properties",28 "value": {29 "value": temperature,30 "unit": "°C",31 "status": "high" if temperature > 30 else "normal",32 "timestamp": datetime.utcnow().isoformat() + "Z"33 }34 }35 36 client.publish(topic, json.dumps(message))37 return temperature3839# MQTT 客户端40client = mqtt.Client()41client.connect(MQTT_BROKER, MQTT_PORT, 60)42client.loop_start()4344# 模拟 100 个设备45devices = [f"org.factory:temp-sensor-{i:03d}" for i in range(1, 101)]4647print("开始模拟设备数据上报...")48try:49 while True:50 for device_id in devices:51 temp = publish_data(client, device_id)52 print(f"📡 {device_id}: {temp}°C")53 54 time.sleep(60) # 每分钟更新一次55 56except KeyboardInterrupt:57 print("\n停止模拟")58 client.loop_stop()59 client.disconnect()9.2 数据分析仪表板
Grafana 仪表板配置:
- 温度分布热力图
1from(bucket: "telegraf")2 |> range(start: -1h)3 |> filter(fn: (r) => r["_measurement"] == "temperature")4 |> filter(fn: (r) => r["_field"] == "value")5 |> aggregateWindow(every: 5m, fn: mean)- 异常设备列表
1from(bucket: "telegraf")2 |> range(start: -5m)3 |> filter(fn: (r) => r["_measurement"] == "temperature")4 |> filter(fn: (r) => r["_field"] == "value")5 |> filter(fn: (r) => r._value > 30.0)6 |> last()- 温度趋势预测(使用 Holt-Winters 算法)
1from(bucket: "telegraf")2 |> range(start: -7d)3 |> filter(fn: (r) => r["_measurement"] == "temperature")4 |> aggregateWindow(every: 1h, fn: mean)5 |> holtWinters(n: 24, interval: 1h)十、最佳实践
10.1 Thing ID 命名规范
推荐格式:{namespace}:{resource-type}-{identifier}
1org.factory:temp-sensor-0012org.factory:pump-motor-A123com.warehouse:conveyor-belt-B03避免:
- ❌ 过长的 ID:
org.company.department.factory.building-a.floor-2.room-201.temp-sensor-001 - ❌ 特殊字符:
sensor#001、device@abc - ❌ 中文字符:
温度传感器-001
10.2 数据更新策略
高频数据(> 1 Hz):
1// 使用批量更新减少 API 调用2const batchUpdate = {3 "temperature": { "properties": { "value": 25.5 } },4 "humidity": { "properties": { "value": 65.2 } },5 "pressure": { "properties": { "value": 101.3 } }6}78await dittoApi.put(`/things/${thingId}/features`, batchUpdate)低频数据(< 1 次/分钟):
1// 单独更新每个属性2await dittoApi.put(3 `/things/${thingId}/features/battery/properties/level`,4 855)10.3 权限管理
为不同角色创建 Policy:
1{2 "policyId": "org.factory:device-policy",3 "entries": {4 "admin": {5 "subjects": {6 "nginx:admin": { "type": "admin" }7 },8 "resources": {9 "thing:/": { "grant": ["READ", "WRITE"], "revoke": [] },10 "policy:/": { "grant": ["READ", "WRITE"], "revoke": [] },11 "message:/": { "grant": ["READ", "WRITE"], "revoke": [] }12 }13 },14 "operator": {15 "subjects": {16 "nginx:operator": { "type": "operator" }17 },18 "resources": {19 "thing:/": { "grant": ["READ", "WRITE"], "revoke": [] },20 "policy:/": { "grant": ["READ"], "revoke": ["WRITE"] },21 "message:/": { "grant": ["READ", "WRITE"], "revoke": [] }22 }23 },24 "viewer": {25 "subjects": {26 "nginx:viewer": { "type": "viewer" }27 },28 "resources": {29 "thing:/": { "grant": ["READ"], "revoke": ["WRITE"] },30 "policy:/": { "grant": [], "revoke": ["READ", "WRITE"] },31 "message:/": { "grant": ["READ"], "revoke": ["WRITE"] }32 }33 }34 }35}10.4 性能优化
1. 使用搜索 API 代替轮询
❌ 不推荐(轮询所有设备):
1setInterval(async () => {2 const things = await dittoApi.getAllThings()3 // 处理数据...4}, 5000)✅ 推荐(使用 WebSocket 订阅):
1const ws = new DittoWebSocket()2ws.onMessage((message) => {3 // 实时处理变化4})2. 批量操作
1// 批量删除2const thingIds = ['org.example:device-001', 'org.example:device-002']3await Promise.all(thingIds.map(id => dittoApi.deleteThing(id)))3. 分页查询
1// 使用 option 参数进行分页2const page1 = await dittoApi.searchThings('eq(attributes/manufacturer,"Acme")', {3 option: 'size(50),cursor(0)'4})十一、故障排查
11.1 常见问题
问题 1:无法创建 Thing
1HTTP 400 Bad Request: Thing ID must match pattern namespace:name解决方案:检查 Thing ID 格式,必须包含命名空间和名称,用冒号分隔。
1// ❌ 错误2"my-device"34// ✅ 正确5"org.example:my-device"问题 2:MQTT 消息未生效
排查步骤:
- 检查 MQTT 连接是否配置
- 验证主题格式
- 查看 Ditto connectivity 日志
1# 查看 connectivity 服务日志2kubectl logs -f opentwins-ditto-connectivity-xxx问题 3:WebSocket 连接失败
解决方案:检查端口转发是否正常
1# 重新启动端口转发2kubectl port-forward svc/opentwins-ditto-nginx 8080:808011.2 监控和日志
查看 OpenTwins 组件日志:
1# 查看所有 Pod2kubectl get pods34# 查看特定服务日志5kubectl logs -f opentwins-ditto-gateway-xxx6kubectl logs -f opentwins-ditto-things-xxx7kubectl logs -f opentwins-influxdb2-089# 查看最近的事件10kubectl get events --sort-by='.lastTimestamp'十二、总结与展望
12.1 项目总结
本文档详细介绍了基于 OpenTwins 平台构建数字孪生应用的完整流程:
✅ 平台部署:OpenTwins 在 Windows 本地环境的安装和配置 ✅ 核心概念:Eclipse Ditto 的 Things、Features、Policies 数据模型 ✅ API 使用:完整的 Ditto HTTP API 操作示例 ✅ 设备接入:MQTT 和 HTTP 两种设备接入方式 ✅ 数据可视化:Grafana 监控仪表板配置 ✅ 应用开发:Vue3 前端应用开发实践 ✅ 实时通信:WebSocket 实时数据订阅 ✅ 最佳实践:命名规范、性能优化、权限管理
12.2 技术优势
使用 OpenTwins 的优势:
| 优势 | 说明 |
|---|---|
| 🚀 快速部署 | 一键部署完整数字孪生平台 |
| 🎯 标准化 | 基于 Eclipse Ditto,符合工业标准 |
| 💰 成本低 | 开源免费,无授权费用 |
| 🔧 易扩展 | 微服务架构,支持水平扩展 |
| 📊 可视化 | 内置 Grafana,开箱即用 |
| 🔒 安全性 | 完善的权限控制机制 |
12.3 下一步计划
功能扩展:
-
3D 可视化
- 集成 Three.js 实现 3D 场景
- 设备在 3D 空间中的实时定位
- 点击设备查看详细信息
-
AI 预测
- 设备故障预测模型
- 温度趋势预测
- 异常检测算法
-
告警系统
- 多级告警规则
- 短信/邮件通知
- 告警升级机制
-
移动端应用
- UniApp 跨平台开发
- 实时推送通知
- 离线数据缓存
生产环境部署:
-
Kubernetes 集群
- 使用生产级 Kubernetes 集群
- 配置高可用性
- 设置资源限制
-
监控和运维
- Prometheus + Grafana 监控
- ELK 日志收集
- 自动备份和恢复
-
安全加固
- HTTPS/TLS 加密
- OAuth2 认证
- API 访问限流
12.4 参考资源
官方文档:
社区资源:
学习路径:
- 完成 OpenTwins 本地部署
- 学习 Ditto Things 数据模型
- 开发简单的设备接入程序
- 创建 Grafana 监控仪表板
- 开发 Vue 前端应用
- 集成实时 WebSocket 通信
- 部署到生产环境
文档版本:1.0.0 最后更新:2025-01-15 作者:Laby 许可证:MIT
相关文档:
🎉 恭喜! 您已经掌握了基于 OpenTwins 构建数字孪生应用的全部知识。现在开始动手实践吧!
评论