Micro-service for file storage and processing written in Go
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

156 lines
3.2 KiB

package pipelines
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"github.com/disintegration/imaging"
"github.com/geplauder/lithium/storage"
"github.com/sirupsen/logrus"
)
// Pipelines
const (
Image PipelineType = iota
Video
)
type PipelineType int
type IPipeline interface {
GetName() string
GetSlug() string
GetType() PipelineType
GetSteps() []Step
Run(string, string, storage.IStorageProvider) (string, error)
}
type Pipeline struct {
Name string `json:"name" faker:"name"`
Slug string `json:"slug" faker:"word"`
Type PipelineType `json:"type" faker:"-"`
RemoveMetadata bool `json:"remove_metadata" faker:"-"`
Steps []Step `json:"steps" faker:"-"`
Output struct {
Format string `json:"format"`
Quality int `json:"quality"`
} `json:"output" faker:"-"`
}
func (p Pipeline) Run(srcPath, bucketName string, storageProvider storage.IStorageProvider) (string, error) {
src, err := imaging.Open(storageProvider.GetPath(bucketName, srcPath))
if err != nil {
return "", errors.New(fmt.Sprintf("error opening file for processing: %s", err))
}
for _, step := range p.GetSteps() {
runner, err := step.GetExecutable()
if err != nil {
return "", err
}
src, err = runner.Execute(src)
if err != nil {
return "", err
}
}
outputFormat := p.Output.Format
if outputFormat == "" {
outputFormat = "jpg"
}
format, err := imaging.FormatFromExtension(outputFormat)
if err != nil {
return "", errors.New(fmt.Sprintf("output format '%s' is not supported", outputFormat))
}
var options []imaging.EncodeOption
if p.Output.Quality != 0 {
options = append(options, imaging.JPEGQuality(p.Output.Quality))
}
// encode image to io buffer
buffer := new(bytes.Buffer)
if err := imaging.Encode(buffer, src, format, options...); err != nil {
return "", err
}
const fileName = "output.jpg" // TODO make variable
_, err = storageProvider.StoreRaw(bucketName, fileName, buffer.Bytes())
if err != nil {
return "", err
}
return fileName, nil
}
func (p Pipeline) GetName() string {
return p.Name
}
func (p Pipeline) GetSlug() string {
return p.Slug
}
func (p Pipeline) GetType() PipelineType {
return p.Type
}
func (p Pipeline) GetSteps() []Step {
return p.Steps
}
type ImagePipeline struct {
Pipeline
}
type VideoPipeline struct {
Pipeline
}
// Deserialization
func DeserializePipelines(pipelines [][]byte) []IPipeline {
var values []IPipeline
for _, pipeline := range pipelines {
var deserializedObject Pipeline
err := json.Unmarshal(pipeline, &deserializedObject)
if err != nil {
logrus.Fatalf("Could not deserialize pipelines config: %s", err)
}
values = append(values, deserializedObject)
}
return values
}
func LoadPipelines() []IPipeline {
var files [][]byte
path, _ := os.Getwd()
err := filepath.Walk(path+"/config", func(path string, info fs.FileInfo, err error) error {
if err == nil && info.IsDir() == false {
data, _ := os.ReadFile(path)
files = append(files, data)
}
return nil
})
if err != nil {
logrus.Fatal("Unexpected error while loading pipelines: ", err)
}
return DeserializePipelines(files)
}