public async Task<IDictionary<string, object>> ReadAsync(string[] keys, CancellationToken cancellationToken)
public async Task<IDictionary<string, object>> ReadAsync(string[] keys, CancellationToken cancellationToken)
{
{
FeedOptions options = null;
FeedOptions options = null;
if (keys == null)
if (keys == null)
{
{
throw new ArgumentNullException(nameof(keys));
throw new ArgumentNullException(nameof(keys));
}
}
if (keys.Length == 0)
if (keys.Length == 0)
{
{
// No keys passed in, no result to return.
// No keys passed in, no result to return.
return new Dictionary<string, object>();
return new Dictionary<string, object>();
}
}
// Ensure Initialization has been run
// Ensure Initialization has been run
await InitializeAsync().ConfigureAwait(false);
await InitializeAsync().ConfigureAwait(false);
if (!string.IsNullOrEmpty(this._partitionKey))
{
options = new FeedOptions() { PartitionKey = new PartitionKey(this._partitionKey) };
}
var storeItems = new Dictionary<string, object>(keys.Length);
var storeItems = new Dictionary<string, object>(keys.Length);
var parameterSequence = string.Join(",", Enumerable.Range(0, keys.Length).Select(i => $"@id{i}"));
var parameterSequence = string.Join(",", Enumerable.Range(0, keys.Length).Select(i => $"@id{i}"));
var parameterValues = keys.Select((key, ix) => new SqlParameter($"@id{ix}", CosmosDbKeyEscape.EscapeKey(key)));
var parameterValues = keys.Select((key, ix) => new SqlParameter($"@id{ix}", CosmosDbKeyEscape.EscapeKey(key)));
var querySpec = new SqlQuerySpec
var querySpec = new SqlQuerySpec
{
{
QueryText = $"SELECT c.id, c.realId, c.document, c._etag FROM c WHERE c.id in ({parameterSequence})",
QueryText = $"SELECT c.id, c.realId, c.document, c._etag FROM c WHERE c.id in ({parameterSequence})",
Parameters = new SqlParameterCollection(parameterValues),
Parameters = new SqlParameterCollection(parameterValues),
};
};
var query = _client.CreateDocumentQuery<DocumentStoreItem>(_collectionLink, querySpec, options).AsDocumentQuery();
var query = _client.CreateDocumentQuery<DocumentStoreItem>(_collectionLink, querySpec, new FeedOptions() { PartitionKey = new PartitionKey(keys[0]) }).AsDocumentQuery();
while (query.HasMoreResults)
while (query.HasMoreResults)
{
{
foreach (var doc in await query.ExecuteNextAsync<DocumentStoreItem>(cancellationToken).ConfigureAwait(false))
foreach (var doc in await query.ExecuteNextAsync<DocumentStoreItem>(cancellationToken).ConfigureAwait(false))
{
{
var item = doc.Document.ToObject(typeof(object), _jsonSerializer);
var item = doc.Document.ToObject(typeof(object), _jsonSerializer);
if (item is IStoreItem storeItem)
if (item is IStoreItem storeItem)
{
{
storeItem.ETag = doc.ETag;
storeItem.ETag = doc.ETag;
}
}
// doc.Id cannot be used since it is escaped, read it from RealId property instead
// doc.Id cannot be used since it is escaped, read it from RealId property instead
storeItems.Add(doc.ReadlId, item);
storeItems.Add(doc.ReadlId, item);
}
}
}
}
return storeItems;
return storeItems;
}
}
/// <summary>
/// <summary>
/// Writes storage items to storage.
/// Writes storage items to storage.
/// </summary>
/// </summary>
/// <param name="changes">The items to write to storage, indexed by key.</param>
/// <param name="changes">The items to write to storage, indexed by key.</param>
/// <param name="cancellationToken">A cancellation token that can be used by other objects
/// <param name="cancellationToken">A cancellation token that can be used by other objects
/// or threads to receive notice of cancellation.</param>
/// or threads to receive notice of cancellation.</param>
/// <returns>A task that represents the work queued to execute.</returns>
/// <returns>A task that represents the work queued to execute.</returns>