237 lines
4.6 KiB
Python
Executable File
237 lines
4.6 KiB
Python
Executable File
import subprocess
|
|
import time
|
|
import threading
|
|
import math
|
|
from tqdm import tqdm
|
|
|
|
class CelesteError(Exception):
|
|
pass
|
|
|
|
class Celeste:
|
|
action_space = [
|
|
"left", # move left
|
|
"right", # move right
|
|
"jump", # jump
|
|
|
|
"dash-u", # dash up
|
|
"dash-r", # dash right
|
|
"dash-l", # dash left
|
|
"dash-ru", # dash right-up
|
|
"dash-lu" # dash left-up
|
|
]
|
|
|
|
def __init__(self):
|
|
# Start pico-8
|
|
self.process = subprocess.Popen(
|
|
"bin/pico-8/linux/pico8",
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT
|
|
)
|
|
|
|
# Wait for window to open and get window id
|
|
time.sleep(2)
|
|
winid = subprocess.check_output([
|
|
"xdotool",
|
|
"search",
|
|
"--class",
|
|
"pico8"
|
|
]).decode("utf-8").strip().split("\n")
|
|
if len(winid) != 1:
|
|
raise Exception("Could not find unique PICO-8 window id")
|
|
self.winid = winid[0]
|
|
|
|
# Load cartridge
|
|
self.keystring("load hackcel.p8")
|
|
self.keypress("Enter")
|
|
self.keystring("run")
|
|
self.keypress("Enter", post = 1000)
|
|
|
|
# Initialize variables
|
|
self.internal_status = {}
|
|
|
|
# Score system
|
|
self.frame_counter = 0
|
|
self.next_point = 0
|
|
self.dist = 0 # distance to next point
|
|
self.target_points = [
|
|
[ # Stage 1
|
|
(28, 88), # Start pillar
|
|
(60, 80), # Middle pillar
|
|
(105, 64), # Right ledge
|
|
(25, 40), # Left ledge
|
|
(110, 16), # End ledge
|
|
(110, -2), # Next stage
|
|
]
|
|
]
|
|
|
|
def act(self, action):
|
|
self.keyup("x")
|
|
self.keyup("c")
|
|
self.keyup("Left")
|
|
self.keyup("Right")
|
|
self.keyup("Down")
|
|
self.keyup("Up")
|
|
|
|
if action is None:
|
|
return
|
|
elif action == "left":
|
|
self.keydown("Left")
|
|
elif action == "right":
|
|
self.keydown("Right")
|
|
elif action == "jump":
|
|
self.keydown("c")
|
|
|
|
elif action == "dash-u":
|
|
self.keydown("Up")
|
|
self.keydown("x")
|
|
elif action == "dash-r":
|
|
self.keydown("Right")
|
|
self.keydown("x")
|
|
elif action == "dash-l":
|
|
self.keydown("Left")
|
|
self.keydown("x")
|
|
elif action == "dash-ru":
|
|
self.keydown("Up")
|
|
self.keydown("Right")
|
|
self.keydown("x")
|
|
elif action == "dash-lu":
|
|
self.keydown("Up")
|
|
self.keydown("Left")
|
|
self.keydown("x")
|
|
|
|
|
|
@property
|
|
def status(self):
|
|
try:
|
|
return {
|
|
"stage": (
|
|
[
|
|
[0, 1, 2, 3, 4]
|
|
]
|
|
[int(self.internal_status["ry"])]
|
|
[int(self.internal_status["rx"])]
|
|
),
|
|
|
|
"xpos": int(self.internal_status["px"]),
|
|
"ypos": int(self.internal_status["py"]),
|
|
"xvel": float(self.internal_status["vx"]),
|
|
"yvel": float(self.internal_status["vy"]),
|
|
"deaths": int(self.internal_status["dc"]),
|
|
|
|
"dist": self.dist,
|
|
"next_point": self.next_point,
|
|
"frame_count": self.frame_counter
|
|
}
|
|
except KeyError:
|
|
raise CelesteError("Not enough data to get status.")
|
|
|
|
|
|
def keypress(self, key: str, *, post = 200):
|
|
subprocess.run([
|
|
"xdotool",
|
|
"key",
|
|
"--window", self.winid,
|
|
key
|
|
])
|
|
time.sleep(post / 1000)
|
|
|
|
def keydown(self, key: str):
|
|
subprocess.run([
|
|
"xdotool",
|
|
"keydown",
|
|
"--window", self.winid,
|
|
key
|
|
])
|
|
|
|
def keyup(self, key: str):
|
|
subprocess.run([
|
|
"xdotool",
|
|
"keyup",
|
|
"--window", self.winid,
|
|
key
|
|
])
|
|
|
|
def keystring(self, string, *, delay = 100, post = 200):
|
|
subprocess.run([
|
|
"xdotool",
|
|
"type",
|
|
"--window", self.winid,
|
|
"--delay", str(delay),
|
|
string
|
|
])
|
|
time.sleep(post / 1000)
|
|
|
|
def reset(self):
|
|
self.internal_status = {}
|
|
self.next_point = 0
|
|
self.frame_counter = 0
|
|
|
|
self.keypress("Escape")
|
|
self.keystring("run")
|
|
self.keypress("Enter", post = 1000)
|
|
|
|
self.flush_reader()
|
|
|
|
def flush_reader(self):
|
|
for k in iter(self.process.stdout.readline, ""):
|
|
k = k.decode("utf-8")[:-1]
|
|
if k == "!RESTART":
|
|
break
|
|
|
|
|
|
def update_loop(self, before, after):
|
|
|
|
# Get state, call callback, wait for state
|
|
# One line => one frame.
|
|
|
|
before_out = None
|
|
|
|
it = iter(self.process.stdout.readline, "")
|
|
|
|
|
|
for line in it:
|
|
l = line.decode("utf-8")[:-1].strip()
|
|
|
|
# This should only occur at game start
|
|
if l in ["!RESTART"]:
|
|
continue
|
|
|
|
self.frame_counter += 1
|
|
|
|
# Parse status string
|
|
for entry in l.split(";"):
|
|
if entry == "":
|
|
continue
|
|
|
|
key, val = entry.split(":")
|
|
self.internal_status[key] = val
|
|
|
|
|
|
# Update checkpoints
|
|
|
|
tx, ty = self.target_points[self.status["stage"]][self.next_point]
|
|
x = self.status["xpos"]
|
|
y = self.status["ypos"]
|
|
dist = math.sqrt(
|
|
(x-tx)*(x-tx) +
|
|
(y-ty)*(y-ty)
|
|
)
|
|
|
|
if dist <= 4 and y == ty:
|
|
print(f"Got point {self.next_point}")
|
|
self.next_point += 1
|
|
|
|
# Recalculate distance to new point
|
|
tx, ty = self.target_points[self.status["stage"]][self.next_point]
|
|
dist = math.sqrt(
|
|
(x-tx)*(x-tx) +
|
|
(y-ty)*(y-ty)
|
|
)
|
|
|
|
self.dist = dist
|
|
|
|
# Call step callback
|
|
if before_out is not None:
|
|
after(self, before_out)
|
|
before_out = before(self) |