Moving averages with MongoDB aggregation framework?

If you have 50-year temperature data (daily) (for example), how would you calculate moving averages using 3-month intervals over that time period? Can you do this with one request or should you have multiple requests?

Example Data

01/01/2014 = 40 degrees
12/31/2013 = 38 degrees
12/30/2013 = 29 degrees
12/29/2013 = 31 degrees
12/28/2013 = 34 degrees
12/27/2013 = 36 degrees
12/26/2013 = 38 degrees
.....

      

+5


source to share


5 answers


The agg structure now has $map

both and $reduce

and is $range

constructed in such a way that handling arrays is much more straightforward. Below is an example of calculating a moving average over a dataset where you want to filter some predicate. Basic setting is each document containing filter criteria and value, for example

{sym: "A", d: ISODate("2018-01-01"), val: 10}
{sym: "A", d: ISODate("2018-01-02"), val: 30}

      

Here is:



// This controls the number of observations in the moving average:
days = 4;

c=db.foo.aggregate([

// Filter down to what you want.  This can be anything or nothing at all.
{$match: {"sym": "S1"}}

// Ensure dates are going earliest to latest:
,{$sort: {d:1}}

// Turn docs into a single doc with a big vector of observations, e.g.
//     {sym: "A", d: d1, val: 10}
//     {sym: "A", d: d2, val: 11}
//     {sym: "A", d: d3, val: 13}
// becomes
//     {_id: "A", prx: [ {v:10,d:d1}, {v:11,d:d2},  {v:13,d:d3} ] }
//
// This will set us up to take advantage of array processing functions!
,{$group: {_id: "$sym", prx: {$push: {v:"$val",d:"$date"}} }}

// Nice additional info.  Note use of dot notation on array to get
// just scalar date at elem 0, not the object {v:val,d:date}:
,{$addFields: {numDays: days, startDate: {$arrayElemAt: [ "$prx.d", 0 ]}} }

// The Juice!  Assume we have a variable "days" which is the desired number
// of days of moving average.
// The complex expression below does this in python pseudocode:
//
// for z in range(0, size of value vector - # of days in moving avg):
//    seg = vector[n:n+days]
//    values = seg.v
//    dates = seg.d
//    for v in seg:
//        tot += v
//    avg = tot/len(seg)
// 
// Note that it is possible to overrun the segment at the end of the "walk"
// along the vector, i.e. not enough date-values.  So we only run the
// vector to (len(vector) - (days-1).
// Also, for extra info, we also add the number of days *actually* used in the
// calculation AND the as-of date which is the tail date of the segment!
//
// Again we take advantage of dot notation to turn the vector of
// object {v:val, d:date} into two vectors of simple scalars [v1,v2,...]
// and [d1,d2,...] with $prx.v and $prx.d
//
,{$addFields: {"prx": {$map: {
    input: {$range:[0,{$subtract:[{$size:"$prx"}, (days-1)]}]} ,
    as: "z",
    in: {
       avg: {$avg: {$slice: [ "$prx.v", "$$z", days ] } },
       d: {$arrayElemAt: [ "$prx.d", {$add: ["$$z", (days-1)] } ]}
        }
        }}
    }}

            ]);

      

This can lead to the following result:

{
    "_id" : "S1",
    "prx" : [
        {
            "avg" : 11.738793632512115,
            "d" : ISODate("2018-09-05T16:10:30.259Z")
        },
        {
            "avg" : 12.420766702631376,
            "d" : ISODate("2018-09-06T16:10:30.259Z")
        },
        ...

    ],
    "numDays" : 4,
    "startDate" : ISODate("2018-09-02T16:10:30.259Z")
}

      

+4


source


The way I would like to do this in MongoDB is to maintain a running sum of the last 90 days in the document for each day value, eg.

{"day": 1, "tempMax": 40, "tempMaxSum90": 2232}
{"day": 2, "tempMax": 38, "tempMaxSum90": 2230}
{"day": 3, "tempMax": 36, "tempMaxSum90": 2231}
{"day": 4, "tempMax": 37, "tempMaxSum90": 2233}

      

Whenever a new data point needs to be added to the collection, instead of reading and summing 90 values, you can efficiently calculate the following sum with two simple queries, one addition and one subtraction like this (psuedo-code):

tempMaxSum90(day) = tempMaxSum90(day-1) + tempMax(day) - tempMax(day-90)

      



The 90-day moving average for each day is just a 90-day amount divided by 90.

If you also want to suggest moving averages across different time scales (e.g. 1 week, 30 days, 90 days, 1 year), you can simply maintain an array of amounts with each document instead of one amount, one amount for each time frame you require.

This approach requires additional storage space and additional processing to insert new data, but is suitable for most time series construction scenarios where new data is collected relatively slowly and fast retrieval is desirable.

+5


source


I don't believe the aggregation framework can do this for multiple dates in the current version (2.6), or at least can't do it without some serious gymnastics. The reason is that the aggregation pipeline only processes one document at a time and only one document, so you need to somehow create a document for each day that contains the relevant information for 3 months. It will be like a step $group

that will calculate the average, which means that the previous step would have produced about 90 copies of each day's record with some distinctive key that can be used for $group

.

So I don't see a way to do this more than one date at a time in a single aggregate. I would be happy to be wrong and should edit / delete this answer in case someone finds a way to do this, even if it is so difficult it is not practical. A PostgreSQL PARTITION function would do the job here; perhaps this feature will be added someday.

0


source


I think I may have an answer to my question. Shrinking the map would do it. The first use emits to map each document neighbors so that it is averaged, then use a reduction for each array ... and that the new array of averages should be an overtime moving average graph as it will be the new date interval you care about

I think I needed to better understand how best ...

:)

For example, if we want to do this in memory (later we can create collections)

GIST https://gist.github.com/mrgcohen/3f67c597a397132c46f7

Does it look right?

0


source


The accepted answer helped me, but it took me a while to figure out how it works and so I thought I would explain my method to help others. Especially in your context I think my answer will help

This works on small datasets ideally

First, group the data by day, then add all the days in the array to each day:

{
  "$sort": {
    "Date": -1
  }
},
{
  "$group": {
    "_id": {
      "Day": "$Date",
      "Temperature": "$Temperature"
    },
    "Previous Values": {
      "$push": {
        "Date": "$Date",
        "Temperature": "$Temperature"
      }
    }
  }

      

This will leave you with an entry that looks like this (it will be ordered correctly):

{"_id.Day": "2017-02-01", 
"Temperature": 40, 
"Previous Values": [
    {"Day": "2017-03-01", "Temperature": 20},
    {"Day": "2017-02-11", "Temperature": 22},
    {"Day": "2017-01-18", "Temperature": 03},
    ...
    ]},

      

Now that all the days are added to each day, we need to remove elements from the Previous Values ​​array that are more recent than the _id.Day field, since the moving average is reversed:

{
  "$project": {
    "_id": 0,
    "Date": "$_id.Date",
    "Temperature": "$_id.Temperature",
    "Previous Values": 1
  }
},
{
  "$project": {
    "_id": 0,
    "Date": 1,
    "Temperature": 1,
    "Previous Values": {
      "$filter": {
        "input": "$Previous Values",
        "as": "pv",
        "cond": {
          "$lte": ["$$pv.Date", "$Date"]
        }
      }
    }
  }
},

      

Each item in the Previous Values ​​array will only contain dates that are less than or equal to the date for each entry:

{"Day": "2017-02-01", 
"Temperature": 40, 
"Previous Values": [
    {"Day": "2017-01-31", "Temperature": 33},
    {"Day": "2017-01-30", "Temperature": 36},
    {"Day": "2017-01-29", "Temperature": 33},
    {"Day": "2017-01-28", "Temperature": 32},
    ...
    ]}

      

Now we can choose the average window size, since the data is given by day, and for the week we take the first 7 array records; monthly - 30; or 3 months, 90 days:

{
  "$project": {
    "_id": 0,
    "Date": 1,
    "Temperature": 1,
    "Previous Values": {
      "$slice": ["$Previous Values", 0, 90]
    }
  }
},

      

To average previous temperatures, we unwind the Previous Values ​​array, then group by the date field. The unwinding operation does this:

{"Day": "2017-02-01", 
"Temperature": 40, 
"Previous Values": {
        "Day": "2017-01-31", 
        "Temperature": 33}
},

{"Day": "2017-02-01", 
"Temperature": 40, 
"Previous Values": {
        "Day": "2017-01-30", 
        "Temperature": 36}
},

{"Day": "2017-02-01", 
"Temperature": 40, 
"Previous Values": {
        "Day": "2017-01-29", 
        "Temperature": 33}
},
...

      

Note that the Day field is the same, but we now have a document for each of the previous dates from the Previous Values ​​array. We can now group back by day, then average the previous values. Temperature to get the moving average:

{"$group": {
    "_id": {
      "Day": "$Date",
      "Temperature": "$Temperature"
    },
    "3 Month Moving Average": {
      "$avg": "$Previous Values.Temperature"
    }
  }
}

      

It! I know that concatenating every record with every record is not ideal, but it works great with small datasets

0


source







All Articles