Why stored procedures?

The Cosmos SQL is very flexible in returning objects and view projections, but it cannot merge documents via its SQL dialect.

Note:

This could be done with a User Defined Function (UDF) as well.

Use Case

Imagine we have documents in a collection that need to be ‘joined’. In a simple case, we may have a template that is a master record and a user-level record with more details. We need a query that will load both objects and then merge them into one combined document.

┌──────────┐       ┌──────────┐
│ TEMPLATE │-|────<│ USER_DOC │
└──────────┘       └──────────┘

The Code

We will build on the previous but add some simple assertion testing.

To set up our test, we will need 2 new functions, one to add a document and one to remove it.

async function createDoc(doc) {
  const result = await container.items.create(doc);
  return result.resource;
}

async function deleteDoc(doc) {
  await container.item(doc.id, doc.user_id).delete();
}

and for a fixture, let us generate a simplistic guid for an id

function guid() {
  return ([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(/[018]/g, (a) =>
    (a ^ ((Math.random() * 16) >> (a / 4))).toString(16)
  );
}

and a little generator

function getFixtures(guid) {
  return {
    template:{
      id: "template-" + guid,
      coupon_name: "sproc_testing",
      master_details: "I am the template",
      detail: "master detail"
    },
    userDoc: {
      id: "user-" + guid,
      user_id: "test-user",
      coupon_name: "sproc_testing",
      user_details: "I am the child",
      detail: "child detail"
    },
    expected: {
      user_id: "test-user",
      coupon_name: "sproc_testing",
      master_details: "I am the template",
      user_details: "I am the child",
      detail: "child detail"
    }
  }
}

And then modify our primary function to result in a failing test that you can run over and over as we develop our merge sproc. This will fail because we have not uploaded the sproc to Azure yet.

async function run() {
  console.log("\n*****\n* Starting test case\n")
  const sproc = mergeSproc
  await createOrUpdateSproc(sproc)

  // begin test
  const id = 'test' //guid()
  const fixtures = getFixtures(id)

  await createDoc(fixtures.template)
  await createDoc(fixtures.userDoc)

  //user_id is the partition key for this particular collection
  const partitionKey = fixtures.userDoc.user_id
  const res = await runSproc(sproc.id, partitionKey, id)

  await deleteDoc(fixtures.template)
  await deleteDoc(fixtures.userDoc)

  assert.strictEqual(res.detail, fixtures.expected.detail, "detail does not match")
  assert.strictEqual(res.master_details, fixtures.expected.master_details, "master details don't match")
  console.log("\n*Assertions passed")
  return res
}

run().then(console.log).catch(console.error)

All you need now is the stored procedure to do the merge.

const mergeSproc = {
    id: "mergeSproc_001",
    body: function (guid) {
      var collection = getContext().getCollection()
      console.log("Sproc called with " + guid)
      var filterQuery = 
      {     
          'query' :  'SELECT * FROM root r where r.id = @id1 or r.id = @id2',
          'parameters' : [{'name':'@id1', 'value':'template-' + guid},
                          {'name':'@id2', 'value':'user-' + guid}]
      }
      var isAccepted = collection.queryDocuments(
          collection.getSelfLink(), 
          filterQuery,
          {}, 
          function (err, feed, options) { if (err) throw err
            if (!feed || !feed.length) {
              var response = getContext().getResponse()
              response.setBody('no docs found....!.')
            }
            else {
              // Do the merge!
              var body = {merged: "yes"}
              var response = getContext().getResponse()
              response.setBody(body)
          }
      })
      if (!isAccepted) throw new Error('The query was not accepted by the server.')
  }
}

let us break this down:

SQL injection is bad, so lets use parameter substituions:

  var filterQuery = 
  {     
      'query' :  'SELECT * FROM root r where r.id = @id1 or r.id = @id2',
      'parameters' : [{'name':'@id1', 'value':'template-' + guid},
                      {'name':'@id2', 'value':'user-' + guid}]
  }

This is not a model for proper NoSQL Schema, I wanted to keep the SQL simple, for now, to focus on the sproc and get a specific set of documents back.

This object complies with the well documented SqlQuerySpec Interface and will handle the proper formatting of strings vs numbers. The type of the ‘value’ is any JSONValue. boolean | number | string | null | JSONArray | JSONObject, but it doesn’t always do the right thing for complex types.

Start the async query

      var collection = getContext().getCollection()
      var isAccepted = collection.queryDocuments(
          collection.getSelfLink(), 
          filterQuery,
          {}, 
          function (err, feed, options) { if (err) throw err
  1. Get the current collection object from the content
  2. Call queryDocuments, passing in the collection link, SQL, feed options and a callback.
  3. The queryDocuments will return False if the server rejects it. if you enter bad SQL it may also be accepted and then throw an error later.
  4. throw errors result in proper HTTP codes being returned to the caller

Since you are already running in a partition, you don’t need to reference the partition key in the filter query or request options.

:::tip pro tip This seems to be similar to the V1 DocumentDB Syntax but you should read the current documentation :::

The callback

          function (err, feed, responseOptions) { if (err) throw err
            if (!feed || !feed.length) {
              var response = getContext().getResponse()
              response.setBody('no docs found....!.')
            }
            else {
              // Do the merge!
              var body = {merged: "yes"}
              var response = getContext().getResponse()
              response.setBody(body)
          }
  1. First check for an error and early return
  2. feed will be the array of results from the queue
  3. responseOptions will contain a continuation token if the query needs to be called again to gather more records
  4. if you have no error, and you have records, so something interesting and set the body of the response appropriately.

Finish the code

At this point you should have simple script that represents a failing test. It should be fast and you can iterate on until yet get the merge right.

You can extend the example to join records based on coupon_name by passing in just user_id.

Completed Script

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
const assert = require('assert')

const { CosmosClient } = require('@azure/cosmos')

const endpoint = 'https://xxxxxxxxxxxxxx.documents.azure.com:443/'
const key = process.env.cosmos_key
const client = new CosmosClient({ endpoint, key });

const container = client.database('dbname').container('coupons')

const mergeSproc = {
    id: "mergeSproc_001",
    body: function (guid) {
      var collection = getContext().getCollection()
      console.log("Sproc called with " + guid)
      var filterQuery = 
      {     
          'query' :  'SELECT * FROM root r where r.id = @id1 or r.id = @id2',
          'parameters' : [{'name':'@id1', 'value':'template-' + guid},
                          {'name':'@id2', 'value':'user-' + guid}]
      }
      var isAccepted = collection.queryDocuments(
          collection.getSelfLink(), 
          filterQuery,
          {}, 
          function (err, feed, options) { if (err) throw err
            if (!feed || !feed.length) {
              var response = getContext().getResponse()
              response.setBody('no docs found....!.')
            }
            else {
              var master = {}
              var child = {}
              feed.forEach(doc => {
                if (doc.master_details) master = doc
                else child = doc
              })
              var body = Object.assign({}, master, child)
              var response = getContext().getResponse()
              response.setBody(body)
          }
      })
      if (!isAccepted) throw new Error('The query was not accepted by the server.')
  }
}

async function createDoc(doc) {
  const result = await container.items.create(doc);
  return result.resource;
}

async function deleteDoc(doc) {
  await container.item(doc.id, doc.user_id).delete();
}

function guid() {
  return ([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(/[018]/g, (a) =>
    (a ^ ((Math.random() * 16) >> (a / 4))).toString(16)
  );
}

function getFixtures(guid) {
  return {
    template:{
      id: "template-" + guid,
      user_id: "test-user",
      coupon_name: "sproc_testing",
      master_details: "I am the template",
      detail: "master detail"
    },
    userDoc: {
      id: "user-" + guid,
      user_id: "test-user",
      coupon_name: "sproc_testing",
      user_details: "I am the child",
      detail: "child detail"
    },
    expected: {
      user_id: "test-user",
      coupon_name: "sproc_testing",
      master_details: "I am the template",
      user_details: "I am the child",
      detail: "child detail"
    }
  }
}
async function createOrUpdateSproc(sproc) {
  try {
    await container.scripts.storedProcedure(sproc.id).replace(sproc)
  } catch (e) {
    if (e.code === 404) {
      console.log('REPLACE failed, try to add ', sproc.id)
      await container.scripts.storedProcedures.create(sproc)
    } else {
      throw(e)
    }
  }
}

async function runSproc(sprocname, partition_id, args) {
    const result = await container
                            .scripts
                            .storedProcedure(sprocname)
                            .execute(partition_id, args, { enableScriptLogging: true })
    console.log("Sproc Log: ", decodeURIComponent(result.headers['x-ms-documentdb-script-log-results']))
    console.log("Sproc RU cost: ", result.headers['x-ms-request-charge'])
    return result.resource
}

async function run() {
  console.log("\n*****\n* Starting test case\n")
  const sproc = mergeSproc
  await createOrUpdateSproc(sproc)

  // setup test
  const id = guid()
  const fixtures = getFixtures(id)

  await createDoc(fixtures.template)
  await createDoc(fixtures.userDoc)

  // execute test
  // user_id is the partition key for this particular collection
  const partitionKey = fixtures.userDoc.user_id
  const res = await runSproc(sproc.id, partitionKey, id)

  // cleanup test
  await deleteDoc(fixtures.template)
  await deleteDoc(fixtures.userDoc)

  // assert satisfaction
  assert.strictEqual(res.detail, fixtures.expected.detail, "detail does not match")
  assert.strictEqual(res.master_details, fixtures.expected.master_details, "master details don't match")
  console.log("\n*Assertions passed")
  return res
}

run().then(console.log).catch(console.error)