-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnodePlugin.js
More file actions
68 lines (59 loc) · 1.87 KB
/
nodePlugin.js
File metadata and controls
68 lines (59 loc) · 1.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
const { Subject } = require('rxjs');
const deserializeArgs = require('./deserializeArgs');
const uuidv1 = require('uuid/v1');
function generateId(that) {
return new Promise((resolve, reject) => {
let newId;
that.act("role:id,cmd:generate", (err, response) => {
newId = response.id;
resolve(newId);
})
})
}
function nodePlugin() {
let nodePluginID;
this.add('role:fw,cmd:createActor', (msg, response) => {
console.log("source actor id " + nodePluginID);
this.act('role:fw,cmd:newActor', msg, (err, resp) => {
if (err) console.log(err);
response({payload:resp.payload});
});
});
this.add('role:fw,cmd:newActor', async (msg, response) => {
let actorId;
let result =[];
await generateId(this).then((newId) => {
console.log("new id for new actor generated", newId);
actorId = newId;
});
const m = deserializeArgs(msg);
const { Subject } = require('rxjs');
const s = new Subject();
const arr1 = m.operations.map(o => {
const op = require('rxjs/operators')[o.operator];
return op.apply(op, o.args);
});
s.pipe.apply(s, arr1).subscribe((data) => {
result.push(data);
console.log(data);
}, (err) => {
response(err, null);
}
, () => {
console.log("completed");
response(null, { payload: {
id : actorId,
data : result
} })
});
m.values.forEach(i => s.next(i));
s.complete();
});
this.add({ init: 'nodePlugin' }, (msg, respond) => {
generateId(this).then((newId) => {
nodePluginID = newId;
respond();
})
})
}
module.exports = nodePlugin;