Micro-service for file storage and processing written in Go
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

159 lines
3.2 KiB

  1. package pipelines
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io/fs"
  8. "log"
  9. "os"
  10. "path/filepath"
  11. "github.com/disintegration/imaging"
  12. "github.com/geplauder/lithium/storage"
  13. )
  14. // Pipelines
  15. const (
  16. Image PipelineType = iota
  17. Video
  18. )
  19. type PipelineType int
  20. type IPipeline interface {
  21. GetName() string
  22. GetSlug() string
  23. GetType() PipelineType
  24. GetSteps() []Step
  25. Run(string, string, storage.IStorageProvider) (string, error)
  26. }
  27. type Pipeline struct {
  28. Name string `json:"name" faker:"name"`
  29. Slug string `json:"slug" faker:"word"`
  30. Type PipelineType `json:"type" faker:"-"`
  31. RemoveMetadata bool `json:"remove_metadata" faker:"-"`
  32. Steps []Step `json:"steps" faker:"-"`
  33. Output struct {
  34. Format string `json:"format"`
  35. Quality int `json:"quality"`
  36. } `json:"output" faker:"-"`
  37. }
  38. func (p Pipeline) Run(srcPath, bucketName string, storageProvider storage.IStorageProvider) (string, error) {
  39. fmt.Println("path: ", storageProvider.GetPath(bucketName, srcPath))
  40. src, err := imaging.Open(storageProvider.GetPath(bucketName, srcPath))
  41. if err != nil {
  42. return "", errors.New(fmt.Sprintf("error opening file for processing: %s", err))
  43. }
  44. for _, step := range p.GetSteps() {
  45. runner, err := step.GetExecutable()
  46. if err != nil {
  47. return "", err
  48. }
  49. src, err = runner.Execute(src)
  50. if err != nil {
  51. return "", err
  52. }
  53. }
  54. outputFormat := p.Output.Format
  55. if outputFormat == "" {
  56. outputFormat = "jpg"
  57. }
  58. format, err := imaging.FormatFromExtension(outputFormat)
  59. if err != nil {
  60. return "", errors.New(fmt.Sprintf("output format '%s' is not supported", outputFormat))
  61. }
  62. var options []imaging.EncodeOption
  63. if p.Output.Quality != 0 {
  64. options = append(options, imaging.JPEGQuality(p.Output.Quality))
  65. }
  66. // encode image to io buffer
  67. buffer := new(bytes.Buffer)
  68. if err := imaging.Encode(buffer, src, format, options...); err != nil {
  69. return "", err
  70. }
  71. const fileName = "output.jpg" // TODO make variable
  72. _, err = storageProvider.StoreRaw(bucketName, fileName, buffer.Bytes())
  73. if err != nil {
  74. return "", err
  75. }
  76. return fileName, nil
  77. }
  78. func (p Pipeline) GetName() string {
  79. return p.Name
  80. }
  81. func (p Pipeline) GetSlug() string {
  82. return p.Slug
  83. }
  84. func (p Pipeline) GetType() PipelineType {
  85. return p.Type
  86. }
  87. func (p Pipeline) GetSteps() []Step {
  88. return p.Steps
  89. }
  90. type ImagePipeline struct {
  91. Pipeline
  92. }
  93. type VideoPipeline struct {
  94. Pipeline
  95. }
  96. // Deserialization
  97. func DeserializePipelines(pipelines [][]byte) []IPipeline {
  98. var values []IPipeline
  99. for _, pipeline := range pipelines {
  100. var deserializedObject Pipeline
  101. err := json.Unmarshal(pipeline, &deserializedObject)
  102. if err != nil {
  103. log.Fatalf("Could not deserialize pipelines config: %s", err)
  104. }
  105. values = append(values, deserializedObject)
  106. }
  107. return values
  108. }
  109. func LoadPipelines() []IPipeline {
  110. var files [][]byte
  111. path, _ := os.Getwd()
  112. err := filepath.Walk(path+"/config", func(path string, info fs.FileInfo, err error) error {
  113. if err == nil && info.IsDir() == false {
  114. fmt.Println(path)
  115. data, _ := os.ReadFile(path)
  116. files = append(files, data)
  117. }
  118. return nil
  119. })
  120. if err != nil {
  121. panic(err)
  122. }
  123. return DeserializePipelines(files)
  124. }