Azure Cosmos DBをSQL(document)で作成すれば普通のSQL文でCRUDできるのではないかと思いましたが、どうやらそうではなく使えるのはSELECT、しかもGROUP BYもできない、トランザクションスコープも適用されないという非常に癖のある代物でした。オールドタイプにはなかなか馴染めないですね。
特にトランザクションスコープは面倒で、ストアドプロシージャを経由すればパーティション単位でトランザクションが保証されるというものです。ちなみにストアドプロシージャの実装方法ははAzureポータル上にてJavaScriptにより定義する必要があります。
仕方がないと諦め、ベタ書きでJavaScriptを書いてストアドプロシージャを作成します。テストデータでは”uniqid”でパーティション化されたコレクションを想定しています。
// SAMPLE STORED PROCEDURE
// ID「InsertDocuments」としてポータル上から登録します
//
// 引数 documents : JSON配列形式の文字列
// 戻値 作成したドキュメントの配列
function InsertDocuments(documents) {
const collection = getContext().getCollection();
const collLink = collection.getSelfLink();
console.log("documents=" + documents +"\n");
let jsonList;
try {
jsonList = JSON.parse(documents);
} catch(e) {
console.log(e);
return null;
}
console.log("length=" + jsonList.length +"\n");
let createdList = [];
for(let i=0 ; i<jsonList.length ; i++){
let doc = jsonList[i];
// Query documents and tolist processed documents.
let isAccepted =
collection.createDocument(
collLink,
doc,
function (err, document) {
if (err) {
console.log(err);
} else {
console.log("created " + document.id + "\n");
createdList.push(document);
if(i==jsonList.length-1){
getContext().getResponse().setBody(JSON.stringify(createdList));
}
}
});
if (!isAccepted) throw new Error('The query was not accepted by the server.');
}
return;
}
次に利用側のコードを書きます。今回、Functions V2(.NET Core)からC#を利用してCosmos DBにアクセスします。
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Newtonsoft.Json.Converters;
using System.Collections.Generic;
namespace FunctionsTest
{
public static class FuncInsertDocument
{
private static readonly string EndpointUrl = "<your endpoint URL>";
private static readonly string PrimaryKey = "<your primary key>"; //[Read-only Keys]で問題ない
private static readonly string DataBaseId = "<your DataBase ID>";
private static readonly string CollectionId = "<your Collection ID>";
[FunctionName("FuncInsertDocument")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req,
ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");
DocumentClient client = new DocumentClient(new Uri(EndpointUrl), PrimaryKey);
// 書込データ作成
var partitionKeyValue = "U001";
var dataList = new List<TestData>();
dataList.Add(new TestData{
UniqId = partitionKeyValue,
SessionId = "SES001",
Time = new DateTimeOffset(new DateTime(2018,1,15,0,0,0,DateTimeKind.Local))
});
// ストアドプロシージャ"InsertDocuments"を実行する
Uri spUri = UriFactory.CreateStoredProcedureUri(
DataBaseId,
CollectionId,
"InsertDocuments");
string spParam = JsonConvert.SerializeObject(dataList);
RequestOptions options = new RequestOptions() {
EnableScriptLogging = true,
PartitionKey = new PartitionKey(partitionKeyValue)
};
var result = await client.ExecuteStoredProcedureAsync<string>(
spUri,
options,
spParam);
// クエリ成功時にはスクリプトログを出力
ActionResult ret = null;
if (result.Response != null) {
log.LogInformation(result.ScriptLog);
ret = new OkObjectResult($"ret=" + result.ScriptLog);
} else {
ret = new BadRequestObjectResult("Please pass a name on the query string or in the request body");
}
return ret;
}
}
public class TestData {
[JsonProperty("uniqid")]
public string UniqId {get;set;}
[JsonProperty("sessionid")]
public string SessionId { get; set; }
[JsonProperty("time")]
[JsonConverter(typeof(MyCustomDateTimeConverter))]
public DateTimeOffset Time { get; set; }
}
public class MyCustomDateTimeConverter : IsoDateTimeConverter {
public MyCustomDateTimeConverter() {
base.DateTimeFormat = "yyyy-MM-dd'Z'HH:mm:ss'Z'";
}
}
}
指定したパーティションキー値と投入データの値が一致しない場合はストアドプロシージャの呼び出しは失敗します。よって実際の利用にあたっては投入前にLINQからパーティションキー単位でグループ化を行い、またコレクションに主キー(Primary Keys)設定を行い、ストアドプロシージャ側に主キー重複時の例外処理などを実装する必要があると思います。
なおストアドプロシージャを経由する追加処理は非常に遅く、あくまでもトランザクションを必要とした少量データの更新などにしか使えません。大量データの取込にはBulkExecuterライブラリを利用して一気に追加/更新をやるのが本来望まれる処理のようです(ただしV1限定)。