1: <?php
2: 3: 4: 5: 6: 7: 8: 9: 10:
11:
12: namespace ManiaLive\Threading;
13:
14: use ManiaLive\Database\Connection;
15: use ManiaLive\Event\Dispatcher;
16: use ManiaLive\Features\Tick\Listener as TickListener;
17: use ManiaLive\Features\Tick\Event as TickEvent;
18: use ManiaLive\Utilities\Console;
19: use ManiaLive\Utilities\Logger;
20:
21: final class ThreadHandler extends \ManiaLib\Utils\Singleton implements TickListener
22: {
23: private $threadsCount = 0;
24: private $threads = array();
25: private $lastTick = array();
26: private $buffers = array();
27: private $pendings = array();
28: private $tries = array();
29:
30: private $database;
31: private $logger;
32: private $tick = 0;
33:
34: private $enabled = false;
35: private $deadThreadsCount = 0;
36: private $commandsCount = 0;
37: private $commandsTotalTime = 0;
38: private $commandsAverageTime = 0;
39:
40: protected function __construct()
41: {
42: $this->enabled = extension_loaded('mysql') && Config::getInstance()->enabled;
43: $this->logger = Logger::getLog('threading');
44:
45: if($this->enabled)
46: {
47: $this->setUpDatabase();
48: $this->setData('config', \ManiaLive\Config\Config::getInstance());
49: $this->setData('wsapi', \ManiaLive\Features\WebServices\Config::getInstance());
50: $this->setData('manialive', \ManiaLive\Application\Config::getInstance());
51: $this->setData('server', \ManiaLive\DedicatedApi\Config::getInstance());
52: $this->setData('threading', \ManiaLive\Threading\Config::getInstance());
53: }
54: else
55: {
56: $this->logger->write('Application started with threading disabled!', array('Process #'.getmypid()));
57: $this->buffers[0] = array();
58: }
59:
60: Dispatcher::register(TickEvent::getClass(), $this);
61: }
62:
63: private function setUpDatabase()
64: {
65: $dbConfig = \ManiaLive\Database\Config::getInstance();
66: $this->database = Connection::getConnection(
67: $dbConfig->host,
68: $dbConfig->username,
69: $dbConfig->password,
70: $dbConfig->database,
71: 'MySQL',
72: $dbConfig->port
73: );
74:
75: $this->database->execute(
76: 'CREATE TABLE IF NOT EXISTS `ThreadingProcesses` ('.
77: '`parentId` INT(10) UNSIGNED NOT NULL,'.
78: '`lastLive` DATETIME NOT NULL,'.
79: 'PRIMARY KEY (`parentId`)'.
80: ')'.
81: 'COLLATE=\'utf8_general_ci\''
82: );
83:
84: $this->database->execute(
85: 'CREATE TABLE IF NOT EXISTS `ThreadingData` ('.
86: '`parentId` INT(10) UNSIGNED NOT NULL,'.
87: '`name` VARCHAR(255) NOT NULL,'.
88: '`value` TEXT NOT NULL,'.
89: 'PRIMARY KEY (`parentId`, `name`)'.
90: ')'.
91: 'COLLATE=\'utf8_general_ci\''
92: );
93:
94: $this->database->execute(
95: 'CREATE TABLE IF NOT EXISTS `ThreadingCommands` ('.
96: '`parentId` INT(10) UNSIGNED NOT NULL,'.
97: '`commandId` INT(10) UNSIGNED NOT NULL,'.
98: '`threadId` INT(10) UNSIGNED NOT NULL,'.
99: '`task` TEXT NOT NULL,'.
100: '`result` TEXT NULL DEFAULT NULL,'.
101: '`timeTaken` FLOAT UNSIGNED NULL DEFAULT NULL,'.
102: 'PRIMARY KEY (`parentId`, `commandId`),'.
103: 'INDEX `threadId` (`threadId`)'.
104: ')'.
105: 'COLLATE=\'utf8_general_ci\''
106: );
107:
108: $deadPids = $this->database->execute(
109: 'SELECT parentId FROM ThreadingProcesses WHERE parentId=%d OR DATE_ADD(lastLive, INTERVAL 2 MINUTE) < NOW()',
110: getmypid()
111: )->fetchArrayOfSingleValues();
112: if($deadPids)
113: {
114: $deadPids = implode(',', array_map('intval', $deadPids));
115: $this->database->execute('DELETE FROM ThreadingProcesses WHERE parentId IN (%s)', $deadPids);
116: $this->database->execute('DELETE FROM ThreadingData WHERE parentId IN (%s)', $deadPids);
117: $this->database->execute('DELETE FROM ThreadingCommands WHERE parentId IN (%s)', $deadPids);
118: }
119: $this->database->execute('INSERT INTO ThreadingProcesses(parentId, lastLive) VALUES(%s, NOW())', getmypid());
120: }
121:
122: function setData($key, $value)
123: {
124: $this->database->execute(
125: 'INSERT INTO ThreadingData(parentId, name, value) VALUES (%d, %s, %s)',
126: getmypid(),
127: $this->database->quote($key),
128: $this->database->quote(base64_encode(serialize($value)))
129: );
130:
131: return $this->database->affectedRows() > 0;
132: }
133:
134: function getData($key, $default = null)
135: {
136: $result = $this->database->execute(
137: 'SELECT value FROM ThreadingData WHERE name=%s AND parentId=%d',
138: $this->database->quote($key),
139: getmypid()
140: );
141:
142: return $result->recordAvailable() ? unserialize(base64_decode($result->fetchSingleValue())) : $default;
143: }
144:
145: function launchThread()
146: {
147: if(!$this->enabled) return 0;
148:
149: $threadId = ++$this->threadsCount;
150: $threadHandle = $this->spawnThread($threadId);
151: if($threadHandle === false)
152: throw new Exception('Thread #'.$threadId.' could not be started!');
153:
154: $this->threads[$threadId] = $threadHandle;
155: $this->lastTick[$threadId] = $this->tick;
156: $this->buffers[$threadId] = array();
157: $this->pendings[$threadId] = array();
158: Dispatcher::dispatch(new Event(Event::ON_THREAD_START, $threadId));
159: $this->logger->write('Thread #'.$threadId.' started!', array('Process #'.getmypid()));
160:
161: return $threadId;
162: }
163:
164: private function spawnThread($threadId)
165: {
166: $config = \ManiaLive\Config\Config::getInstance();
167: $dbConfig = \ManiaLive\Database\Config::getInstance();
168: $outputFile = $config->logsPath.'/'.($config->logsPrefix ? $config->logsPrefix.'-' : '').'threading-error.txt';
169: $descriptors = array(
170: 1 => array('file', $outputFile, 'a'),
171: 2 => array('file', $outputFile, 'a')
172: );
173:
174: $args = array(
175: 'threadId' => $threadId,
176: 'parentId' => getmypid(),
177: 'dbHost' => $dbConfig->host,
178: 'dbPort' => $dbConfig->port,
179: 'dbUsername' => $dbConfig->username,
180: 'dbPassword' => $dbConfig->password,
181: 'dbDatabase' => $dbConfig->database
182: );
183:
184: $command = '"'.Config::getInstance()->phpPath.'" "'.__DIR__.DIRECTORY_SEPARATOR.'thread_ignitor.php"';
185: foreach($args as $key => $value)
186: $command .= ' --'.$key.'='.escapeshellarg($value);
187:
188: Console::printDebug('Trying to spawn Thread #'.$threadId.' using command: '.PHP_EOL.$command);
189: return proc_open($command, $descriptors, $pipes, null, null, array('bypass_shell' => true));
190: }
191:
192: function killThread($threadId)
193: {
194: if(!$this->enabled || !isset($this->threads[$threadId]))
195: return;
196:
197: $threadHandle = $this->threads[$threadId];
198: proc_terminate($threadHandle);
199: proc_close($threadHandle);
200: Dispatcher::dispatch(new Event(Event::ON_THREAD_KILLED, $threadId));
201: $this->logger->write('Thread #'.$threadId.' stopped', array('Process #'.getmypid()));
202:
203: $this->database->execute('DELETE FROM ThreadingCommands WHERE threadId=%d AND parentId=%d', $threadId, getmypid());
204:
205: unset($this->threads[$threadId]);
206: unset($this->lastTick[$threadId]);
207: unset($this->buffers[$threadId]);
208: foreach($this->pendings[$threadId] as $commandId => $command)
209: unset($this->tries[$commandId]);
210: unset($this->pendings[$threadId]);
211: }
212:
213: private function restartThread($threadId)
214: {
215: if(!$this->enabled || !isset($this->threads[$threadId]))
216: return;
217:
218: $commandDiscarded = false;
219: if(empty($this->pendings[$threadId]))
220: {
221: $this->logger->write('Thread #'.$threadId.' died...', array('Process #'.getmypid()));
222: Dispatcher::dispatch(new Event(Event::ON_THREAD_DIES, $threadId));
223: }
224: else
225: {
226: $this->logger->write('Thread #'.$threadId.' timed out...', array('Process #'.getmypid()));
227: Dispatcher::dispatch(new Event(Event::ON_THREAD_TIMES_OUT, $threadId));
228:
229: $command = reset($this->pendings[$threadId]);
230: $lastCommandId = $command->getId();
231: if(++$this->tries[$lastCommandId] > Config::getInstance()->maxTries)
232: {
233: $this->database->execute(
234: 'DELETE FROM ThreadingCommands WHERE commandId=%d AND parentId=%d',
235: $lastCommandId,
236: getmypid()
237: );
238: unset($this->pendings[$threadId][$lastCommandId]);
239: unset($this->tries[$lastCommandId]);
240: $this->logger->write('Command #'.$lastCommandId.' has been discarded after '.Config::getInstance()->maxTries.' unsuccessful tries...', array('Process #'.getmypid()));
241: $commandDiscarded = true;
242: }
243: }
244:
245:
246: $threadHandle = $this->threads[$threadId];
247: proc_terminate($threadHandle);
248: proc_close($threadHandle);
249: ++$this->deadThreadsCount;
250: $this->threads[$threadId] = $this->spawnThread($threadId);
251: $this->lastTick[$threadId] = $this->tick;
252: Dispatcher::dispatch(new Event(Event::ON_THREAD_RESTART, $threadId));
253: $this->logger->write('Thread #'.$threadId.' restarted!', array('Process #'.getmypid()));
254:
255: if($commandDiscarded)
256: $command->fail();
257: }
258:
259: function countThreads()
260: {
261: return count($this->threads);
262: }
263:
264: function countRestartedThreads()
265: {
266: return $this->deadThreadsCount;
267: }
268:
269: function addTask($threadId, Runnable $task, $callback = null)
270: {
271: if(isset($this->buffers[$threadId])) $this->buffers[$threadId][] = new Command($task, $callback);
272: }
273:
274: function isEnabled()
275: {
276: return $this->enabled;
277: }
278:
279: function countFinishedCommands()
280: {
281: return $this->commandsCount;
282: }
283:
284: function getAverageResponseTime()
285: {
286: return $this->commandsAverageTime;
287: }
288:
289: function onTick()
290: {
291: if($this->enabled)
292: {
293: ++$this->tick;
294: $this->receiveResponses();
295: $this->handleTimeOuts();
296: $this->sendTasks();
297: if($this->tick % 60 == 0)
298: $this->database->execute('UPDATE ThreadingProcesses SET lastLive=NOW() WHERE parentId=%d', getmypid());
299: }
300: else
301: {
302: $startTime = microtime(true);
303: $stopTime = $startTime + Config::getInstance()->sequentialTimeout;
304:
305: while($command = array_shift($this->buffers[0]))
306: {
307: $result = $command->getTask()->run();
308: $endTime = microtime(true);
309: $command->setResult($result, $endTime - $startTime);
310:
311:
312: if($endTime > $stopTime) break;
313:
314: $startTime = microtime(true);
315: }
316: }
317: }
318:
319: private function receiveResponses()
320: {
321: $results = $this->database->execute(
322: 'SELECT commandId, threadId, result, timeTaken FROM ThreadingCommands WHERE result IS NOT NULL AND parentId=%d',
323: getmypid()
324: );
325:
326: if(!$results->recordAvailable()) return;
327:
328: $ids = array();
329: while($result = $results->fetchArray())
330: {
331: $commandId = (int) $result['commandId'];
332: $threadId = (int) $result['threadId'];
333: $timeTaken = (float) $result['timeTaken'];
334:
335: Console::printDebug('Got response for Command #'.$commandId.' finished by Thread #'.$threadId.' in '.round($timeTaken, 3).' ms!');
336:
337: if(isset($this->pendings[$threadId][$commandId]))
338: {
339: $command = $this->pendings[$threadId][$commandId];
340: $command->setResult(unserialize(base64_decode($result['result'])), $timeTaken);
341: unset($this->pendings[$threadId][$commandId]);
342: unset($this->tries[$commandId]);
343: }
344: if(isset($this->lastTick[$threadId])) $this->lastTick[$threadId] = $this->tick;
345: ++$this->commandsCount;
346: $this->commandsTotalTime += $timeTaken;
347: $this->commandsAverageTime = $this->commandsTotalTime / $this->commandsCount;
348:
349: $ids[] = $commandId;
350: }
351:
352: $this->database->execute(
353: 'DELETE FROM ThreadingCommands WHERE commandId IN (%s) AND parentId=%d',
354: implode(',', $ids),
355: getmypid()
356: );
357: }
358:
359: private function handleTimeOuts()
360: {
361: foreach($this->lastTick as $threadId => $tick)
362: {
363: if(empty($this->pendings[$threadId]) && $this->tick - $tick > Config::getInstance()->busyTimeout / 2)
364: {
365: $threadStatus = proc_get_status($this->threads[$threadId]);
366: if($threadStatus['running'])
367: $this->lastTick[$threadId] = $this->tick;
368: else
369: $this->restartThread($threadId);
370: }
371: else if($this->tick - $tick > Config::getInstance()->busyTimeout)
372: $this->restartThread($threadId);
373: }
374: }
375:
376: private function sendTasks()
377: {
378: $lines = array();
379: foreach($this->buffers as $threadId => &$buffer)
380: while($command = array_shift($buffer))
381: {
382: $commandId = $command->getId();
383: $lines[] = sprintf(
384: '(%d, %d, %d, %s)',
385: $commandId,
386: $threadId,
387: getmypid(),
388: $this->database->quote(base64_encode(serialize($command->getTask())))
389: );
390: $this->pendings[$threadId][$commandId] = $command;
391: $this->tries[$commandId] = 1;
392: }
393:
394: if(!empty($lines))
395: $this->database->execute('INSERT INTO ThreadingCommands(commandId, threadId, parentId, task) VALUES '.implode(',', $lines));
396: }
397:
398: function __destruct()
399: {
400: foreach($this->threads as $thread)
401: proc_terminate($thread);
402: }
403:
404: }
405:
406: ?>
407: