-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprocess.go
More file actions
139 lines (124 loc) · 3.12 KB
/
process.go
File metadata and controls
139 lines (124 loc) · 3.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package process
import (
"bufio"
"fmt"
"io"
"log"
"os"
"os/exec"
"strings"
"sync"
"syscall"
"github.com/fatih/color"
)
//Process represents main Process info. Contains pointer to os.Exec.Cmd
type Process struct {
sync.RWMutex
Pid int
wg sync.WaitGroup
ID int
JobID int
Cmd string
shell string
cmd *exec.Cmd
}
// New creates new Process manager. It Does not actually launch process.
func New(cmd, shell string, jobID, id int) *Process {
// strconv.Itoa(job.ID), job.Cmd
p := &Process{
ID: id,
JobID: jobID,
Cmd: cmd,
shell: shell,
}
return p
}
func (p *Process) setPid(pid int) {
p.Lock()
defer p.Unlock()
p.Pid = pid
}
//GetPid returns process pid
func (p *Process) GetPid() int {
p.RLock()
defer p.RUnlock()
return p.Pid
}
func (p *Process) String() string {
if p.Pid != 0 {
return fmt.Sprintf("[ID%d/R%d/%d]", p.JobID, p.ID, p.GetPid())
}
return fmt.Sprintf("[ID%d/R%d/-]", p.JobID, p.ID)
}
//Run exec the process and starts printing its sdout and stderr in parallel.
func (p *Process) Run(done chan int) error {
log.Printf("Process: starting %s: %s\n", p, p.Cmd)
p.Lock()
if p.shell != "" {
p.cmd = exec.Command(p.shell, "-c", p.Cmd)
} else {
cmd := strings.Split(p.Cmd, " ")
p.cmd = exec.Command(cmd[0], cmd[1:]...)
}
p.Unlock()
stdout, err := p.cmd.StdoutPipe()
if err != nil {
log.Printf("Error reading stout for %s: %v\n", p.Cmd, err)
return err
}
stderr, err := p.cmd.StderrPipe()
if err != nil {
log.Printf("Error reading stderr for %s: %v\n", p.Cmd, err)
return err
}
err = p.cmd.Start()
if err != nil {
return err
}
p.setPid(p.cmd.Process.Pid)
log.Printf("Process: %s Started\n", p)
p.wg.Add(2)
go p.outPrinter(stdout, "[II]", color.FgBlue)
go p.outPrinter(stderr, "[EE]", color.FgRed)
go func() {
p.wg.Wait() //Waits for outPrinter to exit
err = p.cmd.Wait() //Waits for os.Exec Cmd to exit. Will close pipes.
if err != nil {
log.Printf("process: Exec Error: %s\n", err)
}
log.Printf("Process: %s Finished \n", p)
done <- p.JobID
}()
return nil
}
//GetMaxRss gets max Rss memory usage for the process
func (p *Process) GetMaxRss() int64 {
p.RLock()
defer p.RUnlock()
if p.cmd.ProcessState == nil {
return 0
}
return p.cmd.ProcessState.SysUsage().(*syscall.Rusage).Maxrss
}
func (p *Process) outPrinter(r io.Reader, prefix string, c color.Attribute) {
defer p.wg.Done()
color := color.New(c).SprintFunc()
prefix = color(prefix)
scanner := bufio.NewScanner(r)
for scanner.Scan() {
log.Printf("%s %s %s\n", p, prefix, scanner.Text())
}
if err := scanner.Err(); err != nil {
fmt.Fprintf(os.Stderr, "process.outPrinter: Error scanning out for %d: %v\n", p.ID, err)
}
}
// func sentryLog(cmd string, out []byte, err error) {
// if cfg.SentryDSN != "" {
// message := fmt.Sprintln("Error executing command", cmd)
// client, err := raven.NewWithTags(cfg.SentryDSN, map[string]string{"program": "go-cron", "error": err.Error(), "command": cmd})
// if err == nil {
// packet := &raven.Packet{Message: message, Extra: map[string]interface{}{"command": cmd, "output": string(out)}}
// client.Capture(packet, nil)
// }
// }
// }