Azure Cosmos DBをSQL(document)で作成すれば普通のSQL文でCRUDできるのではないかと思いましたが、どうやらそうではなく使えるのはSELECT、しかもGROUP BYもできない、トランザクションスコープも適用されないという非常に癖のある代物でした。オールドタイプにはなかなか馴染めないですね。
特にトランザクションスコープは面倒で、ストアドプロシージャを経由すればパーティション単位でトランザクションが保証されるというものです。ちなみにストアドプロシージャの実装方法ははAzureポータル上にてJavaScriptにより定義する必要があります。
仕方がないと諦め、ベタ書きでJavaScriptを書いてストアドプロシージャを作成します。テストデータでは”uniqid”でパーティション化されたコレクションを想定しています。
// SAMPLE STORED PROCEDURE // ID「InsertDocuments」としてポータル上から登録します // // 引数 documents : JSON配列形式の文字列 // 戻値 作成したドキュメントの配列 function InsertDocuments(documents) { const collection = getContext().getCollection(); const collLink = collection.getSelfLink(); console.log("documents=" + documents +"\n"); let jsonList; try { jsonList = JSON.parse(documents); } catch(e) { console.log(e); return null; } console.log("length=" + jsonList.length +"\n"); let createdList = []; for(let i=0 ; i<jsonList.length ; i++){ let doc = jsonList[i]; // Query documents and tolist processed documents. let isAccepted = collection.createDocument( collLink, doc, function (err, document) { if (err) { console.log(err); } else { console.log("created " + document.id + "\n"); createdList.push(document); if(i==jsonList.length-1){ getContext().getResponse().setBody(JSON.stringify(createdList)); } } }); if (!isAccepted) throw new Error('The query was not accepted by the server.'); } return; }
次に利用側のコードを書きます。今回、Functions V2(.NET Core)からC#を利用してCosmos DBにアクセスします。
using System; using System.IO; using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.Extensions.Http; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Microsoft.Azure.Documents; using Microsoft.Azure.Documents.Client; using Newtonsoft.Json.Converters; using System.Collections.Generic; namespace FunctionsTest { public static class FuncInsertDocument { private static readonly string EndpointUrl = "<your endpoint URL>"; private static readonly string PrimaryKey = "<your primary key>"; //[Read-only Keys]で問題ない private static readonly string DataBaseId = "<your DataBase ID>"; private static readonly string CollectionId = "<your Collection ID>"; [FunctionName("FuncInsertDocument")] public static async Task<IActionResult> Run( [HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req, ILogger log) { log.LogInformation("C# HTTP trigger function processed a request."); DocumentClient client = new DocumentClient(new Uri(EndpointUrl), PrimaryKey); // 書込データ作成 var partitionKeyValue = "U001"; var dataList = new List<TestData>(); dataList.Add(new TestData{ UniqId = partitionKeyValue, SessionId = "SES001", Time = new DateTimeOffset(new DateTime(2018,1,15,0,0,0,DateTimeKind.Local)) }); // ストアドプロシージャ"InsertDocuments"を実行する Uri spUri = UriFactory.CreateStoredProcedureUri( DataBaseId, CollectionId, "InsertDocuments"); string spParam = JsonConvert.SerializeObject(dataList); RequestOptions options = new RequestOptions() { EnableScriptLogging = true, PartitionKey = new PartitionKey(partitionKeyValue) }; var result = await client.ExecuteStoredProcedureAsync<string>( spUri, options, spParam); // クエリ成功時にはスクリプトログを出力 ActionResult ret = null; if (result.Response != null) { log.LogInformation(result.ScriptLog); ret = new OkObjectResult($"ret=" + result.ScriptLog); } else { ret = new BadRequestObjectResult("Please pass a name on the query string or in the request body"); } return ret; } } public class TestData { [JsonProperty("uniqid")] public string UniqId {get;set;} [JsonProperty("sessionid")] public string SessionId { get; set; } [JsonProperty("time")] [JsonConverter(typeof(MyCustomDateTimeConverter))] public DateTimeOffset Time { get; set; } } public class MyCustomDateTimeConverter : IsoDateTimeConverter { public MyCustomDateTimeConverter() { base.DateTimeFormat = "yyyy-MM-dd'Z'HH:mm:ss'Z'"; } } }
指定したパーティションキー値と投入データの値が一致しない場合はストアドプロシージャの呼び出しは失敗します。よって実際の利用にあたっては投入前にLINQからパーティションキー単位でグループ化を行い、またコレクションに主キー(Primary Keys)設定を行い、ストアドプロシージャ側に主キー重複時の例外処理などを実装する必要があると思います。
なおストアドプロシージャを経由する追加処理は非常に遅く、あくまでもトランザクションを必要とした少量データの更新などにしか使えません。大量データの取込にはBulkExecuterライブラリを利用して一気に追加/更新をやるのが本来望まれる処理のようです(ただしV1限定)。