Skip to content

Commit 076ca2a

Browse files
committed
[EventHubs] Replacing scaling logs to WebJobs extension methods
1 parent 724366b commit 076ca2a

File tree

4 files changed

+61
-58
lines changed

4 files changed

+61
-58
lines changed

NuGet.Config

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
<clear />
55
<!-- Do not add any additional feeds if new packages are needed they need to come from our azure-sdk-for-net DevOps feed which has an upstream set to nuget.org -->
66
<add key="azure-sdk-for-net" value="https://pkgs.dev.azure.com/azure-sdk/public/_packaging/azure-sdk-for-net/nuget/v3/index.json" />
7+
<add key="local" value="Q:\nuget" />
78
</packageSources>
89
<disabledPackageSources>
910
<clear />

eng/Packages.Data.props

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,8 +249,8 @@
249249
<PackageReference Update="Microsoft.Azure.SignalR.Management" Version="1.29.0" />
250250
<PackageReference Update="Microsoft.Azure.SignalR.Protocols" Version="1.29.0" />
251251
<PackageReference Update="Microsoft.Azure.SignalR.Serverless.Protocols" Version="1.10.0" />
252-
<PackageReference Update="Microsoft.Azure.WebJobs" Version="3.0.41" />
253-
<PackageReference Update="Microsoft.Azure.WebJobs.Sources" Version="3.0.41" PrivateAssets="All"/>
252+
<PackageReference Update="Microsoft.Azure.WebJobs" Version="3.0.42-dev" />
253+
<PackageReference Update="Microsoft.Azure.WebJobs.Sources" Version="3.0.42-dev" PrivateAssets="All"/>
254254
<PackageReference Update="Microsoft.Azure.WebJobs.Extensions.Rpc" Version="3.0.41" />
255255
<PackageReference Update="Microsoft.Azure.WebJobs.Host.Storage" Version="5.0.1" />
256256
<PackageReference Update="Microsoft.Spatial" Version="7.5.3" />

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubMetricsProvider.cs

Lines changed: 55 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
using Azure.Messaging.EventHubs;
1010
using Azure.Messaging.EventHubs.Primitives;
1111
using Microsoft.Azure.WebJobs.EventHubs.Listeners;
12+
using Microsoft.Azure.WebJobs.Host.Scale;
1213
using Microsoft.Extensions.Logging;
1314

1415
namespace Microsoft.Azure.WebJobs.Extensions.EventHubs.Listeners
@@ -50,7 +51,7 @@ public async Task<EventHubsTriggerMetrics> GetMetricsAsync()
5051
}
5152
catch (Exception e)
5253
{
53-
_logger.LogWarning($"Encountered an exception while checking Event Hub '{_client.EventHubName}'. Error: {e.Message}");
54+
_logger.LogFunctionScaleError($"Encountered an exception while checking Event Hub '{_client.EventHubName}'.", _functionId, e);
5455
return metrics;
5556
}
5657

@@ -69,87 +70,87 @@ public async Task<EventHubsTriggerMetrics> GetMetricsAsync()
6970
try
7071
{
7172
partitionPropertiesTasks = partitions.Select(async partition =>
73+
{
74+
bool acquired = false;
75+
try
7276
{
73-
bool acquired = false;
74-
try
77+
acquired = await semaphore.WaitAsync(PartitionPropertiesWaitTimeoutMs, cts.Token).ConfigureAwait(false);
78+
if (!acquired)
7579
{
76-
acquired = await semaphore.WaitAsync(PartitionPropertiesWaitTimeoutMs, cts.Token).ConfigureAwait(false);
77-
if (!acquired)
78-
{
79-
throw new TimeoutException(
80-
$"Failed to acquire EH client concurrency slot within {PartitionPropertiesWaitTimeoutMs}ms for Event Hub '{_client.EventHubName}', partition '{partition}'.");
81-
}
82-
return await _client.GetPartitionPropertiesAsync(partition).ConfigureAwait(false);
80+
throw new TimeoutException(
81+
$"Failed to acquire EH client concurrency slot within {PartitionPropertiesWaitTimeoutMs}ms for Event Hub '{_client.EventHubName}', partition '{partition}'.");
8382
}
84-
catch (Exception e)
83+
return await _client.GetPartitionPropertiesAsync(partition).ConfigureAwait(false);
84+
}
85+
catch (Exception e)
86+
{
87+
if (!cts.Token.IsCancellationRequested)
8588
{
86-
if (!cts.Token.IsCancellationRequested)
87-
{
88-
_logger.LogDebug($"Requesting cancellation of other partition info tasks. Error while getting partition info for eventhub '{_client.EventHubName}', partition '{partition}': {e.Message}");
89-
cts.Cancel();
90-
}
91-
throw;
89+
_logger.LogDebug($"Requesting cancellation of other partition info tasks. Error while getting partition info for eventhub '{_client.EventHubName}', partition '{partition}': {e.Message}");
90+
cts.Cancel();
9291
}
93-
finally
92+
throw;
93+
}
94+
finally
95+
{
96+
if (acquired)
9497
{
95-
if (acquired)
96-
{
97-
semaphore.Release();
98-
}
98+
semaphore.Release();
9999
}
100-
}).ToArray();
100+
}
101+
}).ToArray();
101102
await Task.WhenAll(partitionPropertiesTasks).ConfigureAwait(false);
102103
}
103104
catch (Exception e)
104105
{
105-
_logger.LogWarning($"Encountered an exception while getting partition information for Event Hub '{_client.EventHubName}' used for scaling. Error: {e.Message}");
106+
_logger.LogFunctionScaleError($"Encountered an exception while getting partition information for Event Hub '{_client.EventHubName}' used for scaling.", _functionId, e);
106107
}
107108

108109
// Get checkpoints
109110
EventProcessorCheckpoint[] checkpoints = null;
110111
try
111112
{
112113
var checkpointTasks = partitions.Select(async partition =>
114+
{
115+
bool acquired = false;
116+
try
113117
{
114-
bool acquired = false;
115-
try
118+
acquired = await semaphore.WaitAsync(CheckpointWaitTimeoutMs, cts.Token).ConfigureAwait(false);
119+
if (!acquired)
116120
{
117-
acquired = await semaphore.WaitAsync(CheckpointWaitTimeoutMs, cts.Token).ConfigureAwait(false);
118-
if (!acquired)
119-
{
120-
throw new TimeoutException(
121-
$"Failed to acquire checkpoint concurrency slot within {CheckpointWaitTimeoutMs}ms for Event Hub '{_client.EventHubName}', partition '{partition}'.");
122-
}
123-
124-
return await _checkpointStore.GetCheckpointAsync(
125-
_client.FullyQualifiedNamespace,
126-
_client.EventHubName,
127-
_client.ConsumerGroup,
128-
partition,
129-
cts.Token).ConfigureAwait(false);
121+
throw new TimeoutException(
122+
$"Failed to acquire checkpoint concurrency slot within {CheckpointWaitTimeoutMs}ms for Event Hub '{_client.EventHubName}', partition '{partition}'.");
130123
}
131-
catch (Exception e)
124+
125+
return await _checkpointStore.GetCheckpointAsync(
126+
_client.FullyQualifiedNamespace,
127+
_client.EventHubName,
128+
_client.ConsumerGroup,
129+
partition,
130+
cts.Token).ConfigureAwait(false);
131+
}
132+
catch (Exception e)
133+
{
134+
if (!cts.Token.IsCancellationRequested)
132135
{
133-
if (!cts.Token.IsCancellationRequested)
134-
{
135-
_logger.LogDebug($"Requesting cancellation of other checkpoint tasks. Error while getting checkpoint for eventhub '{_client.EventHubName}', partition '{partition}': {e.Message}");
136-
cts.Cancel();
137-
}
138-
throw;
136+
_logger.LogDebug($"Requesting cancellation of other checkpoint tasks. Error while getting checkpoint for eventhub '{_client.EventHubName}', partition '{partition}': {e.Message}");
137+
cts.Cancel();
139138
}
140-
finally
139+
throw;
140+
}
141+
finally
142+
{
143+
if (acquired)
141144
{
142-
if (acquired)
143-
{
144-
semaphore.Release();
145-
}
145+
semaphore.Release();
146146
}
147-
});
148-
checkpoints = await Task.WhenAll(checkpointTasks).ConfigureAwait(false);
147+
}
148+
});
149+
checkpoints = await Task.WhenAll(checkpointTasks).ConfigureAwait(false);
149150
}
150151
catch (Exception e)
151152
{
152-
_logger.LogWarning($"Encountered an exception while getting checkpoints for Event Hub '{_client.EventHubName}' used for scaling. Error: {e.Message}");
153+
_logger.LogFunctionScaleError($"Encountered an exception while getting checkpoints for Event Hub '{_client.EventHubName}' used for scaling.", _functionId, e);
153154
}
154155

155156
return CreateTriggerMetrics(partitionPropertiesTasks.Select(t => t.Result).ToList(), checkpoints);

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubsTargetScaler.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,13 @@ internal TargetScalerResult GetScaleResultInternal(TargetScalerContext context,
103103
int[] sortedValidWorkerCounts = GetSortedValidWorkerCountsForPartitionCount(partitionCount);
104104
int validatedTargetWorkerCount = GetValidWorkerCount(desiredWorkerCount, sortedValidWorkerCounts);
105105

106+
string details = $"Target worker count for function '{_functionId}' is '{validatedTargetWorkerCount}' (EventHubName='{_client.EventHubName}', EventCount ='{eventCount}', Concurrency='{desiredConcurrency}', PartitionCount='{partitionCount}').";
106107
if (validatedTargetWorkerCount != desiredWorkerCount)
107108
{
108-
_logger.LogInformation($"Desired target worker count of '{desiredWorkerCount}' is not in list of valid sorted workers: '{string.Join(",", sortedValidWorkerCounts)}'. Using next largest valid worker as target worker count.");
109+
details += $" Desired target worker count of '{desiredWorkerCount}' is not in list of valid sorted workers: '{string.Join(",", sortedValidWorkerCounts)}'. Using next largest valid worker as target worker count.";
109110
}
110111

111-
_logger.LogInformation($"Target worker count for function '{_functionId}' is '{validatedTargetWorkerCount}' (EventHubName='{_client.EventHubName}', EventCount ='{eventCount}', Concurrency='{desiredConcurrency}', PartitionCount='{partitionCount}').");
112+
_logger.LogFunctionScaleVote(_functionId, validatedTargetWorkerCount, (int)eventCount, desiredConcurrency, details);
112113

113114
return new TargetScalerResult
114115
{

0 commit comments

Comments
 (0)