Untitled diff

Created Diff never expires
12 हटाए गए
लाइनें
कुल
हटाया गया
शब्द
कुल
हटाया गया
इस सुविधा का उपयोग जारी रखने के लिए, अपग्रेड करें
Diffchecker logo
Diffchecker Pro
402 लाइनें
3 जोड़े गए
लाइनें
कुल
जोड़ा गया
शब्द
कुल
जोड़ा गया
इस सुविधा का उपयोग जारी रखने के लिए, अपग्रेड करें
Diffchecker logo
Diffchecker Pro
393 लाइनें
// Copyright (c) Microsoft Corporation. All rights reserved.
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
// Licensed under the MIT License.


using System;
using System;
using System.Collections.Generic;
using System.Collections.Generic;
using System.Linq;
using System.Linq;
using System.Threading;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.Documents.Linq;
using Microsoft.Azure.Documents.Linq;
using Newtonsoft.Json;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json.Serialization;
using Newtonsoft.Json.Serialization;


namespace Microsoft.Bot.Builder.Azure
namespace Microsoft.Bot.Builder.Azure
{
{
/// <summary>
/// <summary>
/// Implements an CosmosDB based storage provider for a bot.
/// Implements an CosmosDB based storage provider for a bot.
/// </summary>
/// </summary>
public class CosmosDbStorage : IStorage
public class CosmosDbStorage : IStorage
{
{
// When setting up the database, calls are made to CosmosDB. If multiple calls are made, we'll end up setting the
// When setting up the database, calls are made to CosmosDB. If multiple calls are made, we'll end up setting the
// collectionLink member variable more than once. The semaphore is for making sure the initialization of the
// collectionLink member variable more than once. The semaphore is for making sure the initialization of the
// database is done only once.
// database is done only once.
private static SemaphoreSlim _semaphore = new SemaphoreSlim(1);
private static SemaphoreSlim _semaphore = new SemaphoreSlim(1);


private readonly JsonSerializer _jsonSerializer = JsonSerializer.Create(new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });
private readonly JsonSerializer _jsonSerializer = JsonSerializer.Create(new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });


private readonly string _databaseId;
private readonly string _databaseId;
private readonly string _partitionKey;
private readonly string _partitionKey;
private readonly string _collectionId;
private readonly string _collectionId;
private readonly RequestOptions _documentCollectionCreationRequestOptions = null;
private readonly RequestOptions _documentCollectionCreationRequestOptions = null;
private readonly RequestOptions _databaseCreationRequestOptions = null;
private readonly RequestOptions _databaseCreationRequestOptions = null;
private readonly IDocumentClient _client;
private readonly IDocumentClient _client;
private string _collectionLink = null;
private string _collectionLink = null;


/// <summary>
/// <summary>
/// Initializes a new instance of the <see cref="CosmosDbStorage"/> class.
/// Initializes a new instance of the <see cref="CosmosDbStorage"/> class.
/// using the provided CosmosDB credentials, database ID, and collection ID.
/// using the provided CosmosDB credentials, database ID, and collection ID.
/// </summary>
/// </summary>
/// <param name="cosmosDbStorageOptions">Cosmos DB storage configuration options.</param>
/// <param name="cosmosDbStorageOptions">Cosmos DB storage configuration options.</param>
public CosmosDbStorage(CosmosDbStorageOptions cosmosDbStorageOptions)
public CosmosDbStorage(CosmosDbStorageOptions cosmosDbStorageOptions)
{
{
if (cosmosDbStorageOptions == null)
if (cosmosDbStorageOptions == null)
{
{
throw new ArgumentNullException(nameof(cosmosDbStorageOptions));
throw new ArgumentNullException(nameof(cosmosDbStorageOptions));
}
}


if (cosmosDbStorageOptions.CosmosDBEndpoint == null)
if (cosmosDbStorageOptions.CosmosDBEndpoint == null)
{
{
throw new ArgumentNullException(nameof(cosmosDbStorageOptions.CosmosDBEndpoint), "Service EndPoint for CosmosDB is required.");
throw new ArgumentNullException(nameof(cosmosDbStorageOptions.CosmosDBEndpoint), "Service EndPoint for CosmosDB is required.");
}
}


if (string.IsNullOrEmpty(cosmosDbStorageOptions.AuthKey))
if (string.IsNullOrEmpty(cosmosDbStorageOptions.AuthKey))
{
{
throw new ArgumentException("AuthKey for CosmosDB is required.", nameof(cosmosDbStorageOptions.AuthKey));
throw new ArgumentException("AuthKey for CosmosDB is required.", nameof(cosmosDbStorageOptions.AuthKey));
}
}


if (string.IsNullOrEmpty(cosmosDbStorageOptions.DatabaseId))
if (string.IsNullOrEmpty(cosmosDbStorageOptions.DatabaseId))
{
{
throw new ArgumentException("DatabaseId is required.", nameof(cosmosDbStorageOptions.DatabaseId));
throw new ArgumentException("DatabaseId is required.", nameof(cosmosDbStorageOptions.DatabaseId));
}
}


if (string.IsNullOrEmpty(cosmosDbStorageOptions.CollectionId))
if (string.IsNullOrEmpty(cosmosDbStorageOptions.CollectionId))
{
{
throw new ArgumentException("CollectionId is required.", nameof(cosmosDbStorageOptions.CollectionId));
throw new ArgumentException("CollectionId is required.", nameof(cosmosDbStorageOptions.CollectionId));
}
}


_databaseId = cosmosDbStorageOptions.DatabaseId;
_databaseId = cosmosDbStorageOptions.DatabaseId;
_collectionId = cosmosDbStorageOptions.CollectionId;
_collectionId = cosmosDbStorageOptions.CollectionId;
_partitionKey = cosmosDbStorageOptions.PartitionKey;
_partitionKey = cosmosDbStorageOptions.PartitionKey;
_documentCollectionCreationRequestOptions = cosmosDbStorageOptions.DocumentCollectionRequestOptions;
_documentCollectionCreationRequestOptions = cosmosDbStorageOptions.DocumentCollectionRequestOptions;
_databaseCreationRequestOptions = cosmosDbStorageOptions.DatabaseCreationRequestOptions;
_databaseCreationRequestOptions = cosmosDbStorageOptions.DatabaseCreationRequestOptions;


// Inject BotBuilder version to CosmosDB Requests
// Inject BotBuilder version to CosmosDB Requests
var version = GetType().Assembly.GetName().Version;
var version = GetType().Assembly.GetName().Version;
var connectionPolicy = new ConnectionPolicy { UserAgentSuffix = $"Microsoft-BotFramework {version}" };
var connectionPolicy = new ConnectionPolicy { UserAgentSuffix = $"Microsoft-BotFramework {version}" };


// Invoke CollectionPolicy delegate to further customize settings
// Invoke CollectionPolicy delegate to further customize settings
cosmosDbStorageOptions.ConnectionPolicyConfigurator?.Invoke(connectionPolicy);
cosmosDbStorageOptions.ConnectionPolicyConfigurator?.Invoke(connectionPolicy);
_client = new DocumentClient(cosmosDbStorageOptions.CosmosDBEndpoint, cosmosDbStorageOptions.AuthKey, connectionPolicy);
_client = new DocumentClient(cosmosDbStorageOptions.CosmosDBEndpoint, cosmosDbStorageOptions.AuthKey, connectionPolicy);
}
}


/// <summary>
/// <summary>
/// Initializes a new instance of the <see cref="CosmosDbStorage"/> class.
/// Initializes a new instance of the <see cref="CosmosDbStorage"/> class.
/// using the provided CosmosDB credentials, database ID, and collection ID.
/// using the provided CosmosDB credentials, database ID, and collection ID.
/// </summary>
/// </summary>
/// <param name="cosmosDbStorageOptions">Cosmos DB storage configuration options.</param>
/// <param name="cosmosDbStorageOptions">Cosmos DB storage configuration options.</param>
/// <param name="jsonSerializer">If passing in a custom JsonSerializer, we recommend the following settings:
/// <param name="jsonSerializer">If passing in a custom JsonSerializer, we recommend the following settings:
/// <para>jsonSerializer.TypeNameHandling = TypeNameHandling.All.</para>
/// <para>jsonSerializer.TypeNameHandling = TypeNameHandling.All.</para>
/// <para>jsonSerializer.NullValueHandling = NullValueHandling.Include.</para>
/// <para>jsonSerializer.NullValueHandling = NullValueHandling.Include.</para>
/// <para>jsonSerializer.ContractResolver = new DefaultContractResolver().</para>
/// <para>jsonSerializer.ContractResolver = new DefaultContractResolver().</para>
/// </param>
/// </param>
public CosmosDbStorage(CosmosDbStorageOptions cosmosDbStorageOptions, JsonSerializer jsonSerializer)
public CosmosDbStorage(CosmosDbStorageOptions cosmosDbStorageOptions, JsonSerializer jsonSerializer)
: this(cosmosDbStorageOptions)
: this(cosmosDbStorageOptions)
{
{
if (jsonSerializer == null)
if (jsonSerializer == null)
{
{
throw new ArgumentNullException(nameof(jsonSerializer));
throw new ArgumentNullException(nameof(jsonSerializer));
}
}


_jsonSerializer = jsonSerializer;
_jsonSerializer = jsonSerializer;
}
}


/// <summary>
/// <summary>
/// Initializes a new instance of the <see cref="CosmosDbStorage"/> class.
/// Initializes a new instance of the <see cref="CosmosDbStorage"/> class.
/// This constructor should only be used if the default behavior of the DocumentClient needs to be changed.
/// This constructor should only be used if the default behavior of the DocumentClient needs to be changed.
/// The <see cref="CosmosDbStorage(CosmosDbStorageOptions)"/> constructor is preferer for most cases.
/// The <see cref="CosmosDbStorage(CosmosDbStorageOptions)"/> constructor is preferer for most cases.
/// </summary>
/// </summary>
/// <param name="documentClient">The custom implementation of IDocumentClient.</param>
/// <param name="documentClient">The custom implementation of IDocumentClient.</param>
/// <param name="cosmosDbCustomClientOptions">Custom client configuration options.</param>
/// <param name="cosmosDbCustomClientOptions">Custom client configuration options.</param>
public CosmosDbStorage(IDocumentClient documentClient, CosmosDbCustomClientOptions cosmosDbCustomClientOptions)
public CosmosDbStorage(IDocumentClient documentClient, CosmosDbCustomClientOptions cosmosDbCustomClientOptions)
{
{
if (cosmosDbCustomClientOptions == null)
if (cosmosDbCustomClientOptions == null)
{
{
throw new ArgumentNullException(nameof(cosmosDbCustomClientOptions));
throw new ArgumentNullException(nameof(cosmosDbCustomClientOptions));
}
}


if (string.IsNullOrEmpty(cosmosDbCustomClientOptions.DatabaseId))
if (string.IsNullOrEmpty(cosmosDbCustomClientOptions.DatabaseId))
{
{
throw new ArgumentException("DatabaseId is required.", nameof(cosmosDbCustomClientOptions.DatabaseId));
throw new ArgumentException("DatabaseId is required.", nameof(cosmosDbCustomClientOptions.DatabaseId));
}
}


if (string.IsNullOrEmpty(cosmosDbCustomClientOptions.CollectionId))
if (string.IsNullOrEmpty(cosmosDbCustomClientOptions.CollectionId))
{
{
throw new ArgumentException("CollectionId is required.", nameof(cosmosDbCustomClientOptions.CollectionId));
throw new ArgumentException("CollectionId is required.", nameof(cosmosDbCustomClientOptions.CollectionId));
}
}


_client = documentClient ?? throw new ArgumentNullException(nameof(documentClient), "An implementation of IDocumentClient for CosmosDB is required.");
_client = documentClient ?? throw new ArgumentNullException(nameof(documentClient), "An implementation of IDocumentClient for CosmosDB is required.");
_databaseId = cosmosDbCustomClientOptions.DatabaseId;
_databaseId = cosmosDbCustomClientOptions.DatabaseId;
_collectionId = cosmosDbCustomClientOptions.CollectionId;
_collectionId = cosmosDbCustomClientOptions.CollectionId;
_documentCollectionCreationRequestOptions = cosmosDbCustomClientOptions.DocumentCollectionRequestOptions;
_documentCollectionCreationRequestOptions = cosmosDbCustomClientOptions.DocumentCollectionRequestOptions;
_databaseCreationRequestOptions = cosmosDbCustomClientOptions.DatabaseCreationRequestOptions;
_databaseCreationRequestOptions = cosmosDbCustomClientOptions.DatabaseCreationRequestOptions;


// Inject BotBuilder version to CosmosDB Requests
// Inject BotBuilder version to CosmosDB Requests
var version = GetType().Assembly.GetName().Version;
var version = GetType().Assembly.GetName().Version;
_client.ConnectionPolicy.UserAgentSuffix = $"Microsoft-BotFramework {version}";
_client.ConnectionPolicy.UserAgentSuffix = $"Microsoft-BotFramework {version}";
}
}


[Obsolete("Replaced by CosmosDBKeyEscape.EscapeKey.")]
[Obsolete("Replaced by CosmosDBKeyEscape.EscapeKey.")]
public static string SanitizeKey(string key) => CosmosDbKeyEscape.EscapeKey(key);
public static string SanitizeKey(string key) => CosmosDbKeyEscape.EscapeKey(key);


/// <summary>
/// <summary>
/// Deletes storage items from storage.
/// Deletes storage items from storage.
/// </summary>
/// </summary>
/// <param name="keys">keys of the <see cref="IStoreItem"/> objects to remove from the store.</param>
/// <param name="keys">keys of the <see cref="IStoreItem"/> objects to remove from the store.</param>
/// <param name="cancellationToken">A cancellation token that can be used by other objects
/// <param name="cancellationToken">A cancellation token that can be used by other objects
/// or threads to receive notice of cancellation.</param>
/// or threads to receive notice of cancellation.</param>
/// <returns>A task that represents the work queued to execute.</returns>
/// <returns>A task that represents the work queued to execute.</returns>
/// <seealso cref="ReadAsync(string[], CancellationToken)"/>
/// <seealso cref="ReadAsync(string[], CancellationToken)"/>
/// <seealso cref="WriteAsync(IDictionary{string, object}, CancellationToken)"/>
/// <seealso cref="WriteAsync(IDictionary{string, object}, CancellationToken)"/>
public async Task DeleteAsync(string[] keys, CancellationToken cancellationToken)
public async Task DeleteAsync(string[] keys, CancellationToken cancellationToken)
{
{
RequestOptions options = null;
RequestOptions options = null;


if (keys == null)
if (keys == null)
{
{
throw new ArgumentNullException(nameof(keys));
throw new ArgumentNullException(nameof(keys));
}
}


if (keys.Length == 0)
if (keys.Length == 0)
{
{
return;
return;
}
}


// Ensure Initialization has been run
// Ensure Initialization has been run
await InitializeAsync().ConfigureAwait(false);
await InitializeAsync().ConfigureAwait(false);


if (!string.IsNullOrEmpty(this._partitionKey))
{
options = new RequestOptions() { PartitionKey = new PartitionKey(this._partitionKey) };
}

// Parallelize deletion
// Parallelize deletion
var tasks = keys.Select(key =>
var tasks = keys.Select(key =>
_client.DeleteDocumentAsync(
_client.DeleteDocumentAsync(
UriFactory.CreateDocumentUri(
UriFactory.CreateDocumentUri(
_databaseId,
_databaseId,
_collectionId,
_collectionId,
CosmosDbKeyEscape.EscapeKey(key)),
CosmosDbKeyEscape.EscapeKey(key)),
options,
new RequestOptions() { PartitionKey = new PartitionKey(key) },
cancellationToken: cancellationToken));
cancellationToken: cancellationToken));


// await to deletion tasks to complete
// await to deletion tasks to complete
await Task.WhenAll(tasks).ConfigureAwait(false);
await Task.WhenAll(tasks).ConfigureAwait(false);
}
}


/// <summary>
/// <summary>
/// Reads storage items from storage.
/// Reads storage items from storage.
/// </summary>
/// </summary>
/// <param name="keys">keys of the <see cref="IStoreItem"/> objects to read from the store.</param>
/// <param name="keys">keys of the <see cref="IStoreItem"/> objects to read from the store.</param>
/// <param name="cancellationToken">A cancellation token that can be used by other objects
/// <param name="cancellationToken">A cancellation token that can be used by other objects
/// or threads to receive notice of cancellation.</param>
/// or threads to receive notice of cancellation.</param>
/// <returns>A task that represents the work queued to execute.</returns>
/// <returns>A task that represents the work queued to execute.</returns>
/// <remarks>If the activities are successfully sent, the task result contains
/// <remarks>If the activities are successfully sent, the task result contains
/// the items read, indexed by key.</remarks>
/// the items read, indexed by key.</remarks>
/// <seealso cref="DeleteAsync(string[], CancellationToken)"/>
/// <seealso cref="DeleteAsync(string[], CancellationToken)"/>
/// <seealso cref="WriteAsync(IDictionary{string, object}, CancellationToken)"/>
/// <seealso cref="WriteAsync(IDictionary{string, object}, CancellationToken)"/>
public async Task<IDictionary<string, object>> ReadAsync(string[] keys, CancellationToken cancellationToken)
public async Task<IDictionary<string, object>> ReadAsync(string[] keys, CancellationToken cancellationToken)
{
{
FeedOptions options = null;
FeedOptions options = null;


if (keys == null)
if (keys == null)
{
{
throw new ArgumentNullException(nameof(keys));
throw new ArgumentNullException(nameof(keys));
}
}


if (keys.Length == 0)
if (keys.Length == 0)
{
{
// No keys passed in, no result to return.
// No keys passed in, no result to return.
return new Dictionary<string, object>();
return new Dictionary<string, object>();
}
}


// Ensure Initialization has been run
// Ensure Initialization has been run
await InitializeAsync().ConfigureAwait(false);
await InitializeAsync().ConfigureAwait(false);


if (!string.IsNullOrEmpty(this._partitionKey))
{
options = new FeedOptions() { PartitionKey = new PartitionKey(this._partitionKey) };
}

var storeItems = new Dictionary<string, object>(keys.Length);
var storeItems = new Dictionary<string, object>(keys.Length);


var parameterSequence = string.Join(",", Enumerable.Range(0, keys.Length).Select(i => $"@id{i}"));
var parameterSequence = string.Join(",", Enumerable.Range(0, keys.Length).Select(i => $"@id{i}"));
var parameterValues = keys.Select((key, ix) => new SqlParameter($"@id{ix}", CosmosDbKeyEscape.EscapeKey(key)));
var parameterValues = keys.Select((key, ix) => new SqlParameter($"@id{ix}", CosmosDbKeyEscape.EscapeKey(key)));
var querySpec = new SqlQuerySpec
var querySpec = new SqlQuerySpec
{
{
QueryText = $"SELECT c.id, c.realId, c.document, c._etag FROM c WHERE c.id in ({parameterSequence})",
QueryText = $"SELECT c.id, c.realId, c.document, c._etag FROM c WHERE c.id in ({parameterSequence})",
Parameters = new SqlParameterCollection(parameterValues),
Parameters = new SqlParameterCollection(parameterValues),
};
};


var query = _client.CreateDocumentQuery<DocumentStoreItem>(_collectionLink, querySpec, options).AsDocumentQuery();
var query = _client.CreateDocumentQuery<DocumentStoreItem>(_collectionLink, querySpec, new FeedOptions() { PartitionKey = new PartitionKey(keys[0]) }).AsDocumentQuery();
while (query.HasMoreResults)
while (query.HasMoreResults)
{
{
foreach (var doc in await query.ExecuteNextAsync<DocumentStoreItem>(cancellationToken).ConfigureAwait(false))
foreach (var doc in await query.ExecuteNextAsync<DocumentStoreItem>(cancellationToken).ConfigureAwait(false))
{
{
var item = doc.Document.ToObject(typeof(object), _jsonSerializer);
var item = doc.Document.ToObject(typeof(object), _jsonSerializer);
if (item is IStoreItem storeItem)
if (item is IStoreItem storeItem)
{
{
storeItem.ETag = doc.ETag;
storeItem.ETag = doc.ETag;
}
}


// doc.Id cannot be used since it is escaped, read it from RealId property instead
// doc.Id cannot be used since it is escaped, read it from RealId property instead
storeItems.Add(doc.ReadlId, item);
storeItems.Add(doc.ReadlId, item);
}
}
}
}


return storeItems;
return storeItems;
}
}


/// <summary>
/// <summary>
/// Writes storage items to storage.
/// Writes storage items to storage.
/// </summary>
/// </summary>
/// <param name="changes">The items to write to storage, indexed by key.</param>
/// <param name="changes">The items to write to storage, indexed by key.</param>
/// <param name="cancellationToken">A cancellation token that can be used by other objects
/// <param name="cancellationToken">A cancellation token that can be used by other objects
/// or threads to receive notice of cancellation.</param>
/// or threads to receive notice of cancellation.</param>
/// <returns>A task that represents the work queued to execute.</returns>
/// <returns>A task that represents the work queued to execute.</returns>
/// <seealso cref="DeleteAsync(string[], CancellationToken)"/>
/// <seealso cref="DeleteAsync(string[], CancellationToken)"/>
/// <seealso cref="ReadAsync(string[], CancellationToken)"/>
/// <seealso cref="ReadAsync(string[], CancellationToken)"/>
public async Task WriteAsync(IDictionary<string, object> changes, CancellationToken cancellationToken)
public async Task WriteAsync(IDictionary<string, object> changes, CancellationToken cancellationToken)
{
{
if (changes == null)
if (changes == null)
{
{
throw new ArgumentNullException(nameof(changes));
throw new ArgumentNullException(nameof(changes));
}
}


if (changes.Count == 0)
if (changes.Count == 0)
{
{
return;
return;
}
}


// Ensure Initialization has been run
// Ensure Initialization has been run
await InitializeAsync().ConfigureAwait(false);
await InitializeAsync().ConfigureAwait(false);


foreach (var change in changes)
foreach (var change in changes)
{
{
var json = JObject.FromObject(change.Value, _jsonSerializer);
var json = JObject.FromObject(change.Value, _jsonSerializer);


// Remove etag from JSON object that was copied from IStoreItem.
// Remove etag from JSON object that was copied from IStoreItem.
// The ETag information is updated as an _etag attribute in the document metadata.
// The ETag information is updated as an _etag attribute in the document metadata.
json.Remove("eTag");
json.Remove("eTag");


var documentChange = new DocumentStoreItem
var documentChange = new DocumentStoreItem
{
{
Id = CosmosDbKeyEscape.EscapeKey(change.Key),
Id = CosmosDbKeyEscape.EscapeKey(change.Key),
ReadlId = change.Key,
ReadlId = change.Key,
Document = json,
Document = json,
};
};


var etag = (change.Value as IStoreItem)?.ETag;
var etag = (change.Value as IStoreItem)?.ETag;
if (etag == null || etag == "*")
if (etag == null || etag == "*")
{
{
// if new item or * then insert or replace unconditionaly
// if new item or * then insert or replace unconditionaly
await _client.UpsertDocumentAsync(
await _client.UpsertDocumentAsync(
_collectionLink,
_collectionLink,
documentChange,
documentChange,
disableAutomaticIdGeneration: true,
disableAutomaticIdGeneration: true,
cancellationToken: cancellationToken).ConfigureAwait(false);
cancellationToken: cancellationToken).ConfigureAwait(false);
}
}
else if (etag.Length > 0)
else if (etag.Length > 0)
{
{
// if we have an etag, do opt. concurrency replace
// if we have an etag, do opt. concurrency replace
var uri = UriFactory.CreateDocumentUri(_databaseId, _collectionId, documentChange.Id);
var uri = UriFactory.CreateDocumentUri(_databaseId, _collectionId, documentChange.Id);
var ac = new AccessCondition { Condition = etag, Type = AccessConditionType.IfMatch };
var ac = new AccessCondition { Condition = etag, Type = AccessConditionType.IfMatch };
await _client.ReplaceDocumentAsync(
await _client.ReplaceDocumentAsync(
uri,
uri,
documentChange,
documentChange,
new RequestOptions { AccessCondition = ac },
new RequestOptions { AccessCondition = ac },
cancellationToken: cancellationToken).ConfigureAwait(false);
cancellationToken: cancellationToken).ConfigureAwait(false);
}
}
else
else
{
{
throw new Exception("etag empty");
throw new Exception("etag empty");
}
}
}
}
}
}


/// <summary>
/// <summary>
/// Creates the CosmosDB Database and populates the _collectionLink member variable.
/// Creates the CosmosDB Database and populates the _collectionLink member variable.
/// </summary>
/// </summary>
/// <remarks>
/// <remarks>
/// This method is idempotent, and thread safe.
/// This method is idempotent, and thread safe.
/// </remarks>
/// </remarks>
private async Task InitializeAsync()
private async Task InitializeAsync()
{
{
// In the steady-state case, we'll already have a connection string to the
// In the steady-state case, we'll already have a connection string to the
// database setup. If so, no need to enter locks or call CosmosDB to get one.
// database setup. If so, no need to enter locks or call CosmosDB to get one.
if (_collectionLink == null)
if (_collectionLink == null)
{
{
// We don't (probably) have a database link yet. Enter the lock,
// We don't (probably) have a database link yet. Enter the lock,
// then check again (aka: Double-Check Lock pattern).
// then check again (aka: Double-Check Lock pattern).
await _semaphore.WaitAsync().ConfigureAwait(false);
await _semaphore.WaitAsync().ConfigureAwait(false);
try
try
{
{
if (_collectionLink == null)
if (_collectionLink == null)
{
{
// We don't have a database link. Create one. Note that we're inside a semaphore at this point
// We don't have a database link. Create one. Note that we're inside a semaphore at this point
// so other threads may be blocked on us.
// so other threads may be blocked on us.
await _client.CreateDatabaseIfNotExistsAsync(
await _client.CreateDatabaseIfNotExistsAsync(
new Database { Id = _databaseId },
new Database { Id = _databaseId },
_databaseCreationRequestOptions) // pass in any user set database creation flags
_databaseCreationRequestOptions) // pass in any user set database creation flags
.ConfigureAwait(false);
.ConfigureAwait(false);


var documentCollection = new DocumentCollection
var documentCollection = new DocumentCollection
{
{
Id = _collectionId,
Id = _collectionId,
};
};


var response = await _client.CreateDocumentCollectionIfNotExistsAsync(
var response = await _client.CreateDocumentCollectionIfNotExistsAsync(
UriFactory.CreateDatabaseUri(_databaseId),
UriFactory.CreateDatabaseUri(_databaseId),
documentCollection,
documentCollection,
_documentCollectionCreationRequestOptions) // pass in any user set collection creation flags
_documentCollectionCreationRequestOptions) // pass in any user set collection creation flags
.ConfigureAwait(false);
.ConfigureAwait(false);


_collectionLink = response.Resource.SelfLink;
_collectionLink = response.Resource.SelfLink;
}
}
}
}
finally
finally
{
{
_semaphore.Release();
_semaphore.Release();
}
}
}
}
}
}


/// <summary>
/// <summary>
/// Internal data structure for storing items in a CosmosDB Collection.
/// Internal data structure for storing items in a CosmosDB Collection.
/// </summary>
/// </summary>
private class DocumentStoreItem
private class DocumentStoreItem
{
{
/// <summary>
/// <summary>
/// Gets or sets the sanitized Id/Key used as PrimaryKey.
/// Gets or sets the sanitized Id/Key used as PrimaryKey.
/// </summary>
/// </summary>
[JsonProperty("id")]
[JsonProperty("id")]
public string Id { get; set; }
public string Id { get; set; }


/// <summary>
/// <summary>
/// Gets or sets the un-sanitized Id/Key.
/// Gets or sets the un-sanitized Id/Key.
/// </summary>
/// </summary>
/// <remarks>
/// <remarks>
/// Note: There is a Typo in the property name ("ReadlId"), that can't be changed due to compatability concerns. The
/// Note: There is a Typo in the property name ("ReadlId"), that can't be changed due to compatability concerns. The
/// Json is correct due to the JsonProperty field, but the Typo needs to stay.
/// Json is correct due to the JsonProperty field, but the Typo needs to stay.
/// </remarks>
/// </remarks>
[JsonProperty("realId")]
[JsonProperty("realId")]
public string ReadlId { get; internal set; }
public string ReadlId { get; internal set; }


// DO NOT FIX THE TYPO BELOW (See Remarks above).
// DO NOT FIX THE TYPO BELOW (See Remarks above).


/// <summary>
/// <summary>
/// Gets or sets the persisted object.
/// Gets or sets the persisted object.
/// </summary>
/// </summary>
[JsonProperty("document")]
[JsonProperty("document")]
public JObject Document { get; set; }
public JObject Document { get; set; }


/// <summary>
/// <summary>
/// Gets or sets the ETag information for handling optimistic concurrency updates.
/// Gets or sets the ETag information for handling optimistic concurrency updates.
/// </summary>
/// </summary>
[JsonProperty("_etag")]
[JsonProperty("_etag")]
public string ETag { get; set; }
public string ETag { get; set; }
}
}
}
}
}
}