Learn How To Build Your Own Proof Of Work Blockchain With Python

Robert McMenemy
9 min readJun 16, 2023

--

Introduction

Since 2010 the idea of decentralised compute, the mind boggling mathematics, the very unique problems and the new found ways to solve them made me eager to find out everything about this new tech and truly dive in about the weeds to see what makes it tick.

Thanks to pythons ease of access to low level c stuff without the overhead its the perfect entry point for anyone wanting to learn the basics and build their own blockchain.

Pre-Requisites

You’re going to need at least python v3 and pip v3 for this tutorial to work as expected, the install instructions for them both can be found here.

Once installed you will need to grab these libraries using pip before proceeding.

pip install cryptography
pip install asyncio
pip install threading
pip install socketserver
pip install random

Structure

We will be laying out classes for the blockchain, the transactions, the blocks, the miners, the nodes and most importantly the p2p network so they can all talk to each other.

There is much more work to be done to make this a fully workable and secure chain but for a learning point it gives you so much to build from it is close to being a production level system it just needs you to spend the time to learn it which is the best part !

Blockchain Class Structure

Building Our Classes

Blockchain Class

The Blockchain class will track base reward, base year, halving frequency in years and a starting difficulty number. We will initialise this with its initial chain, a transaction array, the initialised node validator, blank nodes array and the current miner node.

Our class will calculate rewards for the block, create a genesis block at initialisation, have the ability to add transactions, add nodes, query / update balances, resolve conflicts and check validity of blocks.



class Blockchain:
BASE_REWARD = 50
BASE_YEAR = 2023
HALVING_FREQUENCY = 4
DIFFICULTY = 1

def __init__(self):
self.chain = [self.create_genesis_block()]
self.transactions = []
self.nodes = []
self.miner_node = Node(self.add_node())

def calculate_reward(self):
current_year = datetime.now().year
elapsed_years = current_year - self.BASE_YEAR
return self.BASE_REWARD / (2 ** (elapsed_years // self.HALVING_FREQUENCY))

def create_genesis_block(self):
return Block(0, time.time(), [], "0", 0)

def add_transaction(self, transaction):
self.transactions.append(transaction)

sender = transaction.sender
recipient = transaction.recipient
amount = transaction.amount

sender_balance = getattr(self.get_node_by_address(sender), 'balance', 0)
recipient_balance = getattr(self.get_node_by_address(recipient), 'balance', 0)

setattr(self.get_node_by_address(sender), 'balance', sender_balance - amount)
setattr(self.get_node_by_address(recipient), 'balance', recipient_balance + amount)

return len(self.chain) + 1

def new_transaction(self, transaction):
self.transactions.append(transaction)
return len(self.chain) - 1

def add_node(self, address=None):
if address is None:
node = Node(str(uuid.uuid4()))
self.miner_node = node
self.nodes.append(node)
self.update_balances() # Update balances after adding a new node
return node.address
else:
node = self.get_node_by_address(address)
if node is None:
node = Node(address)
self.miner_node = node
self.nodes.append(node)
self.update_balances() # Update balances after adding a new node
return node.address
else:
return node.address

def get_balances(self):
balances = {}
for block in self.chain:
for transaction in block.transactions:
sender = transaction.sender
recipient = transaction.recipient
amount = transaction.amount

balances[sender] = balances.get(sender, 0) - amount
balances[recipient] = balances.get(recipient, 0) + amount

# Update balances based on pending transactions
for transaction in self.transactions:
sender = transaction.sender
recipient = transaction.recipient
amount = transaction.amount

balances[sender] = balances.get(sender, 0) - amount
balances[recipient] = balances.get(recipient, 0) + amount

return balances

def update_balances(self):
balances = self.get_balances()
for node in self.nodes:
address = node.address
balance = balances.get(address, 0)
setattr(node, 'balance', balance)

def mine_block(self):
try:
last_block = self.chain[-1]
index = last_block.index + 1
timestamp = time.time()
transactions = self.transactions.copy()
reward = self.calculate_reward()

transactions.append(Transaction(self.miner_node.address, self.miner_node.address, reward, "reward", miner_node=self.miner_node))
previous_hash = last_block.hash_block()

max_attempts = 1000
nonce = 0
while nonce < max_attempts:
block = Block(index, timestamp, transactions, previous_hash, nonce)
block_hash = block.hash_block()
if block_hash[:self.DIFFICULTY] == '0' * self.DIFFICULTY and self.node_validator.validate_block(block, self.nodes):
self.chain.append(block)
self.transactions = []
return block
nonce += 1

return None

except Exception as e:
print("An error occurred while mining a new block:")
print(str(e))
traceback.print_exc()
return None

def get_node_by_address(self, address):
for node in self.nodes:
if node.address == address:
return node
return None

def resolve_conflicts(self):
longest_chain = None
max_length = len(self.chain)

for node in self.nodes:
response = requests.get(f'http://{node.address}/blocks')

if response.status_code == 200:
length = response.json()['length']
chain = response.json()['chain']

if length > max_length and self.is_valid(chain):
max_length = length
longest_chain = chain

if longest_chain:
self.chain = longest_chain
return True

return False

def is_valid(self, chain=None):
if chain is None:
chain = self.chain

for i in range(1, len(chain)):
current_block = chain[i]
previous_block = chain[i - 1]
if current_block.previous_hash != previous_block.hash_block():
return False
return True

def __str__(self):
return json.dumps([block.to_dict() for block in self.chain], indent=2)

The secret of our POW blockchain is the difficulty and the mine_block function. As blocks are mined difficulty is increased over time. We resolve any conflicts in block state across nodes using the longest chain rule and add an extra layer of validation in the is_valid() method to double check block integrity.

Block Class

We will place this in the blockchain.py file since blocks are simple containers and don't require much heavy logic. Time based data, transactions, previous and next block hashes, a nonce and a verification signature is all we need.

We have a hash_block function which generates a SHA256 hash of the data dictionary and imprints it to the block before adding it to the chain. Add this class above your blockchain class in the same file named blockchain.py

class Block:
def __init__(self, index, timestamp, transactions, previous_hash, nonce, validations=None, signature=None):
self.index = index
self.timestamp = timestamp
self.transactions = transactions
self.previous_hash = previous_hash
self.nonce = nonce
self.validations = validations if validations else []
self.signature = signature if signature else b''

def hash_block(self):
block_str = json.dumps(self.to_dict(), sort_keys=True)
digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
digest.update(block_str.encode())
return digest.finalize().hex()

def to_dict(self):
return {
"index": self.index,
"timestamp": self.timestamp,
"transactions": [transaction.to_dict() for transaction in self.transactions],
"previous_hash": self.previous_hash,
"nonce": self.nonce,
"validations": self.validations,
"signature": self.signature.hex() if self.signature else None
}

Node Classes

These classes handle our signing and verification of transactions in the network. Create a new file called node.py and paste the following code this will activate our transaction propagation. But now we need the transactions to broadcast.

from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization, hashes

class Node:
def __init__(self, address=None):
self.address = address if address else str(uuid.uuid4())
self.private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
self.public_key = self.private_key.public_key()

def sign(self, message):
digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
digest.update(message)
signature = self.private_key.sign(
digest.finalize(),
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return signature

def verify(self, message, signature):
digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
digest.update(message)
try:
self.public_key.verify(
signature,
digest.finalize(),
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH,
),
hashes.SHA256(),
)
return True
except Exception:
return False

class NodeValidator:
def __init__(self):
self.confirmations_needed = 20

def validate_block(self, block, nodes):
confirmations = 0
for node in nodes:
if self.is_block_approved_by_node(block, node):
confirmations += 1
if confirmations >= self.confirmations_needed:
return True
return False

def is_block_approved_by_node(self, block, node):
try:
node.verify(block.hash_block().encode(), block.signature)
return True
except Exception:
return False

Transaction Class

Each txn in the network will be a representation of this classes data. Create a new file transaction.py and paste the following. This class is the functional controller that other methods call in order to validate transaction integrity and health.

class Transaction:
def __init__(self, sender, recipient, amount, data, signature=None, miner_node=None):
if sender is None or recipient is None or amount is None or data is None:
raise ValueError("Invalid transaction parameters")
self.sender = sender
self.recipient = recipient
self.amount = float(amount)
self.data = data
self.signature = signature if signature else b''
self.miner_node = miner_node if miner_node else None

def to_dict(self):
return {
"sender": self.sender,
"recipient": self.recipient,
"amount": self.amount,
"data": self.data,
"signature": self.signature.hex() if self.signature else None,
}

def sign(self):
sender_bytes = self.sender if isinstance(self.sender, bytes) else self.sender.encode()
recipient_bytes = self.recipient if isinstance(self.recipient, bytes) else self.recipient.encode()
amount_bytes = str(self.amount).encode()
data_bytes = self.data if isinstance(self.data, bytes) else self.data.encode()

message = sender_bytes + recipient_bytes + amount_bytes + data_bytes
self.signature = self.miner_node.sign(message)

def verify(self):
sender_bytes = self.sender if isinstance(self.sender, bytes) else self.sender.encode()
recipient_bytes = self.recipient if isinstance(self.recipient, bytes) else self.recipient.encode()
amount_bytes = str(self.amount).encode()
data_bytes = self.data if isinstance(self.data, bytes) else self.data.encode()

message = sender_bytes + recipient_bytes + amount_bytes + data_bytes
return self.miner_node.verify(message, self.signature)

P2P Class

The magic of this whole thing is the ability for each node and miner in the network to communicate so long as they have TCP/IP access. Create the p2p.py file and paste the following code to activate the network.

This will need more to be useable in a real world cross network scenario so do look into peer discovery across the internet and the challenges you will need to overcome to make this a real world useable class.


import asyncio
import time
import threading
from socketserver import BaseRequestHandler, ThreadingTCPServer
from transaction import Transaction

class P2PRequestHandler(BaseRequestHandler):
MAX_REQUESTS_PER_MINUTE = 1000

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.requests = 0
self.reset_time = time.time()

def handle(self):
try:
self.requests += 1
current_time = time.time()
if current_time - self.reset_time > 60:
self.requests = 0
self.reset_time = current_time

if self.requests > self.MAX_REQUESTS_PER_MINUTE:
raise Exception("Too many requests")

request = self.request.recv(1024).decode()
response = asyncio.run(self.node.handle_request(request))
return response.encode()

except Exception as e:
error_message = f"An error occurred: {str(e)}"
traceback_str = traceback.format_exc()
print(error_message)
print(traceback_str)
return f"HTTP/1.1 500 Internal Server Error\r\n\r\n{error_message}\n{traceback_str}".encode()

class P2PNode:
def __init__(self, blockchain):
self.peers = []
self.server_address = ('', 5000)
self.running = False
self.blockchain = blockchain
self.server = None

async def handle_request(self, request):
try:
request_str = request.decode()
print("Received request:", request_str)

response = await self.process_request(request_str)
return response.encode()

except Exception as e:
print("An error occurred in the request handler:")
print(str(e))
traceback.print_exc()
return "HTTP/1.1 500 Internal Server Error\r\n\r\n".encode()

async def process_request(self, request):
method, *headers_and_body = request.split('\r\n\r\n')
headers = headers_and_body[0]
body = headers_and_body[1] if len(headers_and_body) > 1 else ''
path = headers.split(' ')[1]

if method == 'POST':
if path == '/transactions/new':
return await self.new_transaction(body)
elif path == '/mine':
return await self.mine()
elif path == '/blocks':
return await self.full_chain()
elif path == '/peers/new':
return await self.add_peer(body)
elif method == 'GET' and path == '/blocks':
return await self.full_chain()

return "HTTP/1.1 404 Not Found\r\n\r\n"

async def start(self):
try:
self.running = True
print("Server started.")

self.server = ThreadingTCPServer(self.server_address, P2PRequestHandler)
self.server.node = self

server_thread = threading.Thread(target=self.server.serve_forever)
server_thread.start()

except Exception as e:
print("An error occurred in the server:", str(e))

Main

Create a file called main.py we will use this to test the chain and its various functions.

import threading
import asyncio
import time
import random
from blockchain import Blockchain
from transaction import Transaction
from p2p import P2PNode

try:
blockchain = Blockchain()
print("Blockchain created")

# Start 25 nodes and fund them with 100 coins each
nodes = []
miner_address = blockchain.add_node() # Add the miner's address
for _ in range(25):
address = blockchain.add_node()
nodes.append(address)
transaction = Transaction(miner_address, address, 100, "Initial funds", miner_node=blockchain.miner_node)
transaction.sign()
blockchain.new_transaction(transaction)

print("25 nodes created and funded.")

# Print balances of all nodes
balances = blockchain.get_balances()
print("Balances after funding:")
for address, balance in balances.items():
print(f"{address}: {balance}")

async def start_server():
node = P2PNode(blockchain)
await node.start()

thread = threading.Thread(target=lambda: asyncio.run(start_server()))
thread.start()

time.sleep(1) # Wait for nodes to synchronize

# Each node sends a transaction to three random recipients
for sender in nodes:
recipient = random.choice(nodes)
if recipient != sender:
sender_address = sender
recipient_address = recipient
sender_balance = blockchain.get_balances().get(sender_address, 0)
print("Sender balance:", sender_balance)
if sender_balance >= 1: # Check if sender has sufficient funds
print("Sender:", sender_address)
print("Recipient:", recipient_address)
transaction = Transaction(sender_address, recipient_address, 1, "Transaction", miner_node=blockchain.miner_node)
transaction.sign()
try:
blockchain.new_transaction(transaction)
except Exception as e:
print("An error occurred:", str(e))
traceback.print_exc()
else:
print("Sender does not have sufficient funds to send the transaction.")

# Start mining until three blocks are mined
for _ in range(3):
block = blockchain.mine_block()
if block:
print("New block mined successfully.")
print(f"Block hash: {block.hash_block()}")
else:
print("Failed to mine a new block.")
break

print("Three blocks mined successfully.")

# Print balances of all nodes
balances = blockchain.get_balances()
print("Balances after all transactions:")
for address, balance in balances.items():
print(f"{address}: {balance}")

print("Printing the blockchain:")
print(blockchain)

# Start mining until miner rewards reach 20 coins
while blockchain.get_balances()[blockchain.miner_node.address] < 20:
block = blockchain.mine_block()
if block:
print("New block mined successfully.")
print(f"Block hash: {block.hash_block()}")
else:
print("Failed to mine a new block.")
break

print("Miner rewards reached 20 coins.")

# Send 0.5 coins to every node from the miner
for recipient in nodes:
transaction = Transaction(blockchain.miner_node.address, str(recipient), 0.5, "Reward", miner_node=blockchain.miner_node)
transaction.sign()
blockchain.new_transaction(transaction)

print("Reward transactions sent to nodes.")

time.sleep(1) # Wait for transactions to be processed

# Print balances of all nodes
balances = blockchain.get_balances()
print("Balances after all transactions:")
for address, balance in balances.items():
print(f"{address}: {balance}")

print("Printing the blockchain:")
print(blockchain)

except Exception as e:
print(f"An error occurred: {e}")

Testing

Load a terminal window as an admin user and type then hit return this will spin up the chain and let you see it in action !

python main.py
Blockchain sending transactions

Conclusion

This is a basic starting point for a blockchain and lacks the extra layers of security and validation to be production ready but its a fantastic starting point for anyone to learn !

Full source can be found here

--

--

Robert McMenemy
Robert McMenemy

Written by Robert McMenemy

Full stack developer with a penchant for cryptography.

No responses yet