Azure Cosmos DBにストアドプロシージャからパーティションキーを指定してドキュメントを追加する(C#)

Azure Cosmos DBをSQL(document)で作成すれば普通のSQL文でCRUDできるのではないかと思いましたが、どうやらそうではなく使えるのはSELECT、しかもGROUP BYもできない、トランザクションスコープも適用されないという非常に癖のある代物でした。オールドタイプにはなかなか馴染めないですね。

特にトランザクションスコープは面倒で、ストアドプロシージャを経由すればパーティション単位でトランザクションが保証されるというものです。ちなみにストアドプロシージャの実装方法ははAzureポータル上にてJavaScriptにより定義する必要があります。

仕方がないと諦め、ベタ書きでJavaScriptを書いてストアドプロシージャを作成します。テストデータでは”uniqid”でパーティション化されたコレクションを想定しています。

// SAMPLE STORED PROCEDURE
// ID「InsertDocuments」としてポータル上から登録します
//
// 引数 documents : JSON配列形式の文字列
// 戻値 作成したドキュメントの配列
function InsertDocuments(documents) {
    const collection = getContext().getCollection();
    const collLink = collection.getSelfLink();
    console.log("documents=" + documents +"\n");
    let jsonList;
    try {
        jsonList = JSON.parse(documents);
    } catch(e) {
        console.log(e);
        return null;
    }

    console.log("length=" + jsonList.length +"\n");
    let createdList = [];
    for(let i=0 ; i<jsonList.length ; i++){
        let doc = jsonList[i];

        // Query documents and tolist processed documents.
        let isAccepted = 
            collection.createDocument(
                    collLink, 
                    doc, 
                    function (err, document) {
                if (err) {
                    console.log(err);

                } else {
                    console.log("created " + document.id + "\n");
                    createdList.push(document);
                    if(i==jsonList.length-1){
                        getContext().getResponse().setBody(JSON.stringify(createdList));
                    }
                }
            });
        if (!isAccepted) throw new Error('The query was not accepted by the server.');
    }

    return;
}

次に利用側のコードを書きます。今回、Functions V2(.NET Core)からC#を利用してCosmos DBにアクセスします。

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Newtonsoft.Json.Converters;
using System.Collections.Generic;

namespace FunctionsTest
{
    public static class FuncInsertDocument
    {
        private static readonly string EndpointUrl = "<your endpoint URL>";
        private static readonly string PrimaryKey = "<your primary key>";  //[Read-only Keys]で問題ない
        private static readonly string DataBaseId = "<your DataBase ID>";
        private static readonly string CollectionId = "<your Collection ID>";

        [FunctionName("FuncInsertDocument")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req,
            ILogger log)
        {
            log.LogInformation("C# HTTP trigger function processed a request.");
            DocumentClient client = new DocumentClient(new Uri(EndpointUrl), PrimaryKey);

            // 書込データ作成
            var partitionKeyValue = "U001";
            var dataList = new List<TestData>();
            dataList.Add(new TestData{
                UniqId = partitionKeyValue,
                SessionId = "SES001",
                Time = new DateTimeOffset(new DateTime(2018,1,15,0,0,0,DateTimeKind.Local))
            });

            // ストアドプロシージャ"InsertDocuments"を実行する
            Uri spUri = UriFactory.CreateStoredProcedureUri(
                DataBaseId,
                CollectionId,
                "InsertDocuments");
            string spParam = JsonConvert.SerializeObject(dataList);
            RequestOptions options = new RequestOptions() {
                EnableScriptLogging = true,
                PartitionKey = new PartitionKey(partitionKeyValue)
            };
            var result = await client.ExecuteStoredProcedureAsync<string>(
              spUri,
              options,
              spParam);

            // クエリ成功時にはスクリプトログを出力
            ActionResult ret = null;
            if (result.Response != null) {
                log.LogInformation(result.ScriptLog);
                ret = new OkObjectResult($"ret=" + result.ScriptLog);
            } else {
                ret = new BadRequestObjectResult("Please pass a name on the query string or in the request body");
            }
            return ret;
        }
    }

    public class TestData {
        [JsonProperty("uniqid")]
        public string UniqId {get;set;}

        [JsonProperty("sessionid")]
        public string SessionId { get; set; }

        [JsonProperty("time")]
        [JsonConverter(typeof(MyCustomDateTimeConverter))]
        public DateTimeOffset Time { get; set; }
    }

    public class MyCustomDateTimeConverter : IsoDateTimeConverter {
        public MyCustomDateTimeConverter() {
            base.DateTimeFormat = "yyyy-MM-dd'Z'HH:mm:ss'Z'";
        }
    }
}

指定したパーティションキー値と投入データの値が一致しない場合はストアドプロシージャの呼び出しは失敗します。よって実際の利用にあたっては投入前にLINQからパーティションキー単位でグループ化を行い、またコレクションに主キー(Primary Keys)設定を行い、ストアドプロシージャ側に主キー重複時の例外処理などを実装する必要があると思います。

なおストアドプロシージャを経由する追加処理は非常に遅く、あくまでもトランザクションを必要とした少量データの更新などにしか使えません。大量データの取込にはBulkExecuterライブラリを利用して一気に追加/更新をやるのが本来望まれる処理のようです(ただしV1限定)。

コメントを残す

メールアドレスが公開されることはありません。