我最近写了一篇文章,演示了使用“通道/事件”模型集成Angular 2和SignalR的一种方法:
原文链接:https://www.f2er.com/angularjs/141654.htmlhttps://blog.sstorie.com/integrating-angular-2-and-signalr-part-2-of-2/
我不认为只是链接到另一个站点是合适的,所以这是Angular 2服务的核心,暴露SignalR:
import {Injectable,Inject} from "angular2/core"; import Rx from "rxjs/Rx"; /** * When SignalR runs it will add functions to the global $variable * that you use to create connections to the hub. However,in this * class we won't want to depend on any global variables,so this * class provides an abstraction away from using $directly in here. */ export class SignalrWindow extends Window { $: any; } export enum ConnectionState { Connecting = 1,Connected = 2,Reconnecting = 3,Disconnected = 4 } export class ChannelConfig { url: string; hubName: string; channel: string; } export class ChannelEvent { Name: string; ChannelName: string; Timestamp: Date; Data: any; Json: string; constructor() { this.Timestamp = new Date(); } } class ChannelSubject { channel: string; subject: Rx.Subject<ChannelEvent>; } /** * ChannelService is a wrapper around the functionality that SignalR * provides to expose the ideas of channels and events. With this service * you can subscribe to specific channels (or groups in signalr speak) and * use observables to react to specific events sent out on those channels. */ @Injectable() export class ChannelService { /** * starting$is an observable available to know if the signalr * connection is ready or not. On a successful connection this * stream will emit a value. */ starting$: Rx.Observable<any>; /** * connectionState$provides the current state of the underlying * connection as an observable stream. */ connectionState$: Rx.Observable<ConnectionState>; /** * error$provides a stream of any error messages that occur on the * SignalR connection */ error$: Rx.Observable<string>; // These are used to Feed the public observables // private connectionStateSubject = new Rx.Subject<ConnectionState>(); private startingSubject = new Rx.Subject<any>(); private errorSubject = new Rx.Subject<any>(); // These are used to track the internal SignalR state // private hubConnection: any; private hubProxy: any; // An internal array to track what channel subscriptions exist // private subjects = new Array<ChannelSubject>(); constructor( @Inject(SignalrWindow) private window: SignalrWindow,@Inject("channel.config") private channelConfig: ChannelConfig ) { if (this.window.$=== undefined || this.window.$.hubConnection === undefined) { throw new Error("The variable '$' or the .hubConnection() function are not defined...please check the SignalR scripts have been loaded properly"); } // Set up our observables // this.connectionState$= this.connectionStateSubject.asObservable(); this.error$= this.errorSubject.asObservable(); this.starting$= this.startingSubject.asObservable(); this.hubConnection = this.window.$.hubConnection(); this.hubConnection.url = channelConfig.url; this.hubProxy = this.hubConnection.createHubProxy(channelConfig.hubName); // Define handlers for the connection state events // this.hubConnection.stateChanged((state: any) => { let newState = ConnectionState.Connecting; switch (state.newState) { case this.window.$.signalR.connectionState.connecting: newState = ConnectionState.Connecting; break; case this.window.$.signalR.connectionState.connected: newState = ConnectionState.Connected; break; case this.window.$.signalR.connectionState.reconnecting: newState = ConnectionState.Reconnecting; break; case this.window.$.signalR.connectionState.disconnected: newState = ConnectionState.Disconnected; break; } // Push the new state on our subject // this.connectionStateSubject.next(newState); }); // Define handlers for any errors // this.hubConnection.error((error: any) => { // Push the error on our subject // this.errorSubject.next(error); }); this.hubProxy.on("onEvent",(channel: string,ev: ChannelEvent) => { //console.log(`onEvent - ${channel} channel`,ev); // This method acts like a broker for incoming messages. We // check the interal array of subjects to see if one exists // for the channel this came in on,and then emit the event // on it. Otherwise we ignore the message. // let channelSub = this.subjects.find((x: ChannelSubject) => { return x.channel === channel; }) as ChannelSubject; // If we found a subject then emit the event on it // if (channelSub !== undefined) { return channelSub.subject.next(ev); } }); } /** * Start the SignalR connection. The starting$stream will emit an * event if the connection is established,otherwise it will emit an * error. */ start(): void { // Now we only want the connection started once,so we have a special // starting$observable that clients can subscribe to know know if // if the startup sequence is done. // // If we just mapped the start() promise to an observable,then any time // a client subscried to it the start sequence would be triggered // again since it's a cold observable. // this.hubConnection.start() .done(() => { this.startingSubject.next(); }) .fail((error: any) => { this.startingSubject.error(error); }); } /** * Get an observable that will contain the data associated with a specific * channel * */ sub(channel: string): Rx.Observable<ChannelEvent> { // Try to find an observable that we already created for the requested // channel // let channelSub = this.subjects.find((x: ChannelSubject) => { return x.channel === channel; }) as ChannelSubject; // If we already have one for this event,then just return it // if (channelSub !== undefined) { console.log(`Found existing observable for ${channel} channel`) return channelSub.subject.asObservable(); } // // If we're here then we don't already have the observable to provide the // caller,so we need to call the server method to join the channel // and then create an observable that the caller can use to received // messages. // // Now we just create our internal object so we can track this subject // in case someone else wants it too // channelSub = new ChannelSubject(); channelSub.channel = channel; channelSub.subject = new Rx.Subject<ChannelEvent>(); this.subjects.push(channelSub); // Now SignalR is asynchronous,so we need to ensure the connection is // established before we call any server methods. So we'll subscribe to // the starting$stream since that won't emit a value until the connection // is ready // this.starting$.subscribe(() => { this.hubProxy.invoke("Subscribe",channel) .done(() => { console.log(`Successfully subscribed to ${channel} channel`); }) .fail((error: any) => { channelSub.subject.error(error); }); },(error: any) => { channelSub.subject.error(error); }); return channelSub.subject.asObservable(); } // Not quite sure how to handle this (if at all) since there could be // more than 1 caller subscribed to an observable we created // // unsubscribe(channel: string): Rx.Observable<any> { // this.observables = this.observables.filter((x: ChannelObservable) => { // return x.channel === channel; // }); // } /** publish provides a way for calles to emit events on any channel. In a * production app the server would ensure that only authorized clients can * actually emit the message,but here we're not concerned about that. */ publish(ev: ChannelEvent): void { this.hubProxy.invoke("Publish",ev); } }
然后,组件可以通过订阅(不是在rxjs意义上……)到特定通道,并对发出的特定事件做出反应来使用此服务:
import {Component,OnInit,Input} from "angular2/core"; import {Http,Response} from "angular2/http"; import Rx from "rxjs/Rx"; import {ChannelService,ChannelEvent} from "./services/channel.service"; class StatusEvent { State: string; PercentComplete: number; } @Component({ selector: 'task',template: ` <div> <h4>Task component bound to '{{eventName}}'</h4> </div> <div class="commands"> <textarea class="console" cols="50" rows="15" disabled [value]="messages"></textarea> <div class="commands__input"> <button (click)="callApi()">Call API</button> </div> </div> ` }) export class TaskComponent implements OnInit { @Input() eventName: string; @Input() apiUrl: string; messages = ""; private channel = "tasks"; constructor( private http: Http,private channelService: ChannelService ) { } ngOnInit() { // Get an observable for events emitted on this channel // this.channelService.sub(this.channel).subscribe( (x: ChannelEvent) => { switch (x.Name) { case this.eventName: { this.appendStatusUpdate(x); } } },(error: any) => { console.warn("Attempt to join channel Failed!",error); } ) } private appendStatusUpdate(ev: ChannelEvent): void { // Just prepend this to the messages string shown in the textarea // let date = new Date(); switch (ev.Data.State) { case "starting": { this.messages = `${date.toLocaleTimeString()} : starting\n` + this.messages; break; } case "complete": { this.messages = `${date.toLocaleTimeString()} : complete\n` + this.messages; break; } default: { this.messages = `${date.toLocaleTimeString()} : ${ev.Data.State} : ${ev.Data.PercentComplete} % complete\n` + this.messages; } } } callApi() { this.http.get(this.apiUrl) .map((res: Response) => res.json()) .subscribe((message: string) => { console.log(message); }); } }
我试图将SignalR概念映射到observables,但我仍然在学习如何有效地使用RxJS.在任何情况下,我希望有助于说明这可能在Angular 2应用程序的上下文中起作用.