流处理模型 |
流数据服务(Streaming Service)以流处理模型作为服务来源,其中指定了运行服务所需的信息。
流数据处理流程包含:
依据上述处理流程,流数据的处理模型包含四个部分:Receiver(接收器)、Filter(过滤器)、Mapper(转换器)和Sender(发送器)。每个部分作为一个节点,可以进行连接和合并,构建成实时数据处理流 Stream。除了处理流 Stream 以外,还有一些辅助参数作为整个服务的运行条件,一并存储在启动参数类型 Startup 中。处理模型如下图:
流处理模型采用 JSON 格式定义,您可以参考下文介绍的参数及相应的 JSON 示例,编写一个流处理模型文件并发布为流数据服务。您也可以使用流处理模型编辑器构建模型,查看参数说明即可。
用于设置 Spark Streaming 的运行参数。包括:
Stream 中包含了实时数据处理运行流的参数。
继承自StreamNode,作为流数据处理的入口,接收各种来源的数据,包括Socket、WebSocket、Http、文件系统等。Receiver中需要设置接收信息的元数据,即metadata。Receiver节点包括三个部分组成:自身的描述信息如name、source等;消息的元数据metadata;消息的读取格式reader。
流数据服务支持以下接收方式:
SocketReceiver:继承自Receiver,接收Socket消息的节点。需指定的参数有:
ipAddress——String 类型。接收的Socket服务的IP地址
port——int 类型,接收的Socket服务的端口号
示例:
{ "ipAddress" : "127.0.0.1", "port" : 9527, "name" : "socketReceiver", "source" : "Socket Receiver", "description" : "Receive some message from socketServer", "prevNodes" : [], "nextNodes" : [], "className": "com.supermap.bdt.streaming.receiver.SocketReceiver " }
MultiSocketReceiver:继承自Receiver,同时接收多个Socket消息的节点,接收的消息内容必须是相同的。需指定的参数有:
servers——Array[String] 类型。需要接收的多个服务地址,每个数组对象为一个地址,地址与端口用冒号隔开。
示例:
{ "servers": [ "192.168.1.1:9527", "192.168.1.1:9528", "192.168.1.2:9527" ], "name": "multiSocketReceiver", "source": "MultiSource Socket Receiver", "description": "Receive message from multi socket server", "prevNodes": [], "nextNodes": [], "className": "com.supermap.bdt.streaming.receiver.MultiSocketReceiver" }
SocketServerReceiver:继承自Receiver,Socket服务端接收节点,用于作为服务端接收其他Socket客户的发送的消息。需指定的参数有:
port:int型。启动的Socket服务端监听端口。
{ "port": 9527, "name": "socketServerReceiver", "source": "SocketServer Receiver", "description": "Receive message from socket client", "prevNodes": [], "nextNodes": [], "className": "com.supermap.bdt.streaming.receiver.SocketServerReceiver" }
WebSocketReceiver:继承自Receiver,接收WebSocket消息的节点。需指定的参数有:
url:String 类型。WebSocket服务地址。
{ "url": "ws://192.168.1.1:9527/websocket ", "name": "webSocketReceiver", "source": "WebSocket Receiver", "description": "Receive message from websocket server", "prevNodes": [], "nextNodes": [], "className": "com.supermap.bdt.streaming.receiver.WebSocketReceiver" }
TextFileReceiver:继承自Receiver,监控指定目录,读取新增文件的内容。需指定的参数有:
directoryPath:监控的文件目录,如HDFS目录hdfs:///data/;Linux系统中的目录 /user/share/data;Windows系统中的目录C:/data。
{ "directoryPath": "'hdfs:///data/'", "name": "textFileReceiver", "source": "Text File Receiver", "description": "Listen new file in folder", "prevNodes": [], "nextNodes": [], "className": "com.supermap.bdt.streaming.receiver.TextFileReceiver" }
SingleTextFileReceiver:单文本文件接收器,继承自Receiver,根据设置读取监控文件的内容,支持读取 Json、GeoJSON 和 CSV格式的文件。需指定的参数有:
readInterva:读取时间的间隔。
rowsOneTime:每次读取的行数。
filePath:需要发送的文件的路径,注意,此处应填写绝对路径。
{ "version": 9000, "sparkParameter": { "checkPointDir": "tmp", "interval": 5000 }, "stream": { "nodeDic": { "TextFileReceiver": { "filePath": "G:\\QQRev\\test.json", "readInterva": 1000, "rowsOneTime": 100, "reader": { "isJsonArray": false, "arrayExpression": "", "className": "com.supermap.bdt.streaming.formatter.JsonFormatter" }, "metadata": { "title": "", "epsg": 3857, "fieldInfos": [ { "name": "X", "source": "lon", "nType": "DOUBLE" }, { "name": "Y", "source": "lat", "nType": "DOUBLE" }, { "name": "mbbh", "source": "mbbh", "nType": "TEXT" } ], "featureType": "POINT", "idFieldName": "mbbh", "dateTimeFormat": "yyyy-MM-dd HH:mm:ss" }, "name": "TextFileReceiver", "caption": "", "description": "", "prevNodes": [], "nextNodes": [ "ConsoleSender" ], "className": "com.supermap.bdt.streaming.receiver.SingleTextFileReceiver" }, "ConsoleSender": { "formatter": { "separator": ",", "className": "com.supermap.bdt.streaming.formatter.CSVFormatter" }, "name": "ConsoleSender", "caption": "", "description": "", "prevNodes": [ "TextFileReceiver" ], "nextNodes": [], "className": "com.supermap.bdt.streaming.sender.ConsoleSender" } } } }
KafkaReceiver:继承自Receiver,接收kafka消息的节点。需指定的参数有:
{ "servers": "192.168.1.1:9092, 192.168.1.2:9092, 192.168.1.3:9092 ,192.168.1.4:9092", "topics": [ "topic1", "topic2" ], "groupid": "groupId", "offset": "latest", "name": "kafkaReceiver", "source": "Kafka Receiver", "description": "Receive message from Kafka", "prevNodes": [], "nextNodes": [], "className": "com.supermap.bdt.streaming.receiver.KafkaReceiver" }
HttpReceiver: 继承自Receiver,接收 HTTP 的消息节点,目前只支持HTTP的Get方法。
url:String类型。Http服务地址。
{ "url": "https://api.wheretheiss.at/v1/satellites/25544", "name": "httpReceiver", "source": "HTTP Receiver", "description": "Get message from web", "prevNodes": [], "nextNodes": [], "className": "com.supermap.bdt.streaming.receiver.HttpReceiver" }
JMSReceiver:继承自Receiver,接收JMS标准协议消息的节点,用于接收ActiveMQ、RabbitMQ等消息中间件的消息。
url——JMS消息服务地址
port——int类型。消息服务端口
queueName ——String类型。 消息队列名称
jdniName——String类型。对应消息中间件的JDNI名称,需要到中间件官网查询
username——String类型。用户名
password——String类型。密码
{ "url": "192.168.1.1", "port": 9527, "queueName": "data", "jdniName": "org.apache.activemq.jndi.ActiveMQInitialContextFactory", "userName": "user", "password": "password", "name": "jmsReceiver", "source": "JMS Receiver", "description": "Receive message from JMS(Java Message Service) for ActiveMQ", "prevNodes": [], "nextNodes": [], "className": "com.supermap.bdt.streaming.receiver.JMSReceiver" }
metadata 写在 Receiver 参数中,是接收消息的元数据,用于描述消息的格式定义。需指定以下信息:
接收的消息的内容格式,包括CSV格式(CSVFormatter)、JSON格式(JsonFormatter)或者GeoJSON格式(GeoJsonFormatter)。
CSVFormatter:表示接收的消息的内容格式为CSV格式。需指定:
separator:指定分隔符,默认为逗号
"reader": { "separator": ",", "className": "com.supermap.bdt.streaming.formatter.CSVFormatter" }
JsonFormatter:接收的消息内容格式为 JSON。示例如下:
"reader": { "className": "com.supermap.bdt.streaming.formatter.JsonFormatter" }
GeoJsonFormatter:接收的消息内容格式为 GeoJSON。示例如下:
"reader": { "className": "com.supermap.bdt.streaming.formatter.JsonFormatter" }
继承于StreamNode,用于过滤当前数据,进行数据的清洗与整理。
逻辑运算式过滤
String filter——过滤器内容,为一个逻辑运算式,将会保留该运算式结果为 true 的对象。
如果需要获取字段值进行逻辑运算,使用关键词[],如[ID] > 10。多个运算式直接可以使用&&(与)或者 ||(或)进行连接,并使用括号()进行优先顺序调整,如[ID] > 10 && ([X] >= 10 || [Y] <= 65.32)。注意,使用关键词IN、MATCHES、EXISTS、ISNULL与其他运算式一起进行运算时,必须使用括号包括起来,如 ([ID] IN 1,3,5,7,9) && [X] > 100。
表1 filter参数支持的逻辑运算符列表
运算符 |
描述 |
== | 等于 (==) 该运算符保留属性值等于指定值的对象。例如,[ID] == 3。 注意:double类型慎用,对比精度为10E-10. |
!= | 不等于 (!=) 该运算符保留属性值不等于指定值的对象。例如,[Name] != “A”。 注意:double类型慎用,对比精度为10E-10. |
> | 大于 (>) 该运算符保留属性值大于指定值的对象。例如,[Speed] > 50 |
>= | 大于或等于 (>=) 该运算符保留属性值大于或等于指定值的对象。例如,[Speed] >= 50 |
< | 小于 (<) 该运算符保留属性值小于指定值的对象。例如,[X] < 10.231 |
<= | 小于或等于 (<=) 该运算符保留属性值小于或等于指定值的对象。例如,[Y] <= 40 |
IN | IN 在指定列表中 当在逗号分隔的值列表中存在指定字段的值时,该运算符保留对象。例如,[Code] IN HK1,HK3,HK5 |
MATCHES | MATCHES 正则表达式匹配 当指定字段的值与正则表达式相匹配时,该运算符保留对象。例如,[Code] MATCHES “^HK[135]” 注意:需要匹配的正则表达式需要用“”引号包含起来 |
EXISTS | EXISTS 字段是否存在 当已接收的事件方案中存在指定字段时,该运算符保留对象。例如,EXISTS [X]。 |
ISNULL | ISNULL 是否为空 当指定字段包含空值时,该运算符保留对象。例如, [X] ISNULL。 |
重要说明:filter 指定的逻辑运算式,其中使用的字符串必须使用双引号(“”)或者单引号(‘’)将字符串扩起来,并且双引号(“”)和单引号(‘’)本身要使用\进行转义,例如:字符串“string”在表达式中要写成:\”string\”或者\’string\’。另外,对于 MATCHES 正则表达式匹配,其中用于匹配的字符“\\”需要对每一个“\”进行转义,所以“\\”字符要写成“\\\\”。示例:
{ "filter": "([X] > [Y] && [X] > 20) || ([ID] IN 1,2,3,4,5)", "name": "Filter", "caption": "Attribute Filter", "description": "Filter feature by expression", "prevNodes": [], "nextNodes": [], "className": "com.supermap.bdt.streaming.filter.FeatureFilter" }
地理过滤
地理过滤是通过空间位置关系过滤地理对象的过滤方式。
示例:
"geoFilter": { "connection": { "type": "udb", "info": [ { "server": "Z:\\airport.udb",, "datasetNames": [ "airports_40" ] } ] }, "mode": "inside", "name": "geoFilter", "caption": "GeoFencing Filter", "description": "Filter feature with geofencing", "prevNodes": [], "nextNodes": [], "className": "com.supermap.bdt.streaming.filter.GeoFilter" }
继承于StreamNode,用于建立字段映射以及对字段进行管理,主要包括:字段映射、添加字段、删除字段、字段运算以及地理围栏。
添加字段
"insertMapper": { "insertIndex": 1, "fieldName": "XX", "nType": "DOUBLE", "expression": "[X] * 2", "name": "insertMapper", "source": "Insert Field", "description": "Insert Field by X * 2", "prevNodes": [], "nextNodes": [], "className": "com.supermap.bdt.streaming.map.FeatureInsertMapper" }
删除字段
"deleteMapper": { "deleteFieldNames": [ "F1", "F2" ], "name": "deleteMapper", "source": "delete Field", "description": "delete Field F1和F2 , "prevNodes": [], "nextNodes": [], "className": "com.supermap.bdt.streaming.map.FeatureDeleteMapper" }
字段映射
srcToDesNamePair:字段名称与新名称的对照关系
"mapMaper": { "srcToDesNamePair": { "ID": "newID_Name", "Y": "newY_Name", "X": "newX_Name" }, "srcToDesIndexPair": { "ID": 0, "Y": 2, "X": 1 }, "name": "mapMaper", "source": "Map Fields", "description": "Map Fields with new name and index", "prevNodes": [], "nextNodes": [], "className": "com.supermap.bdt.streaming.map.FeatureMapMapper" }
字段运算
"calculateMapper": { "fieldName ": Fcal, "expression": "[X] * 2", "name": "calculateMapper", "source": "calculate Field", "description": "calculate Field by X * 2", "prevNodes": [], "nextNodes": [], "className": "com.supermap.bdt.streaming.map.FeatureCalculateMapper " }
对于字段运算,除了支持+、-、x、/等数学运算外,还支持数学函数和字符串处理函数。
函数 |
说明 |
ABS(value) | 返回参数的绝对值 |
FLOOR(double) | 返回指定双精度值的下限(接近的最小整数) |
MAX(value a, value b) | 返回 2 个指定参数值中较大的值 |
MIN(value a, value b) | 返回 2 个指定参数值中较小的值 |
ROUND(value) | 返回最接近参数的长整数(假设参数为双精度值) |
NOW() | 返回当前系统时间 |
UPPERCASE |
字符串处理函数,返回字符串的大写形式,例如: "\"xyz\".UPPERCASE,表示将字符串“xyz”转换为大写 字母返回,处理结果为“XYZ” |
LOWERCASE |
字符串处理函数,返回字符串的小写形式,例如: "\"ABC\".UPPERCASE,表示将字符串“ABC”转换为小 写字母返回,处理结果为“abc” |
REPLACE(string1,string2) |
字符串处理函数,将原字符串中的 string1 部分替换为 string2,例如: "\"ABCxyz\".REPLACE(\"AB\",\"MM\"),表示将字符串 中的 “AB”替换为“MM”,处理结果为:MMCxyz |
SUBSTRING(location1, strcount) |
字符串处理函数,将原字符串中 location1 指定位置开 始,取出 strcount 个字符返回。例如: \"ABCxyz\".SUBSTRING(0, 3),表示将目标字符串从第一 个字符开始取出 3 个字符返回,处理结果为:ABC |
注意:expression 指定的运算表达式,其中使用的字符串必须使用双引号(“”)或者单引号(‘’)将字符串扩起来,并且双引号(“”)和单引号(‘’)本身要使用\ 进行转义,例如:字符串“string”在表达式中要写成:\”string\”或者\’string\’。
地理围栏
type——String类型。数据源类型
Info——Array[DsInfo]类型。数据源连接信息。
"GeoFenceMapper": { "connection": { "type": "udb", "info": [ { "server": "Z: \\airport.udb", "datasetNames": [ "airports_40" ] } ] }, "fenceName": "NAME", "fenceID": "SmID", "withinFieldName": "geoWithin", "statusFieldName": "geoStatus", "name": "GeoFenceMapper", "source": "地理标记转换", "description": "", "prevNodes": [ "SocketReceiver" ], "nextNodes": [ "GeoJsonSocketSender", "FenceWithinFilterOut", "FenceWithinFilterIn" ], "className": "com.supermap.bdt.streaming.map.GeoTaggerMapper" }
静态资源扩展
"StaticRDDJoinMapper": { "className": "com.supermap.bdt.streaming.map.StaticRDDJoinMapper", "caption": "StaticRDDJoinMapper", "name": "StaticRDDJoinMapper", "nextNodes": [], "prevNodes": [], "description": "StaticRDDJoinMapper", "csvFile": "D:\\supermap\\soft\\esriunittype.csv", "idFields": ["yard"] }
继承于StreamNode,作为流数据处理的出口,向外发送数据。包含:
WebSocket 发送节点,用于将消息发送到 WebSocket
String path——WebSocket 服务地址。
注:通过 iServer 订阅功能接收数据,仅支持 GeoJsonFormatter。
示例:
"webSocketClientSender": {
"path": "ws://127.0.0.1/data",
"formatter": {
"separator": ",",
"className": "com.supermap.bdt.streaming.formatter.CSVFormatter"
},
"name": "webSocketClientSender",
"caption": "WebSocket Sender",
"description": "Send message by WebSocket",
"prevNodes": [],
"nextNodes": [],
"className":
"com.supermap.bdt.streaming.sender.WebSocketClientSender"
}
EsAppendSender 向 Elasticsearch 引擎新增数据的节点。可保存传入的所有数据,可在需要保存 streaming 的历史数据时使用。
String url——ES 服务地址加端口
String queueName——ES 节点名称
String directoryPath——ES 类型名称
示例:
"ESAppendSender": {
"url": "127.0.0.1:9200",
"queueName": "aircondition",
"directoryPath": "test1",
"formatter": {
"className": "com.supermap.bdt.streaming.formatter.GeoJsonFormatter"
},
"name": "ESAppendSender",
"caption": "ES 发送器",
"description": "",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.sender.EsAppendSender"
}
向 Elasticsearch 引擎新增与更新数据的节点。需要设置发送消息的 ID 字段,如果发送的消息内容中 ID 字段的值已经在 Elasticsearch 引擎中存在,则将更新该记录;如果 Elasticsearch 引擎中没有对应的 ID 值,则会新增一条记录。
String url——ES 服务地址
String port——ES 服务端口
String index——ES 节点名称
String typ——ES 类型名称
String idFieldName——唯一标识字段名称,用于查找需要更新的记录
示例:
"ESUpdateSender":
"url":"127.0.0.1",
" port":"9200",
" index":" aircondition ",
"typ":" test1",
"idFieldName":"id",
"name":"ESUpdateSender",
"caption":"ESUpdateSender",
"description":"Send message to Elasticsearch",
"prevNodes":[],
"nextNodes":[],
"className":"com.supermap.bdt.streaming.sender.ESUpdateSender"
}
文件型发送节点,用于将消息保存到指定文件中。
String filePath —— 输出文件路径
示例:
"fileSender": {
"filePath": "C:\\result.csv",
"formatter": {
"separator": ",",
"className": "com.supermap.bdt.streaming.formatter.CSVFormatter"
},
"name": "fileSender",
"caption": "",
"description": "",
"prevNodes": [
"filter"
],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.sender.FileSender"
}
发送 JMS 标准协议消息的节点,用于将消息发送到 ActiveMQ、RabbitMQ 等消息中间件。
String url——JMS 消息服务地址
Int port——消息服务端口
String queueName ——消息队列名称
String jdniName——对应消息中间件的 JDNI 名称,需要到中间件官网查询
String username——用户名
String password——密码
示例:
"JMSSender": {
"url": "192.168.168.33",
"port": 61616,
"queueName": "myTestJDNI",
"jdniName": "org.apache.activemq.jndi.ActiveMQInitialContextFactory",
"userName": "admin",
"password": "admin",
"formatter": {
"className": "com.supermap.bdt.streaming.formatter.GeoJsonFormatter"
},
"name": "JMSSender",
"caption": "",
"description": "",
"prevNodes": [
"TextFileReceiver"
],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.sender.JMSSender"
}
短信消息发送节点,SMSSender 使用中国网建(http://www.webchinese.com.cn/)提供的 API 接口发送短信消息,注册中国网建用户后获得用户名和接口安全秘钥,即可发送短信消息。
注意:发送的内容审核时间大概 30 分钟以内,所以接收消息会有延迟。
String user——webchinese 用户名
String apiKey——webchinese 接口安全秘钥
java.util.List[String] phoneNumbers——发送的手机号码列表
int sendLimit——发送数量限制,避免消息频繁消耗短信次数,可以设置本次运行最多发送的条数。
示例:
"smsSender": {
"user": "user",
"apiKey": "apiKey",
"phoneNumbers": [
"13800000000"
],
"sendLimit": 100,
"formatter": {
"separator": ",",
"className": "com.supermap.bdt.streaming.formatter.CSVFormatter"
},
"name": "smsSender",
"caption": "SMS Sender",
"description": "Send message by SMS",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.sender.SMSSender"
}
Socket 客户端发送节点,通过 Socket 客户端连接将消息发送到 Socket 服务端。
String ip——接收的 Socket 服务的 IP 地址;
Int port——接收的 Socket 服务的端口号。
示例:
"socketClientSender": {
"ip": "127.0.0.1",
"port": 9527,
"formatter": {
"separator": ",",
"className": "com.supermap.bdt.streaming.formatter.CSVFormatter"
},
"name": "socketClientSender",
"caption": "SocketClient Sender",
"description": "Send message by Socket",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.sender.SocketClientSender"
}
Socket 服务端发送节点,启动 Socket 服务端,将消息发送到连接的 Socket 客户端。
Int port——启动 Socket 服务的端口号。
示例:
"socketServerSender": {
"port": 9527,
"formatter": {
"separator": ",",
"className": "com.supermap.bdt.streaming.formatter.CSVFormatter"
},
"name": "socketServerSender",
"caption": "SocketServer Sender",
"description": "Send message by Socket",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.sender.SocketServerSender"
}
参照上述示例设置,可以将数据的分析处理结果输出到 iServer DataStore 创建的时空数据库中。
根据上述参数说明,编写一个完整的流处理模型文件,您可以将文件另存为后缀为.streaming的文件,用于快速发布流数据服务,也可以将流处理模型文件的内容直接写入“配置信息”中进行发布。示例如下:
{ "version": 9000, "sparkParameter": { "checkPointDir": "tmp", "interval": 10000 }, "stream": { "nodeDic": { "AQIReceiver": { "url": "http://www.supermapol.com/iserver/services/aqi/restjsr/aqi/pm2_5.json?bounds=-113.90625001585,-52.029966847235,113.90625001585,69.175579762077&to=910111", "reader": { "isJsonArray": true, "arrayExpression": "airQualityList", "className": "com.supermap.bdt.streaming.formatter.JsonFormatter" }, "metadata": { "title": "", "epsg": 3857, "fieldInfos": [ { "name": "X", "source": "location.x", "nType": "DOUBLE" }, { "name": "Y", "source": "location.y", "nType": "DOUBLE" }, { "name": "positionName", "source": "positionName", "nType": "TEXT" }, { "name": "aqi", "source": "aqi", "nType": "DOUBLE" } ], "featureType": "POINT" }, "name": "AQIReceiver", "caption": "", "description": "", "prevNodes": [], "nextNodes": [ "WebSocketClientSender" ], "className": "com.supermap.bdt.streaming.receiver.HttpReceiver" }, "WebSocketClientSender": { "path": "ws://127.0.0.1:8800/iserver/services/dataflow/dataflow/broadcast", "formatter": { "separator": ",", "className": "com.supermap.bdt.streaming.formatter.GeoJsonFormatter" }, "name": "WebSocketClientSender", "caption": "", "description": "", "prevNodes": [ "AQIReceiver" ], "nextNodes": [], "className": "com.supermap.bdt.streaming.sender.WebSocketClientSender" } } } }