Error cannot start a hubconnection that is not in the disconnected state

Describe the bug After I wake up my PC from sleep, the WebSocket connection gets terminated (which is OK). Sometimes it cannot be restarted and then HubConnection.state becomes Disconnecting foreve...

Explanation

I opened 3 identical browser tabs and put the PC to sleep overnight. Today I woke it up and the connection was successfully restored in all tabs. Then I put the PC to sleep again, after waking up the issue reproduced in all 3 tabs.

1st sleep: reconnected normally

image

2nd sleep: stuck in «Disconnecting»

image

  • SignalRClient is the sink for your library’s log messages.

  • SignalRService is our service that starts the connection when a user logs in and stops it when he logs out.

    export class SignalRService {
      private readonly _connection: HubConnection;
      private _accessToken: string | undefined;
    
      constructor() {
        this._connection = new HubConnectionBuilder()
          .withUrl(
            config.serviceUrl,
            {
              withCredentials: false,
              accessTokenFactory: () =>
                this._accessToken ?? Promise.reject("No user is logged in.")
            })
          .withAutomaticReconnect()
          .configureLogging(new TelemetryLoggerAdapter())
          .build();
    
        this._connection.on("jobCancelled", () => { /* irrelevant */});
        this._connection.on("walletBalanceUpdated", () => { /* irrelevant */ });
    
        eventHub.loggedIn.subscribe(this.start.bind(this));
        eventHub.loggedOut.subscribe(this.stop.bind(this));
      }
    
      private async start(userId: string, accessToken: string): Promise<void> {
        this._accessToken = accessToken;
        if (this._connection.connectionId) {
          telemetry.verbose(`SignalRService: connection is already established.`);
          return;
        }
        try {
          telemetry.verbose(`SignalRService: starting the connection.`);
          await this._connection.start();
          telemetry.info(`SignalRService: connected as user '${userId}'.`, { userId });
        } catch (err) {
          telemetry.error(
            `SignalRService: failed to connect as user '${userId}'. ${err}`, { userId, err });
        }
      }
    
      private async stop(): Promise<void> {
        this._accessToken = undefined;
        try {
          telemetry.verbose(`SignalRService: stopping the connection.`);
          await this._connection.stop();
          telemetry.info("SignalRService: disconnected, user logged out.");
        } catch (err) {
          telemetry.error(`SignalRService: failed to disconnect. ${err}`, { err });
        }
      }
    }

I have troubles to get my ASP.NET Core SignalR app working.

I have this server-side code :

public class PopcornHub : Hub
{
    private int Users;

    public async Task BroadcastNumberOfUsers(int nbUser)
    {
        await Clients.All.InvokeAsync("OnUserConnected", nbUser);
    }

    public override async Task OnConnectedAsync()
    {
        Users++;
        await BroadcastNumberOfUsers(Users);
        await base.OnConnectedAsync();
    }

    public override async Task OnDisconnectedAsync(Exception exception)
    {
        Users--;
        await BroadcastNumberOfUsers(Users);
        await base.OnDisconnectedAsync(exception);
    }
}

whose SignalR Hub service is configured as :

public void ConfigureServices(IServiceCollection services)
{
    ...
    services.AddSignalR();
    ...
}

public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
{
    ...

    app.UseSignalR(routes =>
    {
        routes.MapHub<PopcornHub>("popcorn");
    });

    ...
}

In my client-side (WPF app), I have a service :

public class PopcornHubService : IPopcornHubService
{
    private readonly HubConnection _connection;

    public PopcornHubService()
    {
        _connection = new HubConnectionBuilder()
            .WithUrl($"{Utils.Constants.PopcornApi.Replace("/api", "/popcorn")}")
            .Build();
        _connection.On<int>("OnUserConnected", (message) =>
        {

        });
    }

    public async Task Start()
    {
        await _connection.StartAsync();
    }
}

My issue is that, when I call Start() method, I get the exception «Cannot start a connection that is not in the Initial state».
The issue occurs either locally or in Azure.

The SignalR endpoint is fine but no connection can be established. What am I missing?

I have used signalR in my project to provide instant notification for the clients. When I send a message to a client, there is no update in the notification. But if I refresh the client page, the notification is updated and no further refresh is necessary. I checked the console in client page. There was an error:

Error: Cannot start a HubConnection that is not in the 'Disconnected' state.
    at L.j (signalr.js:1)
    at L.start (signalr.js:1)
    at pmPage:513

and after that, the following message has been written in the console:

[2022-01-08T07:37:35.208Z] Information: WebSocket connected to wss://localhost:7166/Home/Messages?id=hPCczwx4iIzIUdirBjH9ig.

Javascript code that I use is as follows (message.js):

    var connection = new signalR.HubConnectionBuilder().withUrl("/Home/Messages").withAutomaticReconnect().build();
connection.start();
if (document.getElementById("sendBtn") != null) {
    document.getElementById("sendBtn").addEventListener("click", function () {
        var costCenter = $("#cost-center").val();
        connection.invoke("SendMessage", costCenter).catch(function (err) {
            return console.error(err);
        });
    });
}

The code for Hub is:

public async Task SendMessage(string costCenter)
    {
        var HttpContext = _httpContext.HttpContext;
        string userId = _userRepository.GetUserIdByCostCenter(costCenter).ToString();
        string sender = HttpContext.Session.GetString("department");
        await Clients.User(userId).SendAsync("ReceiveMessage", sender);
    }

Javascript code for the client page:

<script>
connection.on("ReceiveMessage", function (param) {
    var currentMessageNum = parseInt($('#badge-count').text());
    var messageBadge = @Model.MessagesList.Where(x => x.receiverUserId == userId).Count();
    if($('#badge-count').length){
        $('#badge-count').text(currentMessageNum + 1);
        $('.main-msg-list').prepend('<li><a class="dropdown-item message-item" asp-controller="Messages" asp-action="Index" id="msg-'+ currentMessageNum + 1 +'">New message from '+ param +'</a></li>');
    }else{
        $('#badge-count').text('1');
    }
});
connection.start().catch(function (err) {
return console.log(err);
});
</script>

How can I fix this problem?

Я борюсь с настройкой и использованием signalR с .net core mvc 6. Целью концентратора signalR является отправка сообщений клиентам js после вызова метода в контроллере C# (клиент js — это приложение React, настроенное в MVC как ClientApp).

Я включил отладку как для клиентского экземпляра signalR, так и для asp.net.

Вот журналы из ASP.NET:

      SPA proxy is ready. Redirecting to https://localhost:44440.
dbug: Microsoft.AspNetCore.Http.Connections.Internal.HttpConnectionManager[1]
      New connection ctA6QHwS4fvVGcufYvtlAA created.
dbug: Microsoft.AspNetCore.Http.Connections.Internal.HttpConnectionDispatcher[10]
      Sending negotiation response.
dbug: Microsoft.AspNetCore.Http.Connections.Internal.HttpConnectionDispatcher[4]
      Establishing new connection.
dbug: Microsoft.AspNetCore.SignalR.HubConnectionHandler[5]
      OnConnectedAsync started.
dbug: Microsoft.AspNetCore.SignalR.Internal.DefaultHubProtocolResolver[2]
      Found protocol implementation for requested protocol: json.
dbug: Microsoft.AspNetCore.SignalR.HubConnectionContext[1]
      Completed connection handshake. Using HubProtocol 'json'.
connected!! ctA6QHwS4fvVGcufYvtlAA

И соответствующие им логи с js-клиентом:

Utils.ts:194 [2022-02-03T18:40:17.568Z] Debug: Starting HubConnection.
Utils.ts:194 [2022-02-03T18:40:17.568Z] Debug: Starting connection with transfer format 'Text'.
Utils.ts:194 [2022-02-03T18:40:17.576Z] Debug: Sending negotiation request: https://localhost:44440/hubs/order/negotiate?negotiateVersion=1.
Utils.ts:194 [2022-02-03T18:40:21.741Z] Debug: Skipping transport 'WebSockets' because it was disabled by the client.
Utils.ts:194 [2022-02-03T18:40:21.742Z] Debug: Selecting transport 'ServerSentEvents'.
Utils.ts:194 [2022-02-03T18:40:21.742Z] Trace: (SSE transport) Connecting.
Utils.ts:190 [2022-02-03T18:40:25.857Z] Information: SSE connected to https://localhost:44440/hubs/order?id=fxqgKpJnF5Dq5MX-RCfXcg
Utils.ts:194 [2022-02-03T18:40:25.857Z] Debug: The HttpConnection connected successfully.
Utils.ts:194 [2022-02-03T18:40:25.857Z] Debug: Sending handshake request.
Utils.ts:194 [2022-02-03T18:40:25.858Z] Trace: (SSE transport) sending data. String data of length 32.
Utils.ts:194 [2022-02-03T18:40:29.969Z] Trace: (SSE transport) request complete. Response status: 200.
Utils.ts:190 [2022-02-03T18:40:29.978Z] Information: Using HubProtocol 'json'.
Utils.ts:194 [2022-02-03T18:40:59.997Z] Debug: HttpConnection.stopConnection(undefined) called while in state Disconnecting.
index.js:1 [2022-02-03T18:40:59.997Z] Error: Connection disconnected with error 'Error: Server timeout elapsed without receiving a message from the server.'.
console.<computed> @ index.js:1
Utils.ts:194 [2022-02-03T18:40:59.997Z] Debug: HubConnection.connectionClosed(Error: Server timeout elapsed without receiving a message from the server.) called while in state Connecting.
Utils.ts:194 [2022-02-03T18:40:59.997Z] Debug: Hub handshake failed with error 'Error: Server timeout elapsed without receiving a message from the server.' during start(). Stopping HubConnection.
Utils.ts:194 [2022-02-03T18:40:59.997Z] Debug: Call to HttpConnection.stop(Error: Server timeout elapsed without receiving a message from the server.) ignored because the connection is already in the disconnected state.
Utils.ts:194 [2022-02-03T18:40:59.997Z] Debug: HubConnection failed to start successfully because of error 'Error: Server timeout elapsed without receiving a message from the server.'.

Вот пример кода из клиентского приложения js:

    console.log("hub attached");

    const hubConnection = new HubConnectionBuilder()
      .withUrl(OrderHubUrl, {
        transport: HttpTransportType.ServerSentEvents,
        accessTokenFactory: () => user.accessToken ?? "",
      })
      .withAutomaticReconnect()
      .configureLogging(LogLevel.Trace)
      .build();

    this.dispatcher.state.saveState("hubConnection", hubConnection);

    const startConnection = async () => {
      try {
        await hubConnection.start();
        console.log("connected");
      } catch (e) {
        this.dispatcher.dispatch(Event.ShowModal, {
          actionName: "OK",
          header: "Error",
          content: e,
        });
      }
    };
    hubConnection.on("ReceiveMessage", (user, message) => {
      console.log("message received");
      console.log(user);
      console.log(message);
    });

    hubConnection.onreconnecting((e) => console.log("reconnecting", e));
    hubConnection.onreconnected((e) => console.log("reconnected", e));
    startConnection();
  } 

Как вы можете видеть из журналов вверху, клиент signalR js не может пройти через метод запуска. Вместо этого через некоторое время он выдает сообщение об ошибке.

Ниже мой файл Program.cs, где я настроил signalR (я подозреваю, что здесь что-то не так)

using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.IdentityModel.Tokens;
using OrderMaker.Authentication.Helpers;
using OrderMaker.Authentication.Services;
using OrderMaker.Entities;
using OrderMaker.Modules.Common.Services;
using OrderMaker.Modules.Order.Hubs;
using System.Text;

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.

builder.Services.AddControllersWithViews();
builder.Services.AddSignalR(c =>
{
    c.EnableDetailedErrors = true;
    c.ClientTimeoutInterval = TimeSpan.MaxValue;
    c.KeepAliveInterval = TimeSpan.MaxValue;
});

ConfigureConfiguration(builder.Configuration);
ConfigureServices(builder.Services, builder.Configuration);

var app = builder.Build();

app.UseAuthentication();
// Configure the HTTP request pipeline.
if (!app.Environment.IsDevelopment())
{
    // The default HSTS value is 30 days. You may want to change this for production scenarios, see https://aka.ms/aspnetcore-hsts.
    app.UseHsts();
}
app.UseHttpsRedirection();
app.UseStaticFiles();
app.UseCors(
          x => x
          .AllowAnyMethod()
          .AllowAnyHeader()
          .AllowCredentials());

app.UseRouting();
app.UseAuthorization();

app.MapControllerRoute(
    name: "default",
    pattern: "{controller}/{action=Index}/{id?}");
app.MapFallbackToFile("index.html");

app.MapHub<OrderHub>("/hubs/order");

app.Run();


void ConfigureConfiguration(ConfigurationManager configuration)
{
    using var client = new OrderMakerContext();
    client.Database.EnsureCreated();
}

void ConfigureServices(IServiceCollection services, IConfiguration configuration)
{
    services.AddControllers();
    // configure strongly typed settings objects
    var appSettingsSection = configuration.GetSection("AppSettings");
    services.Configure<AppSettings>(appSettingsSection);
    // configure jwt authentication
    var appSettings = appSettingsSection.Get<AppSettings>();
    var key = Encoding.ASCII.GetBytes(appSettings.Secret);
    services.AddAuthentication(x =>
    {
        x.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme;
        x.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme;
    })
    .AddJwtBearer(x =>
    {
        x.RequireHttpsMetadata = false;
        x.SaveToken = true;
        x.TokenValidationParameters = new TokenValidationParameters
        {
            ValidateIssuerSigningKey = true,
            IssuerSigningKey = new SymmetricSecurityKey(key),
            ValidateIssuer = false,
            ValidateAudience = false
        };
        x.Events = new JwtBearerEvents
        {
            OnMessageReceived = context =>
            {
                context.Token = context.Request.Cookies["order_maker_token"];
                // If the request is for our hub...
                var path = context.HttpContext.Request.Path;
                var accessToken = context.Request.Query["access_token"];
                if (!string.IsNullOrEmpty(accessToken) &&
                    (path.StartsWithSegments("/hubs/")))
                {
                    // Read the token out of the query string
                    context.Token = accessToken;
                }
                return Task.CompletedTask;
            }
        };
    }
    );

    services.AddScoped<IUserService, UserService>();
    services.AddSingleton<ILogService, LogService>();
}


Определение моего хаба:

public class OrderHub : Hub
    {
        public async Task SendNotification(List<OrderModel> message)
        {
            await Clients.All.SendAsync("ReceiveMessage", message);
        }
        public override async Task OnConnectedAsync()
        {
            await base.OnConnectedAsync();
            Console.WriteLine("connected!! " + Context.ConnectionId);
        }
        public override async Task OnDisconnectedAsync(Exception exception)
        {
            await base.OnDisconnectedAsync(exception);
            Console.WriteLine("disconnected!!");
        }
    }

И образец контроллера, откуда я хочу отправить сообщение клиентам:

  private readonly IHubContext<OrderHub> _orderHubContext;
        public OrdersController(IHubContext<OrderHub> orderHubContext)
        {
            _orderHubContext = orderHubContext;
        }

       
        [Route("api/[controller]/")]
        [HttpPost]
        //[Authorize(Roles = $"{Role.Admin},{Role.Manager},{Role.Employee}")]
        public async Task<IActionResult> Post([FromBody] List<OrderModel> model)
        {
               /// some code for creating entities etc
               db.SaveChanges();
               /// notification to all clients that something new was added
               Console.Write(_orderHubContext.Clients.All.ToString());
               await _orderHubContext.Clients.All.SendAsync("ReceiveMessage", "hi there");
                }           
            return Ok(new MessageResponse("Order added succesfully."));
    }

В основном я потерян, я провожу два дня уже выясняя, что может вызвать проблемы, но я просто не могу заставить эту штуку работать. Я был бы очень признателен за любые предложения или помощь. Я пытался отключить брандмауэр, использовать другой браузер и т. д., но безуспешно. Выполняется подключение к концентратору, приложение С# видит это новое подключение, но клиент js просто застревает в методе start() на некоторое время и выдает сообщение об ошибке «Ошибка: время ожидания сервера истекло без получения сообщения от сервера».

Обновление: когда я явно устанавливаю тип транспорта в js clint для концентратора LongPolling, он работает по назначению, но это не идеальное решение.

— Обновление — Все эти проблемы возникают только на локальном компьютере. Я попытался проверить свою удачу и развернуть приложение в рабочей среде с фиксированным транспортом для SSE, и оно работает без каких-либо проблем, а также транспорт WebSocket. Единственная подсказка, которая у меня есть, заключается в том, что на локальном хосте приложение использует пустельгу, а на сервере, когда я размещаю свое приложение, использует IIS.


// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Net;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;
using Microsoft.AspNetCore.Internal;
using Microsoft.AspNetCore.Shared;
using Microsoft.AspNetCore.SignalR.Client.Internal;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
 
namespace Microsoft.AspNetCore.SignalR.Client;
 
/// <summary>
/// A connection used to invoke hub methods on a SignalR Server.
/// </summary>
/// <remarks>
/// A <see cref="HubConnection"/> should be created using <see cref="HubConnectionBuilder"/>.
/// Before hub methods can be invoked the connection must be started using <see cref="StartAsync"/>.
/// Clean up a connection using <see cref="StopAsync"/> or <see cref="DisposeAsync"/>.
/// </remarks>
public partial class HubConnection : IAsyncDisposable
{
    /// <summary>
    /// The default timeout which specifies how long to wait for a message before closing the connection. Default is 30 seconds.
    /// </summary>
    public static readonly TimeSpan DefaultServerTimeout = TimeSpan.FromSeconds(30); // Server ping rate is 15 sec, this is 2 times that.
 
    /// <summary>
    /// The default timeout which specifies how long to wait for the handshake to respond before closing the connection. Default is 15 seconds.
    /// </summary>
    public static readonly TimeSpan DefaultHandshakeTimeout = TimeSpan.FromSeconds(15);
 
    /// <summary>
    /// The default interval that the client will send keep alive messages to let the server know to not close the connection. Default is 15 second interval.
    /// </summary>
    public static readonly TimeSpan DefaultKeepAliveInterval = TimeSpan.FromSeconds(15);
 
    // The receive loop has a single reader and single writer at a time so optimize the channel for that
    private static readonly UnboundedChannelOptions _receiveLoopOptions = new UnboundedChannelOptions
    {
        SingleReader = true,
        SingleWriter = true
    };
 
    private static readonly MethodInfo _sendStreamItemsMethod = typeof(HubConnection).GetMethods(BindingFlags.NonPublic | BindingFlags.Instance).Single(m => m.Name.Equals(nameof(SendStreamItems)));
    private static readonly MethodInfo _sendIAsyncStreamItemsMethod = typeof(HubConnection).GetMethods(BindingFlags.NonPublic | BindingFlags.Instance).Single(m => m.Name.Equals(nameof(SendIAsyncEnumerableStreamItems)));
 
    // Persistent across all connections
    private readonly ILoggerFactory _loggerFactory;
    private readonly ILogger _logger;
    private readonly ConnectionLogScope _logScope;
    private readonly IHubProtocol _protocol;
    private readonly IServiceProvider _serviceProvider;
    private readonly IConnectionFactory _connectionFactory;
    private readonly IRetryPolicy? _reconnectPolicy;
    private readonly EndPoint _endPoint;
    private readonly ConcurrentDictionary<string, InvocationHandlerList> _handlers = new ConcurrentDictionary<string, InvocationHandlerList>(StringComparer.Ordinal);
 
    // Holds all mutable state other than user-defined handlers and settable properties.
    private readonly ReconnectingConnectionState _state;
 
    private bool _disposed;
 
    /// <summary>
    /// Occurs when the connection is closed. The connection could be closed due to an error or due to either the server or client intentionally
    /// closing the connection without error.
    /// </summary>
    /// <remarks>
    /// If this event was triggered from a connection error, the <see cref="Exception"/> that occurred will be passed in as the
    /// sole argument to this handler. If this event was triggered intentionally by either the client or server, then
    /// the argument will be <see langword="null"/>.
    /// </remarks>
    /// <example>
    /// The following example attaches a handler to the <see cref="Closed"/> event, and checks the provided argument to determine
    /// if there was an error:
    ///
    /// <code>
    /// connection.Closed += (exception) =>
    /// {
    ///     if (exception == null)
    ///     {
    ///         Console.WriteLine("Connection closed without error.");
    ///     }
    ///     else
    ///     {
    ///         Console.WriteLine($"Connection closed due to an error: {exception}");
    ///     }
    /// };
    /// </code>
    /// </example>
    public event Func<Exception?, Task>? Closed;
 
    /// <summary>
    /// Occurs when the <see cref="HubConnection"/> starts reconnecting after losing its underlying connection.
    /// </summary>
    /// <remarks>
    /// The <see cref="Exception"/> that occurred will be passed in as the sole argument to this handler.
    /// </remarks>
    /// <example>
    /// The following example attaches a handler to the <see cref="Reconnecting"/> event, and checks the provided argument to log the error.
    ///
    /// <code>
    /// connection.Reconnecting += (exception) =>
    /// {
    ///     Console.WriteLine($"Connection started reconnecting due to an error: {exception}");
    /// };
    /// </code>
    /// </example>
    public event Func<Exception?, Task>? Reconnecting;
 
    /// <summary>
    /// Occurs when the <see cref="HubConnection"/> successfully reconnects after losing its underlying connection.
    /// </summary>
    /// <remarks>
    /// The <see cref="string"/> parameter will be the <see cref="HubConnection"/>'s new ConnectionId or null if negotiation was skipped.
    /// </remarks>
    /// <example>
    /// The following example attaches a handler to the <see cref="Reconnected"/> event, and checks the provided argument to log the ConnectionId.
    ///
    /// <code>
    /// connection.Reconnected += (connectionId) =>
    /// {
    ///     Console.WriteLine($"Connection successfully reconnected. The ConnectionId is now: {connectionId}");
    /// };
    /// </code>
    /// </example>
    public event Func<string?, Task>? Reconnected;
 
    // internal for testing purposes
    internal TimeSpan TickRate { get; set; } = TimeSpan.FromSeconds(1);
 
    /// <summary>
    /// Gets or sets the server timeout interval for the connection.
    /// </summary>
    /// <remarks>
    /// The client times out if it hasn't heard from the server for `this` long.
    /// </remarks>
    public TimeSpan ServerTimeout { get; set; }
 
    /// <summary>
    /// Gets or sets the interval at which the client sends ping messages.
    /// </summary>
    /// <remarks>
    /// Sending any message resets the timer to the start of the interval.
    /// </remarks>
    public TimeSpan KeepAliveInterval { get; set; }
 
    /// <summary>
    /// Gets or sets the timeout for the initial handshake.
    /// </summary>
    public TimeSpan HandshakeTimeout { get; set; } = DefaultHandshakeTimeout;
 
    /// <summary>
    /// Gets the connection's current Id. This value will be cleared when the connection is stopped and will have a new value every time the connection is (re)established.
    /// This value will be null if the negotiation step is skipped via HttpConnectionOptions or if the WebSockets transport is explicitly specified because the
    /// client skips negotiation in that case as well.
    /// </summary>
    public string? ConnectionId => _state.CurrentConnectionStateUnsynchronized?.Connection.ConnectionId;
 
    /// <summary>
    /// Indicates the state of the <see cref="HubConnection"/> to the server.
    /// </summary>
    public HubConnectionState State => _state.OverallState;
 
    /// <summary>
    /// Initializes a new instance of the <see cref="HubConnection"/> class.
    /// </summary>
    /// <param name="connectionFactory">The <see cref="IConnectionFactory" /> used to create a connection each time <see cref="StartAsync" /> is called.</param>
    /// <param name="protocol">The <see cref="IHubProtocol" /> used by the connection.</param>
    /// <param name="endPoint">The <see cref="EndPoint"/> to connect to.</param>
    /// <param name="serviceProvider">An <see cref="IServiceProvider"/> containing the services provided to this <see cref="HubConnection"/> instance.</param>
    /// <param name="loggerFactory">The logger factory.</param>
    /// <param name="reconnectPolicy">
    /// The <see cref="IRetryPolicy"/> that controls the timing and number of reconnect attempts.
    /// The <see cref="HubConnection"/> will not reconnect if the <paramref name="reconnectPolicy"/> is null.
    /// </param>
    /// <remarks>
    /// The <see cref="IServiceProvider"/> used to initialize the connection will be disposed when the connection is disposed.
    /// </remarks>
    public HubConnection(IConnectionFactory connectionFactory, IHubProtocol protocol, EndPoint endPoint, IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IRetryPolicy reconnectPolicy)
        : this(connectionFactory, protocol, endPoint, serviceProvider, loggerFactory)
    {
        _reconnectPolicy = reconnectPolicy;
    }
 
    /// <summary>
    /// Initializes a new instance of the <see cref="HubConnection"/> class.
    /// </summary>
    /// <param name="connectionFactory">The <see cref="IConnectionFactory" /> used to create a connection each time <see cref="StartAsync" /> is called.</param>
    /// <param name="protocol">The <see cref="IHubProtocol" /> used by the connection.</param>
    /// <param name="endPoint">The <see cref="EndPoint"/> to connect to.</param>
    /// <param name="serviceProvider">An <see cref="IServiceProvider"/> containing the services provided to this <see cref="HubConnection"/> instance.</param>
    /// <param name="loggerFactory">The logger factory.</param>
    /// <remarks>
    /// The <see cref="IServiceProvider"/> used to initialize the connection will be disposed when the connection is disposed.
    /// </remarks>
    public HubConnection(IConnectionFactory connectionFactory,
                         IHubProtocol protocol,
                         EndPoint endPoint,
                         IServiceProvider serviceProvider,
                         ILoggerFactory loggerFactory)
    {
        _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
        _protocol = protocol ?? throw new ArgumentNullException(nameof(protocol));
        _endPoint = endPoint ?? throw new ArgumentNullException(nameof(endPoint));
        _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
 
        _loggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
        _logger = _loggerFactory.CreateLogger<HubConnection>();
        _state = new ReconnectingConnectionState(_logger);
 
        _logScope = new ConnectionLogScope();
 
        var options = serviceProvider.GetService<IOptions<HubConnectionOptions>>();
 
        ServerTimeout = options?.Value.ServerTimeout ?? DefaultServerTimeout;
 
        KeepAliveInterval = options?.Value.KeepAliveInterval ?? DefaultKeepAliveInterval;
    }
 
    /// <summary>
    /// Starts a connection to the server.
    /// </summary>
    /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.</param>
    /// <returns>A <see cref="Task"/> that represents the asynchronous start.</returns>
    public virtual async Task StartAsync(CancellationToken cancellationToken = default)
    {
        CheckDisposed();
        using (_logger.BeginScope(_logScope))
        {
            await StartAsyncInner(cancellationToken).ConfigureAwait(false);
        }
    }
 
    private async Task StartAsyncInner(CancellationToken cancellationToken = default)
    {
        await _state.WaitConnectionLockAsync(token: cancellationToken).ConfigureAwait(false);
        try
        {
            if (!_state.TryChangeState(HubConnectionState.Disconnected, HubConnectionState.Connecting))
            {
                throw new InvalidOperationException($"The {nameof(HubConnection)} cannot be started if it is not in the {nameof(HubConnectionState.Disconnected)} state.");
            }
 
            // The StopCts is canceled at the start of StopAsync should be reset every time the connection finishes stopping.
            // If this token is currently canceled, it means that StartAsync was called while StopAsync was still running.
            if (_state.StopCts.Token.IsCancellationRequested)
            {
                throw new InvalidOperationException($"The {nameof(HubConnection)} cannot be started while {nameof(StopAsync)} is running.");
            }
 
            using (CancellationTokenUtils.CreateLinkedToken(cancellationToken, _state.StopCts.Token, out var linkedToken))
            {
                await StartAsyncCore(linkedToken).ConfigureAwait(false);
            }
 
            _state.ChangeState(HubConnectionState.Connecting, HubConnectionState.Connected);
        }
        catch
        {
            if (_state.TryChangeState(HubConnectionState.Connecting, HubConnectionState.Disconnected))
            {
                _state.StopCts = new CancellationTokenSource();
            }
 
            throw;
        }
        finally
        {
            _state.ReleaseConnectionLock();
        }
    }
 
    /// <summary>
    /// Stops a connection to the server.
    /// </summary>
    /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.</param>
    /// <returns>A <see cref="Task"/> that represents the asynchronous stop.</returns>
    public virtual async Task StopAsync(CancellationToken cancellationToken = default)
    {
        CheckDisposed();
        using (_logger.BeginScope(_logScope))
        {
            await StopAsyncCore(disposing: false).ConfigureAwait(false);
        }
    }
 
    // Current plan for IAsyncDisposable is that DisposeAsync will NOT take a CancellationToken
    // https://github.com/dotnet/csharplang/blob/195efa07806284d7b57550e7447dc8bd39c156bf/proposals/async-streams.md#iasyncdisposable
    /// <summary>
    /// Disposes the <see cref="HubConnection"/>.
    /// </summary>
    /// <returns>A <see cref="ValueTask"/> that represents the asynchronous dispose.</returns>
    public virtual async ValueTask DisposeAsync()
    {
        if (!_disposed)
        {
            using (_logger.BeginScope(_logScope))
            {
                await StopAsyncCore(disposing: true).ConfigureAwait(false);
            }
        }
    }
 
    /// <summary>
    /// Registers a handler that will be invoked when the hub method with the specified method name is invoked.
    /// Returns value returned by handler to server if the server requests a result.
    /// </summary>
    /// <param name="methodName">The name of the hub method to define.</param>
    /// <param name="parameterTypes">The parameters types expected by the hub method.</param>
    /// <param name="handler">The handler that will be raised when the hub method is invoked.</param>
    /// <param name="state">A state object that will be passed to the handler.</param>
    /// <returns>A subscription that can be disposed to unsubscribe from the hub method.</returns>
    /// <remarks>
    /// This is a low level method for registering a handler. Using an <see cref="HubConnectionExtensions"/> <c>On</c> extension method is recommended.
    /// </remarks>
    public virtual IDisposable On(string methodName, Type[] parameterTypes, Func<object?[], object, Task<object?>> handler, object state)
    {
        Log.RegisteringHandler(_logger, methodName);
 
        CheckDisposed();
 
        // It's OK to be disposed while registering a callback, we'll just never call the callback anyway (as with all the callbacks registered before disposal).
        var invocationHandler = new InvocationHandler(parameterTypes, handler, state);
        var invocationList = _handlers.AddOrUpdate(methodName, _ => new InvocationHandlerList(invocationHandler),
            (_, invocations) =>
            {
                lock (invocations)
                {
                    invocations.Add(methodName, invocationHandler);
                }
                return invocations;
            });
 
        return new Subscription(invocationHandler, invocationList);
    }
 
    // If the registered callback blocks it can cause the client to stop receiving messages. If you need to block, get off the current thread first.
    /// <summary>
    /// Registers a handler that will be invoked when the hub method with the specified method name is invoked.
    /// </summary>
    /// <param name="methodName">The name of the hub method to define.</param>
    /// <param name="parameterTypes">The parameters types expected by the hub method.</param>
    /// <param name="handler">The handler that will be raised when the hub method is invoked.</param>
    /// <param name="state">A state object that will be passed to the handler.</param>
    /// <returns>A subscription that can be disposed to unsubscribe from the hub method.</returns>
    /// <remarks>
    /// This is a low level method for registering a handler. Using an <see cref="HubConnectionExtensions"/> <c>On</c> extension method is recommended.
    /// </remarks>
    public virtual IDisposable On(string methodName, Type[] parameterTypes, Func<object?[], object, Task> handler, object state)
    {
        Log.RegisteringHandler(_logger, methodName);
 
        CheckDisposed();
 
        // It's OK to be disposed while registering a callback, we'll just never call the callback anyway (as with all the callbacks registered before disposal).
        var invocationHandler = new InvocationHandler(parameterTypes, handler, state);
        var invocationList = _handlers.AddOrUpdate(methodName, _ => new InvocationHandlerList(invocationHandler),
            (_, invocations) =>
            {
                lock (invocations)
                {
                    invocations.Add(methodName, invocationHandler);
                }
                return invocations;
            });
 
        return new Subscription(invocationHandler, invocationList);
    }
 
    /// <summary>
    /// Removes all handlers associated with the method with the specified method name.
    /// </summary>
    /// <param name="methodName">The name of the hub method from which handlers are being removed</param>
    public virtual void Remove(string methodName)
    {
        CheckDisposed();
        Log.RemovingHandlers(_logger, methodName);
        _handlers.TryRemove(methodName, out _);
    }
 
    /// <summary>
    /// Invokes a streaming hub method on the server using the specified method name, return type and arguments.
    /// </summary>
    /// <param name="methodName">The name of the server method to invoke.</param>
    /// <param name="returnType">The return type of the server method.</param>
    /// <param name="args">The arguments used to invoke the server method.</param>
    /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.</param>
    /// <returns>
    /// A <see cref="Task{TResult}"/> that represents the asynchronous invoke.
    /// The <see cref="Task{TResult}.Result"/> property returns a <see cref="ChannelReader{T}"/> for the streamed hub method values.
    /// </returns>
    /// <remarks>
    /// This is a low level method for invoking a streaming hub method on the server. Using an <see cref="HubConnectionExtensions"/> <c>StreamAsChannelAsync</c> extension method is recommended.
    /// </remarks>
    public virtual async Task<ChannelReader<object?>> StreamAsChannelCoreAsync(string methodName, Type returnType, object?[] args, CancellationToken cancellationToken = default)
    {
        using (_logger.BeginScope(_logScope))
        {
            return await StreamAsChannelCoreAsyncCore(methodName, returnType, args, cancellationToken).ConfigureAwait(false);
        }
    }
 
    /// <summary>
    /// Invokes a hub method on the server using the specified method name, return type and arguments.
    /// </summary>
    /// <param name="methodName">The name of the server method to invoke.</param>
    /// <param name="returnType">The return type of the server method.</param>
    /// <param name="args">The arguments used to invoke the server method.</param>
    /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.</param>
    /// <returns>
    /// A <see cref="Task{TResult}"/> that represents the asynchronous invoke.
    /// The <see cref="Task{TResult}.Result"/> property returns an <see cref="object"/> for the hub method return value.
    /// </returns>
    /// <remarks>
    /// This is a low level method for invoking a hub method on the server. Using an <see cref="HubConnectionExtensions"/> <c>InvokeAsync</c> extension method is recommended.
    /// </remarks>
    public virtual async Task<object?> InvokeCoreAsync(string methodName, Type returnType, object?[] args, CancellationToken cancellationToken = default)
    {
        using (_logger.BeginScope(_logScope))
        {
            return await InvokeCoreAsyncCore(methodName, returnType, args, cancellationToken).ConfigureAwait(false);
        }
    }
 
    /// <summary>
    /// Invokes a hub method on the server using the specified method name and arguments.
    /// Does not wait for a response from the receiver.
    /// </summary>
    /// <param name="methodName">The name of the server method to invoke.</param>
    /// <param name="args">The arguments used to invoke the server method.</param>
    /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.</param>
    /// <returns>A <see cref="Task"/> that represents the asynchronous invoke.</returns>
    /// <remarks>
    /// This is a low level method for invoking a hub method on the server. Using an <see cref="HubConnectionExtensions"/> <c>SendAsync</c> extension method is recommended.
    /// </remarks>
    public virtual async Task SendCoreAsync(string methodName, object?[] args, CancellationToken cancellationToken = default)
    {
        using (_logger.BeginScope(_logScope))
        {
            await SendCoreAsyncCore(methodName, args, cancellationToken).ConfigureAwait(false);
        }
    }
 
    private async Task StartAsyncCore(CancellationToken cancellationToken)
    {
        _state.AssertInConnectionLock();
        SafeAssert(_state.CurrentConnectionStateUnsynchronized == null, "We already have a connection!");
 
        cancellationToken.ThrowIfCancellationRequested();
 
        CheckDisposed();
 
        Log.Starting(_logger);
 
        // Start the connection
        var connection = await _connectionFactory.ConnectAsync(_endPoint, cancellationToken).ConfigureAwait(false);
        var startingConnectionState = new ConnectionState(connection, this);
 
        // From here on, if an error occurs we need to shut down the connection because
        // we still own it.
        try
        {
            Log.HubProtocol(_logger, _protocol.Name, _protocol.Version);
            await HandshakeAsync(startingConnectionState, cancellationToken).ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            Log.ErrorStartingConnection(_logger, ex);
 
            // Can't have any invocations to cancel, we're in the lock.
            await CloseAsync(startingConnectionState.Connection).ConfigureAwait(false);
            throw;
        }
 
        // Set this at the end to avoid setting internal state until the connection is real
        _state.CurrentConnectionStateUnsynchronized = startingConnectionState;
        // Tell the server we intend to ping.
        // Old clients never ping, and shouldn't be timed out, so ping to tell the server that we should be timed out if we stop.
        // StartAsyncCore is invoked and awaited by StartAsyncInner and ReconnectAsync with the connection lock still acquired.
        if (!(connection.Features.Get<IConnectionInherentKeepAliveFeature>()?.HasInherentKeepAlive ?? false))
        {
            await SendHubMessage(startingConnectionState, PingMessage.Instance, cancellationToken).ConfigureAwait(false);
        }
        startingConnectionState.ReceiveTask = ReceiveLoop(startingConnectionState);
 
        Log.Started(_logger);
    }
 
    private static ValueTask CloseAsync(ConnectionContext connection)
    {
        return connection.DisposeAsync();
    }
 
    // This method does both Dispose and Start, the 'disposing' flag indicates which.
    // The behaviors are nearly identical, except that the _disposed flag is set in the lock
    // if we're disposing.
    private async Task StopAsyncCore(bool disposing)
    {
        // StartAsync acquires the connection lock for the duration of the handshake.
        // ReconnectAsync also acquires the connection lock for reconnect attempts and handshakes.
        // Cancel the StopCts without acquiring the lock so we can short-circuit it.
        _state.StopCts.Cancel();
 
        // Potentially wait for StartAsync to finish, and block a new StartAsync from
        // starting until we've finished stopping.
        await _state.WaitConnectionLockAsync(token: default).ConfigureAwait(false);
 
        // Ensure that ReconnectingState.ReconnectTask is not accessed outside of the lock.
        var reconnectTask = _state.ReconnectTask;
 
        if (reconnectTask.Status != TaskStatus.RanToCompletion)
        {
            // Let the current reconnect attempts finish if necessary without the lock.
            // Otherwise, ReconnectAsync will stall forever acquiring the lock.
            // It should never throw, even if the reconnect attempts fail.
            // The StopCts should prevent the HubConnection from restarting until it is reset.
            _state.ReleaseConnectionLock();
            await reconnectTask.ConfigureAwait(false);
            await _state.WaitConnectionLockAsync(token: default).ConfigureAwait(false);
        }
 
        ConnectionState? connectionState;
 
        try
        {
            if (disposing && _disposed)
            {
                // DisposeAsync should be idempotent.
                return;
            }
 
            CheckDisposed();
            connectionState = _state.CurrentConnectionStateUnsynchronized;
 
            // Set the stopping flag so that any invocations after this get a useful error message instead of
            // silently failing or throwing an error about the pipe being completed.
            if (connectionState != null)
            {
                connectionState.Stopping = true;
            }
            else
            {
                // Reset StopCts if there isn't an active connection so that the next StartAsync wont immediately fail due to the token being canceled
                _state.StopCts = new CancellationTokenSource();
            }
 
            if (disposing)
            {
                // Must set this before calling DisposeAsync because the service provider has a reference to the HubConnection and will try to dispose it again
                _disposed = true;
                if (_serviceProvider is IAsyncDisposable asyncDispose)
                {
                    await asyncDispose.DisposeAsync().ConfigureAwait(false);
                }
                else
                {
                    (_serviceProvider as IDisposable)?.Dispose();
                }
            }
        }
        finally
        {
            _state.ReleaseConnectionLock();
        }
 
        // Now stop the connection we captured
        if (connectionState != null)
        {
            await connectionState.StopAsync().ConfigureAwait(false);
        }
    }
 
    /// <summary>
    /// Invokes a streaming hub method on the server using the specified method name, return type and arguments.
    /// </summary>
    /// <typeparam name="TResult">The return type of the streaming server method.</typeparam>
    /// <param name="methodName">The name of the server method to invoke.</param>
    /// <param name="args">The arguments used to invoke the server method.</param>
    /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.</param>
    /// <returns>
    /// A <see cref="IAsyncEnumerable{TResult}"/> that represents the stream.
    /// </returns>
    public virtual IAsyncEnumerable<TResult> StreamAsyncCore<TResult>(string methodName, object?[] args, CancellationToken cancellationToken = default)
    {
        var cts = cancellationToken.CanBeCanceled ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken) : new CancellationTokenSource();
        var stream = CastIAsyncEnumerable<TResult>(methodName, args, cts);
        var cancelableStream = AsyncEnumerableAdapters.MakeCancelableTypedAsyncEnumerable(stream, cts);
        return cancelableStream;
    }
 
    private async IAsyncEnumerable<T> CastIAsyncEnumerable<T>(string methodName, object?[] args, CancellationTokenSource cts)
    {
        var reader = await StreamAsChannelCoreAsync(methodName, typeof(T), args, cts.Token).ConfigureAwait(false);
 
        try
        {
            while (await reader.WaitToReadAsync(cts.Token).ConfigureAwait(false))
            {
                while (reader.TryRead(out var item))
                {
                    yield return (T)item!;
                }
            }
        }
        finally
        {
            // Needed to avoid UnobservedTaskExceptions
            _ = reader.Completion.Exception;
        }
    }
 
    private async Task<ChannelReader<object?>> StreamAsChannelCoreAsyncCore(string methodName, Type returnType, object?[] args, CancellationToken cancellationToken)
    {
        async Task OnStreamCanceled(InvocationRequest irq)
        {
            // We need to take the connection lock in order to ensure we a) have a connection and b) are the only one accessing the write end of the pipe.
            await _state.WaitConnectionLockAsync(token: default).ConfigureAwait(false);
            try
            {
                if (_state.CurrentConnectionStateUnsynchronized != null)
                {
                    Log.SendingCancellation(_logger, irq.InvocationId);
 
                    // Fire and forget, if it fails that means we aren't connected anymore.
                    _ = SendHubMessage(_state.CurrentConnectionStateUnsynchronized, new CancelInvocationMessage(irq.InvocationId), irq.CancellationToken);
                }
                else
                {
                    Log.UnableToSendCancellation(_logger, irq.InvocationId);
                }
            }
            finally
            {
                _state.ReleaseConnectionLock();
            }
 
            // Cancel the invocation
            irq.Dispose();
        }
 
        var readers = default(Dictionary<string, object>);
 
        CheckDisposed();
        var connectionState = await _state.WaitForActiveConnectionAsync(nameof(StreamAsChannelCoreAsync), token: cancellationToken).ConfigureAwait(false);
 
        ChannelReader<object?> channel;
        try
        {
            CheckDisposed();
            cancellationToken.ThrowIfCancellationRequested();
 
            readers = PackageStreamingParams(connectionState, ref args, out var streamIds);
 
            // I just want an excuse to use 'irq' as a variable name...
            var irq = InvocationRequest.Stream(cancellationToken, returnType, connectionState.GetNextId(), _loggerFactory, this, out channel);
            await InvokeStreamCore(connectionState, methodName, irq, args, streamIds?.ToArray(), cancellationToken).ConfigureAwait(false);
 
            if (cancellationToken.CanBeCanceled)
            {
                cancellationToken.Register(state => _ = OnStreamCanceled((InvocationRequest)state!), irq);
            }
 
            LaunchStreams(connectionState, readers, cancellationToken);
        }
        finally
        {
            _state.ReleaseConnectionLock();
        }
 
        return channel;
    }
 
    private Dictionary<string, object>? PackageStreamingParams(ConnectionState connectionState, ref object?[] args, out List<string>? streamIds)
    {
        Dictionary<string, object>? readers = null;
        streamIds = null;
        var newArgsCount = args.Length;
        const int MaxStackSize = 256;
        Span<bool> isStreaming = args.Length <= MaxStackSize
            ? stackalloc bool[MaxStackSize].Slice(0, args.Length)
            : new bool[args.Length];
        for (var i = 0; i < args.Length; i++)
        {
            var arg = args[i];
            if (arg is not null && ReflectionHelper.IsStreamingType(arg.GetType()))
            {
                isStreaming[i] = true;
                newArgsCount--;
 
                if (readers is null)
                {
                    readers = new Dictionary<string, object>();
                }
                if (streamIds is null)
                {
                    streamIds = new List<string>();
                }
 
                var id = connectionState.GetNextId();
                readers[id] = arg;
                streamIds.Add(id);
 
                Log.StartingStream(_logger, id);
            }
        }
 
        if (newArgsCount == args.Length)
        {
            return null;
        }
 
        var newArgs = newArgsCount > 0
            ? new object?[newArgsCount]
            : Array.Empty<object?>();
        int newArgsIndex = 0;
 
        for (var i = 0; i < args.Length; i++)
        {
            if (!isStreaming[i])
            {
                newArgs[newArgsIndex] = args[i];
                newArgsIndex++;
            }
        }
 
        args = newArgs;
        return readers;
    }
 
    private void LaunchStreams(ConnectionState connectionState, Dictionary<string, object>? readers, CancellationToken cancellationToken)
    {
        if (readers == null)
        {
            // if there were no streaming parameters then readers is never initialized
            return;
        }
 
        _state.AssertInConnectionLock();
        // It's safe to access connectionState.UploadStreamToken as we still have the connection lock
        var cts = CancellationTokenSource.CreateLinkedTokenSource(connectionState.UploadStreamToken, cancellationToken);
 
        foreach (var kvp in readers)
        {
            var reader = kvp.Value;
 
            // For each stream that needs to be sent, run a "send items" task in the background.
            // This reads from the channel, attaches streamId, and sends to server.
            // A single background thread here quickly gets messy.
            if (ReflectionHelper.IsIAsyncEnumerable(reader.GetType()))
            {
                _ = _sendIAsyncStreamItemsMethod
                    .MakeGenericMethod(reader.GetType().GetInterface("IAsyncEnumerable`1")!.GetGenericArguments())
                    .Invoke(this, new object[] { connectionState, kvp.Key.ToString(), reader, cts });
                continue;
            }
            _ = _sendStreamItemsMethod
                .MakeGenericMethod(reader.GetType().GetGenericArguments())
                .Invoke(this, new object[] { connectionState, kvp.Key.ToString(), reader, cts });
        }
    }
 
    // this is called via reflection using the `_sendStreamItems` field
    private Task SendStreamItems<T>(ConnectionState connectionState, string streamId, ChannelReader<T> reader, CancellationTokenSource tokenSource)
    {
        async Task ReadChannelStream()
        {
            while (await reader.WaitToReadAsync(tokenSource.Token).ConfigureAwait(false))
            {
                while (!tokenSource.Token.IsCancellationRequested && reader.TryRead(out var item))
                {
                    await SendWithLock(connectionState, new StreamItemMessage(streamId, item), tokenSource.Token).ConfigureAwait(false);
                    Log.SendingStreamItem(_logger, streamId);
                }
            }
        }
 
        return CommonStreaming(connectionState, streamId, ReadChannelStream);
    }
 
    // this is called via reflection using the `_sendIAsyncStreamItemsMethod` field
    private Task SendIAsyncEnumerableStreamItems<T>(ConnectionState connectionState, string streamId, IAsyncEnumerable<T> stream, CancellationTokenSource tokenSource)
    {
        async Task ReadAsyncEnumerableStream()
        {
            var streamValues = AsyncEnumerableAdapters.MakeCancelableTypedAsyncEnumerable(stream, tokenSource);
 
            await foreach (var streamValue in streamValues)
            {
                await SendWithLock(connectionState, new StreamItemMessage(streamId, streamValue), tokenSource.Token).ConfigureAwait(false);
                Log.SendingStreamItem(_logger, streamId);
            }
        }
 
        return CommonStreaming(connectionState, streamId, ReadAsyncEnumerableStream);
    }
 
    private async Task CommonStreaming(ConnectionState connectionState, string streamId, Func<Task> createAndConsumeStream)
    {
        Log.StartingStream(_logger, streamId);
        string? responseError = null;
        try
        {
            await createAndConsumeStream().ConfigureAwait(false);
        }
        catch (OperationCanceledException)
        {
            Log.CancelingStream(_logger, streamId);
            responseError = "Stream canceled by client.";
        }
        catch (Exception ex)
        {
            Log.ErroredStream(_logger, streamId, ex);
            responseError = $"Stream errored by client: '{ex}'";
        }
 
        // Don't use cancellation token here
        // this is triggered by a cancellation token to tell the server that the client is done streaming
        await _state.WaitConnectionLockAsync(token: default).ConfigureAwait(false);
        try
        {
            // Avoid sending when the connection isn't active, likely happens if there is an active stream when the connection closes
            if (_state.IsConnectionActive())
            {
                Log.CompletingStream(_logger, streamId);
                await SendHubMessage(connectionState, CompletionMessage.WithError(streamId, responseError), cancellationToken: default).ConfigureAwait(false);
            }
            else
            {
                Log.CompletingStreamNotSent(_logger, streamId);
            }
        }
        finally
        {
            _state.ReleaseConnectionLock();
        }
    }
 
    private async Task<object?> InvokeCoreAsyncCore(string methodName, Type returnType, object?[] args, CancellationToken cancellationToken)
    {
        var readers = default(Dictionary<string, object>);
 
        CheckDisposed();
        var connectionState = await _state.WaitForActiveConnectionAsync(nameof(InvokeCoreAsync), token: cancellationToken).ConfigureAwait(false);
 
        Task<object?> invocationTask;
        try
        {
            CheckDisposed();
 
            readers = PackageStreamingParams(connectionState, ref args, out var streamIds);
 
            var irq = InvocationRequest.Invoke(cancellationToken, returnType, connectionState.GetNextId(), _loggerFactory, this, out invocationTask);
            await InvokeCore(connectionState, methodName, irq, args, streamIds?.ToArray(), cancellationToken).ConfigureAwait(false);
 
            LaunchStreams(connectionState, readers, cancellationToken);
        }
        finally
        {
            _state.ReleaseConnectionLock();
        }
 
        // Wait for this outside the lock, because it won't complete until the server responds
        return await invocationTask.ConfigureAwait(false);
    }
 
    private async Task InvokeCore(ConnectionState connectionState, string methodName, InvocationRequest irq, object?[] args, string[]? streams, CancellationToken cancellationToken)
    {
        Log.PreparingBlockingInvocation(_logger, irq.InvocationId, methodName, irq.ResultType.FullName!, args.Length);
 
        // Client invocations are always blocking
        var invocationMessage = new InvocationMessage(irq.InvocationId, methodName, args, streams);
 
        Log.RegisteringInvocation(_logger, irq.InvocationId);
        connectionState.AddInvocation(irq);
 
        // Trace the full invocation
        Log.IssuingInvocation(_logger, irq.InvocationId, irq.ResultType.FullName!, methodName, args);
 
        try
        {
            await SendHubMessage(connectionState, invocationMessage, cancellationToken).ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            Log.FailedToSendInvocation(_logger, irq.InvocationId, ex);
            connectionState.TryRemoveInvocation(irq.InvocationId, out _);
            irq.Fail(ex);
        }
    }
 
    private async Task InvokeStreamCore(ConnectionState connectionState, string methodName, InvocationRequest irq, object?[] args, string[]? streams, CancellationToken cancellationToken)
    {
        _state.AssertConnectionValid();
 
        Log.PreparingStreamingInvocation(_logger, irq.InvocationId, methodName, irq.ResultType.FullName!, args.Length);
 
        var invocationMessage = new StreamInvocationMessage(irq.InvocationId, methodName, args, streams);
 
        Log.RegisteringInvocation(_logger, irq.InvocationId);
 
        connectionState.AddInvocation(irq);
 
        // Trace the full invocation
        Log.IssuingInvocation(_logger, irq.InvocationId, irq.ResultType.FullName!, methodName, args);
 
        try
        {
            await SendHubMessage(connectionState, invocationMessage, cancellationToken).ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            Log.FailedToSendInvocation(_logger, irq.InvocationId, ex);
            connectionState.TryRemoveInvocation(irq.InvocationId, out _);
            irq.Fail(ex);
        }
    }
 
    private async Task SendHubMessage(ConnectionState connectionState, HubMessage hubMessage, CancellationToken cancellationToken = default)
    {
        _state.AssertConnectionValid();
        _protocol.WriteMessage(hubMessage, connectionState.Connection.Transport.Output);
 
        Log.SendingMessage(_logger, hubMessage);
 
        await connectionState.Connection.Transport.Output.FlushAsync(cancellationToken).ConfigureAwait(false);
        Log.MessageSent(_logger, hubMessage);
 
        // We've sent a message, so don't ping for a while
        connectionState.ResetSendPing();
    }
 
    private async Task SendCoreAsyncCore(string methodName, object?[] args, CancellationToken cancellationToken)
    {
        var readers = default(Dictionary<string, object>);
 
        CheckDisposed();
        var connectionState = await _state.WaitForActiveConnectionAsync(nameof(SendCoreAsync), token: cancellationToken).ConfigureAwait(false);
        try
        {
            CheckDisposed();
 
            readers = PackageStreamingParams(connectionState, ref args, out var streamIds);
 
            Log.PreparingNonBlockingInvocation(_logger, methodName, args.Length);
            var invocationMessage = new InvocationMessage(null, methodName, args, streamIds?.ToArray());
            await SendHubMessage(connectionState, invocationMessage, cancellationToken).ConfigureAwait(false);
 
            LaunchStreams(connectionState, readers, cancellationToken);
        }
        finally
        {
            _state.ReleaseConnectionLock();
        }
    }
 
    private async Task SendWithLock(ConnectionState expectedConnectionState, HubMessage message, CancellationToken cancellationToken, [CallerMemberName] string callerName = "")
    {
        CheckDisposed();
        var connectionState = await _state.WaitForActiveConnectionAsync(callerName, token: cancellationToken).ConfigureAwait(false);
        try
        {
            CheckDisposed();
 
            SafeAssert(ReferenceEquals(expectedConnectionState, connectionState), "The connection state changed unexpectedly!");
 
            await SendHubMessage(connectionState, message, cancellationToken).ConfigureAwait(false);
        }
        finally
        {
            _state.ReleaseConnectionLock();
        }
    }
 
    private async Task<CloseMessage?> ProcessMessagesAsync(HubMessage message, ConnectionState connectionState, ChannelWriter<InvocationMessage> invocationMessageWriter)
    {
        Log.ResettingKeepAliveTimer(_logger);
        connectionState.ResetTimeout();
 
        InvocationRequest? irq;
        switch (message)
        {
            case InvocationBindingFailureMessage bindingFailure:
                // The server can't receive a response, so we just drop the message and log
                // REVIEW: Is this the right approach?
                Log.ArgumentBindingFailure(_logger, bindingFailure.InvocationId, bindingFailure.Target, bindingFailure.BindingFailure.SourceException);
                break;
            case InvocationMessage invocation:
                Log.ReceivedInvocation(_logger, invocation.InvocationId, invocation.Target, invocation.Arguments);
                await invocationMessageWriter.WriteAsync(invocation).ConfigureAwait(false);
                break;
            case CompletionMessage completion:
                if (!connectionState.TryRemoveInvocation(completion.InvocationId!, out irq))
                {
                    Log.DroppedCompletionMessage(_logger, completion.InvocationId!);
                    break;
                }
 
                DispatchInvocationCompletion(completion, irq);
                irq.Dispose();
 
                break;
            case StreamItemMessage streamItem:
                // if there's no open StreamInvocation with the given id, then complete with an error
                if (!connectionState.TryGetInvocation(streamItem.InvocationId!, out irq))
                {
                    Log.DroppedStreamMessage(_logger, streamItem.InvocationId!);
                    break;
                }
                await DispatchInvocationStreamItemAsync(streamItem, irq).ConfigureAwait(false);
                break;
            case CloseMessage close:
                if (string.IsNullOrEmpty(close.Error))
                {
                    Log.ReceivedClose(_logger);
                }
                else
                {
                    Log.ReceivedCloseWithError(_logger, close.Error);
                }
                return close;
            case PingMessage _:
                Log.ReceivedPing(_logger);
                // timeout is reset above, on receiving any message
                break;
            default:
                throw new InvalidOperationException($"Unexpected message type: {message.GetType().FullName}");
        }
 
        return null;
    }
 
    private async Task DispatchInvocationAsync(InvocationMessage invocation, ConnectionState connectionState)
    {
        var expectsResult = !string.IsNullOrEmpty(invocation.InvocationId);
        // Find the handler
        if (!_handlers.TryGetValue(invocation.Target, out var invocationHandlerList))
        {
            if (expectsResult)
            {
                Log.MissingResultHandler(_logger, invocation.Target);
                try
                {
                    await SendWithLock(connectionState, CompletionMessage.WithError(invocation.InvocationId!, "Client didn't provide a result."), cancellationToken: default).ConfigureAwait(false);
                }
                catch (Exception ex)
                {
                    Log.ErrorSendingInvocationResult(_logger, invocation.InvocationId!, invocation.Target, ex);
                }
            }
            else
            {
                Log.MissingHandler(_logger, invocation.Target);
            }
            return;
        }
 
        // Grabbing the current handlers
        var copiedHandlers = invocationHandlerList.GetHandlers();
        object? result = null;
        Exception? resultException = null;
        var hasResult = false;
        foreach (var handler in copiedHandlers)
        {
            try
            {
                var task = handler.InvokeAsync(invocation.Arguments);
                if (handler.HasResult && task is Task<object?> resultTask)
                {
                    result = await resultTask.ConfigureAwait(false);
                    hasResult = true;
                }
                else
                {
                    await task.ConfigureAwait(false);
                }
            }
            catch (Exception ex)
            {
                Log.ErrorInvokingClientSideMethod(_logger, invocation.Target, ex);
                if (handler.HasResult)
                {
                    resultException = ex;
                }
            }
        }
 
        if (expectsResult)
        {
            try
            {
                if (resultException is not null)
                {
                    await SendWithLock(connectionState, CompletionMessage.WithError(invocation.InvocationId!, resultException.Message), cancellationToken: default).ConfigureAwait(false);
                }
                else if (hasResult)
                {
                    await SendWithLock(connectionState, CompletionMessage.WithResult(invocation.InvocationId!, result), cancellationToken: default).ConfigureAwait(false);
                }
                else
                {
                    Log.MissingResultHandler(_logger, invocation.Target);
                    await SendWithLock(connectionState, CompletionMessage.WithError(invocation.InvocationId!, "Client didn't provide a result."), cancellationToken: default).ConfigureAwait(false);
                }
            }
            catch (Exception ex)
            {
                Log.ErrorSendingInvocationResult(_logger, invocation.InvocationId!, invocation.Target, ex);
            }
        }
        else if (hasResult)
        {
            Log.ResultNotExpected(_logger, invocation.Target);
        }
    }
 
    private async Task DispatchInvocationStreamItemAsync(StreamItemMessage streamItem, InvocationRequest irq)
    {
        Log.ReceivedStreamItem(_logger, irq.InvocationId);
 
        if (irq.CancellationToken.IsCancellationRequested)
        {
            Log.CancelingStreamItem(_logger, irq.InvocationId);
        }
        else if (!await irq.StreamItem(streamItem.Item).ConfigureAwait(false))
        {
            Log.ReceivedStreamItemAfterClose(_logger, irq.InvocationId);
        }
    }
 
    private void DispatchInvocationCompletion(CompletionMessage completion, InvocationRequest irq)
    {
        Log.ReceivedInvocationCompletion(_logger, irq.InvocationId);
 
        if (irq.CancellationToken.IsCancellationRequested)
        {
            Log.CancelingInvocationCompletion(_logger, irq.InvocationId);
        }
        else
        {
            irq.Complete(completion);
        }
    }
 
    private void CheckDisposed()
    {
        ObjectDisposedThrowHelper.ThrowIf(_disposed, this);
    }
 
    private async Task HandshakeAsync(ConnectionState startingConnectionState, CancellationToken cancellationToken)
    {
        // Send the Handshake request
        Log.SendingHubHandshake(_logger);
 
        var handshakeRequest = new HandshakeRequestMessage(_protocol.Name, _protocol.Version);
        HandshakeProtocol.WriteRequestMessage(handshakeRequest, startingConnectionState.Connection.Transport.Output);
 
        var sendHandshakeResult = await startingConnectionState.Connection.Transport.Output.FlushAsync(CancellationToken.None).ConfigureAwait(false);
 
        if (sendHandshakeResult.IsCompleted)
        {
            // The other side disconnected
            var ex = new IOException("The server disconnected before the handshake could be started.");
            Log.ErrorReceivingHandshakeResponse(_logger, ex);
            throw ex;
        }
 
        var input = startingConnectionState.Connection.Transport.Input;
 
        using var handshakeCts = new CancellationTokenSource(HandshakeTimeout);
 
        try
        {
            // cancellationToken already contains _state.StopCts.Token, so we don't have to link it again
            using (CancellationTokenUtils.CreateLinkedToken(cancellationToken, handshakeCts.Token, out var linkedToken))
            {
                while (true)
                {
                    var result = await input.ReadAsync(linkedToken).ConfigureAwait(false);
 
                    var buffer = result.Buffer;
                    var consumed = buffer.Start;
                    var examined = buffer.End;
 
                    try
                    {
                        // Read first message out of the incoming data
                        if (!buffer.IsEmpty)
                        {
                            if (HandshakeProtocol.TryParseResponseMessage(ref buffer, out var message))
                            {
                                // Adjust consumed and examined to point to the end of the handshake
                                // response, this handles the case where invocations are sent in the same payload
                                // as the negotiate response.
                                consumed = buffer.Start;
                                examined = consumed;
 
                                if (message.Error != null)
                                {
                                    Log.HandshakeServerError(_logger, message.Error);
                                    throw new HubException(
                                        $"Unable to complete handshake with the server due to an error: {message.Error}");
                                }
 
                                Log.HandshakeComplete(_logger);
                                break;
                            }
                        }
 
                        if (result.IsCompleted)
                        {
                            // Not enough data, and we won't be getting any more data.
                            throw new InvalidOperationException(
                                "The server disconnected before sending a handshake response");
                        }
                    }
                    finally
                    {
                        input.AdvanceTo(consumed, examined);
                    }
                }
            }
        }
        catch (HubException)
        {
            // This was already logged as a HandshakeServerError
            throw;
        }
        catch (InvalidDataException ex)
        {
            Log.ErrorInvalidHandshakeResponse(_logger, ex);
            throw;
        }
        catch (OperationCanceledException ex)
        {
            if (handshakeCts.IsCancellationRequested)
            {
                Log.ErrorHandshakeTimedOut(_logger, HandshakeTimeout, ex);
            }
            else
            {
                Log.ErrorHandshakeCanceled(_logger, ex);
            }
 
            throw;
        }
        catch (Exception ex)
        {
            Log.ErrorReceivingHandshakeResponse(_logger, ex);
            throw;
        }
    }
 
    private async Task ReceiveLoop(ConnectionState connectionState)
    {
        // We hold a local capture of the connection state because StopAsync may dump out the current one.
        // We'll be locking any time we want to check back in to the "active" connection state.
        _state.AssertInConnectionLock();
 
        Log.ReceiveLoopStarting(_logger);
 
        // Performs periodic tasks -- here sending pings and checking timeout
        // Disposed with `timer.Stop()` in the finally block below
        var timer = new TimerAwaitable(TickRate, TickRate);
        var timerTask = connectionState.TimerLoop(timer);
 
        var uploadStreamSource = new CancellationTokenSource();
        connectionState.UploadStreamToken = uploadStreamSource.Token;
        var invocationMessageChannel = Channel.CreateUnbounded<InvocationMessage>(_receiveLoopOptions);
 
        // We can't safely wait for this task when closing without introducing deadlock potential when calling StopAsync in a .On method
        connectionState.InvocationMessageReceiveTask = StartProcessingInvocationMessages(invocationMessageChannel.Reader);
 
        async Task StartProcessingInvocationMessages(ChannelReader<InvocationMessage> invocationMessageChannelReader)
        {
            while (await invocationMessageChannelReader.WaitToReadAsync().ConfigureAwait(false))
            {
                while (invocationMessageChannelReader.TryRead(out var invocationMessage))
                {
                    var invokeTask = DispatchInvocationAsync(invocationMessage, connectionState);
                    // If a client result is expected we shouldn't block on user code as that could potentially permanently block the application
                    // Even if it doesn't permanently block, it would be better if non-client result handlers could still be called while waiting for a result
                    // e.g. chat while waiting for user input for a turn in a game
                    if (string.IsNullOrEmpty(invocationMessage.InvocationId))
                    {
                        await invokeTask.ConfigureAwait(false);
                    }
                }
            }
        }
 
        var input = connectionState.Connection.Transport.Input;
 
        try
        {
            while (true)
            {
                var result = await input.ReadAsync().ConfigureAwait(false);
                var buffer = result.Buffer;
 
                try
                {
                    if (result.IsCanceled)
                    {
                        // We were canceled. Possibly because we were stopped gracefully
                        break;
                    }
                    else if (!buffer.IsEmpty)
                    {
                        Log.ProcessingMessage(_logger, buffer.Length);
 
                        CloseMessage? closeMessage = null;
 
                        while (_protocol.TryParseMessage(ref buffer, connectionState, out var message))
                        {
                            // We have data, process it
                            closeMessage = await ProcessMessagesAsync(message, connectionState, invocationMessageChannel.Writer).ConfigureAwait(false);
 
                            if (closeMessage != null)
                            {
                                // Closing because we got a close frame, possibly with an error in it.
                                if (closeMessage.Error != null)
                                {
                                    connectionState.CloseException = new HubException($"The server closed the connection with the following error: {closeMessage.Error}");
                                }
 
                                // Stopping being true indicates the client shouldn't try to reconnect even if automatic reconnects are enabled.
                                if (!closeMessage.AllowReconnect)
                                {
                                    connectionState.Stopping = true;
                                }
 
                                break;
                            }
                        }
 
                        // If we're closing stop everything
                        if (closeMessage != null)
                        {
                            break;
                        }
                    }
 
                    if (result.IsCompleted)
                    {
                        if (!buffer.IsEmpty)
                        {
                            throw new InvalidDataException("Connection terminated while reading a message.");
                        }
                        break;
                    }
                }
                finally
                {
                    // The buffer was sliced up to where it was consumed, so we can just advance to the start.
                    // We mark examined as `buffer.End` so that if we didn't receive a full frame, we'll wait for more data
                    // before yielding the read again.
                    input.AdvanceTo(buffer.Start, buffer.End);
                }
            }
        }
        catch (Exception ex)
        {
            Log.ServerDisconnectedWithError(_logger, ex);
            connectionState.CloseException = ex;
        }
        finally
        {
            invocationMessageChannel.Writer.TryComplete();
            timer.Stop();
            await timerTask.ConfigureAwait(false);
            uploadStreamSource.Cancel();
            await HandleConnectionClose(connectionState).ConfigureAwait(false);
        }
    }
 
    // Internal for testing
    internal Task RunTimerActions()
    {
        // Don't bother acquiring the connection lock. This is only called from tests.
        return _state.CurrentConnectionStateUnsynchronized!.RunTimerActions();
    }
 
    // Internal for testing
    internal void OnServerTimeout()
    {
        // Don't bother acquiring the connection lock. This is only called from tests.
        _state.CurrentConnectionStateUnsynchronized!.OnServerTimeout();
    }
 
    private async Task HandleConnectionClose(ConnectionState connectionState)
    {
        // Clear the connectionState field
        await _state.WaitConnectionLockAsync(token: default).ConfigureAwait(false);
        try
        {
            SafeAssert(ReferenceEquals(_state.CurrentConnectionStateUnsynchronized, connectionState),
                "Someone other than ReceiveLoop cleared the connection state!");
            _state.CurrentConnectionStateUnsynchronized = null;
 
            // Dispose the connection
            await CloseAsync(connectionState.Connection).ConfigureAwait(false);
 
            // Cancel any outstanding invocations within the connection lock
            connectionState.CancelOutstandingInvocations(connectionState.CloseException);
 
            if (connectionState.Stopping || _reconnectPolicy == null)
            {
                if (connectionState.CloseException != null)
                {
                    Log.ShutdownWithError(_logger, connectionState.CloseException);
                }
                else
                {
                    Log.ShutdownConnection(_logger);
                }
 
                _state.ChangeState(HubConnectionState.Connected, HubConnectionState.Disconnected);
                CompleteClose(connectionState.CloseException);
            }
            else
            {
                _state.ReconnectTask = ReconnectAsync(connectionState.CloseException);
            }
        }
        finally
        {
            _state.ReleaseConnectionLock();
        }
    }
 
    private void CompleteClose(Exception? closeException)
    {
        _state.AssertInConnectionLock();
        _state.StopCts = new CancellationTokenSource();
        RunCloseEvent(closeException);
    }
 
    private void RunCloseEvent(Exception? closeException)
    {
        var closed = Closed;
 
        async Task RunClosedEventAsync()
        {
            // Dispatch to the thread pool before we invoke the user callback
            await AwaitableThreadPool.Yield();
 
            try
            {
                Log.InvokingClosedEventHandler(_logger);
                await closed.Invoke(closeException).ConfigureAwait(false);
            }
            catch (Exception ex)
            {
                Log.ErrorDuringClosedEvent(_logger, ex);
            }
        }
 
        // There is no need to start a new task if there is no Closed event registered
        if (closed != null)
        {
            // Fire-and-forget the closed event
            _ = RunClosedEventAsync();
        }
    }
 
    private async Task ReconnectAsync(Exception? closeException)
    {
        var previousReconnectAttempts = 0;
        var reconnectStartTime = DateTime.UtcNow;
        var retryReason = closeException;
        var nextRetryDelay = GetNextRetryDelay(previousReconnectAttempts, TimeSpan.Zero, retryReason);
 
        // We still have the connection lock from the caller, HandleConnectionClose.
        _state.AssertInConnectionLock();
 
        if (nextRetryDelay == null)
        {
            Log.FirstReconnectRetryDelayNull(_logger);
 
            _state.ChangeState(HubConnectionState.Connected, HubConnectionState.Disconnected);
 
            CompleteClose(closeException);
            return;
        }
 
        _state.ChangeState(HubConnectionState.Connected, HubConnectionState.Reconnecting);
 
        if (closeException != null)
        {
            Log.ReconnectingWithError(_logger, closeException);
        }
        else
        {
            Log.Reconnecting(_logger);
        }
 
        RunReconnectingEvent(closeException);
 
        while (nextRetryDelay != null)
        {
            Log.AwaitingReconnectRetryDelay(_logger, previousReconnectAttempts + 1, nextRetryDelay.Value);
 
            try
            {
                await Task.Delay(nextRetryDelay.Value, _state.StopCts.Token).ConfigureAwait(false);
            }
            catch (OperationCanceledException ex)
            {
                Log.ReconnectingStoppedDuringRetryDelay(_logger);
 
                await _state.WaitConnectionLockAsync(token: default).ConfigureAwait(false);
                try
                {
                    _state.ChangeState(HubConnectionState.Reconnecting, HubConnectionState.Disconnected);
 
                    CompleteClose(GetOperationCanceledException("Connection stopped during reconnect delay. Done reconnecting.", ex, _state.StopCts.Token));
                }
                finally
                {
                    _state.ReleaseConnectionLock();
                }
 
                return;
            }
 
            await _state.WaitConnectionLockAsync(token: default).ConfigureAwait(false);
            try
            {
                SafeAssert(ReferenceEquals(_state.CurrentConnectionStateUnsynchronized, null),
                    "Someone other than Reconnect set the connection state!");
 
                await StartAsyncCore(_state.StopCts.Token).ConfigureAwait(false);
 
                Log.Reconnected(_logger, previousReconnectAttempts, DateTime.UtcNow - reconnectStartTime);
 
                _state.ChangeState(HubConnectionState.Reconnecting, HubConnectionState.Connected);
 
                RunReconnectedEvent();
                return;
            }
            catch (Exception ex)
            {
                retryReason = ex;
 
                Log.ReconnectAttemptFailed(_logger, ex);
 
                if (_state.StopCts.IsCancellationRequested)
                {
                    Log.ReconnectingStoppedDuringReconnectAttempt(_logger);
 
                    _state.ChangeState(HubConnectionState.Reconnecting, HubConnectionState.Disconnected);
 
                    CompleteClose(GetOperationCanceledException("Connection stopped during reconnect attempt. Done reconnecting.", ex, _state.StopCts.Token));
                    return;
                }
 
                previousReconnectAttempts++;
            }
            finally
            {
                _state.ReleaseConnectionLock();
            }
 
            nextRetryDelay = GetNextRetryDelay(previousReconnectAttempts, DateTime.UtcNow - reconnectStartTime, retryReason);
        }
 
        await _state.WaitConnectionLockAsync(token: default).ConfigureAwait(false);
        try
        {
            SafeAssert(ReferenceEquals(_state.CurrentConnectionStateUnsynchronized, null),
                "Someone other than Reconnect set the connection state!");
 
            var elapsedTime = DateTime.UtcNow - reconnectStartTime;
            Log.ReconnectAttemptsExhausted(_logger, previousReconnectAttempts, elapsedTime);
 
            _state.ChangeState(HubConnectionState.Reconnecting, HubConnectionState.Disconnected);
 
            var message = $"Reconnect retries have been exhausted after {previousReconnectAttempts} failed attempts and {elapsedTime} elapsed. Disconnecting.";
            CompleteClose(new OperationCanceledException(message));
        }
        finally
        {
            _state.ReleaseConnectionLock();
        }
    }
 
    private TimeSpan? GetNextRetryDelay(long previousRetryCount, TimeSpan elapsedTime, Exception? retryReason)
    {
        try
        {
            return _reconnectPolicy!.NextRetryDelay(new RetryContext
            {
                PreviousRetryCount = previousRetryCount,
                ElapsedTime = elapsedTime,
                RetryReason = retryReason,
            });
        }
        catch (Exception ex)
        {
            Log.ErrorDuringNextRetryDelay(_logger, ex);
            return null;
        }
    }
 
#pragma warning disable CA1822 // Avoid different signatures based on TFM
    private OperationCanceledException GetOperationCanceledException(string message, Exception innerException, CancellationToken cancellationToken)
    {
#pragma warning restore CA1822
#if NETSTANDARD2_1 || NETCOREAPP
        return new OperationCanceledException(message, innerException, _state.StopCts.Token);
#else
        return new OperationCanceledException(message, innerException);
#endif
    }
 
    private void RunReconnectingEvent(Exception? closeException)
    {
        var reconnecting = Reconnecting;
 
        async Task RunReconnectingEventAsync()
        {
            // Dispatch to the thread pool before we invoke the user callback
            await AwaitableThreadPool.Yield();
 
            try
            {
                await reconnecting.Invoke(closeException).ConfigureAwait(false);
            }
            catch (Exception ex)
            {
                Log.ErrorDuringReconnectingEvent(_logger, ex);
            }
        }
 
        // There is no need to start a new task if there is no Reconnecting event registered
        if (reconnecting != null)
        {
            // Fire-and-forget the closed event
            _ = RunReconnectingEventAsync();
        }
    }
 
    private void RunReconnectedEvent()
    {
        var reconnected = Reconnected;
 
        async Task RunReconnectedEventAsync()
        {
            // Dispatch to the thread pool before we invoke the user callback
            await AwaitableThreadPool.Yield();
 
            try
            {
                await reconnected.Invoke(ConnectionId).ConfigureAwait(false);
            }
            catch (Exception ex)
            {
                Log.ErrorDuringReconnectedEvent(_logger, ex);
            }
        }
 
        // There is no need to start a new task if there is no Reconnected event registered
        if (reconnected != null)
        {
            // Fire-and-forget the reconnected event
            _ = RunReconnectedEventAsync();
        }
    }
 
    // Debug.Assert plays havoc with Unit Tests. But I want something that I can "assert" only in Debug builds.
    [Conditional("DEBUG")]
    private static void SafeAssert(bool condition, string message, [CallerMemberName] string? memberName = null, [CallerFilePath] string? fileName = null, [CallerLineNumber] int lineNumber = 0)
    {
        if (!condition)
        {
            throw new Exception($"Assertion failed in {memberName}, at {fileName}:{lineNumber}: {message}");
        }
    }
 
    private sealed class Subscription : IDisposable
    {
        private readonly InvocationHandler _handler;
        private readonly InvocationHandlerList _handlerList;
 
        public Subscription(InvocationHandler handler, InvocationHandlerList handlerList)
        {
            _handler = handler;
            _handlerList = handlerList;
        }
 
        public void Dispose()
        {
            _handlerList.Remove(_handler);
        }
    }
 
    private sealed class InvocationHandlerList
    {
        private readonly List<InvocationHandler> _invocationHandlers;
        // A lazy cached copy of the handlers that doesn't change for thread safety.
        // Adding or removing a handler sets this to null.
        private InvocationHandler[]? _copiedHandlers;
 
        internal InvocationHandlerList(InvocationHandler handler)
        {
            _invocationHandlers = new List<InvocationHandler>() { handler };
        }
 
        internal InvocationHandler[] GetHandlers()
        {
            var handlers = _copiedHandlers;
            if (handlers == null)
            {
                lock (_invocationHandlers)
                {
                    // Check if the handlers are set, if not we'll copy them over.
                    if (_copiedHandlers == null)
                    {
                        _copiedHandlers = _invocationHandlers.ToArray();
                    }
                    handlers = _copiedHandlers;
                }
            }
            return handlers;
        }
 
        internal void Add(string methodName, InvocationHandler handler)
        {
            lock (_invocationHandlers)
            {
                if (handler.HasResult)
                {
                    foreach (var m in _invocationHandlers)
                    {
                        if (m.HasResult)
                        {
                            throw new InvalidOperationException($"'{methodName}' already has a value returning handler. Multiple return values are not supported.");
                        }
                    }
                }
                _invocationHandlers.Add(handler);
                _copiedHandlers = null;
            }
        }
 
        internal void Remove(InvocationHandler handler)
        {
            lock (_invocationHandlers)
            {
                if (_invocationHandlers.Remove(handler))
                {
                    _copiedHandlers = null;
                }
            }
        }
    }
 
    private readonly struct InvocationHandler
    {
        public Type[] ParameterTypes { get; }
        public bool HasResult => _callback.Method.ReturnType == typeof(Task<object>);
        private readonly Func<object?[], object, Task> _callback;
        private readonly object _state;
 
        public InvocationHandler(Type[] parameterTypes, Func<object?[], object, Task> callback, object state)
        {
            _callback = callback;
            ParameterTypes = parameterTypes;
            _state = state;
        }
 
        public Task InvokeAsync(object?[] parameters)
        {
            return _callback(parameters, _state);
        }
    }
 
    private sealed class ConnectionState : IInvocationBinder
    {
        private readonly HubConnection _hubConnection;
        private readonly ILogger _logger;
        private readonly bool _hasInherentKeepAlive;
 
        private readonly object _lock = new object();
        private readonly Dictionary<string, InvocationRequest> _pendingCalls = new Dictionary<string, InvocationRequest>(StringComparer.Ordinal);
        private TaskCompletionSource<object?>? _stopTcs;
 
        private volatile bool _stopping;
 
        private int _nextInvocationId;
 
        private long _nextActivationServerTimeout;
        private long _nextActivationSendPing;
 
        public ConnectionContext Connection { get; }
        public Task? ReceiveTask { get; set; }
        public Exception? CloseException { get; set; }
        public CancellationToken UploadStreamToken { get; set; }
 
        // We store this task so we can view it in a dump file, but never await it
        public Task? InvocationMessageReceiveTask { get; set; }
 
        // Indicates the connection is stopping AND the client should NOT attempt to reconnect even if automatic reconnects are enabled.
        // This means either HubConnection.DisposeAsync/StopAsync was called OR a CloseMessage with AllowReconnects set to false was received.
        public bool Stopping
        {
            get => _stopping;
            set => _stopping = value;
        }
 
        public ConnectionState(ConnectionContext connection, HubConnection hubConnection)
        {
            Connection = connection;
 
            _hubConnection = hubConnection;
            _hubConnection._logScope.ConnectionId = connection.ConnectionId;
 
            _logger = _hubConnection._logger;
            _hasInherentKeepAlive = connection.Features.Get<IConnectionInherentKeepAliveFeature>()?.HasInherentKeepAlive ?? false;
        }
 
        public string GetNextId() => (++_nextInvocationId).ToString(CultureInfo.InvariantCulture);
 
        public void AddInvocation(InvocationRequest irq)
        {
            lock (_lock)
            {
                if (_pendingCalls.ContainsKey(irq.InvocationId))
                {
                    Log.InvocationAlreadyInUse(_logger, irq.InvocationId);
                    throw new InvalidOperationException($"Invocation ID '{irq.InvocationId}' is already in use.");
                }
                else
                {
                    _pendingCalls.Add(irq.InvocationId, irq);
                }
            }
        }
 
        public bool TryGetInvocation(string invocationId, [NotNullWhen(true)] out InvocationRequest? irq)
        {
            lock (_lock)
            {
                return _pendingCalls.TryGetValue(invocationId, out irq);
            }
        }
 
        public bool TryRemoveInvocation(string invocationId, [NotNullWhen(true)] out InvocationRequest? irq)
        {
            lock (_lock)
            {
                if (_pendingCalls.TryGetValue(invocationId, out irq))
                {
                    _pendingCalls.Remove(invocationId);
                    return true;
                }
                else
                {
                    return false;
                }
            }
        }
 
        public void CancelOutstandingInvocations(Exception? exception)
        {
            Log.CancelingOutstandingInvocations(_logger);
 
            lock (_lock)
            {
                foreach (var outstandingCall in _pendingCalls.Values)
                {
                    Log.RemovingInvocation(_logger, outstandingCall.InvocationId);
                    if (exception != null)
                    {
                        outstandingCall.Fail(exception);
                    }
                    outstandingCall.Dispose();
                }
                _pendingCalls.Clear();
            }
        }
 
        public Task StopAsync()
        {
            // We want multiple StopAsync calls on the same connection state
            // to wait for the same "stop" to complete.
            lock (_lock)
            {
                if (_stopTcs != null)
                {
                    return _stopTcs.Task;
                }
                else
                {
                    _stopTcs = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);
                    return StopAsyncCore();
                }
            }
        }
 
        private async Task StopAsyncCore()
        {
            Log.Stopping(_logger);
 
            // Complete our write pipe, which should cause everything to shut down
            Log.TerminatingReceiveLoop(_logger);
            Connection.Transport.Input.CancelPendingRead();
 
            // Wait ServerTimeout for the server or transport to shut down.
            Log.WaitingForReceiveLoopToTerminate(_logger);
            await ((ReceiveTask ?? Task.CompletedTask).ConfigureAwait(false));
 
            Log.Stopped(_logger);
 
            _hubConnection._logScope.ConnectionId = null;
            _stopTcs!.TrySetResult(null);
        }
 
        public async Task TimerLoop(TimerAwaitable timer)
        {
            // initialize the timers
            timer.Start();
            ResetTimeout();
            ResetSendPing();
 
            using (timer)
            {
                // await returns True until `timer.Stop()` is called in the `finally` block of `ReceiveLoop`
                while (await timer)
                {
                    await RunTimerActions().ConfigureAwait(false);
                }
            }
        }
 
        public void ResetSendPing()
        {
            Volatile.Write(ref _nextActivationSendPing, (DateTime.UtcNow + _hubConnection.KeepAliveInterval).Ticks);
        }
 
        public void ResetTimeout()
        {
            Volatile.Write(ref _nextActivationServerTimeout, (DateTime.UtcNow + _hubConnection.ServerTimeout).Ticks);
        }
 
        // Internal for testing
        internal async Task RunTimerActions()
        {
            if (_hasInherentKeepAlive)
            {
                return;
            }
 
            if (DateTime.UtcNow.Ticks > Volatile.Read(ref _nextActivationServerTimeout))
            {
                OnServerTimeout();
            }
 
            if (DateTime.UtcNow.Ticks > Volatile.Read(ref _nextActivationSendPing) && !Stopping)
            {
                if (!_hubConnection._state.TryAcquireConnectionLock())
                {
                    Log.UnableToAcquireConnectionLockForPing(_logger);
                    return;
                }
 
                Log.AcquiredConnectionLockForPing(_logger);
 
                try
                {
                    if (_hubConnection._state.CurrentConnectionStateUnsynchronized != null)
                    {
                        SafeAssert(ReferenceEquals(_hubConnection._state.CurrentConnectionStateUnsynchronized, this),
                            "Something reset the connection state before the timer loop completed!");
 
                        await _hubConnection.SendHubMessage(this, PingMessage.Instance).ConfigureAwait(false);
                    }
                }
                finally
                {
                    _hubConnection._state.ReleaseConnectionLock();
                }
            }
        }
 
        // Internal for testing
        internal void OnServerTimeout()
        {
            CloseException = new TimeoutException(
                $"Server timeout ({_hubConnection.ServerTimeout.TotalMilliseconds:0.00}ms) elapsed without receiving a message from the server.");
            Connection.Transport.Input.CancelPendingRead();
        }
 
        Type IInvocationBinder.GetReturnType(string invocationId)
        {
            if (!TryGetInvocation(invocationId, out var irq))
            {
                Log.ReceivedUnexpectedResponse(_logger, invocationId);
                throw new KeyNotFoundException($"No invocation with id '{invocationId}' could be found.");
            }
            return irq.ResultType;
        }
 
        Type IInvocationBinder.GetStreamItemType(string invocationId)
        {
            // previously, streaming was only server->client, and used GetReturnType for StreamItems
            // literally the same code as the above method
            if (!TryGetInvocation(invocationId, out var irq))
            {
                Log.ReceivedUnexpectedResponse(_logger, invocationId);
                throw new KeyNotFoundException($"No invocation with id '{invocationId}' could be found.");
            }
            return irq.ResultType;
        }
 
        IReadOnlyList<Type> IInvocationBinder.GetParameterTypes(string methodName)
        {
            if (!_hubConnection._handlers.TryGetValue(methodName, out var invocationHandlerList))
            {
                Log.MissingHandler(_logger, methodName);
                return Type.EmptyTypes;
            }
 
            // We use the parameter types of the first handler
            var handlers = invocationHandlerList.GetHandlers();
            if (handlers.Length > 0)
            {
                return handlers[0].ParameterTypes;
            }
            throw new InvalidOperationException($"There are no callbacks registered for the method '{methodName}'");
        }
    }
 
    private sealed class ReconnectingConnectionState
    {
        // This lock protects the connection state.
        private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(1, 1);
 
        private readonly ILogger _logger;
 
        public ReconnectingConnectionState(ILogger logger)
        {
            _logger = logger;
            StopCts = new CancellationTokenSource();
            ReconnectTask = Task.CompletedTask;
        }
 
        public ConnectionState? CurrentConnectionStateUnsynchronized { get; set; }
 
        public HubConnectionState OverallState { get; private set; }
 
        public CancellationTokenSource StopCts { get; set; } = new CancellationTokenSource();
 
        public Task ReconnectTask { get; set; } = Task.CompletedTask;
 
        public void ChangeState(HubConnectionState expectedState, HubConnectionState newState)
        {
            if (!TryChangeState(expectedState, newState))
            {
                Log.StateTransitionFailed(_logger, expectedState, newState, OverallState);
                throw new InvalidOperationException($"The HubConnection failed to transition from the '{expectedState}' state to the '{newState}' state because it was actually in the '{OverallState}' state.");
            }
        }
 
        public bool TryChangeState(HubConnectionState expectedState, HubConnectionState newState)
        {
            AssertInConnectionLock();
 
            Log.AttemptingStateTransition(_logger, expectedState, newState);
 
            if (OverallState != expectedState)
            {
                return false;
            }
 
            OverallState = newState;
            return true;
        }
 
        [Conditional("DEBUG")]
        public void AssertInConnectionLock([CallerMemberName] string? memberName = null, [CallerFilePath] string? fileName = null, [CallerLineNumber] int lineNumber = 0) => SafeAssert(_connectionLock.CurrentCount == 0, "We're not in the Connection Lock!", memberName, fileName, lineNumber);
 
        [Conditional("DEBUG")]
        public void AssertConnectionValid([CallerMemberName] string? memberName = null, [CallerFilePath] string? fileName = null, [CallerLineNumber] int lineNumber = 0)
        {
            AssertInConnectionLock(memberName, fileName, lineNumber);
            SafeAssert(CurrentConnectionStateUnsynchronized != null, "We don't have a connection!", memberName, fileName, lineNumber);
        }
 
        public Task WaitConnectionLockAsync(CancellationToken token, [CallerMemberName] string? memberName = null, [CallerFilePath] string? filePath = null, [CallerLineNumber] int lineNumber = 0)
        {
            Log.WaitingOnConnectionLock(_logger, memberName, filePath, lineNumber);
            return _connectionLock.WaitAsync(token);
        }
 
        public bool TryAcquireConnectionLock()
        {
            if (OperatingSystem.IsBrowser())
            {
                return _connectionLock.WaitAsync(0).Result;
            }
            return _connectionLock.Wait(0);
        }
 
        // Don't call this method in a try/finally that releases the lock since we're also potentially releasing the connection lock here.
        public async Task<ConnectionState> WaitForActiveConnectionAsync(string methodName, CancellationToken token)
        {
            await WaitConnectionLockAsync(token, methodName).ConfigureAwait(false);
 
            if (!IsConnectionActive())
            {
                ReleaseConnectionLock(methodName);
                throw new InvalidOperationException($"The '{methodName}' method cannot be called if the connection is not active");
            }
 
            return CurrentConnectionStateUnsynchronized;
        }
 
        [MemberNotNullWhen(true, nameof(CurrentConnectionStateUnsynchronized))]
        public bool IsConnectionActive()
        {
            AssertInConnectionLock();
            return CurrentConnectionStateUnsynchronized is not null && !CurrentConnectionStateUnsynchronized.Stopping;
        }
 
        public void ReleaseConnectionLock([CallerMemberName] string? memberName = null,
            [CallerFilePath] string? filePath = null, [CallerLineNumber] int lineNumber = 0)
        {
            Log.ReleasingConnectionLock(_logger, memberName, filePath, lineNumber);
            _connectionLock.Release();
        }
    }
}

I’ve implemented Web Socket reconnection and it works fine for a while (i. e. if I disable wi-fi for 5 minutes or hibernate my laptop) but if I leave my mac book for an hour or so (it goes into sleeping mode I guess), SignalR doesn’t recover: it manages to connect but then gets a handshake error and then this cycle repeats again and again.

I reproduced this behaviour on Chrome on Mac and my colleague reproduced it on Chrome on Windows though he was not able to reproduce it on Edge. So it might be a browser-specific problem

Here’s a console output

[2019-01-06T11:29:15.846Z] Information: WebSocket connected to ws://api.gateio-demo.cryptolp.net/Liquidity?id=pylLwZAF0045nJxY-MJcTg.
HubConnection.js:552 Uncaught Error: Error parsing handshake response: Error: Expected a handshake response from the server.
at t.processHandshakeResponse (HubConnection.js:552)
at t.processIncomingData (HubConnection.js:489)
at t.connection.onreceive (HubConnection.js:179)
at WebSocket.i.onmessage (WebSocketTransport.js:238)
t.processHandshakeResponse @ HubConnection.js:552
t.processIncomingData @ HubConnection.js:489
connection.onreceive @ HubConnection.js:179
i.onmessage @ WebSocketTransport.js:238

Utils.js:358 [2019-01-06T11:29:31.696Z] Error: Connection disconnected with error 'Error: Error parsing handshake response: Error: Expected a handshake response from the server.'.
t.log @ Utils.js:358
t.stopConnection @ HttpConnection.js:714
transport.onclose @ HttpConnection.js:435
t.close @ WebSocketTransport.js:288
t.stop @ WebSocketTransport.js:274
(anonymous) @ HttpConnection.js:274
(anonymous) @ HttpConnection.js:125
(anonymous) @ HttpConnection.js:55
a @ HttpConnection.js:7
Promise.then (async)
l @ HttpConnection.js:24
(anonymous) @ HttpConnection.js:27
j @ HttpConnection.js:4
t.stop @ HttpConnection.js:234
t.processHandshakeResponse @ HubConnection.js:555
t.processIncomingData @ HubConnection.js:489
connection.onreceive @ HubConnection.js:179
i.onmessage @ WebSocketTransport.js:238

HubConnection.js:552 Uncaught (in promise) Error: Error parsing handshake response: Error: Expected a handshake response from the server.
at t.processHandshakeResponse (HubConnection.js:552)
at t.processIncomingData (HubConnection.js:489)
at t.connection.onreceive (HubConnection.js:179)
at WebSocket.i.onmessage (WebSocketTransport.js:238)

Done
accepted
area-signalr
bug
signalr-client-javascript

All 27 comments

Could you explain this:

I’ve implemented Web Socket reconnection

Also, are there any server logs when you try to reconnect?

Currently not. Should I enable server logging for SignalR?

That would be helpful.

The logs show that there is a 15 second gap between the websocket being connected and the handshake timing out. It would be nice to see if the server ever got the handshake request. Could you also capture the HAR trace?

Is there any documentation on how to activate log on .net core?

@BrennanConroy, me and @nielheo are having trouble enabling the log you asked for. Could you please hint us?

  • server-side : NuGet v 1.1.0
  • client-side: NPM «@aspnet/signalr»: «^1.1.0», «react»: «^16.7.0»,
  • IIS Express
  • OS: Windows 10 (client & Server)
  • Browser: Chrome / Edge

server-side.txt
client-side.txt

After my PC idle for long time, it will try to re-establish signalr connection but got handshake error on client side. latest occurrence at [2019-01-19T01:17:18.430Z]

Few things:

  • Why are you spamming the server with hundreds of connections?
  • There are a ton of unrelated issues in your server side logs which make it very difficult to parse what’s going on, could you remove all the InfluxDB stuff from the server so there is a clean environment to see the issue in? Or maybe this could be causing the issue.
  • Could you show the code you’re using for reconnect?

@nielheo could you provide the cleaner logs with disabled Influx? @BrennanConroy here’s the reconnection code we’re using:

class LiquidityContainer extends Container {
  constructor(props) {
    super(props)
    this.state = {
     ...
    }

    this.initiateConnection();
  }

  openConnection = () => {
    let that = this

    let interval = setInterval(async () => {
      await startConnection()
    }, 2000); 

    async function startConnection () {
      if (connection.state === 0) {
        try {
          await connection.start()
          await that.setState({ isConnected: true }) 
          clearInterval(interval)
        } catch {

        }
      }
    }
  }

  initiateConnection = () => {
    connection = new HubConnectionBuilder()
      .withUrl(config.baseHubURL + 'Liquidity')
      .build();

    connection.on("UpdateOrdersState", (message) => {
      console.log('WebSocket update');
    });

    connection.onclose(async () => { 
      this.setState({ isConnected: false, subscribed: '' }) 
      setTimeout(this.openConnection, 1000)
    })

    this.openConnection()
  } 

Can you explain the wall of «starting connection» messages you guys have in your logs?
Each one of those means you’re starting a new connection and you have 100s of them over the course of a few seconds. I would guess that if you fix that issue you wont see any more handshake response errors.

Utils.js:371 [2019-01-19T01:17:18.663Z] Debug: Starting HubConnection.
Utils.js:371 [2019-01-19T01:17:18.664Z] Debug: Starting connection with transfer format 'Text'.
Utils.js:371 [2019-01-19T01:17:18.670Z] Debug: Starting HubConnection.
Utils.js:371 [2019-01-19T01:17:18.671Z] Debug: Starting connection with transfer format 'Text'.
Utils.js:371 [2019-01-19T01:17:18.713Z] Debug: Starting HubConnection.
Utils.js:371 [2019-01-19T01:17:18.714Z] Debug: Starting connection with transfer format 'Text'.
Utils.js:371 [2019-01-19T01:17:18.717Z] Debug: Starting HubConnection.
Utils.js:371 [2019-01-19T01:17:18.719Z] Debug: Starting connection with transfer format 'Text'.
Utils.js:371 [2019-01-19T01:17:18.766Z] Debug: Starting HubConnection.
Utils.js:371 [2019-01-19T01:17:18.766Z] Debug: Starting connection with transfer format 'Text'.
Utils.js:371 [2019-01-19T01:17:18.771Z] Debug: Starting HubConnection.
Utils.js:371 [2019-01-19T01:17:18.771Z] Debug: Starting connection with transfer format 'Text'.
Utils.js:371 [2019-01-19T01:17:18.940Z] Debug: Starting HubConnection.
Utils.js:371 [2019-01-19T01:17:18.942Z] Debug: Starting connection with transfer format 'Text'.
Utils.js:371 [2019-01-19T01:17:18.956Z] Debug: Starting HubConnection.
Utils.js:371 [2019-01-19T01:17:18.959Z] Debug: Starting connection with transfer format 'Text'.
Utils.js:371 [2019-01-19T01:17:19.008Z] Debug: Starting HubConnection.
Utils.js:371 [2019-01-19T01:17:19.010Z] Debug: Starting connection with transfer format 'Text'.
Utils.js:371 [2019-01-19T01:17:19.041Z] Debug: Starting HubConnection.
Utils.js:371 [2019-01-19T01:17:19.041Z] Debug: Starting connection with transfer format 'Text'.
Utils.js:371 [2019-01-19T01:17:19.060Z] Debug: Starting HubConnection.
Utils.js:371 [2019-01-19T01:17:19.061Z] Debug: Starting connection with transfer format 'Text'.
Utils.js:371 [2019-01-19T01:17:19.749Z] Debug: Starting HubConnection.
Utils.js:371 [2019-01-19T01:17:19.750Z] Debug: Starting connection with transfer format 'Text'.
Utils.js:371 [2019-01-19T01:17:19.751Z] Debug: Starting HubConnection.
Utils.js:371 [2019-01-19T01:17:19.751Z] Debug: Starting connection with transfer format 'Text'.
Utils.js:371 [2019-01-19T01:17:19.753Z] Debug: Starting HubConnection.
Utils.js:371 [2019-01-19T01:17:19.753Z] Debug: Starting connection with transfer format 'Text'.
Utils.js:371 [2019-01-19T01:17:19.755Z] Debug: Starting HubConnection.

I suppose there’s something wrong in our reconnecting logic that I posted above. After looking through the code the only problem I’ve noticed is that we set await that.setState({ isConnected: true }) right in startConnection instead of doing it in onConnected event but it’s our internal variable, so it should not be a problem. Otherwise we’re trying to reconnect every 2 seconds, so I don’t see how we can trigger so many connection attempts within one second…

If HubConnection.start is called multiple times in a row (while the original connect is still in progress) it can reset the handshakeResolver and handshakeRejecter values which would cause the previous calls to never finish. We should prevent this from happening.

@BrennanConroy We’ve switched to a pattern in the docs (reconnect in catch), it got way better but sometimes this sequence of events happens:

WebSocket connecting
LiquidityContainer.js:34 WebSocket failed to connect:
Cannot start a connection that is not in the ‘Disconnected’ state.
LiquidityContainer.js:29 WebSocket connecting
LiquidityContainer.js:34 WebSocket failed to connect:
Cannot start a connection that is not in the ‘Disconnected’ state.
HubConnection.js:552 Uncaught Error: Error parsing handshake response: Error: Expected a handshake response from the server.
at t.processHandshakeResponse (HubConnection.js:552)
at t.processIncomingData (HubConnection.js:489)
at t.connection.onreceive (HubConnection.js:179)
at WebSocket.i.onmessage (WebSocketTransport.js:238)
t.processHandshakeResponse @ HubConnection.js:552
t.processIncomingData @ HubConnection.js:489
connection.onreceive @ HubConnection.js:179
i.onmessage @ WebSocketTransport.js:238
Utils.js:358 [2019-01-31T20:17:21.370Z] Error: Connection disconnected with error ‘Error: Error parsing handshake response: Error: Expected a handshake response from the server.’.
t.log @ Utils.js:358
t.stopConnection @ HttpConnection.js:714
transport.onclose @ HttpConnection.js:435
t.close @ WebSocketTransport.js:288
t.stop @ WebSocketTransport.js:274
(anonymous) @ HttpConnection.js:274
(anonymous) @ HttpConnection.js:125
(anonymous) @ HttpConnection.js:55
a @ HttpConnection.js:7
Promise.then (async)
l @ HttpConnection.js:24
(anonymous) @ HttpConnection.js:27
j @ HttpConnection.js:4
t.stop @ HttpConnection.js:234
t.processHandshakeResponse @ HubConnection.js:555
t.processIncomingData @ HubConnection.js:489
connection.onreceive @ HubConnection.js:179
i.onmessage @ WebSocketTransport.js:238
LiquidityContainer.js:29 WebSocket connecting
HubConnection.js:552 Uncaught (in promise) Error: Error parsing handshake response: Error: Expected a handshake response from the server.
at t.processHandshakeResponse (HubConnection.js:552)
at t.processIncomingData (HubConnection.js:489)
at t.connection.onreceive (HubConnection.js:179)
at WebSocket.i.onmessage (WebSocketTransport.js:238)
t.processHandshakeResponse @ HubConnection.js:552
t.processIncomingData @ HubConnection.js:489
connection.onreceive @ HubConnection.js:179
i.onmessage @ WebSocketTransport.js:238
Utils.js:366 [2019-01-31T20:17:22.196Z] Information: WebSocket connected to ws://api.karma-stage.cryptolp.net/Liquidity?id=eYN50svoDbzpXBxFMS8hDA.
LiquidityContainer.js:32 WebSocket connected
LiquidityContainer.js:29 WebSocket connecting
LiquidityContainer.js:34 WebSocket failed to connect:
Cannot start a connection that is not in the ‘Disconnected’ state.
LiquidityContainer.js:29 WebSocket connecting

It looks strange: it already connected, then somehow starts to connect again and somehow it causes a reconnect

Do you mind sharing your new LiquidityContainer?

Sure

class LiquidityContainer extends Container {
  constructor(props) {
     this.initiateConnection();
  }

  openConnection = async () => {
      try {
        console.log('WebSocket connecting');
        await connection.start();
        await this.setState({ isConnected: true });
        console.log('WebSocket connected');
      } catch(e) {
          console.log('WebSocket failed to connect:n' + e.message);

          setTimeout(async () => await this.openConnection(), 5000);
      }
  };

  initiateConnection = async () => {
    connection = new HubConnectionBuilder()
      .withUrl(config.baseHubURL + 'Liquidity')
      .build();

    connection.on("UpdateOrdersState", (message) => {
      console.log('WebSocket update');
      this.updateOrderState(message);
    });

    connection.onclose(async () => { 
      await this.setState({ isConnected: false, subscribed: '' });
      await this.openConnection();
    });

    await this.openConnection();
  };

Looks like there is a bug in how we recommend doing the reconnect. The onclose callback is being called and at the same time connection.start() throws, so you get 2 reconnect attempts. The workaround for now would be to set a flag and check it in the catch and the onclose callback.

Interesting, I somehow assumed onclose is not called if we did not manage to connect. Wouldn’t keeping reconnect only in onclose be enough?

No, depending on when the failure is internally the onclose may not be called but start will throw. We’ll look at fixing this in a future release.

Ok, I understood. I will try to play with the field and get back to you.

Overall though it’s confusing how SignalR after like 10 years of development still not only doesn’t have built-in reconnection logic but has bugs in pretty basic connectivity features. Just imagine how much time is being wasted by .net developers facing this kind of basic problems all over the planet. I’m not trying to bash SignalR, I’m just sincerely confused how it is even possible, it’s not like I’m doing something extraordinary with SignalR

A field did not seem to help. I’ve made a stress test now: connected, disabled wi-fi, waited till disconnection and enabled wi-fi and SignalR can’t connect, being in this loop:

WebSocket connecting
VM1167:1 OPTIONS http://localhost:53938/Liquidity/negotiate net::ERR_CONNECTION_REFUSED
Utils.js:347 Warning: Error from HTTP request. 0: 
index.js:1452 Error: Failed to complete negotiation with the server: Error
index.js:1452 Error: Failed to start the connection: Error
LiquidityContainer.js:34 WebSocket failed to connect:

update:
It was exactly like what @BrennanConroy said. After using flag, everything works smoothly. Thanks!

I have the same thing here as well. I think we have the same issue like @BrennanConroy ‘s comment https://github.com/aspnet/AspNetCore/issues/6434#issuecomment-459897933 here.

And I do think @siberianguy got a pretty good point here. Obviously everyone wants to have a way to make signalR reconnect automatically. I feel like this should be handle in the library as well.

Btw, it is also annoying that SignalR is always disconnect for some reasons after a hour or two with an error like «Error: Connection disconnected with error ‘Error: WebSocket closed with status code: 1006». So it is kind of necessary to make reconnect feature work as expected.

We’re covering this as part of automatic reconnect https://github.com/aspnet/AspNetCore/issues/8797 . But we’ll try to take a look at this scenario before 3.0, since the manual reconnect process we document should also be able to handle this and we may need to fix something.

Has this fix been released in any stable build?

No, not yet. This will be first become available in 3.0.0-preview4 and will be stable with the 3.0 release (expected by the end of this year).

Was this page helpful?

0 / 5 — 0 ratings

Answer by Zola Reed

Create a thenable to ensure the connection has been set up.,There’re two other bugs in your code :,Connect and share knowledge within a single location that is structured and easy to search.,Please be sure to answer the question. Provide details and share your research!

Let’s review your ChatComponent Component, it initializes and tries to set up a connection to the Hub. Its ngOnInit() returns immediately after it binds an event handler for ReceiveMessage.

  ngOnInit() {
    ...
    this.hubConnection = new HubConnectionBuilder().withUrl(...).build();
    this.hubConnection
      .start()
      .then(() => console.log('Connection started!'))
      .catch(err => console.log('Error while establishing connection :('));
    this.hubConnection.on('ReceiveMessage', ...);
  }

Create a thenable to ensure the connection has been set up.

  private thenable: Promise<void>

  ngOnInit() {
    ...
    this.hubConnection = new HubConnectionBuilder().withUrl(...).build();
    this.start()
    this.hubConnection.on('ReceiveMessage', ...);
  }

  private start() {
    this.thenable = this.hubConnection.start();
    this.thenable
      .then(() => console.log('Connection started!'))
      .catch(err => console.log('Error while establishing connection :('));
  }

Whenever you want to communicate with the Hub, invoke by thenable in promise-style. For example, your sendMessage() should be changed in the following way:

public sendMessage(): void {
    this.thenable.then(() =>{
        var today = new Date();
        this.messageData = ...
        this.hubConnection
           .invoke('SendToAll', this.author, this.messageData)
           .catch(err => console.error(err));
           this.message = '';
    });
}

You registered the SignalR middleware after the Spa middleware. Be aware UseSpa() should be used as a catch-all middleware. You should invoke UseSignalR() before UseSpa():

app.UseSignalR(routes =>
{
    routes.MapHub<ChatHub>("/api/chat");
});

app.UseMvc(routes =>
{
    ...
});

app.UseSpa(spa =>
{
    ...
});

app.UseSignalR(routes =>                        
{                                               
    routes.MapHub<ChatHub>("/api/chat");  
});                                             

If you’re connecting to the same server, avoid using hard-coded url:

this.hubConnection = new HubConnectionBuilder().withUrl("http://localhost:5000/api/chat", {
      skipNegotiation: true,
      transport: HttpTransportType.WebSockets
    }).build();

Answer by Vivienne Gonzales

Fast navigating between pages returns an Error: Cannot send data if the connection is not in the ‘Connected’ State,Both of those errors are showing up because you aren’t observing the tasks from connection.start() and connection.stop().,
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
,@BrennanConroy your solution will fix this error:

history.back();
console.log('back');
setTimeout(()=> {
	history.forward(); 
	console.log('forward');
}, 4);

Answer by Cayden Mendoza

Good blog, In production server getting an error “, cannot send data if the connection is not in the ‘Connected’ State”,Connection make successfully, but after sometimes the connection is closed and it generates above error,Securing an Angular SignalR client using JWT tokens with ASP.NET Core and IdentityServer4, if (this._hubConnection) {
this._hubConnection.invoke(‘Send’, data);
}
this.messages.push(data);
}

The required SignalR Nuget packages and npm packages are at present hosted on MyGet. Your need to add the SignalR packagesto the csproj file. To use the MyGet feed, add the https://dotnet.myget.org/F/aspnetcore-ci-dev/api/v3/index.json to your package sources.

<Project Sdk="Microsoft.NET.Sdk.Web">

  <PropertyGroup>
    <TargetFramework>net5.0</TargetFramework>
  </PropertyGroup>
  <ItemGroup>
    <PackageReference Include="Microsoft.AspNetCore.SignalR" Version="1.1.0" />
    <PackageReference Include="Microsoft.AspNetCore.SignalR.Protocols.MessagePack" Version="5.0.2" />
    <PackageReference Include="Microsoft.VisualStudio.Web.CodeGeneration.Design" Version="5.0.1" />
    <PackageReference Include="Microsoft.EntityFrameworkCore.Sqlite" Version="5.0.2" />
    <PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="5.0.2" PrivateAssets="all">
      <IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
    </PackageReference>
    <PackageReference Include="Remotion.Linq" Version="2.2.0" />
  </ItemGroup>

  <ItemGroup>
    <DotNetCliToolReference Include="Microsoft.VisualStudio.Web.CodeGeneration.Tools" Version="2.0.0" />
  </ItemGroup>

  <ItemGroup>
    <ProjectReference Include="..DtosDtos.csproj" />
  </ItemGroup>
</Project>

Now create a simple default hub.

using Microsoft.AspNetCore.SignalR;
using System.Threading.Tasks;

namespace AspNetCoreSignalr.SignalRHubs
{
    public class LoopyHub : Hub
    {
        public Task Send(string data)
        {
            return Clients.All.SendAsync("Send", data);
        }
    }
}

Add the SignalR configuration in the startup class. The hub which was created before needs to be added in the UseSignalR extension method.

public void ConfigureServices(IServiceCollection services)
{
	...
	services.AddSignalR();
	...
}

public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
{
	...

	app.UseEndpoints(endpoints =>
	{
		endpoints.MapHub<LoopyHub>("/loopy");
	});

	...
}

You can use the The MyGet npm feed if you want to use the aspnetcore-ci-dev. You can do this using a .npmrc file in the project root. Add the registry path. If using the npm package, do not add this.

@aspnet:registry=https://dotnet.myget.org/f/aspnetcore-ci-dev/npm/

Now add the required SignalR npm packages to the packages.json file. Using the npm package from NuGet:

"dependencies": {
	"@angular/animations": "~11.1.0",
	"@angular/common": "~11.1.0",
	"@angular/compiler": "~11.1.0",
	"@angular/core": "~11.1.0",
	"@angular/forms": "~11.1.0",
	"@angular/platform-browser": "~11.1.0",
	"@angular/platform-browser-dynamic": "~11.1.0",
	"@angular/router": "~11.1.0",
	"angular-auth-oidc-client": "^11.4.3",
	"bootstrap": "^4.6.0",
	"jquery": "^3.5.1",
	"popper.js": "^1.16.1",
	"rxjs": "~6.6.3",
	"tslib": "^2.0.0",
	"zone.js": "~0.11.3",
	"@microsoft/signalr": "5.0.2",
	"@ngrx/effects": "10.1.2",
	"@ngrx/entity": "10.1.2",
	"@ngrx/router-store": "10.1.2",
	"@ngrx/store": "10.1.2",
	"@ngrx/store-devtools": "10.1.2",
	"msgpack5": "5.0.0"
},

Add the SignalR client code. In this basic example, it is just added directly in a component. The sendMessage funtion sends messages and the hubConnection.on function receives all messages including its own.

import { Component, OnInit } from '@angular/core';
import { HubConnection } from '@microsoft/signalr';
import * as signalR from '@microsoft/signalr';

@Component({
    selector: 'app-home-component',
    templateUrl: './home.component.html'
})

export class HomeComponent implements OnInit {
    private _hubConnection: HubConnection | undefined;
    public async: any;
    message = '';
    messages: string[] = [];

    constructor() {
    }

    public sendMessage(): void {
        const data = `Sent: ${this.message}`;

        if (this._hubConnection) {
            this._hubConnection.invoke('Send', data);
        }
        this.messages.push(data);
    }

    ngOnInit() {
        this._hubConnection = new signalR.HubConnectionBuilder()
            .withUrl('https://localhost:44324/loopy')
            .configureLogging(signalR.LogLevel.Information)
            .build();

        this._hubConnection.start().catch(err => console.error(err.toString()));

        this._hubConnection.on('Send', (data: any) => {
            const received = `Received: ${data}`;
            this.messages.push(received);
        });
    }
}

The messages are then displayed in the component template.

<div class="container-fluid">

    <h1>Send some basic messages</h1>


    <div class="row">
        <form class="form-inline" (ngSubmit)="sendMessage()" #messageForm="ngForm">
            <div class="form-group">
                <label class="sr-only" for="message">Message</label>
                <input type="text" class="form-control" id="message" placeholder="your message..." name="message" [(ngModel)]="message" required>
            </div>
            <button type="submit" class="btn btn-primary" [disabled]="!messageForm.valid">Send SignalR Message</button>
        </form>
    </div>
    <div class="row" *ngIf="messages.length > 0">
        <div class="table-responsive">
            <table class="table table-striped">
                <thead>
                    <tr>
                        <th>#</th>
                        <th>Messages</th>
                    </tr>
                </thead>
                <tbody>
                    <tr *ngFor="let message of messages; let i = index">
                        <td>{{i + 1}}</td>
                        <td>{{message}}</td>
                    </tr>
                </tbody>
            </table>
        </div>
    </div>
    <div class="row" *ngIf="messages.length <= 0">
        <span>No messages</span>
    </div>
</div>

Answer by Ishaan Preston

Using send doesn’t wait until the server has received the message. Consequently, it’s not possible to return data or errors from the server.,In the following example, the method name is ReceiveMessage. The argument names are user and message:,As a best practice, call the start method on the HubConnection after on. Doing so ensures your handlers are registered before any messages are received.,To receive messages from the hub, define a method using the on method of the HubConnection.

For Visual Studio, run the following commands from Package Manager Console while in the root folder. For Visual Studio Code, run the following commands from the Integrated Terminal.

npm init -y
npm install @microsoft/signalr

Reference the SignalR JavaScript client in the <script> element. For example:

<script src="~/lib/signalr/signalr.js"></script>

To use the client library without the npm prerequisite, reference a CDN-hosted copy of the client library. For example:

<script src="https://cdnjs.cloudflare.com/ajax/libs/microsoft-signalr/3.1.7/signalr.min.js"></script>

The following code creates and starts a connection. The hub’s name is case insensitive:

const connection = new signalR.HubConnectionBuilder()
    .withUrl("/chathub")
    .configureLogging(signalR.LogLevel.Information)
    .build();

async function start() {
    try {
        await connection.start();
        console.log("SignalR Connected.");
    } catch (err) {
        console.log(err);
        setTimeout(start, 5000);
    }
};

connection.onclose(async () => {
    await start();
});

// Start the connection.
start();

To prevent a malicious site from reading sensitive data from another site, cross-origin connections are disabled by default. To allow a cross-origin request, enable it in the Startup class:

using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using SignalRChat.Hubs;

namespace SignalRChat
{
    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddRazorPages();
            services.AddSignalR();

            services.AddCors(options =>
            {
                options.AddDefaultPolicy(builder =>
                {
                    builder.WithOrigins("https://example.com")
                        .AllowCredentials();
                });
            });
        }

        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            else
            {
                app.UseExceptionHandler("/Error");
            }

            app.UseStaticFiles();
            app.UseRouting();

            app.UseCors();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapRazorPages();
                endpoints.MapHub<ChatHub>("/chathub");
            });
        }
    }
}

In the following example, the method name on the hub is SendMessage. The second and third arguments passed to invoke map to the hub method’s user and message arguments:

try {
    await connection.invoke("SendMessage", user, message);
} catch (err) {
    console.error(err);
}

In the following example, the method name is ReceiveMessage. The argument names are user and message:

connection.on("ReceiveMessage", (user, message) => {
    const li = document.createElement("li");
    li.textContent = `${user}: ${message}`;
    document.getElementById("messageList").appendChild(li);
});

The preceding code in connection.on runs when server-side code calls it using the SendAsync method:

public async Task SendMessage(string user, string message)
{
    await Clients.All.SendAsync("ReceiveMessage", user, message);
}

Use try and catch with async and await or the Promise‘s catch method to handle client-side errors. Use console.error to output errors to the browser’s console:

try {
    await connection.invoke("SendMessage", user, message);
} catch (err) {
    console.error(err);
}

Use the configureLogging method on HubConnectionBuilder to configure the log level. Messages are logged to the browser console:

const connection = new signalR.HubConnectionBuilder()
    .withUrl("/chathub")
    .configureLogging(signalR.LogLevel.Information)
    .build();

The JavaScript client for SignalR can be configured to automatically reconnect using the withAutomaticReconnect method on HubConnectionBuilder. It won’t automatically reconnect by default.

const connection = new signalR.HubConnectionBuilder()
    .withUrl("/chathub")
    .withAutomaticReconnect()
    .build();

Before starting any reconnect attempts, the HubConnection will transition to the HubConnectionState.Reconnecting state and fire its onreconnecting callbacks instead of transitioning to the Disconnected state and triggering its onclose callbacks like a HubConnection without automatic reconnect configured. This provides an opportunity to warn users that the connection has been lost and to disable UI elements.

connection.onreconnecting(error => {
    console.assert(connection.state === signalR.HubConnectionState.Reconnecting);

    document.getElementById("messageInput").disabled = true;

    const li = document.createElement("li");
    li.textContent = `Connection lost due to error "${error}". Reconnecting.`;
    document.getElementById("messagesList").appendChild(li);
});
connection.onreconnected(connectionId => {
    console.assert(connection.state === signalR.HubConnectionState.Connected);

    document.getElementById("messageInput").disabled = false;

    const li = document.createElement("li");
    li.textContent = `Connection reestablished. Connected with connectionId "${connectionId}".`;
    document.getElementById("messagesList").appendChild(li);
});

withAutomaticReconnect() won’t configure the HubConnection to retry initial start failures, so start failures need to be handled manually:

async function start() {
    try {
        await connection.start();
        console.assert(connection.state === signalR.HubConnectionState.Connected);
        console.log("SignalR Connected.");
    } catch (err) {
        console.assert(connection.state === signalR.HubConnectionState.Disconnected);
        console.log(err);
        setTimeout(() => start(), 5000);
    }
};

If the client doesn’t successfully reconnect within its first four attempts, the HubConnection will transition to the Disconnected state and fire its onclose callbacks. This provides an opportunity to inform users the connection has been permanently lost and recommend refreshing the page:

connection.onclose(error => {
    console.assert(connection.state === signalR.HubConnectionState.Disconnected);

    document.getElementById("messageInput").disabled = true;

    const li = document.createElement("li");
    li.textContent = `Connection closed due to error "${error}". Try refreshing this page to restart the connection.`;
    document.getElementById("messagesList").appendChild(li);
});

In order to configure a custom number of reconnect attempts before disconnecting or change the reconnect timing, withAutomaticReconnect accepts an array of numbers representing the delay in milliseconds to wait before starting each reconnect attempt.

const connection = new signalR.HubConnectionBuilder()
    .withUrl("/chathub")
    .withAutomaticReconnect([0, 0, 10000])
    .build();

    // .withAutomaticReconnect([0, 2000, 10000, 30000]) yields the default behavior

nextRetryDelayInMilliseconds must return either a number representing the number of milliseconds to wait before the next reconnect attempt or null if the HubConnection should stop reconnecting.

const connection = new signalR.HubConnectionBuilder()
    .withUrl("/chathub")
    .withAutomaticReconnect({
        nextRetryDelayInMilliseconds: retryContext => {
            if (retryContext.elapsedMilliseconds < 60000) {
                // If we've been reconnecting for less than 60 seconds so far,
                // wait between 0 and 10 seconds before the next reconnect attempt.
                return Math.random() * 10000;
            } else {
                // If we've been reconnecting for more than 60 seconds so far, stop reconnecting.
                return null;
            }
        }
    })
    .build();
  1. A function (in this case, the start function) is created to start the connection.
  2. Call the start function in the connection’s onclose event handler.
async function start() {
    try {
        await connection.start();
        console.log("SignalR Connected.");
    } catch (err) {
        console.log(err);
        setTimeout(start, 5000);
    }
};

connection.onclose(async () => {
    await start();
});

The following code example shows how to use a Web Lock to keep a tab awake and avoid an unexpected connection closure.

var lockResolver;
if (navigator && navigator.locks && navigator.locks.request) {
    const promise = new Promise((res) => {
        lockResolver = res;
    });

    navigator.locks.request('unique_lock_name', { mode: "shared" }, () => {
        return promise;
    });
}

Answer by Jamison Rice

Sending Data via SignalR from the Client to the Server and Back,Creating Projects and Basic Configuration,How to provide a SignalR implementation on the client and server-side,How to install SignalR and prepare a basic configuration

{
  "profiles": {
    "RealTimeCharts_Server": {
      "commandName": "Project",
      "launchBrowser": false,
      "applicationUrl": "https://localhost:5001;http://localhost:5000",
      "environmentVariables": {
        "ASPNETCORE_ENVIRONMENT": "Development"
      }
    }
  }
}
public void ConfigureServices(IServiceCollection services)
{
    services.AddCors(options => 
    { 
        options.AddPolicy("CorsPolicy", builder => builder
        .WithOrigins("http://localhost:4200")
        .AllowAnyMethod()
        .AllowAnyHeader()
        .AllowCredentials()); 
    });

    services.AddControllers();
}

// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
    if (env.IsDevelopment())
    {
        app.UseDeveloperExceptionPage();
    }

    app.UseHttpsRedirection();

    app.UseRouting();

    app.UseCors("CorsPolicy");

    app.UseAuthorization();

    app.UseEndpoints(endpoints =>
    {
        endpoints.MapControllers();
    });
}
npm install @aspnet/signalr –-save
public class ChartModel
{
    public List<int> Data { get; set; }
    public string Label { get; set; }

    public ChartModel()
    {
        Data = new List<int>();
    }
}
public class ChartHub: Hub
{
        
}

Answer by Lana Gilbert

ASP.NET Core SignalR was redesigned with a simpler and more extensible scale-out model. It no longer allows a single client to connect to different server-side instances between requests. This means that sticky sessions are required to ensure server affinity for clients using protocols other than WebSockets. ASP.NET Core SignalR currently provides a scale-out plug-in for Redis.,ASP.NET Core SignalR doesn’t support automatic reconnection or automatic buffering of messages. Instead, it’s up to the client application to decide when it needs to reconnect; and it’s up to the server to implement message buffering if required.,A common way to include a client’s identity on AJAX requests is via bearer tokens in an Authorization header.,Last, change the client-side JavaScript to request a token from this endpoint and use it to connect to the SignalR hub. The HubConnection can be configured with an access token factory to include a token when creating the connection.

This article builds on a new ASP.NET Core Razor Pages application with individual authentication. You can create one in Visual Studio’s new ASP.NET Core project dialog or run the following .NET CLI command:

dotnet new razor --auth Individual

To use ASP.NET Core SignalR, it must be added to the project from NuGet. The latest version at the time of writing this is RC1.

dotnet add package Microsoft.AspNetCore.SignalR --version 1.0.0-rc1-final

A hub is the central point in an ASP.NET Core application through which all SignalR communication is routed. Create a hub for your chat application by adding a class named Chat that inherits from Microsoft.AspNetCore.SignalR.Hub:

using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
namespace SignalRChat.Hubs
{
    public class Chat : Hub
    {
        public async Task SendMessage(string message)
        {
            await Clients.All.SendAsync("newMessage", "anonymous", message);
        }
    }
}

In ConfigureServices, call the AddSignalR extension method to configure the IoC container with services required by SignalR. Like this:

public void ConfigureServices (IServiceCollection services)
{
    // ...
    services.AddSignalR();
}

public void Configure(IApplicationBuilder app, HostingEnvironment env)
{
    // ...
    app.UseAuthentication();
    app.UseMvc();
    
    app.UseSignalR (builder =>
    {
        builder.MapHub<Chat>("/chat");
    });
}

Now it’s time to add some client-side code that will interact with the Chat hub. In Pages/Index.cshtml, reference the SignalR browser JavaScript library. This can be done by using npm and a tool like WebPack to install the package and copy the client-side JavaScript files to the wwwroot folder. You can also reference the script on a CDN (note that the original URL has single @ signs, but @ is a special character in Razor and is escaped with @@):

<script src="https://unpkg.com/@@aspnet/[email protected]@1.0.0-rc1-final/dist/browser/signalr.js"></script>
<div class="signalr-demo">
    <form id="message-form">
        <input type="text" id="message-box"/>
    </form>
    <hr />
    <ul id="messages"></ul>
</div>

Понравилась статья? Поделить с друзьями:
  • Error cannot split rhs for multiple assignment
  • Error cannot set write parameters mode page
  • Error cannot satisfy dependencies ошибка при установке пакетов
  • Error cannot satisfy dependencies ubuntu
  • Error cannot satisfy dependencies kubuntu