2024.05.17 - Factorio Stats

Intro

Wpadł pomysł, żeby wyciągnąć statystyki Factorio do zewnętrznego wizualizatora. Wymagania są dość proste: eksport danych do JSON'a, najlepiej aby NIE wymagał MOD'a do gry (można będzie się łączyć do serwera ze Switcha, który modów nie obsługuje), eksport danych do ElasticStacka (logstash jako transformator danych, elasticsearch jako baza danych), wizualizacja Kibana albo Grafana. Z zajawką projektu pomógł mi ChatGTP od dzisiaj zwany przeze mnie pieszczotliwie Junior.

Było kilka prób odczytu statystyk, co rzutowało na końcowy kształt skryptu. Pomysły, które były testowane:

  • zapis do jednego pliku, logrotate i Filebeat/czytacz_w_pytongu
  • emulacja pliku przez skrypt w pytongu z wykorzystaniem fusepy (Filesystem in USErspace - ciekawe rozwiązanie - nadal rozważam testy)
  • zapis do nowych plików (per tick), odczyt przez Filebeat'a, skrypt sprzątający (bufor na pliki - docelowo wrzucać na tmpfs)

Skrypt statystyk dla Factorio

Forma eksportu danych jest ograniczona przez możliwości LUA w Factorio. W sumie jedynym prostym sposobem jest utworzenie drzewka danych i eksport do pliku w formacie JSON. Aktualnie skrypt jest wstrzyknięty w istniejącego sejwa (można to przerobić na Scenario ale nie bawiłem się tym jeszcze, więc nie wiem jak to zrobić). Próbowałem z różnymi formami skryptu i najbardziej mi odpowiada forma biblioteki.

Ładowanie biblioteki należy wprowadzić do pliku control.lua.

-- control.lua

-- orginal savefile contents
local handler = require("event_handler")
handler.add_lib(require("freeplay"))
handler.add_lib(require("silo-script"))

-- add at the end of file (stas export script)
handler.add_lib(require("json_stats"))

Jak napisać bibliotekę nauczyłem się patrząc na event_handler.lua i silo-script.lua bo nie znalazłem dokumentacji opisującej struktury wymaganej przez event_handler.

Interesujące dla mnie jest okresowe odpalanie funkcji eksportu, więc podpiąłem się pod funkcję on_nth_tick. Wszystkie klasy i funkcje obczajałem w dokumentacji.

-- json_stats.lua

-- unique name that will be added to json and also defines subdirectory for json files
local json_stats_uid = "homelab-vanilla"

-- log stats to file - called on inverval
local function log_stats()
    -- get ticks from start the game
    local ticks_played = game.ticks_played

    -- prepare local variables
    local forces = {}
    local stats = {
        ["json_stats_uid"] = json_stats_uid,
        ["ticks_played"] = ticks_played,
        ["forces"] = forces
    }

    -- iterate through all forces
    for _, force in pairs(game.forces) do
        -- get production statistics
        local production = force.item_production_statistics
        local consumption = force.fluid_production_statistics

        -- get production and consumption values
        local force_stats = {
            name = force.name,
            item_production = production.input_counts,
            item_consumption = production.output_counts,
            fluid_production = consumption.input_counts,
            fluid_consumption = consumption.output_counts
        }

        -- assigne force stats under force name
        forces[force.name] = force_stats
    end

    -- write file to folder (./script-output)
    local stats_file_name = json_stats_uid .. "/" .. ticks_played .. ".json"
    -- last param makes json save only on server
    -- remove it and file will be saved also on all clients
    game.write_file(stats_file_name, game.table_to_json(stats) .. "\n", false, 0)
end

-- tick handler
local function on_log_stats(event)
    log_stats()
end

-- script object
local json_stats = {}

-- register tick handlers
json_stats.on_nth_tick = {
    [600] = on_log_stats
}

-- return script object
return json_stats

Żeby dodać eksport, ściągnąłem sejwa z serwera, rozpakowałem plik, zmodyfikowałem control.lua, dodałem json_stats.lua i spakowałem ZIP'a ponownie. Restart serwera z nowym sejwem dał prawidłowy efekt, na serwerku w katalogu ./script-output zaczeły się pojawiać pliki JSON (trzeba się podłączyć i odpauzować grę, żeby szły ticki). Co ważne do serwera nadal mogłem się podłączyć ze Switcha :)

Czytacz statystyk

Wybrałem opcję z zapisem do nowych plików per tick i odczytem przez Filebeat'a, jako że znam trochę stack Elastica. Statystyki będą odczytywane przez Filebeat i przesyłane do Logstash'a. Tam nastąpi wstępna transformacja danych i przesłanie dalej do Elasticsearch.

Serwer Factorio stoi na Debianie i instalacja Filebeat'a poszła z pakietu zgodnie z instrukcją na stronie elastica.

Konfiguracja Filebeat'a ustawia czytanie z plików json i tymczasowo dump do konsoli.

$ mcedit /etc/filebeat/filebeat.yml
# filebeat.yml

filebeat.inputs:
  - type: log
    paths:
      - /opt/factorio/script-output/map01/*.json
    json:
      keys_under_root: true
    fields_under_root: true
    close_inactive: 5m
    tags: ["map01"]
    index: "factorio-%{+yyyyMMdd}"

output.console:
  pretty: true

Uruchomienie Filebeat'a.

$ systemctl enable filebeat
$ systemctl start filebeat
$ systemctl status filebeat
$ journalctl -u filebeat -f

Następnym krokiem jest czyszczenie starych plików. Dodałem plik factorio_stats do /etc/cron.d/ co daje bufor na 10 minut.

* * * * * root LC_ALL=C find /opt/factorio/script-output/map01/ -name '*.json' -type f -mmin +9 -delete

Po testach i upewnieniu się, że w logach Filebeat'a widać nowe pliki statsów oraz, że skrypt czyszczący spełnia swoje zadanie przeszedłem do instalacji Logstash'a.

Note

FIXME: add logstash install

Baza statystyk

Note

FIXME: add elasticsearch install

Wizualizacja statystyk

Note

FIXME: add kibana install

Bonus: fusepy

Wygenerowane przez ChatGPT. Nietestowane. Jako ciekawostka.

$ sudo apt-get update
$ sudo apt-get install fuse
$ pip install fusepy requests
# file_emulator.py

import os
import stat
import errno
import threading
import requests
from fuse import FUSE, Operations
from time import sleep

class FileEmulator(Operations):
    def __init__(self):
        self.file_content = b''
        self.lock = threading.Lock()
        self.old_content = b''

    def getattr(self, path, fh=None):
        if path == '/file.bin':
            st = {
                'st_mode': (stat.S_IFREG | 0o666),
                'st_nlink': 1,
                'st_size': len(self.file_content),
                'st_ctime': 0,
                'st_mtime': 0,
                'st_atime': 0,
            }
            return st
        else:
            raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))

    def open(self, path, flags):
        if path == '/file.bin':
            return 0
        else:
            raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))

    def read(self, path, size, offset, fh):
        if path == '/file.bin':
            with self.lock:
                return self.file_content[offset:offset + size]
        else:
            raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))

    def write(self, path, data, offset, fh):
        if path == '/file.bin':
            with self.lock:
                self.file_content = self.file_content[:offset] + data + self.file_content[offset + len(data):]
            return len(data)
        else:
            raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))

    def truncate(self, path, length, fh=None):
        if path == '/file.bin':
            with self.lock:
                self.file_content = self.file_content[:length]
            return 0
        else:
            raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))

    def create(self, path, mode, fi=None):
        if path == '/file.bin':
            with self.lock:
                self.file_content = b''
            return 0
        else:
            raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))

    def unlink(self, path):
        if path == '/file.bin':
            with self.lock:
                self.file_content = b''
            return 0
        else:
            raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))

def watch_file_content(emulator):
    while True:
        sleep(1)  # Check every second
        with emulator.lock:
            if emulator.file_content != emulator.old_content:
                new_content = emulator.file_content[len(emulator.old_content):]
                lines = new_content.split(b'\n')
                for line in lines[:-1]:  # All lines except the last (possibly incomplete) one
                    send_post_request(line)
                if new_content.endswith(b'\n'):
                    send_post_request(lines[-1])
                    emulator.old_content = emulator.file_content
                else:
                    emulator.old_content += new_content

def send_post_request(line):
    url = "http://localhost/stats/factorio"
    try:
        response = requests.post(url, data=line)
        print(f"Sent POST request with line: {line.decode('utf-8')} - Response: {response.status_code}")
    except Exception as e:
        print(f"Failed to send POST request: {e}")

if __name__ == '__main__':
    mountpoint = '/opt/fuse'
    emulator = FileEmulator()

    # Start the watcher thread
    watcher_thread = threading.Thread(target=watch_file_content, args=(emulator,))
    watcher_thread.daemon = True
    watcher_thread.start()

    # Mount the FUSE filesystem
    FUSE(emulator, mountpoint, nothreads=True, foreground=True)
sudo python file_emulator.py

Linki