mirror of
https://github.com/NixOS/nixpkgs.git
synced 2026-06-05 21:03:40 +00:00
77 lines
2.3 KiB
Python
77 lines
2.3 KiB
Python
import itertools
|
|
import json
|
|
import os
|
|
import platform
|
|
import shutil
|
|
import sys
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
with open(os.environ['NIX_ATTRS_JSON_FILE']) as env_file:
|
|
ENV = json.load(env_file)
|
|
|
|
USER_AGENT = f'Python/{platform.python_version()} Nixpkgs/{ENV['nixpkgsVersion']}'
|
|
TIMEOUT = float(os.environ.get('NIX_CONNECT_TIMEOUT') or '15')
|
|
|
|
ENDPOINT = ENV['endpoint']
|
|
GAME_URL = ENV['gameUrl']
|
|
UPLOAD_ID = ENV['upload']
|
|
|
|
def abort(message):
|
|
if 'extraMessage' in ENV:
|
|
message = f'{message} {ENV['extraMessage']}'
|
|
print(message, file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
API_KEY = os.environ.get(ENV['apiKeyVar'])
|
|
|
|
if not API_KEY:
|
|
abort(
|
|
f'Error: Either set the environment variable {ENV['apiKeyVar']} '
|
|
'(for the nix-daemon in multi user mode) '
|
|
f'or manually download {ENV.get('uploadName', 'the required file')} '
|
|
f'from {GAME_URL} and add it to the nix store.'
|
|
)
|
|
|
|
def urlopen(url_or_request):
|
|
return urllib.request.urlopen(url_or_request, timeout=TIMEOUT)
|
|
|
|
with urlopen(f'{GAME_URL}/data.json') as response:
|
|
data = json.load(response)
|
|
GAME_ID = data['id']
|
|
IS_FREE = 'price' not in data or data['price'] == '$0.00'
|
|
|
|
def api(path, params={}, download=False):
|
|
url = f'{ENDPOINT}{path}?{urllib.parse.urlencode({'api_key': API_KEY, **params})}'
|
|
if download and ENV['showUrl']:
|
|
with open(os.environ['out'], 'w') as output_file:
|
|
print(url, file=output_file)
|
|
return
|
|
request = urllib.request.Request(url, headers={'User-Agent': USER_AGENT})
|
|
with urlopen(request) as response:
|
|
if download:
|
|
with open(os.environ['out'], 'wb') as output_file:
|
|
shutil.copyfileobj(response, output_file)
|
|
else:
|
|
return json.load(response)
|
|
|
|
if IS_FREE:
|
|
api(f'/uploads/{UPLOAD_ID}/download', download=True)
|
|
sys.exit()
|
|
|
|
KEY_ID = None
|
|
for page in itertools.count(1):
|
|
data = api('/profile/owned-keys', {'page': page})
|
|
if 'owned_keys' not in data:
|
|
break
|
|
for key in data['owned_keys']:
|
|
if key['game_id'] == GAME_ID:
|
|
KEY_ID = key['id']
|
|
break
|
|
if len(data['owned_keys']) < data['per_page']:
|
|
break
|
|
if not KEY_ID:
|
|
abort(f'Cannot find a key associated with {GAME_URL}. Did you buy the game?')
|
|
|
|
api(f'/uploads/{UPLOAD_ID}/download', {'download_key_id': KEY_ID}, download=True)
|