目录

schedulerx-worker-go 使用文档

背景

SchedulerX 是阿里云提供的分布式任务调度服务(兼容开源 XXL-JOB/ElasticJob/K8s Job/Spring Schedule),支持 Cron 定时、一次性任务、任务编排、分布式数据处理,具有高可用、可视化、可运维、低延时等能力。

schedulerx-worker-go 是 SchedulerX Go 版本的 SDK,该 SDK 由高德贡献。

功能

  • 单机任务 (已支持)
  • 广播任务 (已支持)
  • Map任务(已支持)
  • MapReduce任务 (已支持)

使用说明

1. 登录 SchedulerX 控制台创建应用,返回配置信息

endpoint=xxxx
namespace=xxxx
groupId=xxx
appKey=xxx

2. 拉取 Go 版本 SDK

go get github.com/alibaba/schedulerx-worker-go@{最新的tag}

3. 实现接口,编写业务代码

3.1 单机任务

接口如下:

type Processor interface {
    Process(ctx *processor.JobContext) (*ProcessResult, error)
}

实现接口,参考Demo :

//HelloWorld.go
package main

import (
    "fmt"
    "time"

     "github.com/alibaba/schedulerx-worker-go/processor"
    "github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
)

var _ processor.Processor = &HelloWorld{}

type HelloWorld struct{}

func (h *HelloWorld) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
    fmt.Println("[Process] Start process my task: Hello world!")
    // mock execute task
    time.Sleep(3 * time.Second)
    ret := new(processor.ProcessResult)
    ret.SetStatus(processor.InstanceStatusSucceed)
    fmt.Println("[Process] End process my task: Hello world!")
    return ret, nil
}

3.2 广播任务

接口如下:

type BroadcastProcessor interface {
    Processor
    PreProcess(ctx *jobcontext.JobContext) error
    PostProcess(ctx *jobcontext.JobContext) (*ProcessResult, error)
}

接口描述:

  • PreProcess:所有worker节点执行Process之前,由master节点执行一次PreProcess
  • Process:所有worker节点执行Process,可以返回结果
  • PostProcess:所有worker节点执行Process结束后,由master节点执行一次PostProcess,可以获取所有节点Process的结果

实现接口, 参考Demo:

// TestBroadcast.go
package main

import (
    "fmt"
    "github.com/alibaba/schedulerx-worker-go/processor"
    "github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
    "github.com/alibaba/schedulerx-worker-go/processor/taskstatus"
    "math/rand"
    "strconv"
)

type TestBroadcast struct{}

// Process all machines would execute it.
func (t TestBroadcast) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
    value := rand.Intn(10)
    fmt.Printf("Total sharding num=%d, sharding=%d, value=%d\n", ctx.ShardingNum(), ctx.ShardingId(), value)
    ret := new(processor.ProcessResult)
    ret.SetSucceed()
    ret.SetResult(strconv.Itoa(value))
    return ret, nil
}

// PreProcess only one machine will execute it.
func (t TestBroadcast) PreProcess(ctx *jobcontext.JobContext) error {
    fmt.Println("TestBroadcastJob PreProcess")
    return nil
}

// PostProcess only one machine will execute it.
func (t TestBroadcast) PostProcess(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
    fmt.Println("TestBroadcastJob PostProcess")
    allTaskResults := ctx.TaskResults()
    allTaskStatuses := ctx.TaskStatuses()
    num := 0

    for key, val := range allTaskResults {
        fmt.Printf("%v:%v\n", key, val)
        if allTaskStatuses[key] == taskstatus.TaskStatusSucceed {
            valInt, _ := strconv.Atoi(val)
            num += valInt
        }
    }

    fmt.Printf("TestBroadcastJob PostProcess(), num=%d\n", num)
    ret := new(processor.ProcessResult)
    ret.SetSucceed()
    ret.SetResult(strconv.Itoa(num))
    return ret, nil
}

3.3 Map任务

接口如下:

type MapJobProcessor interface {
    Processor
    Map(jobCtx *jobcontext.JobContext, taskList []interface{}, taskName string) (*ProcessResult, error)
    Kill(jobCtx *jobcontext.JobContext) error
}

接口描述:

  • Process: 每次子任务执行的业务逻辑,第一次进来的是根任务
  • Map: 在根任务中构造taskList,通过map方法可以平均分发给其他worker节点分布式执行

实现接口,参考Demo:

// TestMapJob.go
package main

import (
    "encoding/json"
    "errors"
    "fmt"
    "github.com/alibaba/schedulerx-worker-go/processor"
    "github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
    "github.com/alibaba/schedulerx-worker-go/processor/mapjob"
    "strconv"
    "time"
)

type TestMapJob struct {
    *mapjob.MapJobProcessor
}

func (mr *TestMapJob) Kill(jobCtx *jobcontext.JobContext) error {
    //TODO implement me
    panic("implement me")
}

// Process the MapReduce model is used to distributed scan orders for timeout confirmation
func (mr *TestMapJob) Process(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) {
    var (
        num = 100
        err error
    )
    taskName := jobCtx.TaskName()

    if jobCtx.JobParameters() != "" {
        num, err = strconv.Atoi(jobCtx.JobParameters())
        if err != nil {
            return nil, err
        }
    }

    if mr.IsRootTask(jobCtx) {
        fmt.Println("start root task")
        var messageList []interface{}
        for i := 1; i <= num; i++ {
            var str = fmt.Sprintf("id_%d", i)
            messageList = append(messageList, str)
        }
        fmt.Println(messageList)
        return mr.Map(jobCtx, messageList, "Level1Dispatch")
    } else if taskName == "Level1Dispatch" {
        var task []byte = jobCtx.Task()
        var str string
        err = json.Unmarshal(task, &str)
        fmt.Printf("str=%s\n", str)
        time.Sleep(100 * time.Millisecond)
        fmt.Println("Finish Process...")
        if str == "id_5" {
            return processor.NewProcessResult(
                processor.WithFailed(),
                processor.WithResult(str),
            ), errors.New("test")
        }
        return processor.NewProcessResult(
            processor.WithSucceed(),
            processor.WithResult(str),
        ), nil
    }
    return processor.NewProcessResult(processor.WithFailed()), nil
}

3.4 MapReduce任务

接口如下:

type MapReduceJobProcessor interface {
    MapJobProcessor
    Reduce(jobCtx *jobcontext.JobContext) (*ProcessResult, error)
    RunReduceIfFail(jobCtx *jobcontext.JobContext) bool
}

继承Map接口,新增接口如下:

  • Reduce: 所有子任务执行完成后会执行一次reduce方法,在reduce中可以拿到所有子任务的状态和结果
  • RunReduceIfFail: 如果有子任务执行失败,是否执行reduce方法

实现接口,参考Demo:

// TestMapReduceJob.go
package main

import (
    "encoding/json"
    "fmt"
    "github.com/alibaba/schedulerx-worker-go/processor"
    "github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
    "github.com/alibaba/schedulerx-worker-go/processor/mapjob"
    "github.com/alibaba/schedulerx-worker-go/processor/taskstatus"
    "strconv"
    "time"
)

type OrderInfo struct {
    Id    string `json:"id"`
    Value int    `json:"value"`
}

func NewOrderInfo(id string, value int) *OrderInfo {
    return &OrderInfo{Id: id, Value: value}
}

type TestMapReduceJob struct {
    *mapjob.MapReduceJobProcessor
}

func (mr *TestMapReduceJob) Kill(jobCtx *jobcontext.JobContext) error {
    //TODO implement me
    panic("implement me")
}

// Process the MapReduce model is used to distributed scan orders for timeout confirmation
func (mr *TestMapReduceJob) Process(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) {
    var (
        num = 1000
        err error
    )
    taskName := jobCtx.TaskName()

    if jobCtx.JobParameters() != "" {
        num, err = strconv.Atoi(jobCtx.JobParameters())
        if err != nil {
            return nil, err
        }
    }

    if mr.IsRootTask(jobCtx) {
        fmt.Println("start root task, taskId=%d", jobCtx.TaskId())
        var orderInfos []interface{}
        for i := 1; i <= num; i++ {
            orderInfos = append(orderInfos, NewOrderInfo(fmt.Sprintf("id_%d", i), i))
        }
        return mr.Map(jobCtx, orderInfos, "OrderInfo")
    } else if taskName == "OrderInfo" {
        orderInfo := new(OrderInfo)
        if err := json.Unmarshal(jobCtx.Task(), orderInfo); err != nil {
            fmt.Printf("task is not OrderInfo, task=%+v\n", jobCtx.Task())
        }
        fmt.Printf("taskId=%d, orderInfo=%+v\n", jobCtx.TaskId(), orderInfo)
        time.Sleep(1 * time.Millisecond)
        return processor.NewProcessResult(
            processor.WithSucceed(),
            processor.WithResult(strconv.Itoa(orderInfo.Value)),
        ), nil
    }
    return processor.NewProcessResult(processor.WithFailed()), nil
}

func (mr *TestMapReduceJob) Reduce(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) {
    allTaskResults := jobCtx.TaskResults()
    allTaskStatuses := jobCtx.TaskStatuses()
    count := 0
    fmt.Printf("reduce: all task count=%d\n", len(allTaskResults))
    for key, val := range allTaskResults {
        if key == 0 {
            continue
        }
        if allTaskStatuses[key] == taskstatus.TaskStatusSucceed {
            num, err := strconv.Atoi(val)
            if err != nil {
                return nil, err
            }
            count += num
        }
    }
    fmt.Printf("reduce: succeed task count=%d\n", count)
    return processor.NewProcessResult(
        processor.WithSucceed(),
        processor.WithResult(strconv.Itoa(count)),
    ), nil
}

4. 注册 client 和 job

package main

import (
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/alibaba/schedulerx-worker-go"
)

func main() {
    // This is just an example, the real configuration needs to be obtained from the platform
    cfg := &schedulerx.Config{
        Endpoint:  "acm.aliyun.com",
        Namespace: "433d8b23-06e9-408c-aaaa-90d4d1b9a4af",
        GroupId:   "gojob-test",
        AppKey:    "xxxxxxx",
    }
    client, err := schedulerx.GetClient(cfg)
    if err != nil {
        panic(err)
    }
    task1 := &HelloWorld{}
    task2 := &TestBroadcast{}
    task3 := &TestMapJob{
        mapjob.NewMapJobProcessor(),
    }
    task4 := &TestMapReduceJob{
        mapjob.NewMapReduceJobProcessor(),
    }

    // The name HelloWorld registered here must be consistent with the configured on the platform
    client.RegisterTask("HelloWorld", task1)
    client.RegisterTask("TestBroadcast", task2)
    client.RegisterTask("TestMapJob", task3)
    client.RegisterTask("TestMapReduceJob", task4)
    
    // wait for the stop signal
    c := make(chan os.Signal, 1)
    signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR1, syscall.SIGUSR2)
    <-c
    time.Sleep(time.Second * 5)
}

4. 通过控制台创建任务

任务类型选择 golang,任务名称写第 4 步的任务名,比如 HelloWorld

示例

参考 example 目录

关于
602.0 KB
邀请码
    Gitlink(确实开源)
  • 加入我们
  • 官网邮箱:gitlink@ccf.org.cn
  • QQ群
  • QQ群
  • 公众号
  • 公众号

版权所有:中国计算机学会技术支持:开源发展技术委员会
京ICP备13000930号-9 京公网安备 11010802032778号