Merge Sproc

Merge Sproc

Overview

Why stored procedures?

The Cosmos SQL is very flexable in returning objects and view projections, but it cannot merge documents via its SQL dialect.

Use Case

Imagine we have documents with a many to one relationship. For example we have a master coupon template that is the master and then a user level coupon. We need a query that will load both objects and then merge them into one combined document.

The Code

We will building on the previous but lets add some simple assertion testing.

In order to setup our test we will need 2 new functions, one to add a document, and one to remove it.

1async function createDoc(doc) {
2  const result = await container.items.create(doc);
3  return result.resource;
4}
5
6async function deleteDoc(doc) {
7  await container.item(doc.id, doc.user_id).delete();
8}

and for a fixture, lets generate a simplistic guid for an id

1function guid() {
2  return ([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(/[018]/g, (a) =>
3    (a ^ ((Math.random() * 16) >> (a / 4))).toString(16)
4  );
5}

and a little generator

 1function getFixtures(guid) {
 2  return {
 3    template:{
 4      id: "template-" + guid,
 5      coupon_name: "sproc_testing",
 6      master_details: "I am the template",
 7      detail: "master detail"
 8    },
 9    userDoc: {
10      id: "user-" + guid,
11      user_id: "test-user",
12      coupon_name: "sproc_testing",
13      user_details: "I am the child",
14      detail: "child detail"
15    },
16    expected: {
17      user_id: "test-user",
18      coupon_name: "sproc_testing",
19      master_details: "I am the template",
20      user_details: "I am the child",
21      detail: "child detail"
22    }
23  }
24}

and then modify our main function to result in a failing test that you can run over and over as we develop our merge sproc. This will fail because we have not uploaded the sproc to Azure yet.

 1async function run() {
 2  console.log("\n*****\n* Starting test case\n")
 3  const sproc = mergeSproc
 4  await createOrUpdateSproc(sproc)
 5
 6  // begin test
 7  const id = 'test' //guid()
 8  const fixtures = getFixtures(id)
 9
10  await createDoc(fixtures.template)
11  await createDoc(fixtures.userDoc)
12
13  //user_id is the partition key for this particular collection
14  const partitionKey = fixtures.userDoc.user_id
15  const res = await runSproc(sproc.id, partitionKey, id)
16
17  await deleteDoc(fixtures.template)
18  await deleteDoc(fixtures.userDoc)
19
20  assert.strictEqual(res.detail, fixtures.expected.detail, "detail does not match")
21  assert.strictEqual(res.master_details, fixtures.expected.master_details, "master details don't match")
22  console.log("\n*Assertions passed")
23  return res
24}
25
26run().then(console.log).catch(console.error)

all you need now is the stored procedure to do the merge.

 1const mergeSproc = {
 2    id: "mergeSproc_001",
 3    body: function (guid) {
 4      var collection = getContext().getCollection()
 5      console.log("Sproc called with " + guid)
 6      var filterQuery = 
 7      {     
 8          'query' :  'SELECT * FROM root r where r.id = @id1 or r.id = @id2',
 9          'parameters' : [{'name':'@id1', 'value':'template-' + guid},
10                          {'name':'@id2', 'value':'user-' + guid}]
11      }
12      var isAccepted = collection.queryDocuments(
13          collection.getSelfLink(), 
14          filterQuery,
15          {}, 
16          function (err, feed, options) { if (err) throw err
17            if (!feed || !feed.length) {
18              var response = getContext().getResponse()
19              response.setBody('no docs found....!.')
20            }
21            else {
22              // Do the merge!
23              var body = {merged: "yes"}
24              var response = getContext().getResponse()
25              response.setBody(body)
26          }
27      })
28      if (!isAccepted) throw new Error('The query was not accepted by the server.')
29  }
30}

lets break this down:

SQL injection is bad, so lets use parameter substituions:

1  var filterQuery = 
2  {     
3      'query' :  'SELECT * FROM root r where r.id = @id1 or r.id = @id2',
4      'parameters' : [{'name':'@id1', 'value':'template-' + guid},
5                      {'name':'@id2', 'value':'user-' + guid}]
6  }

This is not a model for proper NoSQL Schema, I wanted to keep the SQL simple for now so we could focus on the sproc and get a specific set of documents back.

This object complies with the well documented SqlQuerySpec Interface and will handle the proper formatting of strings vs numbers. The type of the 'value' is any JSONValue. boolean | number | string | null | JSONArray | JSONObject, but it doesn't always do the right thing for complex types.

Start the async query

1      var collection = getContext().getCollection()
2      var isAccepted = collection.queryDocuments(
3          collection.getSelfLink(), 
4          filterQuery,
5          {}, 
6          function (err, feed, options) { if (err) throw err
  1. Get the current collection object from the content
  2. Call queryDocuments, passing in the collection link, SQL, feed options and a callback.
  3. The queryDocuments will return False if the server rejected it. if you have bad SQL it can be accepted and then an error is thrown
  4. throw errors result in proper HTTP codes being returned to the caller

since you are already running in a partition, you don't need to reference the partition key in the filter query or request options.

:::tip pro tip This seems to be similar to the V1 DocumentDB Syntax but you should read the current documentation :::

The callback

 1          function (err, feed, responseOptions) { if (err) throw err
 2            if (!feed || !feed.length) {
 3              var response = getContext().getResponse()
 4              response.setBody('no docs found....!.')
 5            }
 6            else {
 7              // Do the merge!
 8              var body = {merged: "yes"}
 9              var response = getContext().getResponse()
10              response.setBody(body)
11          }
  1. First check for an error and early return
  2. feed will be the array of results from the queue
  3. responseOptions will contain a continuation token if the query needs to be called again to gather more records
  4. if you have no error, and you have records, so something interesting and set the body of the response appropriately.

Finish the code

At this point you should have simple script that represents a failing test. It should be fast and you can iterate on until yet get the merge right.

You can extend the example to join records based on coupon_name by passing in just user_id.

Completed Script

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
const assert = require('assert')

const { CosmosClient } = require('@azure/cosmos')

const endpoint = 'https://gopuff-cosmos-v1-stage.documents.azure.com:443/'
const key = process.env.cosmos_key
const client = new CosmosClient({ endpoint, key });

const container = client.database('cart').container('coupons')

const mergeSproc = {
    id: "mergeSproc_001",
    body: function (guid) {
      var collection = getContext().getCollection()
      console.log("Sproc called with " + guid)
      var filterQuery = 
      {     
          'query' :  'SELECT * FROM root r where r.id = @id1 or r.id = @id2',
          'parameters' : [{'name':'@id1', 'value':'template-' + guid},
                          {'name':'@id2', 'value':'user-' + guid}]
      }
      var isAccepted = collection.queryDocuments(
          collection.getSelfLink(), 
          filterQuery,
          {}, 
          function (err, feed, options) { if (err) throw err
            if (!feed || !feed.length) {
              var response = getContext().getResponse()
              response.setBody('no docs found....!.')
            }
            else {
              var master = {}
              var child = {}
              feed.forEach(doc => {
                if (doc.master_details) master = doc
                else child = doc
              })
              var body = Object.assign({}, master, child)
              var response = getContext().getResponse()
              response.setBody(body)
          }
      })
      if (!isAccepted) throw new Error('The query was not accepted by the server.')
  }
}

async function createDoc(doc) {
  const result = await container.items.create(doc);
  return result.resource;
}

async function deleteDoc(doc) {
  await container.item(doc.id, doc.user_id).delete();
}

function guid() {
  return ([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(/[018]/g, (a) =>
    (a ^ ((Math.random() * 16) >> (a / 4))).toString(16)
  );
}

function getFixtures(guid) {
  return {
    template:{
      id: "template-" + guid,
      user_id: "test-user",
      coupon_name: "sproc_testing",
      master_details: "I am the template",
      detail: "master detail"
    },
    userDoc: {
      id: "user-" + guid,
      user_id: "test-user",
      coupon_name: "sproc_testing",
      user_details: "I am the child",
      detail: "child detail"
    },
    expected: {
      user_id: "test-user",
      coupon_name: "sproc_testing",
      master_details: "I am the template",
      user_details: "I am the child",
      detail: "child detail"
    }
  }
}
async function createOrUpdateSproc(sproc) {
  try {
    await container.scripts.storedProcedure(sproc.id).replace(sproc)
  } catch (e) {
    if (e.code === 404) {
      console.log('REPLACE failed, try to add ', sproc.id)
      await container.scripts.storedProcedures.create(sproc)
    } else {
      throw(e)
    }
  }
}

async function runSproc(sprocname, partition_id, args) {
    const result = await container
                            .scripts
                            .storedProcedure(sprocname)
                            .execute(partition_id, args, { enableScriptLogging: true })
    console.log("Sproc Log: ", decodeURIComponent(result.headers['x-ms-documentdb-script-log-results']))
    console.log("Sproc RU cost: ", result.headers['x-ms-request-charge'])
    return result.resource
}

async function run() {
  console.log("\n*****\n* Starting test case\n")
  const sproc = mergeSproc
  await createOrUpdateSproc(sproc)

  // setup test
  const id = guid()
  const fixtures = getFixtures(id)

  await createDoc(fixtures.template)
  await createDoc(fixtures.userDoc)

  // execute test
  // user_id is the partition key for this particular collection
  const partitionKey = fixtures.userDoc.user_id
  const res = await runSproc(sproc.id, partitionKey, id)

  // cleanup test
  await deleteDoc(fixtures.template)
  await deleteDoc(fixtures.userDoc)

  // assert satisfaction
  assert.strictEqual(res.detail, fixtures.expected.detail, "detail does not match")
  assert.strictEqual(res.master_details, fixtures.expected.master_details, "master details don't match")
  console.log("\n*Assertions passed")
  return res
}

run().then(console.log).catch(console.error)