lightq

A simple, lightweight, Redis-backed job queue library

8
0
TypeScript

LightQ ✨

codecov
NPM Version
License: MIT

A simple, lightweight, Redis-backed job queue library designed for TypeScript
applications. LightQ provides essential queueing features with a focus on
reliability and ease of use, leveraging atomic Redis operations via Lua scripts.

Features

  • βœ… Simple API: Easy-to-understand interface for adding and processing
    jobs.
  • πŸš€ Redis Backend: Uses Redis for persistence, ensuring jobs are not lost.
  • πŸ”’ Atomic Operations: Leverages Lua scripts for safe, atomic state
    transitions in Redis.
  • πŸ”· TypeScript Native: Written entirely in TypeScript with types included.
  • πŸ•’ Delayed Jobs: Schedule jobs to run after a specific delay.
  • πŸ—“οΈ Scheduled Jobs: Define recurring jobs using cron patterns or fixed
    intervals via the integrated Job Scheduler.
  • πŸ”„ Automatic Retries: Failed jobs can be automatically retried with
    configurable backoff strategies (fixed, exponential).
  • βš–οΈ Concurrency Control: Limit the number of jobs a worker processes
    concurrently.
  • 🧹 Job Lifecycle Events: Emit events for active, completed, failed,
    retrying, etc. (including scheduler-specific events).
  • πŸ—‘οΈ Automatic Cleanup: Optionally remove job data automatically upon
    completion or failure.

Installation

# Using npm
npm install @jlucaso/lightq

# Using yarn
yarn add @jlucaso/lightq

# Using Bun
bun add @jlucaso/lightq

Prerequisite: Requires a running Redis server (version 4.0 or later
recommended due to Lua script usage). ioredis is used as the Redis client.

Basic Usage

Check the example/index.ts file for a comprehensive demonstration, including:

  • Creating Queues and Workers.
  • Adding immediate, delayed, and failing jobs.
  • Setting up and removing scheduled jobs.
  • Handling various job lifecycle events.

API Overview

Queue<TData, TResult, TName>

  • constructor(name: string, opts: QueueOptions): Creates a new queue
    instance.
  • add(name: TName, data: TData, opts?: JobOptions): Promise<Job<TData, TResult, TName>>:
    Adds a single job to the queue.
  • addBulk(jobs: { name: TName; data: TData; opts?: JobOptions }[]): Promise<Job<TData, TResult, TName>[]>:
    Adds multiple jobs efficiently.
  • upsertJobScheduler(schedulerId: string, repeat: SchedulerRepeatOptions, template: JobTemplate): Promise<void>:
    Creates or updates a recurring job schedule associated with this queue. Starts
    the scheduler polling process if needed.
  • removeJobScheduler(schedulerId: string): Promise<boolean>: Removes a
    specific job schedule associated with this queue.
  • getJob(jobId: string): Promise<Job<TData, TResult, TName> | null>:
    Retrieves job details by ID.
  • getJobCounts(): Promise<{ wait: number; active: number; ... }>: Gets the
    number of jobs in different states.
  • close(): Promise<void>: Closes the Redis connection used by the queue
    (and its associated scheduler, if active).
  • Events: error, ready, waiting, closing, closed,
    scheduler_error, scheduler_job_added.

Worker<TData, TResult, TName>

  • constructor(name: string, processor: Processor<TData, TResult, TName>, opts: WorkerOptions):
    Creates a worker instance to process jobs from the specified queue.
    • The processor function takes a Job object and should return a Promise
      resolving with the job’s result or throwing an error if it fails.
  • close(force?: boolean): Promise<void>: Closes the worker. By default, it
    waits for active jobs to complete. force = true attempts a faster shutdown.
  • Events:
    • active: Job started processing. (job: Job) => void
    • completed: Job finished successfully.
      (job: Job, result: TResult) => void
    • failed: Job failed (possibly after retries).
      (job: Job | undefined, error: Error) => void
    • error: An error occurred within the worker itself (e.g., Redis connection
      issue, lock renewal failure). (error: Error, job?: Job) => void
    • retrying: Job failed but will be retried.
      (job: Job, error: Error) => void
    • movedDelayed: Delayed jobs were moved to the wait list.
      (count: number) => void
    • ready: Worker connected to Redis. () => void
    • closing: Worker is starting the closing process. () => void
    • closed: Worker has finished closing. () => void

Job<TData, TResult, TName>

  • Represents a single job instance. Contains properties like:
    • id: Unique job ID.
    • name: Job name provided during add.
    • data: The payload provided during add.
    • opts: Job-specific options.
    • attemptsMade: How many times this job has been attempted.
    • progress: (Note: updateProgress is currently not fully implemented).
    • returnValue: The result returned by the processor upon completion.
    • failedReason: The error message if the job failed permanently.
    • stacktrace: Stack trace if the job failed.

JobScheduler

  • While not typically instantiated directly, the JobScheduler class manages
    recurring job schedules. It’s accessed via the Queue’s upsertJobScheduler
    and removeJobScheduler methods. It runs a background process to check for
    due schedules and adds corresponding jobs to the queue.
  • Events (Forwarded via Queue): error, job_added, upsert, remove,
    start, stop, closing, closed.

Configuration

QueueOptions

  • connection: IORedis connection options or an existing IORedis instance.
  • prefix: Redis key prefix (default: lightq).
  • defaultJobOptions: Default JobOptions applied to all jobs added to this
    queue.
  • (Implicitly used by Scheduler): prefix is used as the base for scheduler
    keys unless schedulerPrefix is provided in JobSchedulerOptions.

WorkerOptions

  • Extends QueueOptions.
  • concurrency: Max number of jobs to process concurrently (default: 1).
  • lockDuration: Time (ms) a job is locked during processing (default:
    30000).
  • lockRenewTime: Interval (ms) before lock expiration to attempt renewal
    (default: lockDuration / 2).
  • removeOnComplete: true to delete job data on success, or a number to
    keep only the latest N completed jobs’ data (Note: Trimming by count requires
    uncommenting Lua script logic). Default: false.
  • removeOnFail: true to delete job data on final failure, or a number to
    keep only the latest N failed jobs’ data (Note: Trimming by count requires
    uncommenting Lua script logic). Default: false.

JobOptions

  • jobId: Assign a custom job ID (cannot start with scheduler:).
  • delay: Delay (ms) before the job should be processed.
  • attempts: Max number of times to attempt the job (default: 1).
  • backoff: Backoff strategy for retries: number (fixed delay ms) or
    { type: 'fixed' | 'exponential', delay: number }.
  • removeOnComplete: Overrides worker/queue default.
  • removeOnFail: Overrides worker/queue default.

Scheduler Configuration

  • JobSchedulerOptions (extends QueueOptions): Options for the scheduler
    instance (usually configured internally by the Queue).
    • checkInterval: How often (ms) the scheduler checks for due jobs (default:
      5000).
    • schedulerPrefix: Specific Redis key prefix for scheduler data (defaults to
      prefix from QueueOptions).
  • SchedulerRepeatOptions: Defines when scheduled jobs run.
    • { pattern: string, tz?: string }: A cron pattern string and optional
      timezone (e.g., pattern: '0 * * * *' for hourly).
    • { every: number }: A repeat interval in milliseconds (e.g., every: 60000
      for every minute).
  • JobTemplate: Defines the job created by the scheduler.
    • name: The name of the job to be added to the queue.
    • data?: The data payload for the job.
    • opts?: JobOptions for the created job (cannot include jobId or
      delay).

Contributing

Contributions are welcome! Please feel free to open an issue or submit a pull
request.

License

Licensed under the MIT License.