Options
All
  • Public
  • Public/Protected
  • All
Menu

Wraps an instance of Pubsub and ensures that a pubsub message is generated with some minimum frequency.

Hierarchy

Implements

Index

Constructors

constructor

  • Given a 'maxPubsubPublishInterval' specifying the max amount of time between pubsub messages, starts a background job that runs every maxPubsubPublishInterval/2 and publishes a keepalive message if no other pubsub messages have been sent within maxPubsubPublishInterval/2. Running the check in an interval half as long as the max limit, and publishing a message if we have less than half the max limit interval remaining, guarantees that even in the worst case we never pass 'maxPubsubPublishInterval' without publishing a message.

    Parameters

    • pubsub: ObservableWithNext<PubsubMessage>

      Pubsub instances used to publish messages to the underlying libp2p pubsub topic.

    • maxPubsubPublishInterval: number

      the max amount of time that is allowed to pass without generating a pubsub message.

    Returns PubsubKeepalive

Properties

operator

operator: Operator<any, PubsubMessage> | undefined
deprecated

Internal implementation detail, do not use directly. Will be made internal in v8.

source

source: Observable<any> | undefined
deprecated

Internal implementation detail, do not use directly. Will be made internal in v8.

Static create

create: (...args: any[]) => any

Creates a new Observable by calling the Observable constructor

owner

Observable

method

create

param

the subscriber function to be passed to the Observable constructor

returns

a new observable

nocollapse
deprecated

Use new Observable() instead. Will be removed in v8.

Type declaration

    • (...args: any[]): any
    • Parameters

      • Rest ...args: any[]

      Returns any

Methods

forEach

  • forEach(next: (value: PubsubMessage) => void): Promise<void>
  • forEach(next: (value: PubsubMessage) => void, promiseCtor: PromiseConstructorLike): Promise<void>
  • Used as a NON-CANCELLABLE means of subscribing to an observable, for use with APIs that expect promises, like async/await. You cannot unsubscribe from this.

    WARNING: Only use this with observables you know will complete. If the source observable does not complete, you will end up with a promise that is hung up, and potentially all of the state of an async function hanging out in memory. To avoid this situation, look into adding something like timeout, {@link take}, {@link takeWhile}, or {@link takeUntil} amongst others.

    Example

    import { interval, take } from 'rxjs';
    
    const source$ = interval(1000).pipe(take(4));
    
    async function getTotal() {
      let total = 0;
    
      await source$.forEach(value => {
        total += value;
        console.log('observable -> ' + value);
      });
    
      return total;
    }
    
    getTotal().then(
      total => console.log('Total: ' + total)
    );
    
    // Expected:
    // 'observable -> 0'
    // 'observable -> 1'
    // 'observable -> 2'
    // 'observable -> 3'
    // 'Total: 6'
    

    Parameters

    Returns Promise<void>

    a promise that either resolves on observable completion or rejects with the handled error

  • deprecated

    Passing a Promise constructor will no longer be available in upcoming versions of RxJS. This is because it adds weight to the library, for very little benefit. If you need this functionality, it is recommended that you either polyfill Promise, or you create an adapter to convert the returned native promise to whatever promise implementation you wanted. Will be removed in v8.

    Parameters

    • next: (value: PubsubMessage) => void

      a handler for each value emitted by the observable

    • promiseCtor: PromiseConstructorLike

      a constructor function used to instantiate the Promise

    Returns Promise<void>

    a promise that either resolves on observable completion or rejects with the handled error

lift

  • Creates a new Observable, with this Observable instance as the source, and the passed operator defined as the new observable's operator.

    method

    lift

    deprecated

    Internal implementation detail, do not use directly. Will be made internal in v8. If you have implemented an operator using lift, it is recommended that you create an operator by simply returning new Observable() directly. See "Creating new operators from scratch" section here: https://rxjs.dev/guide/operators

    Type parameters

    • R

    Parameters

    • Optional operator: Operator<PubsubMessage, R>

      the operator defining the operation to take on the observable

    Returns Observable<R>

    a new observable with the Operator applied

next

  • Passes on the message to be published by the underlying Pubsub instance, while keeping track of the fact that we sent a message and thus can reset our idea of when the next keepalive message needs to be sent.

    Parameters

    Returns Subscription

pipe

  • pipe(): Observable<PubsubMessage>
  • pipe<A>(op1: OperatorFunction<PubsubMessage, A>): Observable<A>
  • pipe<A, B>(op1: OperatorFunction<PubsubMessage, A>, op2: OperatorFunction<A, B>): Observable<B>
  • pipe<A, B, C>(op1: OperatorFunction<PubsubMessage, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>): Observable<C>
  • pipe<A, B, C, D>(op1: OperatorFunction<PubsubMessage, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>): Observable<D>
  • pipe<A, B, C, D, E>(op1: OperatorFunction<PubsubMessage, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>): Observable<E>
  • pipe<A, B, C, D, E, F>(op1: OperatorFunction<PubsubMessage, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>): Observable<F>
  • pipe<A, B, C, D, E, F, G>(op1: OperatorFunction<PubsubMessage, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>): Observable<G>
  • pipe<A, B, C, D, E, F, G, H>(op1: OperatorFunction<PubsubMessage, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>, op8: OperatorFunction<G, H>): Observable<H>
  • pipe<A, B, C, D, E, F, G, H, I>(op1: OperatorFunction<PubsubMessage, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>, op8: OperatorFunction<G, H>, op9: OperatorFunction<H, I>): Observable<I>
  • pipe<A, B, C, D, E, F, G, H, I>(op1: OperatorFunction<PubsubMessage, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>, op8: OperatorFunction<G, H>, op9: OperatorFunction<H, I>, ...operations: OperatorFunction<any, any>[]): Observable<unknown>
  • Returns Observable<PubsubMessage>

  • Type parameters

    • A

    Parameters

    Returns Observable<A>

  • Type parameters

    • A

    • B

    Parameters

    • op1: OperatorFunction<PubsubMessage, A>
    • op2: OperatorFunction<A, B>

    Returns Observable<B>

  • Type parameters

    • A

    • B

    • C

    Parameters

    • op1: OperatorFunction<PubsubMessage, A>
    • op2: OperatorFunction<A, B>
    • op3: OperatorFunction<B, C>

    Returns Observable<C>

  • Type parameters

    • A

    • B

    • C

    • D

    Parameters

    • op1: OperatorFunction<PubsubMessage, A>
    • op2: OperatorFunction<A, B>
    • op3: OperatorFunction<B, C>
    • op4: OperatorFunction<C, D>

    Returns Observable<D>

  • Type parameters

    • A

    • B

    • C

    • D

    • E

    Parameters

    • op1: OperatorFunction<PubsubMessage, A>
    • op2: OperatorFunction<A, B>
    • op3: OperatorFunction<B, C>
    • op4: OperatorFunction<C, D>
    • op5: OperatorFunction<D, E>

    Returns Observable<E>

  • Type parameters

    • A

    • B

    • C

    • D

    • E

    • F

    Parameters

    • op1: OperatorFunction<PubsubMessage, A>
    • op2: OperatorFunction<A, B>
    • op3: OperatorFunction<B, C>
    • op4: OperatorFunction<C, D>
    • op5: OperatorFunction<D, E>
    • op6: OperatorFunction<E, F>

    Returns Observable<F>

  • Type parameters

    • A

    • B

    • C

    • D

    • E

    • F

    • G

    Parameters

    • op1: OperatorFunction<PubsubMessage, A>
    • op2: OperatorFunction<A, B>
    • op3: OperatorFunction<B, C>
    • op4: OperatorFunction<C, D>
    • op5: OperatorFunction<D, E>
    • op6: OperatorFunction<E, F>
    • op7: OperatorFunction<F, G>

    Returns Observable<G>

  • Type parameters

    • A

    • B

    • C

    • D

    • E

    • F

    • G

    • H

    Parameters

    • op1: OperatorFunction<PubsubMessage, A>
    • op2: OperatorFunction<A, B>
    • op3: OperatorFunction<B, C>
    • op4: OperatorFunction<C, D>
    • op5: OperatorFunction<D, E>
    • op6: OperatorFunction<E, F>
    • op7: OperatorFunction<F, G>
    • op8: OperatorFunction<G, H>

    Returns Observable<H>

  • Type parameters

    • A

    • B

    • C

    • D

    • E

    • F

    • G

    • H

    • I

    Parameters

    • op1: OperatorFunction<PubsubMessage, A>
    • op2: OperatorFunction<A, B>
    • op3: OperatorFunction<B, C>
    • op4: OperatorFunction<C, D>
    • op5: OperatorFunction<D, E>
    • op6: OperatorFunction<E, F>
    • op7: OperatorFunction<F, G>
    • op8: OperatorFunction<G, H>
    • op9: OperatorFunction<H, I>

    Returns Observable<I>

  • Type parameters

    • A

    • B

    • C

    • D

    • E

    • F

    • G

    • H

    • I

    Parameters

    • op1: OperatorFunction<PubsubMessage, A>
    • op2: OperatorFunction<A, B>
    • op3: OperatorFunction<B, C>
    • op4: OperatorFunction<C, D>
    • op5: OperatorFunction<D, E>
    • op6: OperatorFunction<E, F>
    • op7: OperatorFunction<F, G>
    • op8: OperatorFunction<G, H>
    • op9: OperatorFunction<H, I>
    • Rest ...operations: OperatorFunction<any, any>[]

    Returns Observable<unknown>

publishPubsubKeepaliveIfNeeded

  • publishPubsubKeepaliveIfNeeded(): void
  • Called periodically and ensures that if we haven't published a pubsub message in too long, we'll publish one so that we never go longer than MAX_PUBSUB_PUBLISH_INTERVAL without publishing a pubsub message. This is to work around a bug in IPFS where peer connections get dropped if they haven't had traffic in too long.

    Returns void

subscribe

  • subscribe(observer?: Partial<Observer<PubsubMessage>>): Subscription
  • subscribe(next: (value: PubsubMessage) => void): Subscription
  • subscribe(next?: ((value: PubsubMessage) => void) | null, error?: ((error: any) => void) | null, complete?: (() => void) | null): Subscription

toPromise

  • toPromise(): Promise<PubsubMessage | undefined>
  • toPromise(PromiseCtor: typeof Promise): Promise<PubsubMessage | undefined>
  • toPromise(PromiseCtor: PromiseConstructorLike): Promise<PubsubMessage | undefined>