Untitled diff

Created Diff never expires
12 removals
Lines
Total
Removed
Words
Total
Removed
To continue using this feature, upgrade to
Diffchecker logo
Diffchecker Pro
402 lines
3 additions
Lines
Total
Added
Words
Total
Added
To continue using this feature, upgrade to
Diffchecker logo
Diffchecker Pro
393 lines
// Copyright (c) Microsoft Corporation. All rights reserved.
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
// Licensed under the MIT License.


using System;
using System;
using System.Collections.Generic;
using System.Collections.Generic;
using System.Linq;
using System.Linq;
using System.Threading;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.Documents.Linq;
using Microsoft.Azure.Documents.Linq;
using Newtonsoft.Json;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json.Serialization;
using Newtonsoft.Json.Serialization;


namespace Microsoft.Bot.Builder.Azure
namespace Microsoft.Bot.Builder.Azure
{
{
/// <summary>
/// <summary>
/// Implements an CosmosDB based storage provider for a bot.
/// Implements an CosmosDB based storage provider for a bot.
/// </summary>
/// </summary>
public class CosmosDbStorage : IStorage
public class CosmosDbStorage : IStorage
{
{
// When setting up the database, calls are made to CosmosDB. If multiple calls are made, we'll end up setting the
// When setting up the database, calls are made to CosmosDB. If multiple calls are made, we'll end up setting the
// collectionLink member variable more than once. The semaphore is for making sure the initialization of the
// collectionLink member variable more than once. The semaphore is for making sure the initialization of the
// database is done only once.
// database is done only once.
private static SemaphoreSlim _semaphore = new SemaphoreSlim(1);
private static SemaphoreSlim _semaphore = new SemaphoreSlim(1);


private readonly JsonSerializer _jsonSerializer = JsonSerializer.Create(new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });
private readonly JsonSerializer _jsonSerializer = JsonSerializer.Create(new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });


private readonly string _databaseId;
private readonly string _databaseId;
private readonly string _partitionKey;
private readonly string _partitionKey;
private readonly string _collectionId;
private readonly string _collectionId;
private readonly RequestOptions _documentCollectionCreationRequestOptions = null;
private readonly RequestOptions _documentCollectionCreationRequestOptions = null;
private readonly RequestOptions _databaseCreationRequestOptions = null;
private readonly RequestOptions _databaseCreationRequestOptions = null;
private readonly IDocumentClient _client;
private readonly IDocumentClient _client;
private string _collectionLink = null;
private string _collectionLink = null;


/// <summary>
/// <summary>
/// Initializes a new instance of the <see cref="CosmosDbStorage"/> class.
/// Initializes a new instance of the <see cref="CosmosDbStorage"/> class.
/// using the provided CosmosDB credentials, database ID, and collection ID.
/// using the provided CosmosDB credentials, database ID, and collection ID.
/// </summary>
/// </summary>
/// <param name="cosmosDbStorageOptions">Cosmos DB storage configuration options.</param>
/// <param name="cosmosDbStorageOptions">Cosmos DB storage configuration options.</param>
public CosmosDbStorage(CosmosDbStorageOptions cosmosDbStorageOptions)
public CosmosDbStorage(CosmosDbStorageOptions cosmosDbStorageOptions)
{
{
if (cosmosDbStorageOptions == null)
if (cosmosDbStorageOptions == null)
{
{
throw new ArgumentNullException(nameof(cosmosDbStorageOptions));
throw new ArgumentNullException(nameof(cosmosDbStorageOptions));
}
}


if (cosmosDbStorageOptions.CosmosDBEndpoint == null)
if (cosmosDbStorageOptions.CosmosDBEndpoint == null)
{
{
throw new ArgumentNullException(nameof(cosmosDbStorageOptions.CosmosDBEndpoint), "Service EndPoint for CosmosDB is required.");
throw new ArgumentNullException(nameof(cosmosDbStorageOptions.CosmosDBEndpoint), "Service EndPoint for CosmosDB is required.");
}
}


if (string.IsNullOrEmpty(cosmosDbStorageOptions.AuthKey))
if (string.IsNullOrEmpty(cosmosDbStorageOptions.AuthKey))
{
{
throw new ArgumentException("AuthKey for CosmosDB is required.", nameof(cosmosDbStorageOptions.AuthKey));
throw new ArgumentException("AuthKey for CosmosDB is required.", nameof(cosmosDbStorageOptions.AuthKey));
}
}


if (string.IsNullOrEmpty(cosmosDbStorageOptions.DatabaseId))
if (string.IsNullOrEmpty(cosmosDbStorageOptions.DatabaseId))
{
{
throw new ArgumentException("DatabaseId is required.", nameof(cosmosDbStorageOptions.DatabaseId));
throw new ArgumentException("DatabaseId is required.", nameof(cosmosDbStorageOptions.DatabaseId));
}
}


if (string.IsNullOrEmpty(cosmosDbStorageOptions.CollectionId))
if (string.IsNullOrEmpty(cosmosDbStorageOptions.CollectionId))
{
{
throw new ArgumentException("CollectionId is required.", nameof(cosmosDbStorageOptions.CollectionId));
throw new ArgumentException("CollectionId is required.", nameof(cosmosDbStorageOptions.CollectionId));
}
}


_databaseId = cosmosDbStorageOptions.DatabaseId;
_databaseId = cosmosDbStorageOptions.DatabaseId;
_collectionId = cosmosDbStorageOptions.CollectionId;
_collectionId = cosmosDbStorageOptions.CollectionId;
_partitionKey = cosmosDbStorageOptions.PartitionKey;
_partitionKey = cosmosDbStorageOptions.PartitionKey;
_documentCollectionCreationRequestOptions = cosmosDbStorageOptions.DocumentCollectionRequestOptions;
_documentCollectionCreationRequestOptions = cosmosDbStorageOptions.DocumentCollectionRequestOptions;
_databaseCreationRequestOptions = cosmosDbStorageOptions.DatabaseCreationRequestOptions;
_databaseCreationRequestOptions = cosmosDbStorageOptions.DatabaseCreationRequestOptions;


// Inject BotBuilder version to CosmosDB Requests
// Inject BotBuilder version to CosmosDB Requests
var version = GetType().Assembly.GetName().Version;
var version = GetType().Assembly.GetName().Version;
var connectionPolicy = new ConnectionPolicy { UserAgentSuffix = $"Microsoft-BotFramework {version}" };
var connectionPolicy = new ConnectionPolicy { UserAgentSuffix = $"Microsoft-BotFramework {version}" };


// Invoke CollectionPolicy delegate to further customize settings
// Invoke CollectionPolicy delegate to further customize settings
cosmosDbStorageOptions.ConnectionPolicyConfigurator?.Invoke(connectionPolicy);
cosmosDbStorageOptions.ConnectionPolicyConfigurator?.Invoke(connectionPolicy);
_client = new DocumentClient(cosmosDbStorageOptions.CosmosDBEndpoint, cosmosDbStorageOptions.AuthKey, connectionPolicy);
_client = new DocumentClient(cosmosDbStorageOptions.CosmosDBEndpoint, cosmosDbStorageOptions.AuthKey, connectionPolicy);
}
}


/// <summary>
/// <summary>
/// Initializes a new instance of the <see cref="CosmosDbStorage"/> class.
/// Initializes a new instance of the <see cref="CosmosDbStorage"/> class.
/// using the provided CosmosDB credentials, database ID, and collection ID.
/// using the provided CosmosDB credentials, database ID, and collection ID.
/// </summary>
/// </summary>
/// <param name="cosmosDbStorageOptions">Cosmos DB storage configuration options.</param>
/// <param name="cosmosDbStorageOptions">Cosmos DB storage configuration options.</param>
/// <param name="jsonSerializer">If passing in a custom JsonSerializer, we recommend the following settings:
/// <param name="jsonSerializer">If passing in a custom JsonSerializer, we recommend the following settings:
/// <para>jsonSerializer.TypeNameHandling = TypeNameHandling.All.</para>
/// <para>jsonSerializer.TypeNameHandling = TypeNameHandling.All.</para>
/// <para>jsonSerializer.NullValueHandling = NullValueHandling.Include.</para>
/// <para>jsonSerializer.NullValueHandling = NullValueHandling.Include.</para>
/// <para>jsonSerializer.ContractResolver = new DefaultContractResolver().</para>
/// <para>jsonSerializer.ContractResolver = new DefaultContractResolver().</para>
/// </param>
/// </param>
public CosmosDbStorage(CosmosDbStorageOptions cosmosDbStorageOptions, JsonSerializer jsonSerializer)
public CosmosDbStorage(CosmosDbStorageOptions cosmosDbStorageOptions, JsonSerializer jsonSerializer)
: this(cosmosDbStorageOptions)
: this(cosmosDbStorageOptions)
{
{
if (jsonSerializer == null)
if (jsonSerializer == null)
{
{
throw new ArgumentNullException(nameof(jsonSerializer));
throw new ArgumentNullException(nameof(jsonSerializer));
}
}


_jsonSerializer = jsonSerializer;
_jsonSerializer = jsonSerializer;
}
}


/// <summary>
/// <summary>
/// Initializes a new instance of the <see cref="CosmosDbStorage"/> class.
/// Initializes a new instance of the <see cref="CosmosDbStorage"/> class.
/// This constructor should only be used if the default behavior of the DocumentClient needs to be changed.
/// This constructor should only be used if the default behavior of the DocumentClient needs to be changed.
/// The <see cref="CosmosDbStorage(CosmosDbStorageOptions)"/> constructor is preferer for most cases.
/// The <see cref="CosmosDbStorage(CosmosDbStorageOptions)"/> constructor is preferer for most cases.
/// </summary>
/// </summary>
/// <param name="documentClient">The custom implementation of IDocumentClient.</param>
/// <param name="documentClient">The custom implementation of IDocumentClient.</param>
/// <param name="cosmosDbCustomClientOptions">Custom client configuration options.</param>
/// <param name="cosmosDbCustomClientOptions">Custom client configuration options.</param>
public CosmosDbStorage(IDocumentClient documentClient, CosmosDbCustomClientOptions cosmosDbCustomClientOptions)
public CosmosDbStorage(IDocumentClient documentClient, CosmosDbCustomClientOptions cosmosDbCustomClientOptions)
{
{
if (cosmosDbCustomClientOptions == null)
if (cosmosDbCustomClientOptions == null)
{
{
throw new ArgumentNullException(nameof(cosmosDbCustomClientOptions));
throw new ArgumentNullException(nameof(cosmosDbCustomClientOptions));
}
}


if (string.IsNullOrEmpty(cosmosDbCustomClientOptions.DatabaseId))
if (string.IsNullOrEmpty(cosmosDbCustomClientOptions.DatabaseId))
{
{
throw new ArgumentException("DatabaseId is required.", nameof(cosmosDbCustomClientOptions.DatabaseId));
throw new ArgumentException("DatabaseId is required.", nameof(cosmosDbCustomClientOptions.DatabaseId));
}
}


if (string.IsNullOrEmpty(cosmosDbCustomClientOptions.CollectionId))
if (string.IsNullOrEmpty(cosmosDbCustomClientOptions.CollectionId))
{
{
throw new ArgumentException("CollectionId is required.", nameof(cosmosDbCustomClientOptions.CollectionId));
throw new ArgumentException("CollectionId is required.", nameof(cosmosDbCustomClientOptions.CollectionId));
}
}


_client = documentClient ?? throw new ArgumentNullException(nameof(documentClient), "An implementation of IDocumentClient for CosmosDB is required.");
_client = documentClient ?? throw new ArgumentNullException(nameof(documentClient), "An implementation of IDocumentClient for CosmosDB is required.");
_databaseId = cosmosDbCustomClientOptions.DatabaseId;
_databaseId = cosmosDbCustomClientOptions.DatabaseId;
_collectionId = cosmosDbCustomClientOptions.CollectionId;
_collectionId = cosmosDbCustomClientOptions.CollectionId;
_documentCollectionCreationRequestOptions = cosmosDbCustomClientOptions.DocumentCollectionRequestOptions;
_documentCollectionCreationRequestOptions = cosmosDbCustomClientOptions.DocumentCollectionRequestOptions;
_databaseCreationRequestOptions = cosmosDbCustomClientOptions.DatabaseCreationRequestOptions;
_databaseCreationRequestOptions = cosmosDbCustomClientOptions.DatabaseCreationRequestOptions;


// Inject BotBuilder version to CosmosDB Requests
// Inject BotBuilder version to CosmosDB Requests
var version = GetType().Assembly.GetName().Version;
var version = GetType().Assembly.GetName().Version;
_client.ConnectionPolicy.UserAgentSuffix = $"Microsoft-BotFramework {version}";
_client.ConnectionPolicy.UserAgentSuffix = $"Microsoft-BotFramework {version}";
}
}


[Obsolete("Replaced by CosmosDBKeyEscape.EscapeKey.")]
[Obsolete("Replaced by CosmosDBKeyEscape.EscapeKey.")]
public static string SanitizeKey(string key) => CosmosDbKeyEscape.EscapeKey(key);
public static string SanitizeKey(string key) => CosmosDbKeyEscape.EscapeKey(key);


/// <summary>
/// <summary>
/// Deletes storage items from storage.
/// Deletes storage items from storage.
/// </summary>
/// </summary>
/// <param name="keys">keys of the <see cref="IStoreItem"/> objects to remove from the store.</param>
/// <param name="keys">keys of the <see cref="IStoreItem"/> objects to remove from the store.</param>
/// <param name="cancellationToken">A cancellation token that can be used by other objects
/// <param name="cancellationToken">A cancellation token that can be used by other objects
/// or threads to receive notice of cancellation.</param>
/// or threads to receive notice of cancellation.</param>
/// <returns>A task that represents the work queued to execute.</returns>
/// <returns>A task that represents the work queued to execute.</returns>
/// <seealso cref="ReadAsync(string[], CancellationToken)"/>
/// <seealso cref="ReadAsync(string[], CancellationToken)"/>
/// <seealso cref="WriteAsync(IDictionary{string, object}, CancellationToken)"/>
/// <seealso cref="WriteAsync(IDictionary{string, object}, CancellationToken)"/>
public async Task DeleteAsync(string[] keys, CancellationToken cancellationToken)
public async Task DeleteAsync(string[] keys, CancellationToken cancellationToken)
{
{
RequestOptions options = null;
RequestOptions options = null;


if (keys == null)
if (keys == null)
{
{
throw new ArgumentNullException(nameof(keys));
throw new ArgumentNullException(nameof(keys));
}
}


if (keys.Length == 0)
if (keys.Length == 0)
{
{
return;
return;
}
}


// Ensure Initialization has been run
// Ensure Initialization has been run
await InitializeAsync().ConfigureAwait(false);
await InitializeAsync().ConfigureAwait(false);


if (!string.IsNullOrEmpty(this._partitionKey))
{
options = new RequestOptions() { PartitionKey = new PartitionKey(this._partitionKey) };
}

// Parallelize deletion
// Parallelize deletion
var tasks = keys.Select(key =>
var tasks = keys.Select(key =>
_client.DeleteDocumentAsync(
_client.DeleteDocumentAsync(
UriFactory.CreateDocumentUri(
UriFactory.CreateDocumentUri(
_databaseId,
_databaseId,
_collectionId,
_collectionId,
CosmosDbKeyEscape.EscapeKey(key)),
CosmosDbKeyEscape.EscapeKey(key)),
options,
new RequestOptions() { PartitionKey = new PartitionKey(key) },
cancellationToken: cancellationToken));
cancellationToken: cancellationToken));


// await to deletion tasks to complete
// await to deletion tasks to complete
await Task.WhenAll(tasks).ConfigureAwait(false);
await Task.WhenAll(tasks).ConfigureAwait(false);
}
}


/// <summary>
/// <summary>
/// Reads storage items from storage.
/// Reads storage items from storage.
/// </summary>
/// </summary>
/// <param name="keys">keys of the <see cref="IStoreItem"/> objects to read from the store.</param>
/// <param name="keys">keys of the <see cref="IStoreItem"/> objects to read from the store.</param>
/// <param name="cancellationToken">A cancellation token that can be used by other objects
/// <param name="cancellationToken">A cancellation token that can be used by other objects
/// or threads to receive notice of cancellation.</param>
/// or threads to receive notice of cancellation.</param>
/// <returns>A task that represents the work queued to execute.</returns>
/// <returns>A task that represents the work queued to execute.</returns>
/// <remarks>If the activities are successfully sent, the task result contains
/// <remarks>If the activities are successfully sent, the task result contains
/// the items read, indexed by key.</remarks>
/// the items read, indexed by key.</remarks>
/// <seealso cref="DeleteAsync(string[], CancellationToken)"/>
/// <seealso cref="DeleteAsync(string[], CancellationToken)"/>
/// <seealso cref="WriteAsync(IDictionary{string, object}, CancellationToken)"/>
/// <seealso cref="WriteAsync(IDictionary{string, object}, CancellationToken)"/>
public async Task<IDictionary<string, object>> ReadAsync(string[] keys, CancellationToken cancellationToken)
public async Task<IDictionary<string, object>> ReadAsync(string[] keys, CancellationToken cancellationToken)
{
{
FeedOptions options = null;
FeedOptions options = null;


if (keys == null)
if (keys == null)
{
{
throw new ArgumentNullException(nameof(keys));
throw new ArgumentNullException(nameof(keys));
}
}


if (keys.Length == 0)
if (keys.Length == 0)
{
{
// No keys passed in, no result to return.
// No keys passed in, no result to return.
return new Dictionary<string, object>();
return new Dictionary<string, object>();
}
}


// Ensure Initialization has been run
// Ensure Initialization has been run
await InitializeAsync().ConfigureAwait(false);
await InitializeAsync().ConfigureAwait(false);


if (!string.IsNullOrEmpty(this._partitionKey))
{
options = new FeedOptions() { PartitionKey = new PartitionKey(this._partitionKey) };
}

var storeItems = new Dictionary<string, object>(keys.Length);
var storeItems = new Dictionary<string, object>(keys.Length);


var parameterSequence = string.Join(",", Enumerable.Range(0, keys.Length).Select(i => $"@id{i}"));
var parameterSequence = string.Join(",", Enumerable.Range(0, keys.Length).Select(i => $"@id{i}"));
var parameterValues = keys.Select((key, ix) => new SqlParameter($"@id{ix}", CosmosDbKeyEscape.EscapeKey(key)));
var parameterValues = keys.Select((key, ix) => new SqlParameter($"@id{ix}", CosmosDbKeyEscape.EscapeKey(key)));
var querySpec = new SqlQuerySpec
var querySpec = new SqlQuerySpec
{
{
QueryText = $"SELECT c.id, c.realId, c.document, c._etag FROM c WHERE c.id in ({parameterSequence})",
QueryText = $"SELECT c.id, c.realId, c.document, c._etag FROM c WHERE c.id in ({parameterSequence})",
Parameters = new SqlParameterCollection(parameterValues),
Parameters = new SqlParameterCollection(parameterValues),
};
};


var query = _client.CreateDocumentQuery<DocumentStoreItem>(_collectionLink, querySpec, options).AsDocumentQuery();
var query = _client.CreateDocumentQuery<DocumentStoreItem>(_collectionLink, querySpec, new FeedOptions() { PartitionKey = new PartitionKey(keys[0]) }).AsDocumentQuery();
while (query.HasMoreResults)
while (query.HasMoreResults)
{
{
foreach (var doc in await query.ExecuteNextAsync<DocumentStoreItem>(cancellationToken).ConfigureAwait(false))
foreach (var doc in await query.ExecuteNextAsync<DocumentStoreItem>(cancellationToken).ConfigureAwait(false))
{
{
var item = doc.Document.ToObject(typeof(object), _jsonSerializer);
var item = doc.Document.ToObject(typeof(object), _jsonSerializer);
if (item is IStoreItem storeItem)
if (item is IStoreItem storeItem)
{
{
storeItem.ETag = doc.ETag;
storeItem.ETag = doc.ETag;
}
}


// doc.Id cannot be used since it is escaped, read it from RealId property instead
// doc.Id cannot be used since it is escaped, read it from RealId property instead
storeItems.Add(doc.ReadlId, item);
storeItems.Add(doc.ReadlId, item);
}
}
}
}


return storeItems;
return storeItems;
}
}


/// <summary>
/// <summary>
/// Writes storage items to storage.
/// Writes storage items to storage.
/// </summary>
/// </summary>
/// <param name="changes">The items to write to storage, indexed by key.</param>
/// <param name="changes">The items to write to storage, indexed by key.</param>
/// <param name="cancellationToken">A cancellation token that can be used by other objects
/// <param name="cancellationToken">A cancellation token that can be used by other objects
/// or threads to receive notice of cancellation.</param>
/// or threads to receive notice of cancellation.</param>
/// <returns>A task that represents the work queued to execute.</returns>
/// <returns>A task that represents the work queued to execute.</returns>
/// <seealso cref="DeleteAsync(string[], CancellationToken)"/>
/// <seealso cref="DeleteAsync(string[], CancellationToken)"/>
/// <seealso cref="ReadAsync(string[], CancellationToken)"/>
/// <seealso cref="ReadAsync(string[], CancellationToken)"/>
public async Task WriteAsync(IDictionary<string, object> changes, CancellationToken cancellationToken)
public async Task WriteAsync(IDictionary<string, object> changes, CancellationToken cancellationToken)
{
{
if (changes == null)
if (changes == null)
{
{
throw new ArgumentNullException(nameof(changes));
throw new ArgumentNullException(nameof(changes));
}
}


if (changes.Count == 0)
if (changes.Count == 0)
{
{
return;
return;
}
}


// Ensure Initialization has been run
// Ensure Initialization has been run
await InitializeAsync().ConfigureAwait(false);
await InitializeAsync().ConfigureAwait(false);


foreach (var change in changes)
foreach (var change in changes)
{
{
var json = JObject.FromObject(change.Value, _jsonSerializer);
var json = JObject.FromObject(change.Value, _jsonSerializer);


// Remove etag from JSON object that was copied from IStoreItem.
// Remove etag from JSON object that was copied from IStoreItem.
// The ETag information is updated as an _etag attribute in the document metadata.
// The ETag information is updated as an _etag attribute in the document metadata.
json.Remove("eTag");
json.Remove("eTag");


var documentChange = new DocumentStoreItem
var documentChange = new DocumentStoreItem
{
{
Id = CosmosDbKeyEscape.EscapeKey(change.Key),
Id = CosmosDbKeyEscape.EscapeKey(change.Key),
ReadlId = change.Key,
ReadlId = change.Key,
Document = json,
Document = json,
};
};


var etag = (change.Value as IStoreItem)?.ETag;
var etag = (change.Value as IStoreItem)?.ETag;
if (etag == null || etag == "*")
if (etag == null || etag == "*")
{
{
// if new item or * then insert or replace unconditionaly
// if new item or * then insert or replace unconditionaly
await _client.UpsertDocumentAsync(
await _client.UpsertDocumentAsync(
_collectionLink,
_collectionLink,
documentChange,
documentChange,
disableAutomaticIdGeneration: true,
disableAutomaticIdGeneration: true,
cancellationToken: cancellationToken).ConfigureAwait(false);
cancellationToken: cancellationToken).ConfigureAwait(false);
}
}
else if (etag.Length > 0)
else if (etag.Length > 0)
{
{
// if we have an etag, do opt. concurrency replace
// if we have an etag, do opt. concurrency replace
var uri = UriFactory.CreateDocumentUri(_databaseId, _collectionId, documentChange.Id);
var uri = UriFactory.CreateDocumentUri(_databaseId, _collectionId, documentChange.Id);
var ac = new AccessCondition { Condition = etag, Type = AccessConditionType.IfMatch };
var ac = new AccessCondition { Condition = etag, Type = AccessConditionType.IfMatch };
await _client.ReplaceDocumentAsync(
await _client.ReplaceDocumentAsync(
uri,
uri,
documentChange,
documentChange,
new RequestOptions { AccessCondition = ac },
new RequestOptions { AccessCondition = ac },
cancellationToken: cancellationToken).ConfigureAwait(false);
cancellationToken: cancellationToken).ConfigureAwait(false);
}
}
else
else
{
{
throw new Exception("etag empty");
throw new Exception("etag empty");
}
}
}
}
}
}


/// <summary>
/// <summary>
/// Creates the CosmosDB Database and populates the _collectionLink member variable.
/// Creates the CosmosDB Database and populates the _collectionLink member variable.
/// </summary>
/// </summary>
/// <remarks>
/// <remarks>
/// This method is idempotent, and thread safe.
/// This method is idempotent, and thread safe.
/// </remarks>
/// </remarks>
private async Task InitializeAsync()
private async Task InitializeAsync()
{
{
// In the steady-state case, we'll already have a connection string to the
// In the steady-state case, we'll already have a connection string to the
// database setup. If so, no need to enter locks or call CosmosDB to get one.
// database setup. If so, no need to enter locks or call CosmosDB to get one.
if (_collectionLink == null)
if (_collectionLink == null)
{
{
// We don't (probably) have a database link yet. Enter the lock,
// We don't (probably) have a database link yet. Enter the lock,
// then check again (aka: Double-Check Lock pattern).
// then check again (aka: Double-Check Lock pattern).
await _semaphore.WaitAsync().ConfigureAwait(false);
await _semaphore.WaitAsync().ConfigureAwait(false);
try
try
{
{
if (_collectionLink == null)
if (_collectionLink == null)
{
{
// We don't have a database link. Create one. Note that we're inside a semaphore at this point
// We don't have a database link. Create one. Note that we're inside a semaphore at this point
// so other threads may be blocked on us.
// so other threads may be blocked on us.
await _client.CreateDatabaseIfNotExistsAsync(
await _client.CreateDatabaseIfNotExistsAsync(
new Database { Id = _databaseId },
new Database { Id = _databaseId },
_databaseCreationRequestOptions) // pass in any user set database creation flags
_databaseCreationRequestOptions) // pass in any user set database creation flags
.ConfigureAwait(false);
.ConfigureAwait(false);


var documentCollection = new DocumentCollection
var documentCollection = new DocumentCollection
{
{
Id = _collectionId,
Id = _collectionId,
};
};


var response = await _client.CreateDocumentCollectionIfNotExistsAsync(
var response = await _client.CreateDocumentCollectionIfNotExistsAsync(
UriFactory.CreateDatabaseUri(_databaseId),
UriFactory.CreateDatabaseUri(_databaseId),
documentCollection,
documentCollection,
_documentCollectionCreationRequestOptions) // pass in any user set collection creation flags
_documentCollectionCreationRequestOptions) // pass in any user set collection creation flags
.ConfigureAwait(false);
.ConfigureAwait(false);


_collectionLink = response.Resource.SelfLink;
_collectionLink = response.Resource.SelfLink;
}
}
}
}
finally
finally
{
{
_semaphore.Release();
_semaphore.Release();
}
}
}
}
}
}


/// <summary>
/// <summary>
/// Internal data structure for storing items in a CosmosDB Collection.
/// Internal data structure for storing items in a CosmosDB Collection.
/// </summary>
/// </summary>
private class DocumentStoreItem
private class DocumentStoreItem
{
{
/// <summary>
/// <summary>
/// Gets or sets the sanitized Id/Key used as PrimaryKey.
/// Gets or sets the sanitized Id/Key used as PrimaryKey.
/// </summary>
/// </summary>
[JsonProperty("id")]
[JsonProperty("id")]
public string Id { get; set; }
public string Id { get; set; }


/// <summary>
/// <summary>
/// Gets or sets the un-sanitized Id/Key.
/// Gets or sets the un-sanitized Id/Key.
/// </summary>
/// </summary>
/// <remarks>
/// <remarks>
/// Note: There is a Typo in the property name ("ReadlId"), that can't be changed due to compatability concerns. The
/// Note: There is a Typo in the property name ("ReadlId"), that can't be changed due to compatability concerns. The
/// Json is correct due to the JsonProperty field, but the Typo needs to stay.
/// Json is correct due to the JsonProperty field, but the Typo needs to stay.
/// </remarks>
/// </remarks>
[JsonProperty("realId")]
[JsonProperty("realId")]
public string ReadlId { get; internal set; }
public string ReadlId { get; internal set; }


// DO NOT FIX THE TYPO BELOW (See Remarks above).
// DO NOT FIX THE TYPO BELOW (See Remarks above).


/// <summary>
/// <summary>
/// Gets or sets the persisted object.
/// Gets or sets the persisted object.
/// </summary>
/// </summary>
[JsonProperty("document")]
[JsonProperty("document")]
public JObject Document { get; set; }
public JObject Document { get; set; }


/// <summary>
/// <summary>
/// Gets or sets the ETag information for handling optimistic concurrency updates.
/// Gets or sets the ETag information for handling optimistic concurrency updates.
/// </summary>
/// </summary>
[JsonProperty("_etag")]
[JsonProperty("_etag")]
public string ETag { get; set; }
public string ETag { get; set; }
}
}
}
}
}
}