麻豆小视频在线观看_中文黄色一级片_久久久成人精品_成片免费观看视频大全_午夜精品久久久久久久99热浪潮_成人一区二区三区四区

首頁(yè) > 數(shù)據(jù)庫(kù) > MongoDB > 正文

基于Morphia實(shí)現(xiàn)MongoDB按小時(shí)、按天聚合操作方法

2020-10-29 18:41:30
字體:
供稿:網(wǎng)友

MongoDB按照天數(shù)或小時(shí)聚合

需求

最近接到需求,需要對(duì)用戶賬戶下的設(shè)備狀態(tài),分別按照天以及小時(shí)進(jìn)行聚合,以此為基礎(chǔ)繪制設(shè)備狀態(tài)趨勢(shì)圖.
實(shí)現(xiàn)思路是啟動(dòng)定時(shí)任務(wù),對(duì)各用戶的設(shè)備狀態(tài)數(shù)據(jù)分別按照小時(shí)以及天進(jìn)行聚合,并存儲(chǔ)進(jìn)數(shù)據(jù)庫(kù)中供用戶后續(xù)查詢.
涉及到的技術(shù)棧分別為:Spring Boot,MongoDB,Morphia.

數(shù)據(jù)模型

@Data@Builder@Entity(value = "rawDevStatus", noClassnameStored = true)// 設(shè)備狀態(tài)索引@Indexes({    // 設(shè)置數(shù)據(jù)超時(shí)時(shí)間(TTL,MongoDB根據(jù)TTL在后臺(tái)進(jìn)行數(shù)據(jù)刪除操作)    @Index(fields = @Field("time"), options = @IndexOptions(expireAfterSeconds = 3600 * 24 * 72)),    @Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)})})public class RawDevStatus {  @Id  @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)  private ObjectId objectId;  private String userId;  private Instant time;  @Embedded("points")  List<Point> protocolPoints;  @Data  @AllArgsConstructor  public static class Point {    /**     * 協(xié)議類型     */    private Protocol protocol;    /**     * 設(shè)備總數(shù)     */    private Integer total;    /**     * 設(shè)備在線數(shù)目     */    private Integer onlineNum;    /**     * 處于啟用狀態(tài)設(shè)備數(shù)目     */    private Integer enableNum;  }}

上述代碼是設(shè)備狀態(tài)實(shí)體類,其中設(shè)備狀態(tài)數(shù)據(jù)是按照設(shè)備所屬協(xié)議進(jìn)行區(qū)分的.

@Data@Builder@Entity(value = "aggregationDevStatus", noClassnameStored = true)@Indexes({    @Index(fields = @Field("expireAt"), options = @IndexOptions(expireAfterSeconds = 0)),    @Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)})})public class AggregationDevStatus {  @Id  @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)  private ObjectId objectId;  /**   * 用戶ID   */  private String userId;  /**   * 設(shè)備總數(shù)   */  private Double total;  /**   * 設(shè)備在線數(shù)目   */  private Double onlineNum;  /**   * 處于啟用狀態(tài)設(shè)備數(shù)目   */  private Double enableNum;  /**   * 聚合類型(按照小時(shí)還是按照天聚合)   */  @Property("aggDuration")  private AggregationDuration aggregationDuration;  private Instant time;  /**   * 動(dòng)態(tài)設(shè)置文檔過期時(shí)間   */  private Instant expireAt;}

上述代碼是期待的聚合結(jié)果,其中構(gòu)建兩個(gè)索引:(1)超時(shí)索引;(2)復(fù)合索引,程序會(huì)根據(jù)用戶名以及時(shí)間查詢?cè)O(shè)備狀態(tài)聚合結(jié)果.

聚合操作符介紹

聚合操作類似于管道,管道中的每一步操作產(chǎn)生的中間結(jié)果作為下一步的輸入源,最終輸出聚合結(jié)果.

此次聚合主要涉及以下操作:

•$project:指定輸出文檔中的字段.
•$unwind:拆分?jǐn)?shù)據(jù)中的數(shù)組;
•match:選擇要處理的文檔數(shù)據(jù);
•group:根據(jù)key分組聚合結(jié)果.

原始聚合語句

db.getCollection('raw_dev_status').aggregate([  {$match:    {      time:{$gte: ISODate("2019-06-27T00:00:00Z")},    }  },  {$unwind: "$points"},  {$project:    {      userId:1,points:1,      tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } }    }  },  {$project:    {      userId:1,points:1,      groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } }    }  },  {$group:    {      _id:{user_id:'$userId', cal_time:'$groupTime'},      devTotal:{'$avg':'$points.total'},      onlineTotal:{'$avg':'$points.onlineNum'},      enableTotal:{'$avg':'$points.enableNum'}    }  },])

上述代碼是按小時(shí)聚合數(shù)據(jù),以下來逐步介紹處理思路:

(1) $match

根據(jù)小時(shí)聚合數(shù)據(jù),因?yàn)橹恍枰@取近24小時(shí)的聚合結(jié)果,所以對(duì)數(shù)據(jù)進(jìn)行初步篩選.

(2) $unwind

raw_dev_status中的設(shè)備狀態(tài)是按照協(xié)議區(qū)分的數(shù)組,因此需要對(duì)其進(jìn)行展開,以便下一步進(jìn)行篩選;

(3) $project

  {$project:    {      userId:1,points:1,      tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } }    }  }

選擇需要輸出的數(shù)據(jù),分別為:userId,points以及tmp.

需要注意,為了按照時(shí)間聚合,對(duì)$time屬性進(jìn)行操作,提取%Y:%m:%dT%H時(shí)信息至$tmp作為下一步的聚合依據(jù).

如果需要按天聚合,則format數(shù)據(jù)可修改為:%Y:%m:%dT00:00:00Z即可滿足要求.

(4) $project

  {$project:    {      userId:1,points:1,      groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } }    }  }

因?yàn)樯弦徊絧roject操作中,tmp為字符串?dāng)?shù)據(jù),最終的聚合結(jié)果需要時(shí)間戳(主要懶,不想在程序中進(jìn)行轉(zhuǎn)換操作).
因此,此處對(duì)$tmp進(jìn)行操作,轉(zhuǎn)換為時(shí)間類型數(shù)據(jù),即groupTime.

(5) $group

對(duì)聚合結(jié)果進(jìn)行分類操作,并生成最終輸出結(jié)果.

 {$group:    {      # 根據(jù)_id進(jìn)行分組操作,依據(jù)是`user_id`以及`$groupTime`      _id:{user_id:'$userId', cal_time:'$groupTime'},      # 求設(shè)備總數(shù)平均值      devTotal:{'$avg':'$points.total'},      # 求設(shè)備在線數(shù)平均值      onlineTotal:{'$avg':'$points.onlineNum'},      # ...      enableTotal:{'$avg':'$points.enableNum'}    }  }

代碼編寫

此處ODM選擇Morphia,亦可以使用MongoTemplate,原理類似.

 /**   * 創(chuàng)建聚合條件   *   * @param pastTime   過去時(shí)間段   * @param dateToString 格式化字符串(%Y:%m:%dT%H:00:00Z或%Y:%m:%dT00:00:00Z)   * @return 聚合條件   */  private AggregationPipeline createAggregationPipeline(Instant pastTime, String dateToString, String stringToDate) {    Query<RawDevStatus> query = datastore.createQuery(RawDevStatus.class);    return datastore.createAggregation(RawDevStatus.class)        .match(query.field("time").greaterThanOrEq(pastTime))        .unwind("points", new UnwindOptions().preserveNullAndEmptyArrays(false))        .match(query.field("points.protocol").equal("ALL"))        .project(Projection.projection("userId"),            Projection.projection("points"),            Projection.projection("convertTime",                Projection.expression("$dateToString",                    new BasicDBObject("format", dateToString)                        .append("date", "$time"))            )        )        .project(Projection.projection("userId"),            Projection.projection("points"),            Projection.projection("convertTime",                Projection.expression("$dateFromString",                    new BasicDBObject("format", stringToDate)                        .append("dateString", "$convertTime"))            )        )        .group(            Group.id(Group.grouping("userId"), Group.grouping("convertTime")),            Group.grouping("total", Group.average("points.total")),            Group.grouping("onlineNum", Group.average("points.onlineNum")),            Group.grouping("enableNum", Group.average("points.enableNum"))        );  }  /**   * 獲取聚合結(jié)果   *   * @param pipeline 聚合條件   * @return 聚合結(jié)果   */  private List<AggregationMidDevStatus> getAggregationResult(AggregationPipeline pipeline) {    List<AggregationMidDevStatus> statuses = new ArrayList<>();    Iterator<AggregationMidDevStatus> resultIterator = pipeline.aggregate(        AggregationMidDevStatus.class, AggregationOptions.builder().allowDiskUse(true).build());    while (resultIterator.hasNext()) {      statuses.add(resultIterator.next());    }    return statuses;  }  //......................................................................................  // 獲取聚合結(jié)果(省略若干代碼)  AggregationPipeline pipeline = createAggregationPipeline(pastTime, dateToString, stringToDate);  List<AggregationMidDevStatus> midStatuses = getAggregationResult(pipeline);  if (CollectionUtils.isEmpty(midStatuses)) {    log.warn("Can not get dev status aggregation result.");    return;  }

總結(jié)

以上所述是小編給大家介紹的基于Morphia實(shí)現(xiàn)MongoDB按小時(shí)、按天聚合操作方法,希望對(duì)大家有所幫助,如果大家有任何疑問請(qǐng)給我留言,小編會(huì)及時(shí)回復(fù)大家的。在此也非常感謝大家對(duì)武林網(wǎng)網(wǎng)站的支持!
如果你覺得本文對(duì)你有幫助,歡迎轉(zhuǎn)載,煩請(qǐng)注明出處,謝謝!

發(fā)表評(píng)論 共有條評(píng)論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 国产午夜精品久久久久 | 麻豆91精品91久久久 | 欧美1—12sexvideos| 久久伊人精品热在75 | 国产精品刺激对白麻豆99 | 午夜视频在线免费播放 | h视频在线免费观看 | 国产午夜免费不卡精品理论片 | 色污视频 | free性欧美hd另类 | 国产精品久久久久久久久久10秀 | 国产成人网 | 成人三级电影网 | 国产精选在线 | 国产精品久久久久久久不卡 | 亚洲一区在线观看视频 | 久久久婷婷一区二区三区不卡 | 中文字幕在线观看亚洲 | 欧美另类综合 | 特级黄色影院 | 桥本有菜免费av一区二区三区 | 国产精品高潮视频 | 久久精品亚洲一区 | 性毛片视频 | 国产精品成人久久久久a级 av电影在线免费 | 9191色| 久久国产精品二国产精品中国洋人 | 欧美性受xxxx人人本视频 | 亚洲第一成人在线观看 | 中国毛片在线观看 | 亚洲电影免费观看国语版 | 中文字幕欧美视频 | 久久精品亚洲国产奇米99 | av在线播放网址 | 欧美日韩国产综合网 | 日本在线视频免费 | 国内性爱视频 | 欧美精品久久久久久久多人混战 | 一夜新娘第三季免费观看 | 中文字幕极速在线观看 | 欧洲成人一区 |