Mosec (opens in a new tab) is a high-performance and flexible model serving framework for building ML model-enabled backend and microservices. It bridges the gap between any machine learning models you just trained and the efficient online service API.
- Highly performant: web layer and task coordination built with Rust 🦀, which offers blazing speed in addition to efficient CPU utilization powered by async I/O
- Dynamic batching: aggregate requests from different users for batched inference and distribute results back
- Pipelined stages: spawn multiple processes for pipelined stages to handle CPU/GPU/IO mixed workloads
We provide a set of templates to help you quickly get started with Mosec. You can find them below.
Mosec requires Python 3.7 or above. Install the latest PyPI package (opens in a new tab) with:
pip install -U mosec
We demonstrate how Mosec can help you easily host a pre-trained stable diffusion model as a service. You need to install diffusers (opens in a new tab) and transformers (opens in a new tab) as prerequisites:
pip install --upgrade diffusers[torch] transformers
We build an API for clients to query a text prompt and obtain an image based on the stable-diffusion-v1-5 model (opens in a new tab) in just 3 steps.
class StableDiffusion(MsgpackMixin, Worker): def __init__(self): self.pipe = StableDiffusionPipeline.from_pretrained( "runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16 ) device = "cuda" if torch.cuda.is_available() else "cpu" self.pipe = self.pipe.to(device) self.example = ["useless example prompt"] * 4 # warmup (bs=4) def forward(self, data: List[str]) -> List[memoryview]: logger.debug("generate images for %s", data) res = self.pipe(data) logger.debug("NSFW: %s", res) images =  for img in res: dummy_file = BytesIO() img.save(dummy_file, format="JPEG") images.append(dummy_file.getbuffer()) return images
Define your service as a class which inherits
mosec.Worker. Here we also inherit
MsgpackMixinto employ the msgpack (opens in a new tab) serialization format(a).
__init__method, initialize your model and put it onto the corresponding device. Optionally you can assign
self.examplewith some data to warm up(b) the model. Note that the data should be compatible with your handler's input format, which we detail next.
forwardmethod to write your service handler(c), with the signature
forward(self, data: Any | List[Any]) -> Any | List[Any]. Receiving/returning a single item or a tuple depends on whether dynamic batching(d) is configured.
(a) In this example we return an image in the binary format, which JSON does not support (unless encoded with base64 that makes it longer). Hence, msgpack suits our need better. If we do not inherit
MsgpackMixin, JSON will be used by default. In other words, the protocol of the service request or response can either be msgpack or JSON.
(b) Warm-up usually helps to allocate GPU memory in advance. If the warm-up example is specified, the service will only be ready after the example is forwarded through the handler. However, if no example is given, the first request's latency is expected to be longer. The
exampleshould be set as a single item or a tuple depending on what
forwardexpects to receive. Moreover, in the case where you want to warm up with multiple different examples, you may set
multi_examples(demo here (opens in a new tab)).
(c) This example shows a single-stage service, where the
StableDiffusionworker directly takes in client's prompt request and responds the image. Thus the
forwardcan be considered as a complete service handler. However, we can also design a multi-stage service with workers doing different jobs (e.g., downloading images, forward model, post-processing) in a pipeline. In this case, the whole pipeline is considered as the service handler, with the first worker taking in the request and the last worker sending out the response. The data flow between workers is done by inter-process communication.
(d) Since dynamic batching is enabled in this example, the
forwardmethod will wishfully receive a list of string, e.g.,
['a cute cat playing with a red ball', 'a man sitting in front of a computer', ...], aggregated from different clients for batch inference, improving the system throughput.
Finally, we append the worker to the server to construct a single-stage workflow (multiple stages can be pipelined (opens in a new tab) to further boost the throughput, see this example (opens in a new tab)), and specify the number of processes we want it to run in parallel (
num=1), and the maximum batch size (
max_batch_size=4, the maximum number of requests dynamic batching will accumulate before timeout; timeout is defined with the flag
--wait in milliseconds, meaning the longest time Mosec waits until sending the batch to the Worker).
if __name__ == "__main__": server = Server() # 1) `num` specify the number of processes that will be spawned to run in parallel. # 2) By configuring the `max_batch_size` with the value > 1, the input data in your # `forward` function will be a list (batch); otherwise, it's a single item. server.append_worker(StableDiffusion, num=1, max_batch_size=4) server.run()
Please check out mosec examples page (opens in a new tab) for more examples.