Effection Logo

Library Integration

You build a library with a Promise API. Your users call await client.query() and move on. Internally, though, you need stronger lifecycle guarantees than promises can give you. You need requests, sockets, subscriptions, and cleanup tied to one owner. You need work to stop when the library shuts down.

This is where Effection fits behind a Promise API.

The Integration Boundary

The core of the pattern is a scope that lives for the lifetime of your library. You create it once, initialize your stateful resources inside it, and keep it alive until shutdown. withResolvers() gives you a readiness gate that prevents race conditions between "pool is ready" and "first query arrives."

import { createScope, suspend, withResolvers } from "effection";

const [scope, destroy] = createScope();
const { operation: poolReady, resolve, reject } = withResolvers<PoolState>();

let closed = false;

scope.run(function* () {
  try {
    const pool = yield* connectionPool(options.maxConnections);
    resolve(pool);      // signal: ready for use
    yield* suspend();   // keep scope alive until destroy()
  } catch (e) {
    reject(e instanceof Error ? e : new Error(String(e)));
  }
});

suspend() is what keeps the scope resident between calls. Without it, the scope would exit after initialization and tear down your resources.

Resources for Lifecycle

Use resource() for anything with setup and cleanup. The value is available after provide(), and cleanup in finally runs when the scope exits.

function connection(id: string): Operation<Connection> {
  return resource(function* (provide) {
    const socket = yield* until(connectToDatabase());

    const conn: Connection = {
      id,
      *execute(query: string) {
        // ... query implementation
        return { rows: [] };
      },
    };

    try {
      yield* provide(conn);
    } finally {
      socket.close();
    }
  });
}

Resources can depend on other resources. A connection pool that creates multiple connections will see all of them cleaned up automatically when the pool's scope exits—in reverse order of creation.

The Promise Bridge

Expose plain methods that call scope.run(() => operation). This returns a promise, so clients stay in normal async/await, while your internal work stays under structured concurrency.

function* executeQuery(sql: string): Operation<QueryResult> {
  if (closed) {
    throw new Error("Pool is shut down");
  }

  const pool = yield* poolReady;  // blocks until init complete
  const conn = yield* pool.acquire();
  try {
    return yield* conn.execute(sql);
  } finally {
    pool.release(conn);
  }
}

return {
  query(sql: string): Promise<QueryResult> {
    return scope.run(() => executeQuery(sql));
  },

    queryAll(queries: string[]): Promise<QueryResult[]> {
      return scope.run(function* () {
        return yield* all(queries.map((sql) => executeQuery(sql)));
      });
    },

    shutdown,
  [Symbol.asyncDispose](): Promise<void> {
    return shutdown();
  },
};

Adding Symbol.asyncDispose lets callers use await using for automatic cleanup:

// Manual cleanup
const pool = createPool({ maxConnections: 10 });
const result = await pool.query("SELECT * FROM users");
await pool.shutdown();

// Automatic cleanup with await using
await using pool = createPool({ maxConnections: 10 });
const result = await pool.query("SELECT * FROM users");
// cleanup runs automatically

Gotchas

Await destruction. destroy() returns a future. If you don't await it, teardown may not complete before your process exits. Always await shutdown() or use await using.

In-flight work during shutdown. When destroy() is called, any operations currently running via scope.run() are halted. Their finally blocks still execute, so cleanup happens, but the promises returned to callers will reject with a "halted" error.

Startup failures propagate. If initialization throws, reject() is called once, and every caller waiting on poolReady receives the same error. This is deterministic and usually what you want.

Complete Example

Full working example
import {
  all,
  createScope,
  resource,
  sleep,
  suspend,
  until,
  withResolvers,
  type Operation,
} from "effection";

interface QueryResult {
  rows: unknown[];
}

interface Connection {
  id: string;
  execute(query: string): Operation<QueryResult>;
}

interface PoolState {
  connections: Connection[];
  available: Connection[];
  acquire(): Operation<Connection>;
  release(conn: Connection): void;
}

function connectToDatabase(): Promise<{ close(): void }> {
  return Promise.resolve({ close() {} });
}

function connection(id: string): Operation<Connection> {
  return resource(function* (provide) {
    const socket = yield* until(connectToDatabase());

    const conn: Connection = {
      id,
      *execute(_query: string) {
        return { rows: [] };
      },
    };

    try {
      yield* provide(conn);
    } finally {
      socket.close();
    }
  });
}

function connectionPool(maxConnections: number): Operation<PoolState> {
  return resource(function* (provide) {
    const connections: Connection[] = [];
    const available: Connection[] = [];

    for (let i = 0; i < maxConnections; i++) {
      const conn = yield* connection(`conn-${i}`);
      connections.push(conn);
      available.push(conn);
    }

    const state: PoolState = {
      connections,
      available,
      *acquire() {
        while (available.length === 0) {
          yield* sleep(50);
        }
        return available.pop()!;
      },
      release(conn) {
        available.push(conn);
      },
    };

    yield* provide(state);
  });
}

export interface ConnectionPool extends AsyncDisposable {
  query(sql: string): Promise<QueryResult>;
  queryAll(queries: string[]): Promise<QueryResult[]>;
  shutdown(): Promise<void>;
}

export function createPool(
  options: { maxConnections: number },
): ConnectionPool {
  const [scope, destroy] = createScope();
  const { operation: poolReady, resolve, reject } = withResolvers<PoolState>();

  let closed = false;

  scope.run(function* () {
    try {
      const pool = yield* connectionPool(options.maxConnections);
      resolve(pool);
      yield* suspend();
    } catch (e) {
      reject(e instanceof Error ? e : new Error(String(e)));
    }
  });

  function* executeQuery(sql: string): Operation<QueryResult> {
    if (closed) {
      throw new Error("Pool is shut down");
    }

    const pool = yield* poolReady;
    const conn = yield* pool.acquire();
    try {
      return yield* conn.execute(sql);
    } finally {
      pool.release(conn);
    }
  }

  async function shutdown(): Promise<void> {
    if (closed) return;
    closed = true;
    await destroy();
  }

  return {
    query(sql: string): Promise<QueryResult> {
      return scope.run(() => executeQuery(sql));
    },

    queryAll(queries: string[]): Promise<QueryResult[]> {
      return scope.run(function* () {
        return yield* all(queries.map((sql) => executeQuery(sql)));
      });
    },

    shutdown,

    [Symbol.asyncDispose](): Promise<void> {
      return shutdown();
    },
  };
}

See also: GitHub Gist


That is the whole pattern: one integration scope, one readiness gate, internal resources as operations, external methods as promises. Your users write await, you get structured shutdown.

  • PreviousProcesses