-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathbalancer
More file actions
executable file
·119 lines (103 loc) · 3.36 KB
/
balancer
File metadata and controls
executable file
·119 lines (103 loc) · 3.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#!/usr/bin/env php
<?php
use React\EventLoop\Factory as LoopFactory;
use React\Socket\ConnectionInterface;
use React\Socket\Connector;
use React\Socket\Server;
use React\Stream\Util;
use Symfony\Component\Console\Input\ArgvInput;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputDefinition;
require __DIR__ . "/vendor/autoload.php";
/**
* Create input from argv
*
* @param array $argv
*
* @return InputInterface
*/
function createInputFromARGV(array $argv) : InputInterface
{
$inputDefinition = (new InputDefinition([
new InputArgument('port', InputArgument::REQUIRED),
new InputArgument('balance', InputArgument::REQUIRED | InputArgument::IS_ARRAY),
new InputOption('silence', '', InputOption::VALUE_NONE),
]));
return new ArgvInput($argv, $inputDefinition);
}
/**
* Get balanced addresses by an array of them
*
* [myhost:8000]
* [1.1.1.1:8000]
* [8000]
* [8000-8100]
*
* @param array $array
*
* @return array
*/
function getBalancedUrls(array $array) : array
{
$finalArray = [];
foreach ($array as $address) {
$parts = explode(':', $address);
$host = (count($parts) === 1 || empty($parts[0])) ? '127.0.0.1' : $parts[0];
$port = end($parts);
$ports = explode('-', $port, 2);
for ($i = $ports[0]; $i <= $ports[count($ports)-1]; $i++) {
$finalArray["$host:{$i}"] = true;
}
}
return $finalArray;
}
$input = createInputFromARGV($argv);
$proxyPort = $input->getArgument('port');
$balancedUris = getBalancedUrls($input->getArgument('balance'));
$debug = !$input->getOption('silence');
if ($debug) {
echo 'Listening => 0.0.0.0:' . $proxyPort . PHP_EOL;
foreach ($balancedUris as $balancedUri => $_) {
echo 'Balancing => ' . $balancedUri . PHP_EOL;
}
}
$loop = LoopFactory::create();
$connector = new Connector($loop);
$uri = "tcp://0.0.0.0:$proxyPort";
$proxy = new Server($uri, $loop);
$proxy->on('connection', function (ConnectionInterface $proxyConnection) use ($loop, $connector, $balancedUris, $debug) {
$proxyConnection->pause();
$randomUri = array_rand($balancedUris);
$connector
->connect($randomUri)
->then(function(ConnectionInterface $connection) use ($proxyConnection, $randomUri, $debug) {
if ($debug) {
echo 'Piped to ' . $randomUri . PHP_EOL;
}
Util::pipe($connection, $proxyConnection, ['end' => true]);
Util::pipe($proxyConnection, $connection, ['end' => true]);
})
->otherwise(function(\Throwable $throwable) use ($proxyConnection, $debug) {
if ($debug) {
echo 'Error thrown : ' . $throwable->getMessage() . PHP_EOL;
}
$proxyConnection->end();
})
->always(function() use ($proxyConnection, $connector) {
$proxyConnection->resume();
});
});
$signalHandler = function () use (&$signalHandler, $proxy, $loop, $debug) {
if ($debug) {
echo 'Stopping balancer...' . PHP_EOL;
}
$loop->removeSignal(SIGINT, $signalHandler);
$loop->removeSignal(SIGTERM, $signalHandler);
$proxy->close();
$loop->stop();
};
$loop->addSignal(SIGINT, $signalHandler);
$loop->addSignal(SIGTERM, $signalHandler);
$loop->run();